summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
commitff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch)
tree17f66b04659a06c018494eb732adb661111d63f2
parent7cef269be64f64b920763c6f2455931422c8bfe9 (diff)
downloadouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz
ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads.
-rw-r--r--include/ouroboros/tpm.h23
-rw-r--r--src/ipcpd/ipcp.c99
-rw-r--r--src/ipcpd/ipcp.h9
-rw-r--r--src/ipcpd/normal/dht.c339
-rw-r--r--src/irmd/main.c104
-rw-r--r--src/lib/tpm.c162
6 files changed, 417 insertions, 319 deletions
diff --git a/include/ouroboros/tpm.h b/include/ouroboros/tpm.h
index dc57f485..74e45035 100644
--- a/include/ouroboros/tpm.h
+++ b/include/ouroboros/tpm.h
@@ -25,22 +25,25 @@
#include <stdbool.h>
-int tpm_init(size_t min,
- size_t inc,
- void * (* func)(void *));
+struct tpm;
-int tpm_start(void);
+struct tpm * tpm_create(size_t min,
+ size_t inc,
+ void * (* func)(void *),
+ void * o);
-void tpm_stop(void);
+void tpm_destroy(struct tpm * tpm);
-void tpm_fini(void);
+int tpm_start(struct tpm * tpm);
-bool tpm_check(void);
+void tpm_stop(struct tpm * tpm);
-void tpm_exit(void);
+bool tpm_check(struct tpm * tpm);
-void tpm_dec(void);
+void tpm_exit(struct tpm * tpm);
-void tpm_inc(void);
+void tpm_dec(struct tpm * tpm);
+
+void tpm_inc(struct tpm * tpm);
#endif /* OUROBOROS_LIB_TPM_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 85d543da..513c638a 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -36,7 +36,6 @@
#include <ouroboros/dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
-#include <ouroboros/tpm.h>
#include "ipcp.h"
@@ -44,6 +43,14 @@
#include <sys/socket.h>
#include <stdlib.h>
+struct cmd {
+ struct list_head next;
+
+ uint8_t cbuf[IPCP_MSG_BUF_SIZE];
+ size_t len;
+ int fd;
+};
+
void ipcp_sig_handler(int sig,
siginfo_t * info,
void * c)
@@ -107,7 +114,8 @@ static void * acceptloop(void * o)
while (ipcp_get_state() != IPCP_SHUTDOWN &&
ipcp_get_state() != IPCP_NULL) {
- ssize_t count;
+ struct cmd * cmd;
+
#if defined(__FreeBSD__) || defined(__APPLE__)
FD_ZERO(&fds);
FD_SET(ipcpi.sockfd, &fds);
@@ -122,25 +130,28 @@ static void * acceptloop(void * o)
(void *) &tv, sizeof(tv)))
log_warn("Failed to set timeout on socket.");
- pthread_mutex_lock(&ipcpi.cmd_lock);
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Out of memory");
+ break;
+ }
- assert(ipcpi.csockfd == -1);
+ pthread_mutex_lock(&ipcpi.cmd_lock);
- count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE);
- if (count <= 0) {
+ cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE);
+ if (cmd->len <= 0) {
pthread_mutex_unlock(&ipcpi.cmd_lock);
log_err("Failed to read from socket.");
close(csockfd);
+ free(cmd);
continue;
}
- ipcpi.cmd_len = count;
- ipcpi.csockfd = csockfd;
+ cmd->fd = csockfd;
- pthread_cond_signal(&ipcpi.cmd_cond);
+ list_add(&cmd->next, &ipcpi.cmds);
- while (ipcpi.csockfd != -1)
- pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock);
+ pthread_cond_signal(&ipcpi.cmd_cond);
pthread_mutex_unlock(&ipcpi.cmd_lock);
}
@@ -159,13 +170,15 @@ static void * mainloop(void * o)
struct timespec dl;
struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000),
(IPCP_ACCEPT_TIMEOUT % 1000) * MILLION};
- (void) o;
+
+ (void) o;
while (true) {
int ret = 0;
ipcp_msg_t ret_msg = IPCP_MSG__INIT;
dif_info_msg_t dif_info = DIF_INFO_MSG__INIT;
int fd = -1;
+ struct cmd * cmd;
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
@@ -174,33 +187,34 @@ static void * mainloop(void * o)
pthread_mutex_lock(&ipcpi.cmd_lock);
- while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT)
+ while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,
&ipcpi.cmd_lock,
&dl);
- sfd = ipcpi.csockfd;
- ipcpi.csockfd = -1;
-
- if (sfd == -1) {
+ if (ret == -ETIMEDOUT) {
pthread_mutex_unlock(&ipcpi.cmd_lock);
- if (tpm_check())
+ if (tpm_check(ipcpi.tpm))
break;
continue;
}
- pthread_cond_signal(&ipcpi.acc_cond);
+ cmd = list_last_entry(&ipcpi.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+
+ msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf);
+ sfd = cmd->fd;
+
+ free(cmd);
- msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);
if (msg == NULL) {
- pthread_mutex_unlock(&ipcpi.cmd_lock);
close(sfd);
continue;
}
- pthread_mutex_unlock(&ipcpi.cmd_lock);
-
- tpm_dec();
+ tpm_dec(ipcpi.tpm);
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
@@ -474,7 +488,7 @@ static void * mainloop(void * o)
if (buffer.len == 0) {
log_err("Failed to pack reply message");
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
@@ -482,7 +496,7 @@ static void * mainloop(void * o)
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
@@ -492,17 +506,17 @@ static void * mainloop(void * o)
log_err("Failed to send reply message");
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
}
- tpm_exit();
+ tpm_exit(ipcpi.tpm);
return (void *) 0;
}
@@ -617,20 +631,14 @@ int ipcp_init(int argc,
goto fail_cmd_cond;
}
- if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) {
- log_err("Failed to init convar.");
- goto fail_acc_cond;
- }
+ list_head_init(&ipcpi.cmds);
ipcpi.alloc_id = -1;
- ipcpi.csockfd = -1;
pthread_condattr_destroy(&cattr);
return 0;
- fail_acc_cond:
- pthread_cond_destroy(&ipcpi.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&ipcpi.cmd_lock);
fail_cmd_lock:
@@ -675,12 +683,14 @@ int ipcp_boot()
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
- goto fail_tpm_init;
+ ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS,
+ mainloop, NULL);
+ if (ipcpi.tpm == NULL)
+ goto fail_tpm_create;
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- if (tpm_start())
+ if (tpm_start(ipcpi.tpm))
goto fail_tpm_start;
ipcp_set_state(IPCP_INIT);
@@ -696,18 +706,18 @@ int ipcp_boot()
return 0;
fail_acceptor:
- tpm_stop();
+ tpm_stop(ipcpi.tpm);
fail_tpm_start:
- tpm_fini();
- fail_tpm_init:
+ tpm_destroy(ipcpi.tpm);
+ fail_tpm_create:
return -1;
}
void ipcp_shutdown()
{
pthread_join(ipcpi.acceptor, NULL);
- tpm_stop();
- tpm_fini();
+ tpm_stop(ipcpi.tpm);
+ tpm_destroy(ipcpi.tpm);
log_info("IPCP %d shutting down.", getpid());
}
@@ -724,7 +734,6 @@ void ipcp_fini()
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);
- pthread_cond_destroy(&ipcpi.acc_cond);
pthread_cond_destroy(&ipcpi.cmd_cond);
pthread_mutex_destroy(&ipcpi.cmd_lock);
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 1b2a0334..d47d224b 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -25,8 +25,10 @@
#include <ouroboros/hash.h>
#include <ouroboros/ipcp.h>
+#include <ouroboros/list.h>
#include <ouroboros/qoscube.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/tpm.h>
#include <pthread.h>
#include <time.h>
@@ -92,10 +94,7 @@ struct ipcp {
int sockfd;
char * sock_path;
- uint8_t cbuf[IPCP_MSG_BUF_SIZE];
- size_t cmd_len;
- int csockfd;
- pthread_cond_t acc_cond;
+ struct list_head cmds;
pthread_cond_t cmd_cond;
pthread_mutex_t cmd_lock;
@@ -103,6 +102,8 @@ struct ipcp {
pthread_cond_t alloc_cond;
pthread_mutex_t alloc_lock;
+ struct tpm * tpm;
+
pthread_t acceptor;
} ipcpi;
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index 548ae03a..93fd4e4e 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -34,6 +34,7 @@
#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
#include <ouroboros/utils.h>
#include "connmgr.h"
@@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t;
#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */
#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
+#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */
enum dht_state {
DHT_INIT = 0,
@@ -187,6 +189,12 @@ struct bucket {
struct bucket * children[1L << KAD_BETA];
};
+struct cmd {
+ struct list_head next;
+
+ struct shm_du_buff * sdb;
+};
+
struct dht {
size_t alpha;
size_t b;
@@ -212,6 +220,7 @@ struct dht {
struct bmp * cookies;
enum dht_state state;
+ struct list_head cmds;
pthread_cond_t cond;
pthread_mutex_t mtx;
@@ -219,6 +228,8 @@ struct dht {
int fd;
+ struct tpm * tpm;
+
pthread_t worker;
};
@@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht,
ipcp_sdb_release(sdb);
#endif
fail_msg:
+ pthread_rwlock_wrlock(&dht->lock);
bmp_release(dht->cookies, msg->cookie);
+ pthread_rwlock_unlock(&dht->lock);
fail_bmp_alloc:
return -1;
}
@@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht * dht,
list_head_init(&l);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht->lock);
len = dht_contact_list(dht, &l, key);
if (len == 0) {
@@ -1898,9 +1911,13 @@ static void * work(void * o)
dht = (struct dht *) o;
+ pthread_rwlock_rdlock(&dht->lock);
+
intv = gcd(dht->t_expire, dht->t_repub);
intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
+ pthread_rwlock_unlock(&dht->lock);
+
list_head_init(&reflist);
while (true) {
@@ -2189,7 +2206,7 @@ int dht_reg(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, >addr, t_expire);
+ kad_publish(dht, key, addr, t_expire);
return 0;
}
@@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht * dht,
static void * dht_handle_sdu(void * o)
{
- struct dht * dht;
- struct shm_du_buff * sdb;
- kad_msg_t * msg;
- kad_contact_msg_t ** cmsgs;
- kad_msg_t resp_msg = KAD_MSG__INIT;
- uint64_t addr;
- buffer_t buf;
- size_t i;
- size_t b;
- size_t t_expire;
+ struct dht * dht = (struct dht *) o;
+ struct timespec dl;
+ struct timespec to = {(HANDLE_TIMEO / 1000),
+ (HANDLE_TIMEO % 1000) * MILLION};
+ assert(dht);
- assert(o);
+ while (true) {
+ kad_msg_t * msg;
+ kad_contact_msg_t ** cmsgs;
+ kad_msg_t resp_msg = KAD_MSG__INIT;
+ uint64_t addr;
+ buffer_t buf;
+ size_t i;
+ size_t b;
+ size_t t_expire;
+ struct cmd * cmd;
+ int ret = 0;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_mutex_lock(&dht->mtx);
+
+ while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&dht->cond,
+ &dht->mtx, &dl);
+
+ if (ret == -ETIMEDOUT) {
+ pthread_mutex_unlock(&dht->mtx);
+ if (tpm_check(dht->tpm))
+ break;
+ continue;
+ }
- memset(&buf, 0, sizeof(buf));
+ cmd = list_last_entry(&dht->cmds, struct cmd, next);
+ list_del(&cmd->next);
- dht = ((struct sdu_info *) o)->dht;
- sdb = ((struct sdu_info *) o)->sdb;
+ pthread_mutex_unlock(&dht->mtx);
- assert(dht);
- assert(sdb);
+ i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
- msg = kad_msg__unpack(NULL,
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- shm_du_buff_head(sdb));
+ msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
- ipcp_sdb_release(sdb);
+ ipcp_sdb_release(cmd->sdb);
+ free(cmd);
- free(o);
+ if (msg == NULL) {
+ log_err("Failed to unpack message.");
+ continue;
+ }
- if (msg == NULL) {
- log_err("Failed to unpack message.");
- return (void *) -1;
- }
+ pthread_rwlock_rdlock(&dht->lock);
- pthread_rwlock_rdlock(&dht->lock);
+ b = dht->b;
+ t_expire = dht->t_expire;
- b = dht->b;
- t_expire = dht->t_expire;
+ pthread_rwlock_unlock(&dht->lock);
- pthread_rwlock_unlock(&dht->lock);
+ if (msg->has_key && msg->key.len != b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad key in message.");
+ continue;
+ }
- if (msg->has_key && msg->key.len != b) {
- kad_msg__free_unpacked(msg, NULL);
- log_warn("Bad key in message.");
- return (void *) -1;
- }
+ if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad source ID in message of type %d.",
+ msg->code);
+ continue;
+ }
- if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
- kad_msg__free_unpacked(msg, NULL);
- log_warn("Bad source ID in message of type %d.", msg->code);
- return (void *) -1;
- }
+ if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_dbg("Got a request message when not running.");
+ continue;
+ }
- if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
- kad_msg__free_unpacked(msg, NULL);
- log_dbg("Got a request message when not running.");
- return (void *) -1;
- }
+ tpm_dec(dht->tpm);
- addr = msg->s_addr;
+ addr = msg->s_addr;
- resp_msg.code = KAD_RESPONSE;
- resp_msg.cookie = msg->cookie;
+ resp_msg.code = KAD_RESPONSE;
+ resp_msg.cookie = msg->cookie;
- switch(msg->code) {
- case KAD_JOIN:
- /* Refuse enrollee on check fails. */
- if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
- log_warn("Parameter mismatch. "
- "DHT enrolment refused.");
- break;
- }
+ switch(msg->code) {
+ case KAD_JOIN:
+ /* Refuse enrollee on check fails. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
+ log_warn("Parameter mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
- if (msg->t_replicate != KAD_T_REPL) {
- log_warn("Replication time mismatch. "
- "DHT enrolment refused.");
+ if (msg->t_replicate != KAD_T_REPL) {
+ log_warn("Replication time mismatch. "
+ "DHT enrolment refused.");
- break;
+ break;
}
- if (msg->t_refresh != KAD_T_REFR) {
- log_warn("Refresh time mismatch. "
- "DHT enrolment refused.");
+ if (msg->t_refresh != KAD_T_REFR) {
+ log_warn("Refresh time mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ resp_msg.has_alpha = true;
+ resp_msg.has_b = true;
+ resp_msg.has_k = true;
+ resp_msg.has_t_expire = true;
+ resp_msg.has_t_refresh = true;
+ resp_msg.has_t_replicate = true;
+ resp_msg.alpha = KAD_ALPHA;
+ resp_msg.b = b;
+ resp_msg.k = KAD_K;
+ resp_msg.t_expire = t_expire;
+ resp_msg.t_refresh = KAD_T_REFR;
+ resp_msg.t_replicate = KAD_T_REPL;
break;
- }
+ case KAD_FIND_VALUE:
+ buf = dht_retrieve(dht, msg->key.data);
+ if (buf.len != 0) {
+ resp_msg.n_addrs = buf.len;
+ resp_msg.addrs = (uint64_t *) buf.data;
+ break;
+ }
+ /* FALLTHRU */
+ case KAD_FIND_NODE:
+ /* Return k closest contacts. */
+ resp_msg.n_contacts =
+ dht_get_contacts(dht, msg->key.data, &cmsgs);
+ resp_msg.contacts = cmsgs;
+ break;
+ case KAD_STORE:
+ if (msg->n_contacts < 1) {
+ log_warn("No contacts in store message.");
+ break;
+ }
- resp_msg.has_alpha = true;
- resp_msg.has_b = true;
- resp_msg.has_k = true;
- resp_msg.has_t_expire = true;
- resp_msg.has_t_refresh = true;
- resp_msg.has_t_replicate = true;
- resp_msg.alpha = KAD_ALPHA;
- resp_msg.b = b;
- resp_msg.k = KAD_K;
- resp_msg.t_expire = t_expire;
- resp_msg.t_refresh = KAD_T_REFR;
- resp_msg.t_replicate = KAD_T_REPL;
- break;
- case KAD_FIND_VALUE:
- buf = dht_retrieve(dht, msg->key.data);
- if (buf.len != 0) {
- resp_msg.n_addrs = buf.len;
- resp_msg.addrs = (uint64_t *) buf.data;
+ if (!msg->has_t_expire) {
+ log_warn("No expiry time in store message.");
+ break;
+ }
+
+ kad_add(dht, *msg->contacts, msg->n_contacts,
+ msg->t_expire);
break;
- }
- /* FALLTHRU */
- case KAD_FIND_NODE:
- /* Return k closest contacts. */
- resp_msg.n_contacts =
- dht_get_contacts(dht, msg->key.data, &cmsgs);
- resp_msg.contacts = cmsgs;
- break;
- case KAD_STORE:
- if (msg->n_contacts < 1) {
- log_warn("No contacts in store message.");
+ case KAD_RESPONSE:
+ kad_handle_response(dht, msg);
+ break;
+ default:
+ assert(false);
break;
}
- if (!msg->has_t_expire) {
- log_warn("No expiry time in store message.");
- break;
+ if (msg->code != KAD_JOIN) {
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, msg->s_id.data, addr))
+ log_warn("Failed to update bucket.");
+ pthread_rwlock_unlock(&dht->lock);
}
- kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire);
- break;
- case KAD_RESPONSE:
- kad_handle_response(dht, msg);
- break;
- default:
- assert(false);
- break;
- }
+ if (msg->code < KAD_STORE) {
+ if (send_msg(dht, &resp_msg, addr))
+ log_warn("Failed to send response.");
+ }
- if (msg->code != KAD_JOIN) {
- pthread_rwlock_wrlock(&dht->lock);
- if (dht_update_bucket(dht, msg->s_id.data, addr))
- log_warn("Failed to update bucket.");
- pthread_rwlock_unlock(&dht->lock);
- }
+ kad_msg__free_unpacked(msg, NULL);
- if (msg->code < KAD_STORE) {
- if (send_msg(dht, &resp_msg, addr))
- log_warn("Failed to send response.");
- }
+ if (resp_msg.n_addrs > 0)
+ free(resp_msg.addrs);
- kad_msg__free_unpacked(msg, NULL);
+ if (resp_msg.n_contacts == 0) {
+ tpm_inc(dht->tpm);
+ continue;
+ }
- if (resp_msg.n_addrs > 0)
- free(resp_msg.addrs);
+ for (i = 0; i < resp_msg.n_contacts; ++i)
+ kad_contact_msg__free_unpacked(resp_msg.contacts[i],
+ NULL);
+ free(resp_msg.contacts);
- if (resp_msg.n_contacts == 0)
- return (void *) -1;
+ tpm_inc(dht->tpm);
+ }
- for (i = 0; i < resp_msg.n_contacts; ++i)
- kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);
- free(resp_msg.contacts);
+ tpm_exit(dht->tpm);
return (void *) 0;
}
@@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o)
static void dht_post_sdu(void * comp,
struct shm_du_buff * sdb)
{
- pthread_t thr;
- struct sdu_info * info;
+ struct cmd * cmd;
+ struct dht * dht = (struct dht *) comp;
- info = malloc(sizeof(*info));
- if (info == NULL)
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command failed. Out of memory.");
return;
+ }
- info->dht = (struct dht *) comp;
- info->sdb = sdb;
+ cmd->sdb = sdb;
- if (pthread_create(&thr, NULL, dht_handle_sdu, info)) {
- free(info);
- return;
- }
+ pthread_mutex_lock(&dht->mtx);
+
+ list_add(&cmd->next, &dht->cmds);
- pthread_detach(thr);
+ pthread_cond_signal(&dht->cond);
+
+ pthread_mutex_unlock(&dht->mtx);
}
void dht_destroy(struct dht * dht)
@@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht)
if (dht == NULL)
return;
+#ifndef __DHT_TEST__
+ tpm_stop(dht->tpm);
+
+ tpm_destroy(dht->tpm);
+#endif
if (dht_get_state(dht) == DHT_RUNNING) {
dht_set_state(dht, DHT_SHUTDOWN);
pthread_cancel(dht->worker);
@@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr)
list_head_init(&dht->requests);
list_head_init(&dht->refs);
list_head_init(&dht->lookups);
+ list_head_init(&dht->cmds);
if (pthread_rwlock_init(&dht->lock, NULL))
goto fail_rwlock;
@@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr)
dht->addr = addr;
dht->id = NULL;
#ifndef __DHT_TEST__
+ dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht);
+ if (dht->tpm == NULL)
+ goto fail_tpm_create;
+
+ if (tpm_start(dht->tpm))
+ goto fail_tpm_start;
+
dht->fd = dt_reg_ae(dht, &dht_post_sdu);
notifier_reg(handle_event, dht);
#else
(void) handle_event;
+ (void) dht_handle_sdu;
(void) dht_post_sdu;
#endif
dht->state = DHT_INIT;
return dht;
-
+#ifndef __DHT_TEST__
+ fail_tpm_start:
+ tpm_destroy(dht->tpm);
+ fail_tpm_create:
+ bmp_destroy(dht->cookies);
+#endif
fail_bmp:
pthread_cond_destroy(&dht->cond);
fail_cond:
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 27c771a6..3fceadb6 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -93,6 +93,14 @@ enum irm_state {
IRMD_RUNNING
};
+struct cmd {
+ struct list_head next;
+
+ uint8_t cbuf[IB_LEN];
+ size_t len;
+ int fd;
+};
+
struct {
struct list_head registry; /* registered names known */
@@ -112,16 +120,15 @@ struct {
int sockfd; /* UNIX socket */
- uint8_t cbuf[IB_LEN]; /* cmd message buffer */
- size_t cmd_len; /* length of cmd in cbuf */
- int csockfd; /* cmd UNIX socket */
- pthread_cond_t acc_cond; /* cmd accepted condvar */
+ struct list_head cmds; /* pending commands */
pthread_cond_t cmd_cond; /* cmd signal condvar */
pthread_mutex_t cmd_lock; /* cmd signal lock */
enum irm_state state; /* state of the irmd */
pthread_rwlock_t state_lock; /* lock for the entire irmd */
+ struct tpm * tpm; /* thread pool manager */
+
pthread_t irm_sanitize; /* clean up irmd resources */
pthread_t shm_sanitize; /* keep track of rdrbuff use */
pthread_t acceptor; /* accept new commands */
@@ -1631,7 +1638,6 @@ static void irm_fini(void)
lockfile_destroy(irmd.lf);
pthread_mutex_destroy(&irmd.cmd_lock);
- pthread_cond_destroy(&irmd.acc_cond);
pthread_cond_destroy(&irmd.cmd_cond);
pthread_rwlock_destroy(&irmd.reg_lock);
pthread_rwlock_destroy(&irmd.state_lock);
@@ -1864,7 +1870,8 @@ static void * acceptloop(void * o)
(void) o;
while (irmd_get_state() == IRMD_RUNNING) {
- ssize_t count;
+ struct cmd * cmd;
+
#if defined(__FreeBSD__) || defined(__APPLE__)
FD_ZERO(&fds);
FD_SET(irmd.sockfd, &fds);
@@ -1879,25 +1886,28 @@ static void * acceptloop(void * o)
(void *) &tv, sizeof(tv)))
log_warn("Failed to set timeout on socket.");
- pthread_mutex_lock(&irmd.cmd_lock);
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Out of memory.");
+ break;
+ }
- assert(irmd.csockfd == -1);
+ pthread_mutex_lock(&irmd.cmd_lock);
- count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE);
- if (count <= 0) {
+ cmd->len = read(csockfd, cmd->cbuf, IRM_MSG_BUF_SIZE);
+ if (cmd->len <= 0) {
pthread_mutex_unlock(&irmd.cmd_lock);
log_err("Failed to read from socket.");
close(csockfd);
+ free(cmd);
continue;
}
- irmd.cmd_len = count;
- irmd.csockfd = csockfd;
+ cmd->fd = csockfd;
- pthread_cond_signal(&irmd.cmd_cond);
+ list_add(&cmd->next, &irmd.cmds);
- while(irmd.csockfd != -1)
- pthread_cond_wait(&irmd.acc_cond, &irmd.cmd_lock);
+ pthread_cond_signal(&irmd.cmd_cond);
pthread_mutex_unlock(&irmd.cmd_lock);
}
@@ -1923,6 +1933,7 @@ void * mainloop(void * o)
pid_t * apis = NULL;
struct timespec * timeo = NULL;
struct timespec ts = {0, 0};
+ struct cmd * cmd;
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
@@ -1931,33 +1942,34 @@ void * mainloop(void * o)
pthread_mutex_lock(&irmd.cmd_lock);
- while (irmd.csockfd == -1 && ret != -ETIMEDOUT)
+ while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&irmd.cmd_cond,
&irmd.cmd_lock,
&dl);
- sfd = irmd.csockfd;
- irmd.csockfd = -1;
-
- if (sfd == -1) {
+ if (ret == -ETIMEDOUT) {
pthread_mutex_unlock(&irmd.cmd_lock);
- if (tpm_check())
+ if (tpm_check(irmd.tpm))
break;
continue;
}
- pthread_cond_signal(&irmd.acc_cond);
+ cmd = list_last_entry(&irmd.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_mutex_unlock(&irmd.cmd_lock);
+
+ msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf);
+ sfd = cmd->fd;
+
+ free(cmd);
- msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf);
if (msg == NULL) {
- pthread_mutex_unlock(&irmd.cmd_lock);
close(sfd);
continue;
}
- pthread_mutex_unlock(&irmd.cmd_lock);
-
- tpm_dec();
+ tpm_dec(irmd.tpm);
if (msg->has_timeo_sec) {
assert(msg->has_timeo_nsec);
@@ -2098,7 +2110,7 @@ void * mainloop(void * o)
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
continue;
}
@@ -2108,7 +2120,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
continue;
}
@@ -2117,7 +2129,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
continue;
}
@@ -2133,10 +2145,10 @@ void * mainloop(void * o)
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
}
- tpm_exit();
+ tpm_exit(irmd.tpm);
return (void *) 0;
}
@@ -2184,12 +2196,6 @@ static int irm_init(void)
goto fail_cmd_cond;
}
- if (pthread_cond_init(&irmd.acc_cond, &cattr)) {
- log_err("Failed to initialize condvar.");
- pthread_condattr_destroy(&cattr);
- goto fail_acc_cond;
- }
-
pthread_condattr_destroy(&cattr);
list_head_init(&irmd.ipcps);
@@ -2198,6 +2204,7 @@ static int irm_init(void)
list_head_init(&irmd.spawned_apis);
list_head_init(&irmd.registry);
list_head_init(&irmd.irm_flows);
+ list_head_init(&irmd.cmds);
irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0);
if (irmd.port_ids == NULL) {
@@ -2272,7 +2279,6 @@ static int irm_init(void)
gcry_control(GCRYCTL_INITIALIZATION_FINISHED);
#endif
- irmd.csockfd = -1;
irmd.state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2294,8 +2300,6 @@ static int irm_init(void)
fail_lockfile:
bmp_destroy(irmd.port_ids);
fail_port_ids:
- pthread_cond_destroy(&irmd.acc_cond);
- fail_acc_cond:
pthread_cond_destroy(&irmd.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&irmd.cmd_lock);
@@ -2367,12 +2371,14 @@ int main(int argc,
if (irm_init() < 0)
goto fail_irm_init;
- if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) {
+ irmd.tpm = tpm_create(IRMD_MIN_THREADS, IRMD_ADD_THREADS,
+ mainloop, NULL);
+ if (irmd.tpm == NULL) {
irmd_set_state(IRMD_NULL);
- goto fail_tpm_init;
+ goto fail_tpm_create;
}
- if (tpm_start()) {
+ if (tpm_start(irmd.tpm)) {
irmd_set_state(IRMD_NULL);
goto fail_tpm_start;
}
@@ -2396,9 +2402,9 @@ int main(int argc,
pthread_join(irmd.irm_sanitize, NULL);
pthread_join(irmd.shm_sanitize, NULL);
- tpm_stop();
+ tpm_stop(irmd.tpm);
- tpm_fini();
+ tpm_destroy(irmd.tpm);
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
@@ -2417,10 +2423,10 @@ int main(int argc,
fail_shm_sanitize:
pthread_join(irmd.irm_sanitize, NULL);
fail_irm_sanitize:
- tpm_stop();
+ tpm_stop(irmd.tpm);
fail_tpm_start:
- tpm_fini();
- fail_tpm_init:
+ tpm_destroy(irmd.tpm);
+ fail_tpm_create:
irm_fini();
fail_irm_init:
log_fini();
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
index dd71d276..c883e0a8 100644
--- a/src/lib/tpm.c
+++ b/src/lib/tpm.c
@@ -50,38 +50,38 @@ enum tpm_state {
TPM_RUNNING
};
-struct {
+struct tpm {
size_t min;
size_t inc;
size_t cur;
size_t wrk;
void * (* func)(void *);
+ void * o;
struct list_head pool;
enum tpm_state state;
-
pthread_cond_t cond;
pthread_mutex_t lock;
pthread_t mgr;
-} tpm;
+};
-static void tpm_join(void)
+static void tpm_join(struct tpm * tpm)
{
struct list_head * p;
struct list_head * h;
- list_for_each_safe(p, h, &tpm.pool) {
+ list_for_each_safe(p, h, &tpm->pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (tpm.state != TPM_RUNNING) {
+ if (tpm->state != TPM_RUNNING) {
if (!e->kill) {
e->kill = true;
- --tpm.cur;
+ --tpm->cur;
}
while (!e->join)
- pthread_cond_wait(&tpm.cond, &tpm.lock);
+ pthread_cond_wait(&tpm->cond, &tpm->lock);
}
if (e->join) {
@@ -92,12 +92,13 @@ static void tpm_join(void)
}
}
-static struct pthr_el * tpm_pthr_el(pthread_t thr)
+static struct pthr_el * tpm_pthr_el(struct tpm * tpm,
+ pthread_t thr)
{
struct list_head * p;
struct pthr_el * e;
- list_for_each(p, &tpm.pool) {
+ list_for_each(p, &tpm->pool) {
e = list_entry(p, struct pthr_el, next);
if (e->thr == thr)
return e;
@@ -109,15 +110,15 @@ static struct pthr_el * tpm_pthr_el(pthread_t thr)
return NULL;
}
-static void tpm_kill(void)
+static void tpm_kill(struct tpm * tpm)
{
struct list_head * p;
- list_for_each(p, &tpm.pool) {
+ list_for_each(p, &tpm->pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
if (!e->kill) {
e->kill = true;
- --tpm.cur;
+ --tpm->cur;
return;
}
}
@@ -128,25 +129,25 @@ static void * tpmgr(void * o)
struct timespec dl;
struct timespec to = {(TPM_TIMEOUT / 1000),
(TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
+ struct tpm * tpm = (struct tpm *) o;
while (true) {
clock_gettime(PTHREAD_COND_CLOCK, &dl);
ts_add(&dl, &to, &dl);
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- if (tpm.state != TPM_RUNNING) {
- tpm_join();
- pthread_mutex_unlock(&tpm.lock);
+ if (tpm->state != TPM_RUNNING) {
+ tpm_join(tpm);
+ pthread_mutex_unlock(&tpm->lock);
break;
}
- tpm_join();
+ tpm_join(tpm);
- if (tpm.cur - tpm.wrk < tpm.min) {
+ if (tpm->cur - tpm->wrk < tpm->min) {
size_t i;
- for (i = 0; i < tpm.inc; ++i) {
+ for (i = 0; i < tpm->inc; ++i) {
struct pthr_el * e = malloc(sizeof(*e));
if (e == NULL)
break;
@@ -155,35 +156,41 @@ static void * tpmgr(void * o)
e->kill = false;
if (pthread_create(&e->thr, NULL,
- tpm.func, NULL)) {
+ tpm->func, tpm->o)) {
free(e);
break;
}
- list_add(&e->next, &tpm.pool);
+ list_add(&e->next, &tpm->pool);
}
- tpm.cur += i;
+ tpm->cur += i;
}
- if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)
+ if (pthread_cond_timedwait(&tpm->cond, &tpm->lock, &dl)
== ETIMEDOUT)
- if (tpm.cur > tpm.min)
- tpm_kill();
+ if (tpm->cur > tpm->min)
+ tpm_kill(tpm);
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
return (void *) 0;
}
-int tpm_init(size_t min,
- size_t inc,
- void * (* func)(void *))
+struct tpm * tpm_create(size_t min,
+ size_t inc,
+ void * (* func)(void *),
+ void * o)
{
+ struct tpm * tpm;
pthread_condattr_t cattr;
- if (pthread_mutex_init(&tpm.lock, NULL))
+ tpm = malloc(sizeof(*tpm));
+ if (tpm == NULL)
+ goto fail_malloc;
+
+ if (pthread_mutex_init(&tpm->lock, NULL))
goto fail_lock;
if (pthread_condattr_init(&cattr))
@@ -192,103 +199,108 @@ int tpm_init(size_t min,
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_cond_init(&tpm.cond, &cattr))
+ if (pthread_cond_init(&tpm->cond, &cattr))
goto fail_cond;
- list_head_init(&tpm.pool);
+ list_head_init(&tpm->pool);
pthread_condattr_destroy(&cattr);
- tpm.state = TPM_INIT;
- tpm.func = func;
- tpm.min = min;
- tpm.inc = inc;
- tpm.cur = 0;
- tpm.wrk = 0;
+ tpm->state = TPM_INIT;
+ tpm->func = func;
+ tpm->o = o;
+ tpm->min = min;
+ tpm->inc = inc;
+ tpm->cur = 0;
+ tpm->wrk = 0;
- return 0;
+ return tpm;
fail_cond:
pthread_condattr_destroy(&cattr);
fail_cattr:
- pthread_mutex_destroy(&tpm.lock);
+ pthread_mutex_destroy(&tpm->lock);
fail_lock:
- return -1;
+ free(tpm);
+ fail_malloc:
+ return NULL;
}
-int tpm_start(void)
+int tpm_start(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) {
- pthread_mutex_unlock(&tpm.lock);
+ if (pthread_create(&tpm->mgr, NULL, tpmgr, tpm)) {
+ pthread_mutex_unlock(&tpm->lock);
return -1;
}
- tpm.state = TPM_RUNNING;
+ tpm->state = TPM_RUNNING;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
return 0;
}
-void tpm_stop(void)
+void tpm_stop(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- tpm.state = TPM_NULL;
+ tpm->state = TPM_NULL;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
-void tpm_fini(void)
+void tpm_destroy(struct tpm * tpm)
{
- pthread_join(tpm.mgr, NULL);
+ pthread_join(tpm->mgr, NULL);
+
+ pthread_mutex_destroy(&tpm->lock);
+ pthread_cond_destroy(&tpm->cond);
- pthread_mutex_destroy(&tpm.lock);
- pthread_cond_destroy(&tpm.cond);
+ free(tpm);
}
-bool tpm_check(void)
+bool tpm_check(struct tpm * tpm)
{
bool ret;
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- ret = tpm_pthr_el(pthread_self())->kill;
+ ret = tpm_pthr_el(tpm, pthread_self())->kill;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
return ret;
}
-void tpm_inc(void)
+void tpm_inc(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- --tpm.wrk;
+ --tpm->wrk;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
-void tpm_dec(void)
+void tpm_dec(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- ++tpm.wrk;
+ ++tpm->wrk;
- pthread_cond_signal(&tpm.cond);
+ pthread_cond_signal(&tpm->cond);
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
-void tpm_exit(void)
+void tpm_exit(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- tpm_pthr_el(pthread_self())->join = true;
+ tpm_pthr_el(tpm, pthread_self())->join = true;
- pthread_cond_signal(&tpm.cond);
+ pthread_cond_signal(&tpm->cond);
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}