From 1e1de177e7da8075b016b703168f3473f58e0568 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 20 Sep 2017 21:28:56 +0200 Subject: ipcpd: Handle DHT SDUs in different thread The DHT will now spawn a thread when receiving SDUs to avoid starvation of sdu scheduler threads. Also fixes some locking issues. --- src/ipcpd/normal/dht.c | 73 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index e7ba4bf3..996b122a 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -227,6 +227,11 @@ struct join_info { uint64_t addr; }; +struct sdu_info { + struct dht * dht; + struct shm_du_buff * sdb; +}; + static uint8_t * dht_dup_key(const uint8_t * key, size_t len) { @@ -1665,11 +1670,16 @@ static int kad_join(struct dht * dht, msg.has_t_refresh = true; msg.has_t_replicate = true; msg.alpha = KAD_ALPHA; - msg.b = dht->b; msg.k = KAD_K; msg.t_refresh = KAD_T_REFR; msg.t_replicate = KAD_T_REPL; + pthread_rwlock_rdlock(&dht->lock); + + msg.b = dht->b; + + pthread_rwlock_unlock(&dht->lock); + if (send_msg(dht, &msg, addr)) return -1; @@ -1798,12 +1808,16 @@ static ssize_t dht_get_contacts(struct dht * dht, pthread_rwlock_rdlock(&dht->lock); len = dht_contact_list(dht, &l, key); - if (len == 0) + if (len == 0) { + pthread_rwlock_unlock(&dht->lock); return 0; + } *msgs = malloc(len * sizeof(**msgs)); - if (*msgs == NULL) + if (*msgs == NULL) { + pthread_rwlock_unlock(&dht->lock); return 0; + } list_for_each_safe(p, h, &l) { struct contact * c = list_entry(p, struct contact, next); @@ -1976,8 +1990,6 @@ static int kad_handle_join_resp(struct dht * dht, return -1; } - dht->state = DHT_RUNNING; - kad_req_respond(req); dht_update_bucket(dht, msg->s_id.data, msg->s_addr); @@ -2201,10 +2213,10 @@ uint64_t dht_query(struct dht * dht, return 0; } -void dht_post_sdu(void * ae, - struct shm_du_buff * sdb) +static void * dht_handle_sdu(void * o) { struct dht * dht; + struct shm_du_buff * sdb; kad_msg_t * msg; kad_contact_msg_t ** cmsgs; kad_msg_t resp_msg = KAD_MSG__INIT; @@ -2212,12 +2224,15 @@ void dht_post_sdu(void * ae, buffer_t buf; size_t i; - assert(ae); - assert(sdb); + assert(o); memset(&buf, 0, sizeof(buf)); - dht = (struct dht *) ae; + dht = ((struct sdu_info *) o)->dht; + sdb = ((struct sdu_info *) o)->sdb; + + assert(dht); + assert(sdb); msg = kad_msg__unpack(NULL, shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), @@ -2225,27 +2240,29 @@ void dht_post_sdu(void * ae, ipcp_sdb_release(sdb); + free((struct sdu_info *) o); + if (msg == NULL) { log_err("Failed to unpack message."); - return; + return (void *) -1; } if (msg->has_key && msg->key.len != dht->b) { kad_msg__free_unpacked(msg, NULL); log_warn("Bad key in message."); - return; + return (void *) -1; } if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) { kad_msg__free_unpacked(msg, NULL); log_warn("Bad source ID in message of type %d.", msg->code); - return; + return (void *) -1; } if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { kad_msg__free_unpacked(msg, NULL); log_dbg("Got a request message when not running."); - return; + return (void *) -1; } addr = msg->s_addr; @@ -2341,11 +2358,34 @@ void dht_post_sdu(void * ae, free(resp_msg.addrs); if (resp_msg.n_contacts == 0) - return; + return (void *) -1; for (i = 0; i < resp_msg.n_contacts; ++i) kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); free(resp_msg.contacts); + + return (void *) 0; +} + +static void dht_post_sdu(void * comp, + struct shm_du_buff * sdb) +{ + pthread_t thr; + struct sdu_info * info; + + info = malloc(sizeof(*info)); + if (info == NULL) + return; + + info->dht = (struct dht *) comp; + info->sdb = sdb; + + if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { + free(info); + return; + } + + pthread_detach(thr); } void dht_destroy(struct dht * dht) @@ -2421,6 +2461,8 @@ static void * join_thr(void * o) sleep(KAD_JOIN_INTV); } + dht_set_state(info->dht, DHT_RUNNING); + lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); if (lu != NULL) lookup_destroy(lu); @@ -2509,6 +2551,7 @@ struct dht * dht_create(uint64_t addr) notifier_reg(handle_event, dht); #else (void) handle_event; + (void) dht_post_sdu; #endif dht->state = DHT_INIT; -- cgit v1.2.3