diff options
Diffstat (limited to 'src/ipcpd/unicast/dir')
-rw-r--r-- | src/ipcpd/unicast/dir/dht.c | 845 | ||||
-rw-r--r-- | src/ipcpd/unicast/dir/dht.h | 17 | ||||
-rw-r--r-- | src/ipcpd/unicast/dir/ops.h | 18 | ||||
-rw-r--r-- | src/ipcpd/unicast/dir/tests/dht_test.c | 37 |
4 files changed, 508 insertions, 409 deletions
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index ba4b897e..1742267b 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -211,8 +211,8 @@ struct cmd { }; struct dir_ops dht_dir_ops = { - .init = dht_init, - .fini = dht_fini, + .create = dht_create, + .destroy = dht_destroy, .bootstrap = dht_bootstrap, .reg = dht_reg, .unreg = dht_unreg, @@ -220,7 +220,7 @@ struct dir_ops dht_dir_ops = { .wait_running = dht_wait_running }; -struct { +struct dht { size_t alpha; size_t b; size_t k; @@ -256,13 +256,15 @@ struct { struct tpm * tpm; pthread_t worker; -} dht; +}; struct join_info { + struct dht * dht; uint64_t addr; }; struct packet_info { + struct dht * dht; struct shm_du_buff * sdb; }; @@ -280,49 +282,53 @@ static uint8_t * dht_dup_key(const uint8_t * key, return dup; } -static enum dht_state dht_get_state(void) +static enum dht_state dht_get_state(struct dht * dht) { enum dht_state state; - pthread_mutex_lock(&dht.mtx); + pthread_mutex_lock(&dht->mtx); - state = dht.state; + state = dht->state; - pthread_mutex_unlock(&dht.mtx); + pthread_mutex_unlock(&dht->mtx); return state; } -static int dht_set_state(enum dht_state state) +static int dht_set_state(struct dht * dht, + enum dht_state state) { - pthread_mutex_lock(&dht.mtx); + pthread_mutex_lock(&dht->mtx); - if (state == DHT_JOINING && dht.state != DHT_INIT) { - pthread_mutex_unlock(&dht.mtx); + if (state == DHT_JOINING && dht->state != DHT_INIT) { + pthread_mutex_unlock(&dht->mtx); return -1; } - dht.state = state; + dht->state = state; - pthread_cond_broadcast(&dht.cond); + pthread_cond_broadcast(&dht->cond); - pthread_mutex_unlock(&dht.mtx); + pthread_mutex_unlock(&dht->mtx); return 0; } -int dht_wait_running() +int dht_wait_running(void * dir) { - int ret = 0; + struct dht * dht; + int ret = 0; - pthread_mutex_lock(&dht.mtx); + dht = (struct dht *) dir; - pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); + pthread_mutex_lock(&dht->mtx); - while (dht.state == DHT_JOINING) - pthread_cond_wait(&dht.cond, &dht.mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); - if (dht.state != DHT_RUNNING) + while (dht->state == DHT_JOINING) + pthread_cond_wait(&dht->cond, &dht->mtx); + + if (dht->state != DHT_RUNNING) ret = -1; pthread_cleanup_pop(true); @@ -346,7 +352,8 @@ static uint8_t * create_id(size_t len) return id; } -static void kad_req_create(kad_msg_t * msg, +static void kad_req_create(struct dht * dht, + kad_msg_t * msg, uint64_t addr) { struct kad_req * req; @@ -369,9 +376,9 @@ static void kad_req_create(kad_msg_t * msg, req->code = msg->code; req->key = NULL; - pthread_rwlock_rdlock(&dht.lock); - b = dht.b; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); + b = dht->b; + pthread_rwlock_unlock(&dht->lock); if (msg->has_key) { req->key = dht_dup_key(msg->key.data, b); @@ -402,11 +409,11 @@ static void kad_req_create(kad_msg_t * msg, pthread_condattr_destroy(&cattr); - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - list_add(&req->next, &dht.requests); + list_add(&req->next, &dht->requests); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); } static void cancel_req_destroy(void * o) @@ -564,11 +571,12 @@ static struct bucket * iter_bucket(struct bucket * b, return iter_bucket(b->children[(byte & mask)], id); } -static struct bucket * dht_get_bucket(const uint8_t * id) +static struct bucket * dht_get_bucket(struct dht * dht, + const uint8_t * id) { - assert(dht.buckets); + assert(dht->buckets); - return iter_bucket(dht.buckets, id); + return iter_bucket(dht->buckets, id); } /* @@ -603,7 +611,8 @@ static size_t list_add_sorted(struct list_head * l, return 1; } -static size_t dht_contact_list(struct list_head * l, +static size_t dht_contact_list(struct dht * dht, + struct list_head * l, const uint8_t * key) { struct list_head * p; @@ -613,52 +622,55 @@ static size_t dht_contact_list(struct list_head * l, struct timespec t; assert(l); + assert(dht); assert(key); assert(list_is_empty(l)); clock_gettime(CLOCK_REALTIME_COARSE, &t); - b = dht_get_bucket(key); + b = dht_get_bucket(dht, key); if (b == NULL) return 0; b->t_refr = t.tv_sec + KAD_T_REFR; - if (b->n_contacts == dht.k || b->parent == NULL) { + if (b->n_contacts == dht->k || b->parent == NULL) { list_for_each(p, &b->contacts) { struct contact * c; c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht.b, c->addr); + c = contact_create(c->id, dht->b, c->addr); if (list_add_sorted(l, c, key) == 1) - if (++len == dht.k) + if (++len == dht->k) break; } } else { struct bucket * d = b->parent; - for (i = 0; i < (1L << KAD_BETA) && len < dht.k; ++i) { + for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { list_for_each(p, &d->children[i]->contacts) { struct contact * c; c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht.b, c->addr); + c = contact_create(c->id, dht->b, c->addr); if (c == NULL) continue; if (list_add_sorted(l, c, key) == 1) - if (++len == dht.k) + if (++len == dht->k) break; } } } - assert(len == dht.k || b->parent == NULL); + assert(len == dht->k || b->parent == NULL); return len; } -static struct lookup * lookup_create(const uint8_t * id) +static struct lookup * lookup_create(struct dht * dht, + const uint8_t * id) { struct lookup * lu; pthread_condattr_t cattr; + assert(dht); assert(id); lu = malloc(sizeof(*lu)); @@ -671,7 +683,7 @@ static struct lookup * lookup_create(const uint8_t * id) lu->state = LU_INIT; lu->addrs = NULL; lu->n_addrs = 0; - lu->key = dht_dup_key(id, dht.b); + lu->key = dht_dup_key(id, dht->b); if (lu->key == NULL) goto fail_id; @@ -688,13 +700,13 @@ static struct lookup * lookup_create(const uint8_t * id) pthread_condattr_destroy(&cattr); - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - list_add(&lu->next, &dht.lookups); + list_add(&lu->next, &dht->lookups); - lu->n_contacts = dht_contact_list(&lu->contacts, id); + lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return lu; @@ -773,7 +785,8 @@ static void lookup_destroy(struct lookup * lu) pthread_cleanup_pop(true); } -static void lookup_update(struct lookup * lu, +static void lookup_update(struct dht * dht, + struct lookup * lu, kad_msg_t * msg) { struct list_head * p = NULL; @@ -786,7 +799,7 @@ static void lookup_update(struct lookup * lu, assert(lu); assert(msg); - if (dht_get_state() != DHT_RUNNING) + if (dht_get_state(dht) != DHT_RUNNING) return; pthread_mutex_lock(&lu->lock); @@ -822,16 +835,16 @@ static void lookup_update(struct lookup * lu, pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); while (lu->state == LU_INIT) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); pthread_cond_wait(&lu->cond, &lu->lock); - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); } pthread_cleanup_pop(false); for (n = 0; n < msg->n_contacts; ++n) { c = contact_create(msg->contacts[n]->id.data, - dht.b, msg->contacts[n]->addr); + dht->b, msg->contacts[n]->addr); if (c == NULL) continue; @@ -840,7 +853,7 @@ static void lookup_update(struct lookup * lu, list_for_each(p, &lu->contacts) { struct contact * e; e = list_entry(p, struct contact, next); - if (!memcmp(e->id, c->id, dht.b)) { + if (!memcmp(e->id, c->id, dht->b)) { contact_destroy(c); c = NULL; break; @@ -855,11 +868,11 @@ static void lookup_update(struct lookup * lu, if (c == NULL) continue; - if (lu->n_contacts < dht.k) { + if (lu->n_contacts < dht->k) { list_add_tail(&c->next, p); ++lu->n_contacts; mod = true; - } else if (pos == dht.k) { + } else if (pos == dht->k) { contact_destroy(c); } else { struct contact * d; @@ -1007,12 +1020,15 @@ static enum lookup_state lookup_wait(struct lookup * lu) return state; } -static struct kad_req * dht_find_request(kad_msg_t * msg) +static struct kad_req * dht_find_request(struct dht * dht, + kad_msg_t * msg) { struct list_head * p; + + assert(dht); assert(msg); - list_for_each(p, &dht.requests) { + list_for_each(p, &dht->requests) { struct kad_req * r = list_entry(p, struct kad_req, next); if (r->cookie == msg->cookie) return r; @@ -1021,15 +1037,17 @@ static struct kad_req * dht_find_request(kad_msg_t * msg) return NULL; } -static struct lookup * dht_find_lookup(uint32_t cookie) +static struct lookup * dht_find_lookup(struct dht * dht, + uint32_t cookie) { struct list_head * p; struct list_head * p2; struct list_head * h2; + assert(dht); assert(cookie > 0); - list_for_each(p, &dht.lookups) { + list_for_each(p, &dht->lookups) { struct lookup * l = list_entry(p, struct lookup, next); pthread_mutex_lock(&l->lock); list_for_each_safe(p2, h2, &l->cookies) { @@ -1076,18 +1094,20 @@ static void val_destroy(struct val * v) free(v); } -static struct ref_entry * ref_entry_create(const uint8_t * key) +static struct ref_entry * ref_entry_create(struct dht * dht, + const uint8_t * key) { struct ref_entry * e; struct timespec t; + assert(dht); assert(key); e = malloc(sizeof(*e)); if (e == NULL) return NULL; - e->key = dht_dup_key(key, dht.b); + e->key = dht_dup_key(key, dht->b); if (e->key == NULL) { free(e); return NULL; @@ -1095,7 +1115,7 @@ static struct ref_entry * ref_entry_create(const uint8_t * key) clock_gettime(CLOCK_REALTIME_COARSE, &t); - e->t_rep = t.tv_sec + dht.t_repub; + e->t_rep = t.tv_sec + dht->t_repub; return e; } @@ -1106,10 +1126,12 @@ static void ref_entry_destroy(struct ref_entry * e) free(e); } -static struct dht_entry * dht_entry_create(const uint8_t * key) +static struct dht_entry * dht_entry_create(struct dht * dht, + const uint8_t * key) { struct dht_entry * e; + assert(dht); assert(key); e = malloc(sizeof(*e)); @@ -1121,7 +1143,7 @@ static struct dht_entry * dht_entry_create(const uint8_t * key) e->n_vals = 0; - e->key = dht_dup_key(key, dht.b); + e->key = dht_dup_key(key, dht->b); if (e->key == NULL) { free(e); return NULL; @@ -1204,7 +1226,8 @@ static void dht_entry_del_addr(struct dht_entry * e, } } -static uint64_t dht_entry_get_addr(struct dht_entry * e) +static uint64_t dht_entry_get_addr(struct dht * dht, + struct dht_entry * e) { struct list_head * p; @@ -1213,7 +1236,7 @@ static uint64_t dht_entry_get_addr(struct dht_entry * e) list_for_each(p, &e->vals) { struct val * v = list_entry(p, struct val, next); - if (v->addr != dht.addr) + if (v->addr != dht->addr) return v->addr; } @@ -1221,12 +1244,14 @@ static uint64_t dht_entry_get_addr(struct dht_entry * e) } /* Forward declaration. */ -static struct lookup * kad_lookup(const uint8_t * key, +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * key, enum kad_code code); /* Build a refresh list. */ -static void bucket_refresh(struct bucket * b, +static void bucket_refresh(struct dht * dht, + struct bucket * b, time_t t, struct list_head * r) { @@ -1234,7 +1259,7 @@ static void bucket_refresh(struct bucket * b, if (*b->children != NULL) for (i = 0; i < (1L << KAD_BETA); ++i) - bucket_refresh(b->children[i], t, r); + bucket_refresh(dht, b->children[i], t, r); if (b->n_contacts == 0) return; @@ -1243,7 +1268,7 @@ static void bucket_refresh(struct bucket * b, struct contact * c; struct contact * d; c = list_first_entry(&b->contacts, struct contact, next); - d = contact_create(c->id, dht.b, c->addr); + d = contact_create(c->id, dht->b, c->addr); if (c != NULL) list_add(&d->next, r); return; @@ -1377,7 +1402,8 @@ static int split_bucket(struct bucket * b) } /* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(const uint8_t * id, +static int dht_update_bucket(struct dht * dht, + const uint8_t * id, uint64_t addr) { struct list_head * p; @@ -1385,11 +1411,13 @@ static int dht_update_bucket(const uint8_t * id, struct bucket * b; struct contact * c; - b = dht_get_bucket(id); + assert(dht); + + b = dht_get_bucket(dht, id); if (b == NULL) return -1; - c = contact_create(id, dht.b, addr); + c = contact_create(id, dht->b, addr); if (c == NULL) return -1; @@ -1402,8 +1430,8 @@ static int dht_update_bucket(const uint8_t * id, } } - if (b->n_contacts == dht.k) { - if (bucket_has_id(b, dht.id)) { + if (b->n_contacts == dht->k) { + if (bucket_has_id(b, dht->id)) { list_add_tail(&c->next, &b->contacts); ++b->n_contacts; if (split_bucket(b)) { @@ -1411,7 +1439,7 @@ static int dht_update_bucket(const uint8_t * id, contact_destroy(c); --b->n_contacts; } - } else if (b->n_alts == dht.k) { + } else if (b->n_alts == dht->k) { struct contact * d; d = list_first_entry(&b->alts, struct contact, next); list_del(&d->next); @@ -1429,7 +1457,8 @@ static int dht_update_bucket(const uint8_t * id, return 0; } -static int send_msg(kad_msg_t * msg, +static int send_msg(struct dht * dht, + kad_msg_t * msg, uint64_t addr) { #ifndef __DHT_TEST__ @@ -1441,25 +1470,25 @@ static int send_msg(kad_msg_t * msg, if (msg->code == KAD_RESPONSE) retr = KAD_RESP_RETR; - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - if (dht.id != NULL) { + if (dht->id != NULL) { msg->has_s_id = true; - msg->s_id.data = dht.id; - msg->s_id.len = dht.b; + msg->s_id.data = dht->id; + msg->s_id.len = dht->b; } - msg->s_addr = dht.addr; + msg->s_addr = dht->addr; if (msg->code < KAD_STORE) { - msg->cookie = bmp_allocate(dht.cookies); - if (!bmp_is_id_valid(dht.cookies, msg->cookie)) { - pthread_rwlock_unlock(&dht.lock); + msg->cookie = bmp_allocate(dht->cookies); + if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { + pthread_rwlock_unlock(&dht->lock); goto fail_bmp_alloc; } } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); #ifndef __DHT_TEST__ len = kad_msg__get_packed_size(msg); @@ -1472,7 +1501,7 @@ static int send_msg(kad_msg_t * msg, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -1488,51 +1517,53 @@ static int send_msg(kad_msg_t * msg, (void) retr; #endif /* __DHT_TEST__ */ - if (msg->code < KAD_STORE && dht_get_state() != DHT_SHUTDOWN) - kad_req_create(msg, addr); + if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) + kad_req_create(dht, msg, addr); return msg->cookie; #ifndef __DHT_TEST__ fail_msg: - pthread_rwlock_wrlock(&dht.lock); - bmp_release(dht.cookies, msg->cookie); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); + bmp_release(dht->cookies, msg->cookie); + pthread_rwlock_unlock(&dht->lock); #endif /* !__DHT_TEST__ */ fail_bmp_alloc: return -1; } -static struct dht_entry * dht_find_entry(const uint8_t * key) +static struct dht_entry * dht_find_entry(struct dht * dht, + const uint8_t * key) { struct list_head * p; - list_for_each(p, &dht.entries) { + list_for_each(p, &dht->entries) { struct dht_entry * e = list_entry(p, struct dht_entry, next); - if (!memcmp(key, e->key, dht.b)) + if (!memcmp(key, e->key, dht->b)) return e; } return NULL; } -static int kad_add(const kad_contact_msg_t * contacts, +static int kad_add(struct dht * dht, + const kad_contact_msg_t * contacts, ssize_t n, time_t exp) { struct dht_entry * e; - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); while (n-- > 0) { - if (contacts[n].id.len != dht.b) + if (contacts[n].id.len != dht->b) log_warn("Bad key length in contact data."); - e = dht_find_entry(contacts[n].id.data); + e = dht_find_entry(dht, contacts[n].id.data); if (e != NULL) { if (dht_entry_add_addr(e, contacts[n].addr, exp)) goto fail; } else { - e = dht_entry_create(contacts[n].id.data); + e = dht_entry_create(dht, contacts[n].id.data); if (e == NULL) goto fail; @@ -1541,39 +1572,42 @@ static int kad_add(const kad_contact_msg_t * contacts, goto fail; } - list_add(&e->next, &dht.entries); + list_add(&e->next, &dht->entries); } } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; fail: - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return -ENOMEM; } -static int wait_resp(kad_msg_t * msg, +static int wait_resp(struct dht * dht, + kad_msg_t * msg, time_t timeo) { struct kad_req * req; + assert(dht); assert(msg); - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - req = dht_find_request(msg); + req = dht_find_request(dht, msg); if (req == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return -EPERM; } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return kad_req_wait(req, timeo); } -static int kad_store(const uint8_t * key, +static int kad_store(struct dht * dht, + const uint8_t * key, uint64_t addr, uint64_t r_addr, time_t ttl) @@ -1585,11 +1619,11 @@ static int kad_store(const uint8_t * key, cmsg.id.data = (uint8_t *) key; cmsg.addr = addr; - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - cmsg.id.len = dht.b; + cmsg.id.len = dht->b; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); cmsgp[0] = &cmsg; @@ -1599,37 +1633,39 @@ static int kad_store(const uint8_t * key, msg.n_contacts = 1; msg.contacts = cmsgp; - if (send_msg(&msg, r_addr) < 0) + if (send_msg(dht, &msg, r_addr) < 0) return -1; return 0; } -static ssize_t kad_find(struct lookup * lu, +static ssize_t kad_find(struct dht * dht, + struct lookup * lu, const uint64_t * addrs, enum kad_code code) { kad_msg_t msg = KAD_MSG__INIT; ssize_t sent = 0; + assert(dht); assert(lu->key); msg.code = code; msg.has_key = true; msg.key.data = (uint8_t *) lu->key; - msg.key.len = dht.b; + msg.key.len = dht->b; while (*addrs != 0) { struct cookie_el * c; int ret; - if (*addrs == dht.addr) { + if (*addrs == dht->addr) { ++addrs; continue; } - ret = send_msg(&msg, *addrs); + ret = send_msg(dht, &msg, *addrs); if (ret < 0) break; @@ -1652,36 +1688,38 @@ static ssize_t kad_find(struct lookup * lu, return sent; } -static void lookup_detach(struct lookup * lu) +static void lookup_detach(struct dht * dht, + struct lookup * lu) { - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); list_del(&lu->next); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); } -static struct lookup * kad_lookup( const uint8_t * id, +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * id, enum kad_code code) { uint64_t addrs[KAD_ALPHA + 1]; enum lookup_state state; struct lookup * lu; - lu = lookup_create(id); + lu = lookup_create(dht, id); if (lu == NULL) return NULL; lookup_new_addrs(lu, addrs); if (addrs[0] == 0) { - lookup_detach(lu); + lookup_detach(dht, lu); lookup_destroy(lu); return NULL; } - if (kad_find(lu, addrs, code) == 0) { - lookup_detach(lu); + if (kad_find(dht, lu, addrs, code) == 0) { + lookup_detach(dht, lu); return lu; } @@ -1692,10 +1730,10 @@ static struct lookup * kad_lookup( const uint8_t * id, if (addrs[0] == 0) break; - kad_find(lu, addrs, code); + kad_find(dht, lu, addrs, code); break; case LU_DESTROY: - lookup_detach(lu); + lookup_detach(dht, lu); lookup_set_state(lu, LU_NULL); return NULL; default: @@ -1705,12 +1743,13 @@ static struct lookup * kad_lookup( const uint8_t * id, assert(state == LU_COMPLETE); - lookup_detach(lu); + lookup_detach(dht, lu); return lu; } -static void kad_publish(const uint8_t * key, +static void kad_publish(struct dht * dht, + const uint8_t * key, uint64_t addr, time_t exp) { @@ -1721,20 +1760,21 @@ static void kad_publish(const uint8_t * key, time_t t_expire; + assert(dht); assert(key); - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - k = dht.k; - t_expire = dht.t_expire; + k = dht->k; + t_expire = dht->t_expire; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); addrs = malloc(k * sizeof(*addrs)); if (addrs == NULL) return; - lu = kad_lookup(key, KAD_FIND_NODE); + lu = kad_lookup(dht, key, KAD_FIND_NODE); if (lu == NULL) { free(addrs); return; @@ -1743,14 +1783,14 @@ static void kad_publish(const uint8_t * key, n = lookup_contact_addrs(lu, addrs); while (n-- > 0) { - if (addrs[n] == dht.addr) { + if (addrs[n] == dht->addr) { kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; msg.id.data = (uint8_t *) key; - msg.id.len = dht.b; + msg.id.len = dht->b; msg.addr = addr; - kad_add(&msg, 1, exp); + kad_add(dht, &msg, 1, exp); } else { - if (kad_store(key, addr, addrs[n], t_expire)) + if (kad_store(dht, key, addr, addrs[n], t_expire)) log_warn("Failed to send store message."); } } @@ -1760,7 +1800,8 @@ static void kad_publish(const uint8_t * key, free(addrs); } -static int kad_join(uint64_t addr) +static int kad_join(struct dht * dht, + uint64_t addr) { kad_msg_t msg = KAD_MSG__INIT; @@ -1776,43 +1817,44 @@ static int kad_join(uint64_t addr) msg.t_refresh = KAD_T_REFR; msg.t_replicate = KAD_T_REPL; - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - msg.b = dht.b; + msg.b = dht->b; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); - if (send_msg(&msg, addr) < 0) + if (send_msg(dht, &msg, addr) < 0) return -1; - if (wait_resp(&msg, KAD_T_JOIN) < 0) + if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) return -1; - dht.id = create_id(dht.b); - if (dht.id == NULL) + dht->id = create_id(dht->b); + if (dht->id == NULL) return -1; - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - dht_update_bucket(dht.id, dht.addr); + dht_update_bucket(dht, dht->id, dht->addr); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; } -static void dht_dead_peer(uint8_t * key, +static void dht_dead_peer(struct dht * dht, + uint8_t * key, uint64_t addr) { struct list_head * p; struct list_head * h; struct bucket * b; - b = dht_get_bucket(key); + b = dht_get_bucket(dht, key); list_for_each_safe(p, h, &b->contacts) { struct contact * c = list_entry(p, struct contact, next); - if (b->n_contacts + b->n_alts <= dht.k) { + if (b->n_contacts + b->n_alts <= dht->k) { ++c->fails; return; } @@ -1825,7 +1867,7 @@ static void dht_dead_peer(uint8_t * key, } } - while (b->n_contacts < dht.k && b->n_alts > 0) { + while (b->n_contacts < dht->k && b->n_alts > 0) { struct contact * c; c = list_first_entry(&b->alts, struct contact, next); list_del(&c->next); @@ -1835,27 +1877,29 @@ static void dht_dead_peer(uint8_t * key, } } -static int dht_del(const uint8_t * key, +static int dht_del(struct dht * dht, + const uint8_t * key, uint64_t addr) { struct dht_entry * e; - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - e = dht_find_entry(key); + e = dht_find_entry(dht, key); if (e == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return -EPERM; } dht_entry_del_addr(e, addr); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; } -static buffer_t dht_retrieve(const uint8_t * key) +static buffer_t dht_retrieve(struct dht * dht, + const uint8_t * key) { struct dht_entry * e; struct list_head * p; @@ -1863,9 +1907,9 @@ static buffer_t dht_retrieve(const uint8_t * key) uint64_t * pos; size_t addrs = 0; - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - e = dht_find_entry(key); + e = dht_find_entry(dht, key); if (e == NULL) goto fail; @@ -1873,7 +1917,7 @@ static buffer_t dht_retrieve(const uint8_t * key) if (buf.len == 0) goto fail; - pos = malloc(sizeof(dht.addr) * buf.len); + pos = malloc(sizeof(dht->addr) * buf.len); if (pos == NULL) goto fail; @@ -1886,18 +1930,19 @@ static buffer_t dht_retrieve(const uint8_t * key) break; } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return buf; fail: - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); buf.len = 0; return buf; } -static ssize_t dht_get_contacts(const uint8_t * key, +static ssize_t dht_get_contacts(struct dht * dht, + const uint8_t * key, kad_contact_msg_t *** msgs) { struct list_head l; @@ -1908,18 +1953,18 @@ static ssize_t dht_get_contacts(const uint8_t * key, list_head_init(&l); - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - len = dht_contact_list(&l, key); + len = dht_contact_list(dht, &l, key); if (len == 0) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); *msgs = NULL; return 0; } *msgs = malloc(len * sizeof(**msgs)); if (*msgs == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; } @@ -1927,7 +1972,7 @@ static ssize_t dht_get_contacts(const uint8_t * key, struct contact * c = list_entry(p, struct contact, next); (*msgs)[i] = malloc(sizeof(***msgs)); if ((*msgs)[i] == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); while (i > 0) free(*msgs[--i]); free(*msgs); @@ -1938,13 +1983,13 @@ static ssize_t dht_get_contacts(const uint8_t * key, kad_contact_msg__init((*msgs)[i]); (*msgs)[i]->id.data = c->id; - (*msgs)[i]->id.len = dht.b; + (*msgs)[i]->id.len = dht->b; (*msgs)[i++]->addr = c->addr; list_del(&c->next); free(c); } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return i; } @@ -1960,6 +2005,7 @@ static time_t gcd(time_t a, static void * work(void * o) { + struct dht * dht; struct timespec now; struct list_head * p; struct list_head * h; @@ -1967,46 +2013,46 @@ static void * work(void * o) time_t intv; struct lookup * lu; - (void) o; + dht = (struct dht *) o; - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - intv = gcd(dht.t_expire, dht.t_repub); + intv = gcd(dht->t_expire, dht->t_repub); intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); list_head_init(&reflist); while (true) { clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); /* Republish registered hashes. */ - list_for_each(p, &dht.refs) { + list_for_each(p, &dht->refs) { struct ref_entry * e; uint8_t * key; uint64_t addr; time_t t_expire; e = list_entry(p, struct ref_entry, next); if (now.tv_sec > e->t_rep) { - key = dht_dup_key(e->key, dht.b); + key = dht_dup_key(e->key, dht->b); if (key == NULL) continue; - addr = dht.addr; - t_expire = dht.t_expire; - e->t_rep = now.tv_sec + dht.t_repub; + addr = dht->addr; + t_expire = dht->t_expire; + e->t_rep = now.tv_sec + dht->t_repub; - pthread_rwlock_unlock(&dht.lock); - kad_publish(key, addr, t_expire); - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); + kad_publish(dht, key, addr, t_expire); + pthread_rwlock_wrlock(&dht->lock); free(key); } } /* Remove stale entries and republish if necessary. */ - list_for_each_safe(p, h, &dht.entries) { + list_for_each_safe(p, h, &dht->entries) { struct list_head * p1; struct list_head * h1; struct dht_entry * e; @@ -2024,39 +2070,39 @@ static void * work(void * o) } if (now.tv_sec > v->t_rep) { - key = dht_dup_key(e->key, dht.b); + key = dht_dup_key(e->key, dht->b); addr = v->addr; - t_expire = dht.t_expire = now.tv_sec; - v->t_rep = now.tv_sec + dht.t_replic; - pthread_rwlock_unlock(&dht.lock); - kad_publish(key, addr, t_expire); - pthread_rwlock_wrlock(&dht.lock); + t_expire = dht->t_expire = now.tv_sec; + v->t_rep = now.tv_sec + dht->t_replic; + pthread_rwlock_unlock(&dht->lock); + kad_publish(dht, key, addr, t_expire); + pthread_rwlock_wrlock(&dht->lock); free(key); } } } /* Check the requests list for unresponsive nodes. */ - list_for_each_safe(p, h, &dht.requests) { + list_for_each_safe(p, h, &dht->requests) { struct kad_req * r; r = list_entry(p, struct kad_req, next); if (now.tv_sec > r->t_exp) { list_del(&r->next); - bmp_release(dht.cookies, r->cookie); - dht_dead_peer(r->key, r->addr); + bmp_release(dht->cookies, r->cookie); + dht_dead_peer(dht, r->key, r->addr); kad_req_destroy(r); } } /* Refresh unaccessed buckets. */ - bucket_refresh(dht.buckets, now.tv_sec, &reflist); + bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); list_for_each_safe(p, h, &reflist) { struct contact * c; c = list_entry(p, struct contact, next); - lu = kad_lookup(c->id, KAD_FIND_NODE); + lu = kad_lookup(dht, c->id, KAD_FIND_NODE); if (lu != NULL) lookup_destroy(lu); list_del(&c->next); @@ -2069,9 +2115,11 @@ static void * work(void * o) return (void *) 0; } -static int kad_handle_join_resp(struct kad_req * req, +static int kad_handle_join_resp(struct dht * dht, + struct kad_req * req, kad_msg_t * msg) { + assert(dht); assert(req); assert(msg); @@ -2087,11 +2135,11 @@ static int kad_handle_join_resp(struct kad_req * req, return -1; } - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - dht.buckets = bucket_create(); - if (dht.buckets == NULL) { - pthread_rwlock_unlock(&dht.lock); + dht->buckets = bucket_create(); + if (dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); return -1; } @@ -2105,80 +2153,84 @@ static int kad_handle_join_resp(struct kad_req * req, if (msg->t_refresh != KAD_T_REFR) log_warn("Different kademlia refresh time detected."); - dht.k = msg->k; - dht.b = msg->b; - dht.t_expire = msg->t_expire; - dht.t_repub = MAX(1, dht.t_expire - 10); + dht->k = msg->k; + dht->b = msg->b; + dht->t_expire = msg->t_expire; + dht->t_repub = MAX(1, dht->t_expire - 10); - if (pthread_create(&dht.worker, NULL, work, NULL)) { - bucket_destroy(dht.buckets); - pthread_rwlock_unlock(&dht.lock); + if (pthread_create(&dht->worker, NULL, work, dht)) { + bucket_destroy(dht->buckets); + pthread_rwlock_unlock(&dht->lock); return -1; } kad_req_respond(req); - dht_update_bucket(msg->s_id.data, msg->s_addr); + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); log_dbg("Enrollment of DHT completed."); return 0; } -static int kad_handle_find_resp(struct kad_req * req, +static int kad_handle_find_resp(struct dht * dht, + struct kad_req * req, kad_msg_t * msg) { struct lookup * lu; + assert(dht); assert(req); assert(msg); - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - lu = dht_find_lookup(req->cookie); + lu = dht_find_lookup(dht, req->cookie); if (lu == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return -1; } - lookup_update(lu, msg); + lookup_update(dht, lu, msg); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; } -static void kad_handle_response(kad_msg_t * msg) +static void kad_handle_response(struct dht * dht, + kad_msg_t * msg) { struct kad_req * req; + assert(dht); assert(msg); - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - req = dht_find_request(msg); + req = dht_find_request(dht, msg); if (req == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return; } - bmp_release(dht.cookies, req->cookie); + bmp_release(dht->cookies, req->cookie); list_del(&req->next); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); switch(req->code) { case KAD_JOIN: - if (kad_handle_join_resp(req, msg)) + if (kad_handle_join_resp(dht, req, msg)) log_err("Enrollment of DHT failed."); break; case KAD_FIND_VALUE: case KAD_FIND_NODE: - if (dht_get_state() != DHT_RUNNING) + if (dht_get_state(dht) != DHT_RUNNING) break; - kad_handle_find_resp(req, msg); + kad_handle_find_resp(dht, req, msg); break; default: break; @@ -2187,154 +2239,176 @@ static void kad_handle_response(kad_msg_t * msg) kad_req_destroy(req); } -int dht_bootstrap() +int dht_bootstrap(void * dir) { - pthread_rwlock_wrlock(&dht.lock); + struct dht * dht; -#ifndef __DHT_TEST__ - dht.b = hash_len(ipcpi.dir_hash_algo); -#else - dht.b = DHT_TEST_KEY_LEN; -#endif - dht.t_expire = 86400; /* 1 day */ - dht.t_repub = dht.t_expire - 10; - dht.k = KAD_K; + dht = (struct dht *) dir; + + assert(dht); - dht.id = create_id(dht.b); - if (dht.id == NULL) + pthread_rwlock_wrlock(&dht->lock); + + dht->id = create_id(dht->b); + if (dht->id == NULL) goto fail_id; - dht.buckets = bucket_create(); - if (dht.buckets == NULL) + dht->buckets = bucket_create(); + if (dht->buckets == NULL) goto fail_buckets; - dht.buckets->depth = 0; - dht.buckets->mask = 0; + dht->buckets->depth = 0; + dht->buckets->mask = 0; +#ifndef __DHT_TEST__ + dht->b = hash_len(ipcpi.dir_hash_algo); +#else + dht->b = DHT_TEST_KEY_LEN; +#endif + dht->t_expire = 86400; /* 1 day */ + dht->t_repub = dht->t_expire - 10; + dht->k = KAD_K; - if (pthread_create(&dht.worker, NULL, work, NULL)) + if (pthread_create(&dht->worker, NULL, work, dht)) goto fail_pthread_create; - dht.state = DHT_RUNNING; + dht->state = DHT_RUNNING; - dht_update_bucket(dht.id, dht.addr); + dht_update_bucket(dht, dht->id, dht->addr); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; fail_pthread_create: - bucket_destroy(dht.buckets); - dht.buckets = NULL; + bucket_destroy(dht->buckets); + dht->buckets = NULL; fail_buckets: - free(dht.id); - dht.id = NULL; + free(dht->id); + dht->id = NULL; fail_id: - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return -1; } -static struct ref_entry * ref_entry_get(const uint8_t * key) +static struct ref_entry * ref_entry_get(struct dht * dht, + const uint8_t * key) { struct list_head * p; - list_for_each(p, &dht.refs) { + list_for_each(p, &dht->refs) { struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht. b) ) + if (!memcmp(key, r->key, dht-> b) ) return r; } return NULL; } -int dht_reg(const uint8_t * key) +int dht_reg(void * dir, + const uint8_t * key) { + struct dht * dht; struct ref_entry * e; uint64_t addr; time_t t_expire; + dht = (struct dht *) dir; + + assert(dht); assert(key); - assert(dht.addr != 0); + assert(dht->addr != 0); - if (dht_wait_running()) + if (dht_wait_running(dht)) return -1; - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - if (ref_entry_get(key) != NULL) { + if (ref_entry_get(dht, key) != NULL) { log_dbg("Name already registered."); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; } - e = ref_entry_create(key); + e = ref_entry_create(dht, key); if (e == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return -ENOMEM; } - list_add(&e->next, &dht.refs); + list_add(&e->next, &dht->refs); - t_expire = dht.t_expire; - addr = dht.addr; + t_expire = dht->t_expire; + addr = dht->addr; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); - kad_publish(key, addr, t_expire); + kad_publish(dht, key, addr, t_expire); return 0; } -int dht_unreg(const uint8_t * key) +int dht_unreg(void * dir, + const uint8_t * key) { + struct dht * dht; struct list_head * p; struct list_head * h; + dht = (struct dht *) dir; + + assert(dht); assert(key); - if (dht_get_state() != DHT_RUNNING) + if (dht_get_state(dht) != DHT_RUNNING) return -1; - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - list_for_each_safe(p, h, &dht.refs) { + list_for_each_safe(p, h, &dht->refs) { struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht. b) ) { + if (!memcmp(key, r->key, dht-> b) ) { list_del(&r->next); ref_entry_destroy(r); } } - dht_del(key, dht.addr); + dht_del(dht, key, dht->addr); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); return 0; } -uint64_t dht_query(const uint8_t * key) +uint64_t dht_query(void * dir, + const uint8_t * key) { + struct dht * dht; struct dht_entry * e; struct lookup * lu; uint64_t addrs[KAD_K]; size_t n; + dht = (struct dht *) dir; + + assert(dht); + addrs[0] = 0; - if (dht_wait_running()) + if (dht_wait_running(dht)) return 0; - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - e = dht_find_entry(key); + e = dht_find_entry(dht, key); if (e != NULL) - addrs[0] = dht_entry_get_addr(e); + addrs[0] = dht_entry_get_addr(dht, e); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); if (addrs[0] != 0) return addrs[0]; - lu = kad_lookup(key, KAD_FIND_VALUE); + lu = kad_lookup(dht, key, KAD_FIND_VALUE); if (lu == NULL) return 0; @@ -2347,7 +2421,7 @@ uint64_t dht_query(const uint8_t * key) lookup_destroy(lu); /* Current behaviour is anycast and return the first peer address. */ - if (addrs[0] != dht.addr) + if (addrs[0] != dht->addr) return addrs[0]; if (n > 1) @@ -2358,7 +2432,9 @@ uint64_t dht_query(const uint8_t * key) static void * dht_handle_packet(void * o) { - (void) o; + struct dht * dht = (struct dht *) o; + + assert(dht); while (true) { kad_msg_t * msg; @@ -2371,14 +2447,14 @@ static void * dht_handle_packet(void * o) size_t t_expire; struct cmd * cmd; - pthread_mutex_lock(&dht.mtx); + pthread_mutex_lock(&dht->mtx); - pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); - while (list_is_empty(&dht.cmds)) - pthread_cond_wait(&dht.cond, &dht.mtx); + while (list_is_empty(&dht->cmds)) + pthread_cond_wait(&dht->cond, &dht->mtx); - cmd = list_last_entry(&dht.cmds, struct cmd, next); + cmd = list_last_entry(&dht->cmds, struct cmd, next); list_del(&cmd->next); pthread_cleanup_pop(true); @@ -2396,18 +2472,18 @@ static void * dht_handle_packet(void * o) continue; } - if (msg->code != KAD_RESPONSE && dht_wait_running()) { + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { kad_msg__free_unpacked(msg, NULL); log_dbg("Got a request message when not running."); continue; } - pthread_rwlock_rdlock(&dht.lock); + pthread_rwlock_rdlock(&dht->lock); - b = dht.b; - t_expire = dht.t_expire; + b = dht->b; + t_expire = dht->t_expire; - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); if (msg->has_key && msg->key.len != b) { kad_msg__free_unpacked(msg, NULL); @@ -2422,7 +2498,7 @@ static void * dht_handle_packet(void * o) continue; } - tpm_dec(dht.tpm); + tpm_dec(dht->tpm); addr = msg->s_addr; @@ -2465,7 +2541,7 @@ static void * dht_handle_packet(void * o) resp_msg.t_replicate = KAD_T_REPL; break; case KAD_FIND_VALUE: - buf = dht_retrieve(msg->key.data); + buf = dht_retrieve(dht, msg->key.data); if (buf.len != 0) { resp_msg.n_addrs = buf.len; resp_msg.addrs = (uint64_t *) buf.data; @@ -2475,7 +2551,7 @@ static void * dht_handle_packet(void * o) case KAD_FIND_NODE: /* Return k closest contacts. */ resp_msg.n_contacts = - dht_get_contacts(msg->key.data, &cmsgs); + dht_get_contacts(dht, msg->key.data, &cmsgs); resp_msg.contacts = cmsgs; break; case KAD_STORE: @@ -2489,11 +2565,11 @@ static void * dht_handle_packet(void * o) break; } - kad_add(*msg->contacts, msg->n_contacts, + kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); break; case KAD_RESPONSE: - kad_handle_response(msg); + kad_handle_response(dht, msg); break; default: assert(false); @@ -2501,19 +2577,19 @@ static void * dht_handle_packet(void * o) } if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht.lock); - if (dht_get_state() == DHT_JOINING && - dht.buckets == NULL) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); + if (dht_get_state(dht) == DHT_JOINING && + dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); goto finish; } - if (dht_update_bucket(msg->s_id.data, addr)) + if (dht_update_bucket(dht, msg->s_id.data, addr)) log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); } - if (msg->code < KAD_STORE && send_msg(&resp_msg, addr) < 0) + if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) log_warn("Failed to send response."); finish: @@ -2523,7 +2599,7 @@ static void * dht_handle_packet(void * o) free(resp_msg.addrs); if (resp_msg.n_contacts == 0) { - tpm_inc(dht.tpm); + tpm_inc(dht->tpm); continue; } @@ -2532,22 +2608,19 @@ static void * dht_handle_packet(void * o) NULL); free(resp_msg.contacts); - tpm_inc(dht.tpm); + tpm_inc(dht->tpm); } return (void *) 0; } -static void dht_post_packet(void * o, +static void dht_post_packet(void * comp, struct shm_du_buff * sdb) { struct cmd * cmd; + struct dht * dht = (struct dht *) comp; - (void) o; - - assert(o == &dht); - - if (dht_get_state() == DHT_SHUTDOWN) { + if (dht_get_state(dht) == DHT_SHUTDOWN) { #ifndef __DHT_TEST__ ipcp_sdb_release(sdb); #endif @@ -2562,34 +2635,39 @@ static void dht_post_packet(void * o, cmd->sdb = sdb; - pthread_mutex_lock(&dht.mtx); + pthread_mutex_lock(&dht->mtx); - list_add(&cmd->next, &dht.cmds); + list_add(&cmd->next, &dht->cmds); - pthread_cond_signal(&dht.cond); + pthread_cond_signal(&dht->cond); - pthread_mutex_unlock(&dht.mtx); + pthread_mutex_unlock(&dht->mtx); } -void dht_fini() +void dht_destroy(void * dir) { + struct dht * dht; struct list_head * p; struct list_head * h; + dht = (struct dht *) dir; + if (dht == NULL) + return; + #ifndef __DHT_TEST__ - tpm_stop(dht.tpm); + tpm_stop(dht->tpm); - tpm_destroy(dht.tpm); + tpm_destroy(dht->tpm); #endif - if (dht_get_state() == DHT_RUNNING) { - dht_set_state(DHT_SHUTDOWN); - pthread_cancel(dht.worker); - pthread_join(dht.worker, NULL); + if (dht_get_state(dht) == DHT_RUNNING) { + dht_set_state(dht, DHT_SHUTDOWN); + pthread_cancel(dht->worker); + pthread_join(dht->worker, NULL); } - pthread_rwlock_wrlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); - list_for_each_safe(p, h, &dht.cmds) { + list_for_each_safe(p, h, &dht->cmds) { struct cmd * c = list_entry(p, struct cmd, next); list_del(&c->next); #ifndef __DHT_TEST__ @@ -2598,42 +2676,44 @@ void dht_fini() free(c); } - list_for_each_safe(p, h, &dht.entries) { + list_for_each_safe(p, h, &dht->entries) { struct dht_entry * e = list_entry(p, struct dht_entry, next); list_del(&e->next); dht_entry_destroy(e); } - list_for_each_safe(p, h, &dht.requests) { + list_for_each_safe(p, h, &dht->requests) { struct kad_req * r = list_entry(p, struct kad_req, next); list_del(&r->next); kad_req_destroy(r); } - list_for_each_safe(p, h, &dht.refs) { + list_for_each_safe(p, h, &dht->refs) { struct ref_entry * e = list_entry(p, struct ref_entry, next); list_del(&e->next); ref_entry_destroy(e); } - list_for_each_safe(p, h, &dht.lookups) { + list_for_each_safe(p, h, &dht->lookups) { struct lookup * l = list_entry(p, struct lookup, next); list_del(&l->next); lookup_destroy(l); } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); + + if (dht->buckets != NULL) + bucket_destroy(dht->buckets); - if (dht.buckets != NULL) - bucket_destroy(dht.buckets); + bmp_destroy(dht->cookies); - bmp_destroy(dht.cookies); + pthread_mutex_destroy(&dht->mtx); - pthread_mutex_destroy(&dht.mtx); + pthread_rwlock_destroy(&dht->lock); - pthread_rwlock_destroy(&dht.lock); + free(dht->id); - free(dht.id); + free(dht); } static void * join_thr(void * o) @@ -2644,14 +2724,14 @@ static void * join_thr(void * o) assert(info); - while (kad_join(info->addr)) { - if (dht_get_state() == DHT_SHUTDOWN) { + while (kad_join(info->dht, info->addr)) { + if (dht_get_state(info->dht) == DHT_SHUTDOWN) { log_dbg("DHT enrollment aborted."); goto finish; } if (retr++ == KAD_JOIN_RETR) { - dht_set_state(DHT_INIT); + dht_set_state(info->dht, DHT_INIT); log_warn("DHT enrollment attempt failed."); goto finish; } @@ -2659,9 +2739,9 @@ static void * join_thr(void * o) sleep(KAD_JOIN_INTV); } - dht_set_state(DHT_RUNNING); + dht_set_state(info->dht, DHT_RUNNING); - lu = kad_lookup(dht.id, KAD_FIND_NODE); + lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); if (lu != NULL) lookup_destroy(lu); @@ -2675,7 +2755,7 @@ static void handle_event(void * self, int event, const void * o) { - (void) self; + struct dht * dht = (struct dht *) self; if (event == NOTIFY_DT_CONN_ADD) { pthread_t thr; @@ -2686,18 +2766,19 @@ static void handle_event(void * self, /* Give the pff some time to update for the new link. */ nanosleep(&slack, NULL); - switch(dht_get_state()) { + switch(dht_get_state(dht)) { case DHT_INIT: inf = malloc(sizeof(*inf)); if (inf == NULL) break; + inf->dht = dht; inf->addr = c->conn_info.addr; - if (dht_set_state(DHT_JOINING) == 0 || - dht_wait_running()) { + if (dht_set_state(dht, DHT_JOINING) == 0 || + dht_wait_running(dht)) { if (pthread_create(&thr, NULL, join_thr, inf)) { - dht_set_state(DHT_INIT); + dht_set_state(dht, DHT_INIT); free(inf); return; } @@ -2711,7 +2792,7 @@ static void handle_event(void * self, * FIXME: this lookup for effiency reasons * causes a SEGV when stressed with rapid * enrollments. - * lu = kad_lookup(dht, dht.id, KAD_FIND_NODE); + * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); * if (lu != NULL) * lookup_destroy(lu); */ @@ -2722,65 +2803,73 @@ static void handle_event(void * self, } } -int dht_init() +void * dht_create(void) { - dht.buckets = NULL; + struct dht * dht; - list_head_init(&dht.entries); - list_head_init(&dht.requests); - list_head_init(&dht.refs); - list_head_init(&dht.lookups); - list_head_init(&dht.cmds); + dht = malloc(sizeof(*dht)); + if (dht == NULL) + goto fail_malloc; + + dht->buckets = NULL; - if (pthread_rwlock_init(&dht.lock, NULL)) + list_head_init(&dht->entries); + list_head_init(&dht->requests); + list_head_init(&dht->refs); + list_head_init(&dht->lookups); + list_head_init(&dht->cmds); + + if (pthread_rwlock_init(&dht->lock, NULL)) goto fail_rwlock; - if (pthread_mutex_init(&dht.mtx, NULL)) + if (pthread_mutex_init(&dht->mtx, NULL)) goto fail_mutex; - if (pthread_cond_init(&dht.cond, NULL)) + if (pthread_cond_init(&dht->cond, NULL)) goto fail_cond; - dht.cookies = bmp_create(DHT_MAX_REQS, 1); - if (dht.cookies == NULL) + dht->cookies = bmp_create(DHT_MAX_REQS, 1); + if (dht->cookies == NULL) goto fail_bmp; - dht.b = 0; - dht.id = NULL; + dht->b = 0; + dht->id = NULL; #ifndef __DHT_TEST__ - dht.addr = ipcpi.dt_addr; - dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); - if (dht.tpm == NULL) + dht->addr = ipcpi.dt_addr; + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); + if (dht->tpm == NULL) goto fail_tpm_create; - if (tpm_start(dht.tpm)) + if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT); - if ((int) dht.eid < 0) + dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); + if ((int) dht->eid < 0) goto fail_tpm_start; - notifier_reg(handle_event, NULL); + notifier_reg(handle_event, dht); #else (void) handle_event; (void) dht_handle_packet; (void) dht_post_packet; #endif - dht.state = DHT_INIT; + dht->state = DHT_INIT; - return 0; + return (void *) dht; #ifndef __DHT_TEST__ fail_tpm_start: - tpm_destroy(dht.tpm); + tpm_destroy(dht->tpm); fail_tpm_create: - bmp_destroy(dht.cookies); + bmp_destroy(dht->cookies); #endif fail_bmp: - pthread_cond_destroy(&dht.cond); + pthread_cond_destroy(&dht->cond); fail_cond: - pthread_mutex_destroy(&dht.mtx); + pthread_mutex_destroy(&dht->mtx); fail_mutex: - pthread_rwlock_destroy(&dht.lock); + pthread_rwlock_destroy(&dht->lock); fail_rwlock: - return -1; + free(dht); + fail_malloc: + return NULL; } diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h index c34cf1c4..a6e9c2c8 100644 --- a/src/ipcpd/unicast/dir/dht.h +++ b/src/ipcpd/unicast/dir/dht.h @@ -30,19 +30,22 @@ #include <stdint.h> #include <sys/types.h> -int dht_init(void); +void * dht_create(void); -int dht_bootstrap(void); +void dht_destroy(void * dir); -void dht_fini(void); +int dht_bootstrap(void * dir); -int dht_reg(const uint8_t * key); +int dht_reg(void * dir, + const uint8_t * key); -int dht_unreg(const uint8_t * key); +int dht_unreg(void * dir, + const uint8_t * key); -uint64_t dht_query(const uint8_t * key); +uint64_t dht_query(void * dir, + const uint8_t * key); -int dht_wait_running(void); +int dht_wait_running(void * dir); extern struct dir_ops dht_dir_ops; diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h index 7eabb680..e74324da 100644 --- a/src/ipcpd/unicast/dir/ops.h +++ b/src/ipcpd/unicast/dir/ops.h @@ -23,20 +23,24 @@ #ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H #define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H + struct dir_ops { - int (* init)(void); + void * (* create)(void); - void (* fini)(void); + void (* destroy)(void * dir); - int (* bootstrap)(void); + int (* bootstrap)(void * dir); - int (* reg)(const uint8_t * hash); + int (* reg)(void * dir, + const uint8_t * hash); - int (* unreg)(const uint8_t * hash); + int (* unreg)(void * dir, + const uint8_t * hash); - uint64_t (* query)(const uint8_t * hash); + uint64_t (* query)(void * dir, + const uint8_t * hash); - int (* wait_running)(void); + int (* wait_running)(void * dir); }; #endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */ diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c index 70773ea7..3f4c3b87 100644 --- a/src/ipcpd/unicast/dir/tests/dht_test.c +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -23,7 +23,6 @@ #define __DHT_TEST__ #define DHT_TEST_KEY_LEN 32 - #include "dht.c" #include <pthread.h> @@ -31,45 +30,49 @@ #include <stdlib.h> #include <stdio.h> -#define CONTACTS 1000 +#define CONTACTS 1000 int dht_test(int argc, char ** argv) { + struct dht * dht; uint8_t key[DHT_TEST_KEY_LEN]; size_t i; (void) argc; (void) argv; - if (dht_init() < 0) { + dht = dht_create(); + if (dht == NULL) { printf("Failed to create dht.\n"); return -1; } - dht_fini(); + dht_destroy(dht); - if (dht_init() < 0) { + dht = dht_create(); + if (dht == NULL) { printf("Failed to re-create dht.\n"); return -1; } - if (dht_bootstrap()) { + if (dht_bootstrap(dht)) { printf("Failed to bootstrap dht.\n"); - dht_fini(); + dht_destroy(dht); return -1; } - dht_fini(); + dht_destroy(dht); - if (dht_init() < 0) { + dht = dht_create(); + if (dht == NULL) { printf("Failed to re-create dht.\n"); return -1; } - if (dht_bootstrap()) { + if (dht_bootstrap(dht)) { printf("Failed to bootstrap dht.\n"); - dht_fini(); + dht_destroy(dht); return -1; } @@ -77,17 +80,17 @@ int dht_test(int argc, uint64_t addr; random_buffer(&addr, sizeof(addr)); random_buffer(key, DHT_TEST_KEY_LEN); - pthread_rwlock_wrlock(&dht.lock); - if (dht_update_bucket(key, addr)) { - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, key, addr)) { + pthread_rwlock_unlock(&dht->lock); printf("Failed to update bucket.\n"); - dht_fini(); + dht_destroy(dht); return -1; } - pthread_rwlock_unlock(&dht.lock); + pthread_rwlock_unlock(&dht->lock); } - dht_fini(); + dht_destroy(dht); return 0; } |