diff options
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/normal/dht.c | 115 |
1 files changed, 85 insertions, 30 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index be5411da..548ae03a 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -304,17 +304,18 @@ static uint8_t * create_id(size_t len) return id; } -static struct kad_req * kad_req_create(struct dht * dht, - kad_msg_t * msg, - uint64_t addr) +static void kad_req_create(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) { struct kad_req * req; pthread_condattr_t cattr; struct timespec t; + size_t b; req = malloc(sizeof(*req)); if (req == NULL) - return NULL; + return; list_head_init(&req->next); @@ -327,18 +328,22 @@ static struct kad_req * kad_req_create(struct dht * dht, req->code = msg->code; req->key = NULL; + pthread_rwlock_rdlock(&dht->lock); + b = dht->b; + pthread_rwlock_unlock(&dht->lock); + if (msg->has_key) { - req->key = dht_dup_key(msg->key.data, dht->b); + req->key = dht_dup_key(msg->key.data, b); if (req->key == NULL) { free(req); - return NULL; + return; } } if (pthread_mutex_init(&req->lock, NULL)) { free(req->key); free(req); - return NULL; + return; } pthread_condattr_init(&cattr); @@ -351,12 +356,16 @@ static struct kad_req * kad_req_create(struct dht * dht, pthread_mutex_destroy(&req->lock); free(req->key); free(req); - return NULL; + return; } pthread_condattr_destroy(&cattr); - return req; + pthread_rwlock_wrlock(&dht->lock); + + list_add(&req->next, &dht->requests); + + pthread_rwlock_unlock(&dht->lock); } static void kad_req_destroy(struct kad_req * req) @@ -1357,7 +1366,6 @@ static int send_msg(struct dht * dht, uint64_t addr) { struct shm_du_buff * sdb; - struct kad_req * req; size_t len; int retr = 0; @@ -1376,10 +1384,14 @@ static int send_msg(struct dht * dht, if (msg->code < KAD_STORE) { msg->cookie = bmp_allocate(dht->cookies); - if (!bmp_is_id_valid(dht->cookies, msg->cookie)) + if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { + pthread_rwlock_unlock(&dht->lock); goto fail_bmp_alloc; + } } + pthread_rwlock_unlock(&dht->lock); + len = kad_msg__get_packed_size(msg); if (len == 0) goto fail_msg; @@ -1406,13 +1418,8 @@ static int send_msg(struct dht * dht, ipcp_sdb_release(sdb); #endif /* __DHT_TEST__ */ - if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) { - req = kad_req_create(dht, msg, addr); - if (req != NULL) - list_add(&req->next, &dht->requests); - } - - pthread_rwlock_unlock(&dht->lock); + if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) + kad_req_create(dht, msg, addr); return 0; @@ -1423,7 +1430,6 @@ static int send_msg(struct dht * dht, fail_msg: bmp_release(dht->cookies, msg->cookie); fail_bmp_alloc: - pthread_rwlock_unlock(&dht->lock); return -1; } @@ -1513,9 +1519,14 @@ static int kad_store(struct dht * dht, kad_contact_msg_t * cmsgp[1]; cmsg.id.data = (uint8_t *) key; - cmsg.id.len = dht->b; cmsg.addr = addr; + pthread_rwlock_rdlock(&dht->lock); + + cmsg.id.len = dht->b; + + pthread_rwlock_unlock(&dht->lock); + cmsgp[0] = &cmsg; msg.code = KAD_STORE; @@ -1632,15 +1643,32 @@ static void kad_publish(struct dht * dht, time_t exp) { struct lookup * lu; - uint64_t addrs[KAD_K]; + uint64_t * addrs; ssize_t n; + size_t k; + time_t t_expire; + assert(dht); assert(key); + pthread_rwlock_rdlock(&dht->lock); + + k = dht->k; + t_expire = dht->t_expire; + + pthread_rwlock_unlock(&dht->lock); + + + addrs = malloc(k * sizeof(*addrs)); + if (addrs == NULL) + return; + lu = kad_lookup(dht, key, KAD_FIND_NODE); - if (lu == NULL) + if (lu == NULL) { + free(addrs); return; + } n = lookup_contact_addrs(lu, addrs); @@ -1652,12 +1680,14 @@ static void kad_publish(struct dht * dht, msg.addr = addr; kad_add(dht, &msg, 1, exp); } else { - if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) + if (kad_store(dht, key, addr, addrs[n], t_expire)) log_warn("Failed to send store message."); } } lookup_destroy(lu); + + free(addrs); } static int kad_join(struct dht * dht, @@ -1879,13 +1909,24 @@ static void * work(void * o) pthread_rwlock_wrlock(&dht->lock); /* Republish registered hashes. */ - list_for_each_safe(p, h, &dht->refs) { + list_for_each(p, &dht->refs) { struct ref_entry * e; + uint8_t * key; + uint64_t addr; + time_t t_expire; e = list_entry(p, struct ref_entry, next); if (now.tv_sec > e->t_rep) { - kad_publish(dht, e->key, dht->addr, - dht->t_expire); + key = dht_dup_key(e->key, dht->b); + if (key == NULL) + continue; + addr = dht->addr; + t_expire = dht->t_expire; e->t_rep = now.tv_sec + dht->t_repub; + + pthread_rwlock_unlock(&dht->lock); + kad_publish(dht, key, addr, t_expire); + pthread_rwlock_wrlock(&dht->lock); + free(key); } } @@ -1894,19 +1935,28 @@ static void * work(void * o) struct list_head * p1; struct list_head * h1; struct dht_entry * e; + uint8_t * key; + time_t t_expire; e = list_entry (p, struct dht_entry, next); list_for_each_safe(p1, h1, &e->vals) { struct val * v; + uint64_t addr; v = list_entry(p1, struct val, next); if (now.tv_sec > v->t_exp) { list_del(&v->next); val_destroy(v); + continue; } if (now.tv_sec > v->t_rep) { - kad_publish(dht, e->key, v->addr, - dht->t_expire - now.tv_sec); + key = dht_dup_key(e->key, dht->b); + addr = v->addr; + t_expire = dht->t_expire = now.tv_sec; v->t_rep = now.tv_sec + dht->t_replic; + pthread_rwlock_unlock(&dht->lock); + kad_publish(dht, key, addr, t_expire); + pthread_rwlock_wrlock(&dht->lock); + free(key); } } } @@ -1964,7 +2014,7 @@ static int kad_handle_join_resp(struct dht * dht, return -1; } - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_wrlock(&dht->lock); dht->buckets = bucket_create(); if (dht->buckets == NULL) { @@ -2116,6 +2166,8 @@ int dht_reg(struct dht * dht, const uint8_t * key) { struct ref_entry * e; + uint64_t addr; + time_t t_expire; assert(dht); assert(key); @@ -2132,9 +2184,12 @@ int dht_reg(struct dht * dht, list_add(&e->next, &dht->refs); + t_expire = dht->t_expire; + addr = dht->addr; + pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, dht->addr, dht->t_expire); + kad_publish(dht, key, >addr, t_expire); return 0; } |