From ff5063ad0e7902ce59864a466bd9d8d606d788e4 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 24 Sep 2017 14:34:03 +0200 Subject: ipcpd: Add threadpool manager to DHT This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads. --- src/ipcpd/normal/dht.c | 339 +++++++++++++++++++++++++++++-------------------- 1 file changed, 203 insertions(+), 136 deletions(-) (limited to 'src/ipcpd/normal/dht.c') diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 548ae03a..93fd4e4e 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "connmgr.h" @@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ #define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ #define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ enum dht_state { DHT_INIT = 0, @@ -187,6 +189,12 @@ struct bucket { struct bucket * children[1L << KAD_BETA]; }; +struct cmd { + struct list_head next; + + struct shm_du_buff * sdb; +}; + struct dht { size_t alpha; size_t b; @@ -212,6 +220,7 @@ struct dht { struct bmp * cookies; enum dht_state state; + struct list_head cmds; pthread_cond_t cond; pthread_mutex_t mtx; @@ -219,6 +228,8 @@ struct dht { int fd; + struct tpm * tpm; + pthread_t worker; }; @@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht, ipcp_sdb_release(sdb); #endif fail_msg: + pthread_rwlock_wrlock(&dht->lock); bmp_release(dht->cookies, msg->cookie); + pthread_rwlock_unlock(&dht->lock); fail_bmp_alloc: return -1; } @@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht * dht, list_head_init(&l); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_wrlock(&dht->lock); len = dht_contact_list(dht, &l, key); if (len == 0) { @@ -1898,9 +1911,13 @@ static void * work(void * o) dht = (struct dht *) o; + pthread_rwlock_rdlock(&dht->lock); + intv = gcd(dht->t_expire, dht->t_repub); intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + pthread_rwlock_unlock(&dht->lock); + list_head_init(&reflist); while (true) { @@ -2189,7 +2206,7 @@ int dht_reg(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, >addr, t_expire); + kad_publish(dht, key, addr, t_expire); return 0; } @@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht * dht, static void * dht_handle_sdu(void * o) { - struct dht * dht; - struct shm_du_buff * sdb; - kad_msg_t * msg; - kad_contact_msg_t ** cmsgs; - kad_msg_t resp_msg = KAD_MSG__INIT; - uint64_t addr; - buffer_t buf; - size_t i; - size_t b; - size_t t_expire; + struct dht * dht = (struct dht *) o; + struct timespec dl; + struct timespec to = {(HANDLE_TIMEO / 1000), + (HANDLE_TIMEO % 1000) * MILLION}; + assert(dht); - assert(o); + while (true) { + kad_msg_t * msg; + kad_contact_msg_t ** cmsgs; + kad_msg_t resp_msg = KAD_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + size_t b; + size_t t_expire; + struct cmd * cmd; + int ret = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&dht->mtx); + + while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&dht->cond, + &dht->mtx, &dl); + + if (ret == -ETIMEDOUT) { + pthread_mutex_unlock(&dht->mtx); + if (tpm_check(dht->tpm)) + break; + continue; + } - memset(&buf, 0, sizeof(buf)); + cmd = list_last_entry(&dht->cmds, struct cmd, next); + list_del(&cmd->next); - dht = ((struct sdu_info *) o)->dht; - sdb = ((struct sdu_info *) o)->sdb; + pthread_mutex_unlock(&dht->mtx); - assert(dht); - assert(sdb); + i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); - msg = kad_msg__unpack(NULL, - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - shm_du_buff_head(sdb)); + msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); - ipcp_sdb_release(sdb); + ipcp_sdb_release(cmd->sdb); + free(cmd); - free(o); + if (msg == NULL) { + log_err("Failed to unpack message."); + continue; + } - if (msg == NULL) { - log_err("Failed to unpack message."); - return (void *) -1; - } + pthread_rwlock_rdlock(&dht->lock); - pthread_rwlock_rdlock(&dht->lock); + b = dht->b; + t_expire = dht->t_expire; - b = dht->b; - t_expire = dht->t_expire; + pthread_rwlock_unlock(&dht->lock); - pthread_rwlock_unlock(&dht->lock); + if (msg->has_key && msg->key.len != b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + continue; + } - if (msg->has_key && msg->key.len != b) { - kad_msg__free_unpacked(msg, NULL); - log_warn("Bad key in message."); - return (void *) -1; - } + if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", + msg->code); + continue; + } - if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - kad_msg__free_unpacked(msg, NULL); - log_warn("Bad source ID in message of type %d.", msg->code); - return (void *) -1; - } + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { + kad_msg__free_unpacked(msg, NULL); + log_dbg("Got a request message when not running."); + continue; + } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - kad_msg__free_unpacked(msg, NULL); - log_dbg("Got a request message when not running."); - return (void *) -1; - } + tpm_dec(dht->tpm); - addr = msg->s_addr; + addr = msg->s_addr; - resp_msg.code = KAD_RESPONSE; - resp_msg.cookie = msg->cookie; + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; - switch(msg->code) { - case KAD_JOIN: - /* Refuse enrollee on check fails. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { - log_warn("Parameter mismatch. " - "DHT enrolment refused."); - break; - } + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } - if (msg->t_replicate != KAD_T_REPL) { - log_warn("Replication time mismatch. " - "DHT enrolment refused."); + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); - break; + break; } - if (msg->t_refresh != KAD_T_REFR) { - log_warn("Refresh time mismatch. " - "DHT enrolment refused."); + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = b; + resp_msg.k = KAD_K; + resp_msg.t_expire = t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; break; - } + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } - resp_msg.has_alpha = true; - resp_msg.has_b = true; - resp_msg.has_k = true; - resp_msg.has_t_expire = true; - resp_msg.has_t_refresh = true; - resp_msg.has_t_replicate = true; - resp_msg.alpha = KAD_ALPHA; - resp_msg.b = b; - resp_msg.k = KAD_K; - resp_msg.t_expire = t_expire; - resp_msg.t_refresh = KAD_T_REFR; - resp_msg.t_replicate = KAD_T_REPL; - break; - case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); - if (buf.len != 0) { - resp_msg.n_addrs = buf.len; - resp_msg.addrs = (uint64_t *) buf.data; + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, + msg->t_expire); break; - } - /* FALLTHRU */ - case KAD_FIND_NODE: - /* Return k closest contacts. */ - resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); - resp_msg.contacts = cmsgs; - break; - case KAD_STORE: - if (msg->n_contacts < 1) { - log_warn("No contacts in store message."); + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); break; } - if (!msg->has_t_expire) { - log_warn("No expiry time in store message."); - break; + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); } - kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); - break; - case KAD_RESPONSE: - kad_handle_response(dht, msg); - break; - default: - assert(false); - break; - } + if (msg->code < KAD_STORE) { + if (send_msg(dht, &resp_msg, addr)) + log_warn("Failed to send response."); + } - if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_update_bucket(dht, msg->s_id.data, addr)) - log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); - } + kad_msg__free_unpacked(msg, NULL); - if (msg->code < KAD_STORE) { - if (send_msg(dht, &resp_msg, addr)) - log_warn("Failed to send response."); - } + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); - kad_msg__free_unpacked(msg, NULL); + if (resp_msg.n_contacts == 0) { + tpm_inc(dht->tpm); + continue; + } - if (resp_msg.n_addrs > 0) - free(resp_msg.addrs); + for (i = 0; i < resp_msg.n_contacts; ++i) + kad_contact_msg__free_unpacked(resp_msg.contacts[i], + NULL); + free(resp_msg.contacts); - if (resp_msg.n_contacts == 0) - return (void *) -1; + tpm_inc(dht->tpm); + } - for (i = 0; i < resp_msg.n_contacts; ++i) - kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); - free(resp_msg.contacts); + tpm_exit(dht->tpm); return (void *) 0; } @@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o) static void dht_post_sdu(void * comp, struct shm_du_buff * sdb) { - pthread_t thr; - struct sdu_info * info; + struct cmd * cmd; + struct dht * dht = (struct dht *) comp; - info = malloc(sizeof(*info)); - if (info == NULL) + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); return; + } - info->dht = (struct dht *) comp; - info->sdb = sdb; + cmd->sdb = sdb; - if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { - free(info); - return; - } + pthread_mutex_lock(&dht->mtx); + + list_add(&cmd->next, &dht->cmds); - pthread_detach(thr); + pthread_cond_signal(&dht->cond); + + pthread_mutex_unlock(&dht->mtx); } void dht_destroy(struct dht * dht) @@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht) if (dht == NULL) return; +#ifndef __DHT_TEST__ + tpm_stop(dht->tpm); + + tpm_destroy(dht->tpm); +#endif if (dht_get_state(dht) == DHT_RUNNING) { dht_set_state(dht, DHT_SHUTDOWN); pthread_cancel(dht->worker); @@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr) list_head_init(&dht->requests); list_head_init(&dht->refs); list_head_init(&dht->lookups); + list_head_init(&dht->cmds); if (pthread_rwlock_init(&dht->lock, NULL)) goto fail_rwlock; @@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ + dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + if (dht->tpm == NULL) + goto fail_tpm_create; + + if (tpm_start(dht->tpm)) + goto fail_tpm_start; + dht->fd = dt_reg_ae(dht, &dht_post_sdu); notifier_reg(handle_event, dht); #else (void) handle_event; + (void) dht_handle_sdu; (void) dht_post_sdu; #endif dht->state = DHT_INIT; return dht; - +#ifndef __DHT_TEST__ + fail_tpm_start: + tpm_destroy(dht->tpm); + fail_tpm_create: + bmp_destroy(dht->cookies); +#endif fail_bmp: pthread_cond_destroy(&dht->cond); fail_cond: -- cgit v1.2.3