diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2021-12-04 18:26:58 +0100 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2021-12-06 17:52:16 +0100 |
commit | 9422e6be94ac1007e8115a920379fd545055e531 (patch) | |
tree | 31075ad5ee851ef4625e3cafbd821e591e817997 /src/ipcpd/unicast/dht.c | |
parent | 11d2ecc140486949c8d81e984137263ca48d5799 (diff) | |
download | ouroboros-9422e6be94ac1007e8115a920379fd545055e531.tar.gz ouroboros-9422e6be94ac1007e8115a920379fd545055e531.zip |
ipcpd: Move DHT to stack
This makes the DHT a single directory implementation and moves it to
the stack (init/fini instead of create/destroy). This is a step
towards making it a directory policy, in line with our other policy
implementations.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/unicast/dht.c')
-rw-r--r-- | src/ipcpd/unicast/dht.c | 819 |
1 files changed, 376 insertions, 443 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c index 2b668f9f..f7cb89f2 100644 --- a/src/ipcpd/unicast/dht.c +++ b/src/ipcpd/unicast/dht.c @@ -47,6 +47,7 @@ #include "common/connmgr.h" #include "dht.h" #include "dt.h" +#include "ipcp.h" #include <stdlib.h> #include <string.h> @@ -208,7 +209,7 @@ struct cmd { struct shm_du_buff * sdb; }; -struct dht { +struct { size_t alpha; size_t b; size_t k; @@ -244,15 +245,13 @@ struct dht { struct tpm * tpm; pthread_t worker; -}; +} dht; struct join_info { - struct dht * dht; uint64_t addr; }; struct packet_info { - struct dht * dht; struct shm_du_buff * sdb; }; @@ -270,50 +269,49 @@ static uint8_t * dht_dup_key(const uint8_t * key, return dup; } -static enum dht_state dht_get_state(struct dht * dht) +static enum dht_state dht_get_state(void) { enum dht_state state; - pthread_mutex_lock(&dht->mtx); + pthread_mutex_lock(&dht.mtx); - state = dht->state; + state = dht.state; - pthread_mutex_unlock(&dht->mtx); + pthread_mutex_unlock(&dht.mtx); return state; } -static int dht_set_state(struct dht * dht, - enum dht_state state) +static int dht_set_state(enum dht_state state) { - pthread_mutex_lock(&dht->mtx); + pthread_mutex_lock(&dht.mtx); - if (state == DHT_JOINING && dht->state != DHT_INIT) { - pthread_mutex_unlock(&dht->mtx); + if (state == DHT_JOINING && dht.state != DHT_INIT) { + pthread_mutex_unlock(&dht.mtx); return -1; } - dht->state = state; + dht.state = state; - pthread_cond_broadcast(&dht->cond); + pthread_cond_broadcast(&dht.cond); - pthread_mutex_unlock(&dht->mtx); + pthread_mutex_unlock(&dht.mtx); return 0; } -int dht_wait_running(struct dht * dht) +int dht_wait_running() { int ret = 0; - pthread_mutex_lock(&dht->mtx); + pthread_mutex_lock(&dht.mtx); - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); - while (dht->state == DHT_JOINING) - pthread_cond_wait(&dht->cond, &dht->mtx); + while (dht.state == DHT_JOINING) + pthread_cond_wait(&dht.cond, &dht.mtx); - if (dht->state != DHT_RUNNING) + if (dht.state != DHT_RUNNING) ret = -1; pthread_cleanup_pop(true); @@ -337,8 +335,7 @@ static uint8_t * create_id(size_t len) return id; } -static void kad_req_create(struct dht * dht, - kad_msg_t * msg, +static void kad_req_create(kad_msg_t * msg, uint64_t addr) { struct kad_req * req; @@ -361,9 +358,9 @@ static void kad_req_create(struct dht * dht, req->code = msg->code; req->key = NULL; - pthread_rwlock_rdlock(&dht->lock); - b = dht->b; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); + b = dht.b; + pthread_rwlock_unlock(&dht.lock); if (msg->has_key) { req->key = dht_dup_key(msg->key.data, b); @@ -394,11 +391,11 @@ static void kad_req_create(struct dht * dht, pthread_condattr_destroy(&cattr); - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - list_add(&req->next, &dht->requests); + list_add(&req->next, &dht.requests); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); } static void cancel_req_destroy(void * o) @@ -556,12 +553,11 @@ static struct bucket * iter_bucket(struct bucket * b, return iter_bucket(b->children[(byte & mask)], id); } -static struct bucket * dht_get_bucket(struct dht * dht, - const uint8_t * id) +static struct bucket * dht_get_bucket(const uint8_t * id) { - assert(dht->buckets); + assert(dht.buckets); - return iter_bucket(dht->buckets, id); + return iter_bucket(dht.buckets, id); } /* @@ -596,8 +592,7 @@ static size_t list_add_sorted(struct list_head * l, return 1; } -static size_t dht_contact_list(struct dht * dht, - struct list_head * l, +static size_t dht_contact_list(struct list_head * l, const uint8_t * key) { struct list_head * p; @@ -607,55 +602,52 @@ static size_t dht_contact_list(struct dht * dht, struct timespec t; assert(l); - assert(dht); assert(key); assert(list_is_empty(l)); clock_gettime(CLOCK_REALTIME_COARSE, &t); - b = dht_get_bucket(dht, key); + b = dht_get_bucket(key); if (b == NULL) return 0; b->t_refr = t.tv_sec + KAD_T_REFR; - if (b->n_contacts == dht->k || b->parent == NULL) { + if (b->n_contacts == dht.k || b->parent == NULL) { list_for_each(p, &b->contacts) { struct contact * c; c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); + c = contact_create(c->id, dht.b, c->addr); if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) + if (++len == dht.k) break; } } else { struct bucket * d = b->parent; - for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { + for (i = 0; i < (1L << KAD_BETA) && len < dht.k; ++i) { list_for_each(p, &d->children[i]->contacts) { struct contact * c; c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); + c = contact_create(c->id, dht.b, c->addr); if (c == NULL) continue; if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) + if (++len == dht.k) break; } } } - assert(len == dht->k || b->parent == NULL); + assert(len == dht.k || b->parent == NULL); return len; } -static struct lookup * lookup_create(struct dht * dht, - const uint8_t * id) +static struct lookup * lookup_create(const uint8_t * id) { struct lookup * lu; pthread_condattr_t cattr; - assert(dht); assert(id); lu = malloc(sizeof(*lu)); @@ -668,7 +660,7 @@ static struct lookup * lookup_create(struct dht * dht, lu->state = LU_INIT; lu->addrs = NULL; lu->n_addrs = 0; - lu->key = dht_dup_key(id, dht->b); + lu->key = dht_dup_key(id, dht.b); if (lu->key == NULL) goto fail_id; @@ -685,13 +677,13 @@ static struct lookup * lookup_create(struct dht * dht, pthread_condattr_destroy(&cattr); - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - list_add(&lu->next, &dht->lookups); + list_add(&lu->next, &dht.lookups); - lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + lu->n_contacts = dht_contact_list(&lu->contacts, id); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return lu; @@ -770,8 +762,7 @@ static void lookup_destroy(struct lookup * lu) pthread_cleanup_pop(true); } -static void lookup_update(struct dht * dht, - struct lookup * lu, +static void lookup_update(struct lookup * lu, kad_msg_t * msg) { struct list_head * p = NULL; @@ -784,7 +775,7 @@ static void lookup_update(struct dht * dht, assert(lu); assert(msg); - if (dht_get_state(dht) != DHT_RUNNING) + if (dht_get_state() != DHT_RUNNING) return; pthread_mutex_lock(&lu->lock); @@ -820,16 +811,16 @@ static void lookup_update(struct dht * dht, pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); while (lu->state == LU_INIT) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); pthread_cond_wait(&lu->cond, &lu->lock); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); } pthread_cleanup_pop(false); for (n = 0; n < msg->n_contacts; ++n) { c = contact_create(msg->contacts[n]->id.data, - dht->b, msg->contacts[n]->addr); + dht.b, msg->contacts[n]->addr); if (c == NULL) continue; @@ -838,7 +829,7 @@ static void lookup_update(struct dht * dht, list_for_each(p, &lu->contacts) { struct contact * e; e = list_entry(p, struct contact, next); - if (!memcmp(e->id, c->id, dht->b)) { + if (!memcmp(e->id, c->id, dht.b)) { contact_destroy(c); c = NULL; break; @@ -853,11 +844,11 @@ static void lookup_update(struct dht * dht, if (c == NULL) continue; - if (lu->n_contacts < dht->k) { + if (lu->n_contacts < dht.k) { list_add_tail(&c->next, p); ++lu->n_contacts; mod = true; - } else if (pos == dht->k) { + } else if (pos == dht.k) { contact_destroy(c); } else { struct contact * d; @@ -1005,15 +996,12 @@ static enum lookup_state lookup_wait(struct lookup * lu) return state; } -static struct kad_req * dht_find_request(struct dht * dht, - kad_msg_t * msg) +static struct kad_req * dht_find_request(kad_msg_t * msg) { struct list_head * p; - - assert(dht); assert(msg); - list_for_each(p, &dht->requests) { + list_for_each(p, &dht.requests) { struct kad_req * r = list_entry(p, struct kad_req, next); if (r->cookie == msg->cookie) return r; @@ -1022,17 +1010,15 @@ static struct kad_req * dht_find_request(struct dht * dht, return NULL; } -static struct lookup * dht_find_lookup(struct dht * dht, - uint32_t cookie) +static struct lookup * dht_find_lookup(uint32_t cookie) { struct list_head * p; struct list_head * p2; struct list_head * h2; - assert(dht); assert(cookie > 0); - list_for_each(p, &dht->lookups) { + list_for_each(p, &dht.lookups) { struct lookup * l = list_entry(p, struct lookup, next); pthread_mutex_lock(&l->lock); list_for_each_safe(p2, h2, &l->cookies) { @@ -1079,20 +1065,18 @@ static void val_destroy(struct val * v) free(v); } -static struct ref_entry * ref_entry_create(struct dht * dht, - const uint8_t * key) +static struct ref_entry * ref_entry_create(const uint8_t * key) { struct ref_entry * e; struct timespec t; - assert(dht); assert(key); e = malloc(sizeof(*e)); if (e == NULL) return NULL; - e->key = dht_dup_key(key, dht->b); + e->key = dht_dup_key(key, dht.b); if (e->key == NULL) { free(e); return NULL; @@ -1100,7 +1084,7 @@ static struct ref_entry * ref_entry_create(struct dht * dht, clock_gettime(CLOCK_REALTIME_COARSE, &t); - e->t_rep = t.tv_sec + dht->t_repub; + e->t_rep = t.tv_sec + dht.t_repub; return e; } @@ -1111,12 +1095,10 @@ static void ref_entry_destroy(struct ref_entry * e) free(e); } -static struct dht_entry * dht_entry_create(struct dht * dht, - const uint8_t * key) +static struct dht_entry * dht_entry_create(const uint8_t * key) { struct dht_entry * e; - assert(dht); assert(key); e = malloc(sizeof(*e)); @@ -1128,7 +1110,7 @@ static struct dht_entry * dht_entry_create(struct dht * dht, e->n_vals = 0; - e->key = dht_dup_key(key, dht->b); + e->key = dht_dup_key(key, dht.b); if (e->key == NULL) { free(e); return NULL; @@ -1211,8 +1193,7 @@ static void dht_entry_del_addr(struct dht_entry * e, } } -static uint64_t dht_entry_get_addr(struct dht * dht, - struct dht_entry * e) +static uint64_t dht_entry_get_addr(struct dht_entry * e) { struct list_head * p; @@ -1221,7 +1202,7 @@ static uint64_t dht_entry_get_addr(struct dht * dht, list_for_each(p, &e->vals) { struct val * v = list_entry(p, struct val, next); - if (v->addr != dht->addr) + if (v->addr != dht.addr) return v->addr; } @@ -1229,14 +1210,12 @@ static uint64_t dht_entry_get_addr(struct dht * dht, } /* Forward declaration. */ -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * key, +static struct lookup * kad_lookup(const uint8_t * key, enum kad_code code); /* Build a refresh list. */ -static void bucket_refresh(struct dht * dht, - struct bucket * b, +static void bucket_refresh(struct bucket * b, time_t t, struct list_head * r) { @@ -1244,7 +1223,7 @@ static void bucket_refresh(struct dht * dht, if (*b->children != NULL) for (i = 0; i < (1L << KAD_BETA); ++i) - bucket_refresh(dht, b->children[i], t, r); + bucket_refresh(b->children[i], t, r); if (b->n_contacts == 0) return; @@ -1253,7 +1232,7 @@ static void bucket_refresh(struct dht * dht, struct contact * c; struct contact * d; c = list_first_entry(&b->contacts, struct contact, next); - d = contact_create(c->id, dht->b, c->addr); + d = contact_create(c->id, dht.b, c->addr); if (c != NULL) list_add(&d->next, r); return; @@ -1387,8 +1366,7 @@ static int split_bucket(struct bucket * b) } /* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(struct dht * dht, - const uint8_t * id, +static int dht_update_bucket(const uint8_t * id, uint64_t addr) { struct list_head * p; @@ -1396,13 +1374,11 @@ static int dht_update_bucket(struct dht * dht, struct bucket * b; struct contact * c; - assert(dht); - - b = dht_get_bucket(dht, id); + b = dht_get_bucket(id); if (b == NULL) return -1; - c = contact_create(id, dht->b, addr); + c = contact_create(id, dht.b, addr); if (c == NULL) return -1; @@ -1415,8 +1391,8 @@ static int dht_update_bucket(struct dht * dht, } } - if (b->n_contacts == dht->k) { - if (bucket_has_id(b, dht->id)) { + if (b->n_contacts == dht.k) { + if (bucket_has_id(b, dht.id)) { list_add_tail(&c->next, &b->contacts); ++b->n_contacts; if (split_bucket(b)) { @@ -1424,7 +1400,7 @@ static int dht_update_bucket(struct dht * dht, contact_destroy(c); --b->n_contacts; } - } else if (b->n_alts == dht->k) { + } else if (b->n_alts == dht.k) { struct contact * d; d = list_first_entry(&b->alts, struct contact, next); list_del(&d->next); @@ -1442,8 +1418,7 @@ static int dht_update_bucket(struct dht * dht, return 0; } -static int send_msg(struct dht * dht, - kad_msg_t * msg, +static int send_msg(kad_msg_t * msg, uint64_t addr) { #ifndef __DHT_TEST__ @@ -1455,25 +1430,25 @@ static int send_msg(struct dht * dht, if (msg->code == KAD_RESPONSE) retr = KAD_RESP_RETR; - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - if (dht->id != NULL) { + if (dht.id != NULL) { msg->has_s_id = true; - msg->s_id.data = dht->id; - msg->s_id.len = dht->b; + msg->s_id.data = dht.id; + msg->s_id.len = dht.b; } - msg->s_addr = dht->addr; + msg->s_addr = dht.addr; if (msg->code < KAD_STORE) { - msg->cookie = bmp_allocate(dht->cookies); - if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { - pthread_rwlock_unlock(&dht->lock); + msg->cookie = bmp_allocate(dht.cookies); + if (!bmp_is_id_valid(dht.cookies, msg->cookie)) { + pthread_rwlock_unlock(&dht.lock); goto fail_bmp_alloc; } } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); #ifndef __DHT_TEST__ len = kad_msg__get_packed_size(msg); @@ -1486,7 +1461,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -1502,53 +1477,51 @@ static int send_msg(struct dht * dht, (void) retr; #endif /* __DHT_TEST__ */ - if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) - kad_req_create(dht, msg, addr); + if (msg->code < KAD_STORE && dht_get_state() != DHT_SHUTDOWN) + kad_req_create(msg, addr); return msg->cookie; #ifndef __DHT_TEST__ fail_msg: - pthread_rwlock_wrlock(&dht->lock); - bmp_release(dht->cookies, msg->cookie); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); + bmp_release(dht.cookies, msg->cookie); + pthread_rwlock_unlock(&dht.lock); #endif /* !__DHT_TEST__ */ fail_bmp_alloc: return -1; } -static struct dht_entry * dht_find_entry(struct dht * dht, - const uint8_t * key) +static struct dht_entry * dht_find_entry(const uint8_t * key) { struct list_head * p; - list_for_each(p, &dht->entries) { + list_for_each(p, &dht.entries) { struct dht_entry * e = list_entry(p, struct dht_entry, next); - if (!memcmp(key, e->key, dht->b)) + if (!memcmp(key, e->key, dht.b)) return e; } return NULL; } -static int kad_add(struct dht * dht, - const kad_contact_msg_t * contacts, +static int kad_add(const kad_contact_msg_t * contacts, ssize_t n, time_t exp) { struct dht_entry * e; - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); while (n-- > 0) { - if (contacts[n].id.len != dht->b) + if (contacts[n].id.len != dht.b) log_warn("Bad key length in contact data."); - e = dht_find_entry(dht, contacts[n].id.data); + e = dht_find_entry(contacts[n].id.data); if (e != NULL) { if (dht_entry_add_addr(e, contacts[n].addr, exp)) goto fail; } else { - e = dht_entry_create(dht, contacts[n].id.data); + e = dht_entry_create(contacts[n].id.data); if (e == NULL) goto fail; @@ -1557,42 +1530,39 @@ static int kad_add(struct dht * dht, goto fail; } - list_add(&e->next, &dht->entries); + list_add(&e->next, &dht.entries); } } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; fail: - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return -ENOMEM; } -static int wait_resp(struct dht * dht, - kad_msg_t * msg, +static int wait_resp(kad_msg_t * msg, time_t timeo) { struct kad_req * req; - assert(dht); assert(msg); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - req = dht_find_request(dht, msg); + req = dht_find_request(msg); if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return -EPERM; } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return kad_req_wait(req, timeo); } -static int kad_store(struct dht * dht, - const uint8_t * key, +static int kad_store(const uint8_t * key, uint64_t addr, uint64_t r_addr, time_t ttl) @@ -1604,11 +1574,11 @@ static int kad_store(struct dht * dht, cmsg.id.data = (uint8_t *) key; cmsg.addr = addr; - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - cmsg.id.len = dht->b; + cmsg.id.len = dht.b; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); cmsgp[0] = &cmsg; @@ -1618,39 +1588,37 @@ static int kad_store(struct dht * dht, msg.n_contacts = 1; msg.contacts = cmsgp; - if (send_msg(dht, &msg, r_addr) < 0) + if (send_msg(&msg, r_addr) < 0) return -1; return 0; } -static ssize_t kad_find(struct dht * dht, - struct lookup * lu, +static ssize_t kad_find(struct lookup * lu, const uint64_t * addrs, enum kad_code code) { kad_msg_t msg = KAD_MSG__INIT; ssize_t sent = 0; - assert(dht); assert(lu->key); msg.code = code; msg.has_key = true; msg.key.data = (uint8_t *) lu->key; - msg.key.len = dht->b; + msg.key.len = dht.b; while (*addrs != 0) { struct cookie_el * c; int ret; - if (*addrs == dht->addr) { + if (*addrs == dht.addr) { ++addrs; continue; } - ret = send_msg(dht, &msg, *addrs); + ret = send_msg(&msg, *addrs); if (ret < 0) break; @@ -1673,38 +1641,36 @@ static ssize_t kad_find(struct dht * dht, return sent; } -static void lookup_detach(struct dht * dht, - struct lookup * lu) +static void lookup_detach(struct lookup * lu) { - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); list_del(&lu->next); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); } -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * id, +static struct lookup * kad_lookup( const uint8_t * id, enum kad_code code) { uint64_t addrs[KAD_ALPHA + 1]; enum lookup_state state; struct lookup * lu; - lu = lookup_create(dht, id); + lu = lookup_create(id); if (lu == NULL) return NULL; lookup_new_addrs(lu, addrs); if (addrs[0] == 0) { - lookup_detach(dht, lu); + lookup_detach(lu); lookup_destroy(lu); return NULL; } - if (kad_find(dht, lu, addrs, code) == 0) { - lookup_detach(dht, lu); + if (kad_find(lu, addrs, code) == 0) { + lookup_detach(lu); return lu; } @@ -1715,10 +1681,10 @@ static struct lookup * kad_lookup(struct dht * dht, if (addrs[0] == 0) break; - kad_find(dht, lu, addrs, code); + kad_find(lu, addrs, code); break; case LU_DESTROY: - lookup_detach(dht, lu); + lookup_detach(lu); lookup_set_state(lu, LU_NULL); return NULL; default: @@ -1728,13 +1694,12 @@ static struct lookup * kad_lookup(struct dht * dht, assert(state == LU_COMPLETE); - lookup_detach(dht, lu); + lookup_detach(lu); return lu; } -static void kad_publish(struct dht * dht, - const uint8_t * key, +static void kad_publish(const uint8_t * key, uint64_t addr, time_t exp) { @@ -1745,21 +1710,20 @@ static void kad_publish(struct dht * dht, time_t t_expire; - assert(dht); assert(key); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - k = dht->k; - t_expire = dht->t_expire; + k = dht.k; + t_expire = dht.t_expire; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); addrs = malloc(k * sizeof(*addrs)); if (addrs == NULL) return; - lu = kad_lookup(dht, key, KAD_FIND_NODE); + lu = kad_lookup(key, KAD_FIND_NODE); if (lu == NULL) { free(addrs); return; @@ -1768,14 +1732,14 @@ static void kad_publish(struct dht * dht, n = lookup_contact_addrs(lu, addrs); while (n-- > 0) { - if (addrs[n] == dht->addr) { + if (addrs[n] == dht.addr) { kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; msg.id.data = (uint8_t *) key; - msg.id.len = dht->b; + msg.id.len = dht.b; msg.addr = addr; - kad_add(dht, &msg, 1, exp); + kad_add(&msg, 1, exp); } else { - if (kad_store(dht, key, addr, addrs[n], t_expire)) + if (kad_store(key, addr, addrs[n], t_expire)) log_warn("Failed to send store message."); } } @@ -1785,8 +1749,7 @@ static void kad_publish(struct dht * dht, free(addrs); } -static int kad_join(struct dht * dht, - uint64_t addr) +static int kad_join(uint64_t addr) { kad_msg_t msg = KAD_MSG__INIT; @@ -1802,44 +1765,43 @@ static int kad_join(struct dht * dht, msg.t_refresh = KAD_T_REFR; msg.t_replicate = KAD_T_REPL; - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - msg.b = dht->b; + msg.b = dht.b; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); - if (send_msg(dht, &msg, addr) < 0) + if (send_msg(&msg, addr) < 0) return -1; - if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) + if (wait_resp(&msg, KAD_T_JOIN) < 0) return -1; - dht->id = create_id(dht->b); - if (dht->id == NULL) + dht.id = create_id(dht.b); + if (dht.id == NULL) return -1; - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - dht_update_bucket(dht, dht->id, dht->addr); + dht_update_bucket(dht.id, dht.addr); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; } -static void dht_dead_peer(struct dht * dht, - uint8_t * key, +static void dht_dead_peer(uint8_t * key, uint64_t addr) { struct list_head * p; struct list_head * h; struct bucket * b; - b = dht_get_bucket(dht, key); + b = dht_get_bucket(key); list_for_each_safe(p, h, &b->contacts) { struct contact * c = list_entry(p, struct contact, next); - if (b->n_contacts + b->n_alts <= dht->k) { + if (b->n_contacts + b->n_alts <= dht.k) { ++c->fails; return; } @@ -1852,7 +1814,7 @@ static void dht_dead_peer(struct dht * dht, } } - while (b->n_contacts < dht->k && b->n_alts > 0) { + while (b->n_contacts < dht.k && b->n_alts > 0) { struct contact * c; c = list_first_entry(&b->alts, struct contact, next); list_del(&c->next); @@ -1862,29 +1824,27 @@ static void dht_dead_peer(struct dht * dht, } } -static int dht_del(struct dht * dht, - const uint8_t * key, +static int dht_del(const uint8_t * key, uint64_t addr) { struct dht_entry * e; - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - e = dht_find_entry(dht, key); + e = dht_find_entry(key); if (e == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return -EPERM; } dht_entry_del_addr(e, addr); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; } -static buffer_t dht_retrieve(struct dht * dht, - const uint8_t * key) +static buffer_t dht_retrieve(const uint8_t * key) { struct dht_entry * e; struct list_head * p; @@ -1892,9 +1852,9 @@ static buffer_t dht_retrieve(struct dht * dht, uint64_t * pos; size_t addrs = 0; - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - e = dht_find_entry(dht, key); + e = dht_find_entry(key); if (e == NULL) goto fail; @@ -1902,7 +1862,7 @@ static buffer_t dht_retrieve(struct dht * dht, if (buf.len == 0) goto fail; - pos = malloc(sizeof(dht->addr) * buf.len); + pos = malloc(sizeof(dht.addr) * buf.len); if (pos == NULL) goto fail; @@ -1915,19 +1875,18 @@ static buffer_t dht_retrieve(struct dht * dht, break; } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return buf; fail: - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); buf.len = 0; return buf; } -static ssize_t dht_get_contacts(struct dht * dht, - const uint8_t * key, +static ssize_t dht_get_contacts(const uint8_t * key, kad_contact_msg_t *** msgs) { struct list_head l; @@ -1938,18 +1897,18 @@ static ssize_t dht_get_contacts(struct dht * dht, list_head_init(&l); - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - len = dht_contact_list(dht, &l, key); + len = dht_contact_list(&l, key); if (len == 0) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); *msgs = NULL; return 0; } *msgs = malloc(len * sizeof(**msgs)); if (*msgs == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; } @@ -1957,7 +1916,7 @@ static ssize_t dht_get_contacts(struct dht * dht, struct contact * c = list_entry(p, struct contact, next); (*msgs)[i] = malloc(sizeof(***msgs)); if ((*msgs)[i] == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); while (i > 0) free(*msgs[--i]); free(*msgs); @@ -1968,13 +1927,13 @@ static ssize_t dht_get_contacts(struct dht * dht, kad_contact_msg__init((*msgs)[i]); (*msgs)[i]->id.data = c->id; - (*msgs)[i]->id.len = dht->b; + (*msgs)[i]->id.len = dht.b; (*msgs)[i++]->addr = c->addr; list_del(&c->next); free(c); } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return i; } @@ -1990,7 +1949,6 @@ static time_t gcd(time_t a, static void * work(void * o) { - struct dht * dht; struct timespec now; struct list_head * p; struct list_head * h; @@ -1998,46 +1956,46 @@ static void * work(void * o) time_t intv; struct lookup * lu; - dht = (struct dht *) o; + (void) o; - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - intv = gcd(dht->t_expire, dht->t_repub); + intv = gcd(dht.t_expire, dht.t_repub); intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); list_head_init(&reflist); while (true) { clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); /* Republish registered hashes. */ - list_for_each(p, &dht->refs) { + list_for_each(p, &dht.refs) { struct ref_entry * e; uint8_t * key; uint64_t addr; time_t t_expire; e = list_entry(p, struct ref_entry, next); if (now.tv_sec > e->t_rep) { - key = dht_dup_key(e->key, dht->b); + key = dht_dup_key(e->key, dht.b); if (key == NULL) continue; - addr = dht->addr; - t_expire = dht->t_expire; - e->t_rep = now.tv_sec + dht->t_repub; + addr = dht.addr; + t_expire = dht.t_expire; + e->t_rep = now.tv_sec + dht.t_repub; - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); + kad_publish(key, addr, t_expire); + pthread_rwlock_wrlock(&dht.lock); free(key); } } /* Remove stale entries and republish if necessary. */ - list_for_each_safe(p, h, &dht->entries) { + list_for_each_safe(p, h, &dht.entries) { struct list_head * p1; struct list_head * h1; struct dht_entry * e; @@ -2055,39 +2013,39 @@ static void * work(void * o) } if (now.tv_sec > v->t_rep) { - key = dht_dup_key(e->key, dht->b); + key = dht_dup_key(e->key, dht.b); addr = v->addr; - t_expire = dht->t_expire = now.tv_sec; - v->t_rep = now.tv_sec + dht->t_replic; - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); + t_expire = dht.t_expire = now.tv_sec; + v->t_rep = now.tv_sec + dht.t_replic; + pthread_rwlock_unlock(&dht.lock); + kad_publish(key, addr, t_expire); + pthread_rwlock_wrlock(&dht.lock); free(key); } } } /* Check the requests list for unresponsive nodes. */ - list_for_each_safe(p, h, &dht->requests) { + list_for_each_safe(p, h, &dht.requests) { struct kad_req * r; r = list_entry(p, struct kad_req, next); if (now.tv_sec > r->t_exp) { list_del(&r->next); - bmp_release(dht->cookies, r->cookie); - dht_dead_peer(dht, r->key, r->addr); + bmp_release(dht.cookies, r->cookie); + dht_dead_peer(r->key, r->addr); kad_req_destroy(r); } } /* Refresh unaccessed buckets. */ - bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + bucket_refresh(dht.buckets, now.tv_sec, &reflist); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); list_for_each_safe(p, h, &reflist) { struct contact * c; c = list_entry(p, struct contact, next); - lu = kad_lookup(dht, c->id, KAD_FIND_NODE); + lu = kad_lookup(c->id, KAD_FIND_NODE); if (lu != NULL) lookup_destroy(lu); list_del(&c->next); @@ -2100,11 +2058,9 @@ static void * work(void * o) return (void *) 0; } -static int kad_handle_join_resp(struct dht * dht, - struct kad_req * req, +static int kad_handle_join_resp(struct kad_req * req, kad_msg_t * msg) { - assert(dht); assert(req); assert(msg); @@ -2120,11 +2076,11 @@ static int kad_handle_join_resp(struct dht * dht, return -1; } - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - dht->buckets = bucket_create(); - if (dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); + dht.buckets = bucket_create(); + if (dht.buckets == NULL) { + pthread_rwlock_unlock(&dht.lock); return -1; } @@ -2138,84 +2094,80 @@ static int kad_handle_join_resp(struct dht * dht, if (msg->t_refresh != KAD_T_REFR) log_warn("Different kademlia refresh time detected."); - dht->k = msg->k; - dht->b = msg->b; - dht->t_expire = msg->t_expire; - dht->t_repub = MAX(1, dht->t_expire - 10); + dht.k = msg->k; + dht.b = msg->b; + dht.t_expire = msg->t_expire; + dht.t_repub = MAX(1, dht.t_expire - 10); - if (pthread_create(&dht->worker, NULL, work, dht)) { - bucket_destroy(dht->buckets); - pthread_rwlock_unlock(&dht->lock); + if (pthread_create(&dht.worker, NULL, work, NULL)) { + bucket_destroy(dht.buckets); + pthread_rwlock_unlock(&dht.lock); return -1; } kad_req_respond(req); - dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + dht_update_bucket(msg->s_id.data, msg->s_addr); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); log_dbg("Enrollment of DHT completed."); return 0; } -static int kad_handle_find_resp(struct dht * dht, - struct kad_req * req, +static int kad_handle_find_resp(struct kad_req * req, kad_msg_t * msg) { struct lookup * lu; - assert(dht); assert(req); assert(msg); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - lu = dht_find_lookup(dht, req->cookie); + lu = dht_find_lookup(req->cookie); if (lu == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return -1; } - lookup_update(dht, lu, msg); + lookup_update(lu, msg); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; } -static void kad_handle_response(struct dht * dht, - kad_msg_t * msg) +static void kad_handle_response(kad_msg_t * msg) { struct kad_req * req; - assert(dht); assert(msg); - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - req = dht_find_request(dht, msg); + req = dht_find_request(msg); if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return; } - bmp_release(dht->cookies, req->cookie); + bmp_release(dht.cookies, req->cookie); list_del(&req->next); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); switch(req->code) { case KAD_JOIN: - if (kad_handle_join_resp(dht, req, msg)) + if (kad_handle_join_resp(req, msg)) log_err("Enrollment of DHT failed."); break; case KAD_FIND_VALUE: case KAD_FIND_NODE: - if (dht_get_state(dht) != DHT_RUNNING) + if (dht_get_state() != DHT_RUNNING) break; - kad_handle_find_resp(dht, req, msg); + kad_handle_find_resp(req, msg); break; default: break; @@ -2224,137 +2176,131 @@ static void kad_handle_response(struct dht * dht, kad_req_destroy(req); } -int dht_bootstrap(struct dht * dht, - size_t b, - time_t t_expire) +int dht_bootstrap() { - assert(dht); + pthread_rwlock_wrlock(&dht.lock); - pthread_rwlock_wrlock(&dht->lock); +#ifndef __DHT_TEST__ + dht.b = hash_len(ipcpi.dir_hash_algo); +#else + dht.b = DHT_TEST_KEY_LEN; +#endif + dht.t_expire = 86400; /* 1 day */ + dht.t_repub = dht.t_expire - 10; + dht.k = KAD_K; - dht->id = create_id(b); - if (dht->id == NULL) + dht.id = create_id(dht.b); + if (dht.id == NULL) goto fail_id; - dht->buckets = bucket_create(); - if (dht->buckets == NULL) + dht.buckets = bucket_create(); + if (dht.buckets == NULL) goto fail_buckets; - dht->buckets->depth = 0; - dht->buckets->mask = 0; - - dht->b = b / CHAR_BIT; - dht->t_expire = MAX(2, t_expire); - dht->t_repub = MAX(1, t_expire - 10); - dht->k = KAD_K; + dht.buckets->depth = 0; + dht.buckets->mask = 0; - if (pthread_create(&dht->worker, NULL, work, dht)) + if (pthread_create(&dht.worker, NULL, work, NULL)) goto fail_pthread_create; - dht->state = DHT_RUNNING; + dht.state = DHT_RUNNING; - dht_update_bucket(dht, dht->id, dht->addr); + dht_update_bucket(dht.id, dht.addr); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; fail_pthread_create: - bucket_destroy(dht->buckets); - dht->buckets = NULL; + bucket_destroy(dht.buckets); + dht.buckets = NULL; fail_buckets: - free(dht->id); - dht->id = NULL; + free(dht.id); + dht.id = NULL; fail_id: - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return -1; } -static struct ref_entry * ref_entry_get(struct dht * dht, - const uint8_t * key) +static struct ref_entry * ref_entry_get(const uint8_t * key) { struct list_head * p; - list_for_each(p, &dht->refs) { + list_for_each(p, &dht.refs) { struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) + if (!memcmp(key, r->key, dht. b) ) return r; } return NULL; } -int dht_reg(struct dht * dht, - const uint8_t * key) +int dht_reg(const uint8_t * key) { struct ref_entry * e; uint64_t addr; time_t t_expire; - assert(dht); assert(key); - assert(dht->addr != 0); + assert(dht.addr != 0); - if (dht_wait_running(dht)) + if (dht_wait_running()) return -1; - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - if (ref_entry_get(dht, key) != NULL) { + if (ref_entry_get(key) != NULL) { log_dbg("Name already registered."); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; } - e = ref_entry_create(dht, key); + e = ref_entry_create(key); if (e == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return -ENOMEM; } - list_add(&e->next, &dht->refs); + list_add(&e->next, &dht.refs); - t_expire = dht->t_expire; - addr = dht->addr; + t_expire = dht.t_expire; + addr = dht.addr; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); - kad_publish(dht, key, addr, t_expire); + kad_publish(key, addr, t_expire); return 0; } -int dht_unreg(struct dht * dht, - const uint8_t * key) +int dht_unreg(const uint8_t * key) { struct list_head * p; struct list_head * h; - assert(dht); assert(key); - if (dht_get_state(dht) != DHT_RUNNING) + if (dht_get_state() != DHT_RUNNING) return -1; - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - list_for_each_safe(p, h, &dht->refs) { + list_for_each_safe(p, h, &dht.refs) { struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) { + if (!memcmp(key, r->key, dht. b) ) { list_del(&r->next); ref_entry_destroy(r); } } - dht_del(dht, key, dht->addr); + dht_del(key, dht.addr); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); return 0; } -uint64_t dht_query(struct dht * dht, - const uint8_t * key) +uint64_t dht_query(const uint8_t * key) { struct dht_entry * e; struct lookup * lu; @@ -2363,21 +2309,21 @@ uint64_t dht_query(struct dht * dht, addrs[0] = 0; - if (dht_wait_running(dht)) + if (dht_wait_running()) return 0; - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - e = dht_find_entry(dht, key); + e = dht_find_entry(key); if (e != NULL) - addrs[0] = dht_entry_get_addr(dht, e); + addrs[0] = dht_entry_get_addr(e); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); if (addrs[0] != 0) return addrs[0]; - lu = kad_lookup(dht, key, KAD_FIND_VALUE); + lu = kad_lookup(key, KAD_FIND_VALUE); if (lu == NULL) return 0; @@ -2390,7 +2336,7 @@ uint64_t dht_query(struct dht * dht, lookup_destroy(lu); /* Current behaviour is anycast and return the first peer address. */ - if (addrs[0] != dht->addr) + if (addrs[0] != dht.addr) return addrs[0]; if (n > 1) @@ -2401,9 +2347,7 @@ uint64_t dht_query(struct dht * dht, static void * dht_handle_packet(void * o) { - struct dht * dht = (struct dht *) o; - - assert(dht); + (void) o; while (true) { kad_msg_t * msg; @@ -2416,14 +2360,14 @@ static void * dht_handle_packet(void * o) size_t t_expire; struct cmd * cmd; - pthread_mutex_lock(&dht->mtx); + pthread_mutex_lock(&dht.mtx); - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); - while (list_is_empty(&dht->cmds)) - pthread_cond_wait(&dht->cond, &dht->mtx); + while (list_is_empty(&dht.cmds)) + pthread_cond_wait(&dht.cond, &dht.mtx); - cmd = list_last_entry(&dht->cmds, struct cmd, next); + cmd = list_last_entry(&dht.cmds, struct cmd, next); list_del(&cmd->next); pthread_cleanup_pop(true); @@ -2441,18 +2385,18 @@ static void * dht_handle_packet(void * o) continue; } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { + if (msg->code != KAD_RESPONSE && dht_wait_running()) { kad_msg__free_unpacked(msg, NULL); log_dbg("Got a request message when not running."); continue; } - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_rdlock(&dht.lock); - b = dht->b; - t_expire = dht->t_expire; + b = dht.b; + t_expire = dht.t_expire; - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); if (msg->has_key && msg->key.len != b) { kad_msg__free_unpacked(msg, NULL); @@ -2467,7 +2411,7 @@ static void * dht_handle_packet(void * o) continue; } - tpm_dec(dht->tpm); + tpm_dec(dht.tpm); addr = msg->s_addr; @@ -2510,7 +2454,7 @@ static void * dht_handle_packet(void * o) resp_msg.t_replicate = KAD_T_REPL; break; case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); + buf = dht_retrieve(msg->key.data); if (buf.len != 0) { resp_msg.n_addrs = buf.len; resp_msg.addrs = (uint64_t *) buf.data; @@ -2520,7 +2464,7 @@ static void * dht_handle_packet(void * o) case KAD_FIND_NODE: /* Return k closest contacts. */ resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); + dht_get_contacts(msg->key.data, &cmsgs); resp_msg.contacts = cmsgs; break; case KAD_STORE: @@ -2534,11 +2478,11 @@ static void * dht_handle_packet(void * o) break; } - kad_add(dht, *msg->contacts, msg->n_contacts, + kad_add(*msg->contacts, msg->n_contacts, msg->t_expire); break; case KAD_RESPONSE: - kad_handle_response(dht, msg); + kad_handle_response(msg); break; default: assert(false); @@ -2546,19 +2490,19 @@ static void * dht_handle_packet(void * o) } if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_get_state(dht) == DHT_JOINING && - dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); + if (dht_get_state() == DHT_JOINING && + dht.buckets == NULL) { + pthread_rwlock_unlock(&dht.lock); goto finish; } - if (dht_update_bucket(dht, msg->s_id.data, addr)) + if (dht_update_bucket(msg->s_id.data, addr)) log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.lock); } - if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) + if (msg->code < KAD_STORE && send_msg(&resp_msg, addr) < 0) log_warn("Failed to send response."); finish: @@ -2568,7 +2512,7 @@ static void * dht_handle_packet(void * o) free(resp_msg.addrs); if (resp_msg.n_contacts == 0) { - tpm_inc(dht->tpm); + tpm_inc(dht.tpm); continue; } @@ -2577,19 +2521,22 @@ static void * dht_handle_packet(void * o) NULL); free(resp_msg.contacts); - tpm_inc(dht->tpm); + tpm_inc(dht.tpm); } return (void *) 0; } -static void dht_post_packet(void * comp, +static void dht_post_packet(void * o, struct shm_du_buff * sdb) { struct cmd * cmd; - struct dht * dht = (struct dht *) comp; - if (dht_get_state(dht) == DHT_SHUTDOWN) { + (void) o; + + assert(o == &dht); + + if (dht_get_state() == DHT_SHUTDOWN) { #ifndef __DHT_TEST__ ipcp_sdb_release(sdb); #endif @@ -2604,37 +2551,34 @@ static void dht_post_packet(void * comp, cmd->sdb = sdb; - pthread_mutex_lock(&dht->mtx); + pthread_mutex_lock(&dht.mtx); - list_add(&cmd->next, &dht->cmds); + list_add(&cmd->next, &dht.cmds); - pthread_cond_signal(&dht->cond); + pthread_cond_signal(&dht.cond); - pthread_mutex_unlock(&dht->mtx); + pthread_mutex_unlock(&dht.mtx); } -void dht_destroy(struct dht * dht) +void dht_fini() { struct list_head * p; struct list_head * h; - if (dht == NULL) - return; - #ifndef __DHT_TEST__ - tpm_stop(dht->tpm); + tpm_stop(dht.tpm); - tpm_destroy(dht->tpm); + tpm_destroy(dht.tpm); #endif - if (dht_get_state(dht) == DHT_RUNNING) { - dht_set_state(dht, DHT_SHUTDOWN); - pthread_cancel(dht->worker); - pthread_join(dht->worker, NULL); + if (dht_get_state() == DHT_RUNNING) { + dht_set_state(DHT_SHUTDOWN); + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); } - pthread_rwlock_wrlock(&dht->lock); + pthread_rwlock_wrlock(&dht.lock); - list_for_each_safe(p, h, &dht->cmds) { + list_for_each_safe(p, h, &dht.cmds) { struct cmd * c = list_entry(p, struct cmd, next); list_del(&c->next); #ifndef __DHT_TEST__ @@ -2643,44 +2587,42 @@ void dht_destroy(struct dht * dht) free(c); } - list_for_each_safe(p, h, &dht->entries) { + list_for_each_safe(p, h, &dht.entries) { struct dht_entry * e = list_entry(p, struct dht_entry, next); list_del(&e->next); dht_entry_destroy(e); } - list_for_each_safe(p, h, &dht->requests) { + list_for_each_safe(p, h, &dht.requests) { struct kad_req * r = list_entry(p, struct kad_req, next); list_del(&r->next); kad_req_destroy(r); } - list_for_each_safe(p, h, &dht->refs) { + list_for_each_safe(p, h, &dht.refs) { struct ref_entry * e = list_entry(p, struct ref_entry, next); list_del(&e->next); ref_entry_destroy(e); } - list_for_each_safe(p, h, &dht->lookups) { + list_for_each_safe(p, h, &dht.lookups) { struct lookup * l = list_entry(p, struct lookup, next); list_del(&l->next); lookup_destroy(l); } - pthread_rwlock_unlock(&dht->lock); - - if (dht->buckets != NULL) - bucket_destroy(dht->buckets); + pthread_rwlock_unlock(&dht.lock); - bmp_destroy(dht->cookies); + if (dht.buckets != NULL) + bucket_destroy(dht.buckets); - pthread_mutex_destroy(&dht->mtx); + bmp_destroy(dht.cookies); - pthread_rwlock_destroy(&dht->lock); + pthread_mutex_destroy(&dht.mtx); - free(dht->id); + pthread_rwlock_destroy(&dht.lock); - free(dht); + free(dht.id); } static void * join_thr(void * o) @@ -2691,14 +2633,14 @@ static void * join_thr(void * o) assert(info); - while (kad_join(info->dht, info->addr)) { - if (dht_get_state(info->dht) == DHT_SHUTDOWN) { + while (kad_join(info->addr)) { + if (dht_get_state() == DHT_SHUTDOWN) { log_dbg("DHT enrollment aborted."); goto finish; } if (retr++ == KAD_JOIN_RETR) { - dht_set_state(info->dht, DHT_INIT); + dht_set_state(DHT_INIT); log_warn("DHT enrollment attempt failed."); goto finish; } @@ -2706,9 +2648,9 @@ static void * join_thr(void * o) sleep(KAD_JOIN_INTV); } - dht_set_state(info->dht, DHT_RUNNING); + dht_set_state(DHT_RUNNING); - lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); + lu = kad_lookup(dht.id, KAD_FIND_NODE); if (lu != NULL) lookup_destroy(lu); @@ -2722,7 +2664,7 @@ static void handle_event(void * self, int event, const void * o) { - struct dht * dht = (struct dht *) self; + (void) self; if (event == NOTIFY_DT_CONN_ADD) { pthread_t thr; @@ -2733,19 +2675,18 @@ static void handle_event(void * self, /* Give the pff some time to update for the new link. */ nanosleep(&slack, NULL); - switch(dht_get_state(dht)) { + switch(dht_get_state()) { case DHT_INIT: inf = malloc(sizeof(*inf)); if (inf == NULL) break; - inf->dht = dht; inf->addr = c->conn_info.addr; - if (dht_set_state(dht, DHT_JOINING) == 0 || - dht_wait_running(dht)) { + if (dht_set_state(DHT_JOINING) == 0 || + dht_wait_running()) { if (pthread_create(&thr, NULL, join_thr, inf)) { - dht_set_state(dht, DHT_INIT); + dht_set_state(DHT_INIT); free(inf); return; } @@ -2759,7 +2700,7 @@ static void handle_event(void * self, * FIXME: this lookup for effiency reasons * causes a SEGV when stressed with rapid * enrollments. - * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + * lu = kad_lookup(dht, dht.id, KAD_FIND_NODE); * if (lu != NULL) * lookup_destroy(lu); */ @@ -2770,73 +2711,65 @@ static void handle_event(void * self, } } -struct dht * dht_create(uint64_t addr) +int dht_init() { - struct dht * dht; - - dht = malloc(sizeof(*dht)); - if (dht == NULL) - goto fail_malloc; + dht.buckets = NULL; - dht->buckets = NULL; + list_head_init(&dht.entries); + list_head_init(&dht.requests); + list_head_init(&dht.refs); + list_head_init(&dht.lookups); + list_head_init(&dht.cmds); - list_head_init(&dht->entries); - list_head_init(&dht->requests); - list_head_init(&dht->refs); - list_head_init(&dht->lookups); - list_head_init(&dht->cmds); - - if (pthread_rwlock_init(&dht->lock, NULL)) + if (pthread_rwlock_init(&dht.lock, NULL)) goto fail_rwlock; - if (pthread_mutex_init(&dht->mtx, NULL)) + if (pthread_mutex_init(&dht.mtx, NULL)) goto fail_mutex; - if (pthread_cond_init(&dht->cond, NULL)) + if (pthread_cond_init(&dht.cond, NULL)) goto fail_cond; - dht->cookies = bmp_create(DHT_MAX_REQS, 1); - if (dht->cookies == NULL) + dht.cookies = bmp_create(DHT_MAX_REQS, 1); + if (dht.cookies == NULL) goto fail_bmp; - dht->b = 0; - dht->addr = addr; - dht->id = NULL; + dht.b = 0; + dht.id = NULL; #ifndef __DHT_TEST__ - dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); - if (dht->tpm == NULL) + dht.addr = ipcpi.dt_addr; + dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); + if (dht.tpm == NULL) goto fail_tpm_create; - if (tpm_start(dht->tpm)) + if (tpm_start(dht.tpm)) goto fail_tpm_start; - dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); - if ((int) dht->eid < 0) + dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT); + if ((int) dht.eid < 0) goto fail_tpm_start; - notifier_reg(handle_event, dht); + notifier_reg(handle_event, NULL); #else (void) handle_event; (void) dht_handle_packet; (void) dht_post_packet; #endif - dht->state = DHT_INIT; + dht.state = DHT_INIT; - return dht; + return 0; #ifndef __DHT_TEST__ fail_tpm_start: - tpm_destroy(dht->tpm); + tpm_destroy(dht.tpm); fail_tpm_create: - bmp_destroy(dht->cookies); + bmp_destroy(dht.cookies); #endif fail_bmp: - pthread_cond_destroy(&dht->cond); + pthread_cond_destroy(&dht.cond); fail_cond: - pthread_mutex_destroy(&dht->mtx); + pthread_mutex_destroy(&dht.mtx); fail_mutex: - pthread_rwlock_destroy(&dht->lock); + pthread_rwlock_destroy(&dht.lock); fail_rwlock: - free(dht); - fail_malloc: - return NULL; + return -1; } |