From ff5063ad0e7902ce59864a466bd9d8d606d788e4 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 24 Sep 2017 14:34:03 +0200 Subject: ipcpd: Add threadpool manager to DHT This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads. --- src/ipcpd/ipcp.c | 99 ++++++++------- src/ipcpd/ipcp.h | 9 +- src/ipcpd/normal/dht.c | 339 +++++++++++++++++++++++++++++-------------------- 3 files changed, 262 insertions(+), 185 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 85d543da..513c638a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -36,7 +36,6 @@ #include #include #include -#include #include "ipcp.h" @@ -44,6 +43,14 @@ #include #include +struct cmd { + struct list_head next; + + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t len; + int fd; +}; + void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -107,7 +114,8 @@ static void * acceptloop(void * o) while (ipcp_get_state() != IPCP_SHUTDOWN && ipcp_get_state() != IPCP_NULL) { - ssize_t count; + struct cmd * cmd; + #if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(ipcpi.sockfd, &fds); @@ -122,25 +130,28 @@ static void * acceptloop(void * o) (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - pthread_mutex_lock(&ipcpi.cmd_lock); + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Out of memory"); + break; + } - assert(ipcpi.csockfd == -1); + pthread_mutex_lock(&ipcpi.cmd_lock); - count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { + cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE); + if (cmd->len <= 0) { pthread_mutex_unlock(&ipcpi.cmd_lock); log_err("Failed to read from socket."); close(csockfd); + free(cmd); continue; } - ipcpi.cmd_len = count; - ipcpi.csockfd = csockfd; + cmd->fd = csockfd; - pthread_cond_signal(&ipcpi.cmd_cond); + list_add(&cmd->next, &ipcpi.cmds); - while (ipcpi.csockfd != -1) - pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock); + pthread_cond_signal(&ipcpi.cmd_cond); pthread_mutex_unlock(&ipcpi.cmd_lock); } @@ -159,13 +170,15 @@ static void * mainloop(void * o) struct timespec dl; struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; - (void) o; + + (void) o; while (true) { int ret = 0; ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; int fd = -1; + struct cmd * cmd; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -174,33 +187,34 @@ static void * mainloop(void * o) pthread_mutex_lock(&ipcpi.cmd_lock); - while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, &ipcpi.cmd_lock, &dl); - sfd = ipcpi.csockfd; - ipcpi.csockfd = -1; - - if (sfd == -1) { + if (ret == -ETIMEDOUT) { pthread_mutex_unlock(&ipcpi.cmd_lock); - if (tpm_check()) + if (tpm_check(ipcpi.tpm)) break; continue; } - pthread_cond_signal(&ipcpi.acc_cond); + cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + + msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); + sfd = cmd->fd; + + free(cmd); - msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - pthread_mutex_unlock(&ipcpi.cmd_lock); close(sfd); continue; } - pthread_mutex_unlock(&ipcpi.cmd_lock); - - tpm_dec(); + tpm_dec(ipcpi.tpm); switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -474,7 +488,7 @@ static void * mainloop(void * o) if (buffer.len == 0) { log_err("Failed to pack reply message"); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -482,7 +496,7 @@ static void * mainloop(void * o) if (buffer.data == NULL) { log_err("Failed to create reply buffer."); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -492,17 +506,17 @@ static void * mainloop(void * o) log_err("Failed to send reply message"); free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); } - tpm_exit(); + tpm_exit(ipcpi.tpm); return (void *) 0; } @@ -617,20 +631,14 @@ int ipcp_init(int argc, goto fail_cmd_cond; } - if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) { - log_err("Failed to init convar."); - goto fail_acc_cond; - } + list_head_init(&ipcpi.cmds); ipcpi.alloc_id = -1; - ipcpi.csockfd = -1; pthread_condattr_destroy(&cattr); return 0; - fail_acc_cond: - pthread_cond_destroy(&ipcpi.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&ipcpi.cmd_lock); fail_cmd_lock: @@ -675,12 +683,14 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - goto fail_tpm_init; + ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, + mainloop, NULL); + if (ipcpi.tpm == NULL) + goto fail_tpm_create; pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_start()) + if (tpm_start(ipcpi.tpm)) goto fail_tpm_start; ipcp_set_state(IPCP_INIT); @@ -696,18 +706,18 @@ int ipcp_boot() return 0; fail_acceptor: - tpm_stop(); + tpm_stop(ipcpi.tpm); fail_tpm_start: - tpm_fini(); - fail_tpm_init: + tpm_destroy(ipcpi.tpm); + fail_tpm_create: return -1; } void ipcp_shutdown() { pthread_join(ipcpi.acceptor, NULL); - tpm_stop(); - tpm_fini(); + tpm_stop(ipcpi.tpm); + tpm_destroy(ipcpi.tpm); log_info("IPCP %d shutting down.", getpid()); } @@ -724,7 +734,6 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); - pthread_cond_destroy(&ipcpi.acc_cond); pthread_cond_destroy(&ipcpi.cmd_cond); pthread_mutex_destroy(&ipcpi.cmd_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1b2a0334..d47d224b 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -25,8 +25,10 @@ #include #include +#include #include #include +#include #include #include @@ -92,10 +94,7 @@ struct ipcp { int sockfd; char * sock_path; - uint8_t cbuf[IPCP_MSG_BUF_SIZE]; - size_t cmd_len; - int csockfd; - pthread_cond_t acc_cond; + struct list_head cmds; pthread_cond_t cmd_cond; pthread_mutex_t cmd_lock; @@ -103,6 +102,8 @@ struct ipcp { pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; + struct tpm * tpm; + pthread_t acceptor; } ipcpi; diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 548ae03a..93fd4e4e 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "connmgr.h" @@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ #define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ #define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ enum dht_state { DHT_INIT = 0, @@ -187,6 +189,12 @@ struct bucket { struct bucket * children[1L << KAD_BETA]; }; +struct cmd { + struct list_head next; + + struct shm_du_buff * sdb; +}; + struct dht { size_t alpha; size_t b; @@ -212,6 +220,7 @@ struct dht { struct bmp * cookies; enum dht_state state; + struct list_head cmds; pthread_cond_t cond; pthread_mutex_t mtx; @@ -219,6 +228,8 @@ struct dht { int fd; + struct tpm * tpm; + pthread_t worker; }; @@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht, ipcp_sdb_release(sdb); #endif fail_msg: + pthread_rwlock_wrlock(&dht->lock); bmp_release(dht->cookies, msg->cookie); + pthread_rwlock_unlock(&dht->lock); fail_bmp_alloc: return -1; } @@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht * dht, list_head_init(&l); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_wrlock(&dht->lock); len = dht_contact_list(dht, &l, key); if (len == 0) { @@ -1898,9 +1911,13 @@ static void * work(void * o) dht = (struct dht *) o; + pthread_rwlock_rdlock(&dht->lock); + intv = gcd(dht->t_expire, dht->t_repub); intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + pthread_rwlock_unlock(&dht->lock); + list_head_init(&reflist); while (true) { @@ -2189,7 +2206,7 @@ int dht_reg(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, >addr, t_expire); + kad_publish(dht, key, addr, t_expire); return 0; } @@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht * dht, static void * dht_handle_sdu(void * o) { - struct dht * dht; - struct shm_du_buff * sdb; - kad_msg_t * msg; - kad_contact_msg_t ** cmsgs; - kad_msg_t resp_msg = KAD_MSG__INIT; - uint64_t addr; - buffer_t buf; - size_t i; - size_t b; - size_t t_expire; + struct dht * dht = (struct dht *) o; + struct timespec dl; + struct timespec to = {(HANDLE_TIMEO / 1000), + (HANDLE_TIMEO % 1000) * MILLION}; + assert(dht); - assert(o); + while (true) { + kad_msg_t * msg; + kad_contact_msg_t ** cmsgs; + kad_msg_t resp_msg = KAD_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + size_t b; + size_t t_expire; + struct cmd * cmd; + int ret = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&dht->mtx); + + while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&dht->cond, + &dht->mtx, &dl); + + if (ret == -ETIMEDOUT) { + pthread_mutex_unlock(&dht->mtx); + if (tpm_check(dht->tpm)) + break; + continue; + } - memset(&buf, 0, sizeof(buf)); + cmd = list_last_entry(&dht->cmds, struct cmd, next); + list_del(&cmd->next); - dht = ((struct sdu_info *) o)->dht; - sdb = ((struct sdu_info *) o)->sdb; + pthread_mutex_unlock(&dht->mtx); - assert(dht); - assert(sdb); + i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); - msg = kad_msg__unpack(NULL, - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - shm_du_buff_head(sdb)); + msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); - ipcp_sdb_release(sdb); + ipcp_sdb_release(cmd->sdb); + free(cmd); - free(o); + if (msg == NULL) { + log_err("Failed to unpack message."); + continue; + } - if (msg == NULL) { - log_err("Failed to unpack message."); - return (void *) -1; - } + pthread_rwlock_rdlock(&dht->lock); - pthread_rwlock_rdlock(&dht->lock); + b = dht->b; + t_expire = dht->t_expire; - b = dht->b; - t_expire = dht->t_expire; + pthread_rwlock_unlock(&dht->lock); - pthread_rwlock_unlock(&dht->lock); + if (msg->has_key && msg->key.len != b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + continue; + } - if (msg->has_key && msg->key.len != b) { - kad_msg__free_unpacked(msg, NULL); - log_warn("Bad key in message."); - return (void *) -1; - } + if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", + msg->code); + continue; + } - if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - kad_msg__free_unpacked(msg, NULL); - log_warn("Bad source ID in message of type %d.", msg->code); - return (void *) -1; - } + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { + kad_msg__free_unpacked(msg, NULL); + log_dbg("Got a request message when not running."); + continue; + } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - kad_msg__free_unpacked(msg, NULL); - log_dbg("Got a request message when not running."); - return (void *) -1; - } + tpm_dec(dht->tpm); - addr = msg->s_addr; + addr = msg->s_addr; - resp_msg.code = KAD_RESPONSE; - resp_msg.cookie = msg->cookie; + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; - switch(msg->code) { - case KAD_JOIN: - /* Refuse enrollee on check fails. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { - log_warn("Parameter mismatch. " - "DHT enrolment refused."); - break; - } + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } - if (msg->t_replicate != KAD_T_REPL) { - log_warn("Replication time mismatch. " - "DHT enrolment refused."); + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); - break; + break; } - if (msg->t_refresh != KAD_T_REFR) { - log_warn("Refresh time mismatch. " - "DHT enrolment refused."); + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = b; + resp_msg.k = KAD_K; + resp_msg.t_expire = t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; break; - } + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } - resp_msg.has_alpha = true; - resp_msg.has_b = true; - resp_msg.has_k = true; - resp_msg.has_t_expire = true; - resp_msg.has_t_refresh = true; - resp_msg.has_t_replicate = true; - resp_msg.alpha = KAD_ALPHA; - resp_msg.b = b; - resp_msg.k = KAD_K; - resp_msg.t_expire = t_expire; - resp_msg.t_refresh = KAD_T_REFR; - resp_msg.t_replicate = KAD_T_REPL; - break; - case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); - if (buf.len != 0) { - resp_msg.n_addrs = buf.len; - resp_msg.addrs = (uint64_t *) buf.data; + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, + msg->t_expire); break; - } - /* FALLTHRU */ - case KAD_FIND_NODE: - /* Return k closest contacts. */ - resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); - resp_msg.contacts = cmsgs; - break; - case KAD_STORE: - if (msg->n_contacts < 1) { - log_warn("No contacts in store message."); + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); break; } - if (!msg->has_t_expire) { - log_warn("No expiry time in store message."); - break; + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); } - kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); - break; - case KAD_RESPONSE: - kad_handle_response(dht, msg); - break; - default: - assert(false); - break; - } + if (msg->code < KAD_STORE) { + if (send_msg(dht, &resp_msg, addr)) + log_warn("Failed to send response."); + } - if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_update_bucket(dht, msg->s_id.data, addr)) - log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); - } + kad_msg__free_unpacked(msg, NULL); - if (msg->code < KAD_STORE) { - if (send_msg(dht, &resp_msg, addr)) - log_warn("Failed to send response."); - } + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); - kad_msg__free_unpacked(msg, NULL); + if (resp_msg.n_contacts == 0) { + tpm_inc(dht->tpm); + continue; + } - if (resp_msg.n_addrs > 0) - free(resp_msg.addrs); + for (i = 0; i < resp_msg.n_contacts; ++i) + kad_contact_msg__free_unpacked(resp_msg.contacts[i], + NULL); + free(resp_msg.contacts); - if (resp_msg.n_contacts == 0) - return (void *) -1; + tpm_inc(dht->tpm); + } - for (i = 0; i < resp_msg.n_contacts; ++i) - kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); - free(resp_msg.contacts); + tpm_exit(dht->tpm); return (void *) 0; } @@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o) static void dht_post_sdu(void * comp, struct shm_du_buff * sdb) { - pthread_t thr; - struct sdu_info * info; + struct cmd * cmd; + struct dht * dht = (struct dht *) comp; - info = malloc(sizeof(*info)); - if (info == NULL) + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); return; + } - info->dht = (struct dht *) comp; - info->sdb = sdb; + cmd->sdb = sdb; - if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { - free(info); - return; - } + pthread_mutex_lock(&dht->mtx); + + list_add(&cmd->next, &dht->cmds); - pthread_detach(thr); + pthread_cond_signal(&dht->cond); + + pthread_mutex_unlock(&dht->mtx); } void dht_destroy(struct dht * dht) @@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht) if (dht == NULL) return; +#ifndef __DHT_TEST__ + tpm_stop(dht->tpm); + + tpm_destroy(dht->tpm); +#endif if (dht_get_state(dht) == DHT_RUNNING) { dht_set_state(dht, DHT_SHUTDOWN); pthread_cancel(dht->worker); @@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr) list_head_init(&dht->requests); list_head_init(&dht->refs); list_head_init(&dht->lookups); + list_head_init(&dht->cmds); if (pthread_rwlock_init(&dht->lock, NULL)) goto fail_rwlock; @@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ + dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + if (dht->tpm == NULL) + goto fail_tpm_create; + + if (tpm_start(dht->tpm)) + goto fail_tpm_start; + dht->fd = dt_reg_ae(dht, &dht_post_sdu); notifier_reg(handle_event, dht); #else (void) handle_event; + (void) dht_handle_sdu; (void) dht_post_sdu; #endif dht->state = DHT_INIT; return dht; - +#ifndef __DHT_TEST__ + fail_tpm_start: + tpm_destroy(dht->tpm); + fail_tpm_create: + bmp_destroy(dht->cookies); +#endif fail_bmp: pthread_cond_destroy(&dht->cond); fail_cond: -- cgit v1.2.3