diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-07-18 11:07:43 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-07-18 11:07:43 +0000 |
commit | aff9a39ade6ec39be0b0de7c360a66ed4c6a1e1b (patch) | |
tree | adf6a907d87f11668ccdf0f2df83efd93068f292 /src/ipcpd/normal/dht.c | |
parent | a295fd7b24c86f071061aa15e7c82c2463b001b5 (diff) | |
parent | 6e739b09bef860a4830328630ea07622bdd79d79 (diff) | |
download | ouroboros-aff9a39ade6ec39be0b0de7c360a66ed4c6a1e1b.tar.gz ouroboros-aff9a39ade6ec39be0b0de7c360a66ed4c6a1e1b.zip |
Merged in dstaesse/ouroboros/be-dht (pull request #529)
ipcpd: Add DHT as directory in normal IPCP
Diffstat (limited to 'src/ipcpd/normal/dht.c')
-rw-r--r-- | src/ipcpd/normal/dht.c | 2369 |
1 files changed, 2369 insertions, 0 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c new file mode 100644 index 00000000..0b00e2f5 --- /dev/null +++ b/src/ipcpd/normal/dht.c @@ -0,0 +1,2369 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#define OUROBOROS_PREFIX "dht" + +#include <ouroboros/config.h> +#include <ouroboros/hash.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/random.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> + +#include "dht.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +#include "kademlia.pb-c.h" +typedef KadMsg kad_msg_t; +typedef KadContactMsg kad_contact_msg_t; + +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 6 /* Response time to wait for a join. */ +#define KAD_T_RESP 2 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ + +enum dht_state { + DHT_INIT = 0, + DHT_RUNNING, + DHT_SHUTDOWN, +}; + +enum kad_code { + KAD_JOIN = 0, + KAD_FIND_NODE, + KAD_FIND_VALUE, + /* Messages without a response below. */ + KAD_STORE, + KAD_RESPONSE +}; + +enum kad_req_state { + REQ_NULL = 0, + REQ_INIT, + REQ_PENDING, + REQ_RESPONSE, + REQ_DONE, + REQ_DESTROY +}; + +enum lookup_state { + LU_NULL = 0, + LU_INIT, + LU_PENDING, + LU_UPDATE, + LU_COMPLETE, + LU_DONE, + LU_DESTROY +}; + +struct kad_req { + struct list_head next; + + uint32_t cookie; + enum kad_code code; + uint8_t * key; + uint64_t addr; + + enum kad_req_state state; + pthread_cond_t cond; + pthread_mutex_t lock; + + time_t t_exp; +}; + +struct lookup { + struct list_head next; + + uint8_t * key; + + struct list_head contacts; + size_t n_contacts; + + uint64_t * addrs; + size_t n_addrs; + + enum lookup_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct val { + struct list_head next; + + uint64_t addr; + + time_t t_exp; + time_t t_rep; +}; + +struct ref_entry { + struct list_head next; + + uint8_t * key; + + time_t t_rep; +}; + +struct dht_entry { + struct list_head next; + + uint8_t * key; + size_t n_vals; + struct list_head vals; +}; + +struct contact { + struct list_head next; + + uint8_t * id; + uint64_t addr; + + size_t fails; + time_t t_seen; +}; + +struct bucket { + struct list_head contacts; + size_t n_contacts; + + struct list_head alts; + size_t n_alts; + + time_t t_refr; + + size_t depth; + uint8_t mask; + + struct bucket * parent; + struct bucket * children[1L << KAD_BETA]; +}; + +struct dht { + size_t alpha; + size_t b; + size_t k; + + time_t t_expire; + time_t t_refresh; + time_t t_replic; + time_t t_repub; + + uint8_t * id; + uint64_t addr; + + struct bucket * buckets; + + struct list_head entries; + + struct list_head refs; + + struct list_head lookups; + + struct list_head requests; + struct bmp * cookies; + + enum dht_state state; + pthread_mutex_t mtx; + + pthread_rwlock_t lock; + + int fd; + + pthread_t worker; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, + size_t len) +{ + uint8_t * dup; + + dup = malloc(sizeof(*dup) * len); + if (dup == NULL) + return NULL; + + memcpy(dup, key, len); + + return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ + enum dht_state state; + + pthread_mutex_lock(&dht->mtx); + + state = dht->state; + + pthread_mutex_unlock(&dht->mtx); + + return state; +} + +static void dht_set_state(struct dht * dht, + enum dht_state state) +{ + pthread_mutex_lock(&dht->mtx); + + dht->state = state; + + pthread_mutex_unlock(&dht->mtx); +} + +static uint8_t * create_id(size_t len) +{ + uint8_t * id; + + id = malloc(len); + if (id == NULL) + return NULL; + + if (random_buffer(id, len) < 0) { + free(id); + return NULL; + } + + return id; +} + +static struct kad_req * kad_req_create(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) +{ + struct kad_req * req; + pthread_condattr_t cattr; + struct timespec t; + + req = malloc(sizeof(*req)); + if (req == NULL) + return NULL; + + list_head_init(&req->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + req->t_exp = t.tv_sec + KAD_T_RESP; + req->addr = addr; + req->state = REQ_INIT; + req->cookie = msg->cookie; + req->code = msg->code; + req->key = NULL; + + if (msg->has_key) { + req->key = dht_dup_key(msg->key.data, dht->b); + if (req->key == NULL) { + free(req); + return NULL; + } + } + + if (pthread_mutex_init(&req->lock, NULL)) { + free(req->key); + free(req); + return NULL; + } + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&req->cond, &cattr)) { + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&req->lock); + free(req->key); + free(req); + return NULL; + } + + pthread_condattr_destroy(&cattr); + + return req; +} + +static void kad_req_destroy(struct kad_req * req) +{ + assert(req); + + if (req->key != NULL) + free(req->key); + + pthread_mutex_lock(&req->lock); + + switch (req->state) { + case REQ_DESTROY: + pthread_mutex_unlock(&req->lock); + return; + case REQ_PENDING: + req->state = REQ_DESTROY; + pthread_cond_signal(&req->cond); + break; + case REQ_INIT: + case REQ_DONE: + req->state = REQ_NULL; + break; + case REQ_RESPONSE: + case REQ_NULL: + default: + break; + } + + while (req->state != REQ_NULL) + pthread_cond_wait(&req->cond, &req->lock); + + pthread_mutex_unlock(&req->lock); + + pthread_cond_destroy(&req->cond); + pthread_mutex_destroy(&req->lock); + + free(req); +} + +static int kad_req_wait(struct kad_req * req, + time_t t) +{ + struct timespec timeo = {t, 0}; + struct timespec abs; + int ret = 0; + + assert(req); + + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&abs, &timeo, &abs); + + pthread_mutex_lock(&req->lock); + + req->state = REQ_PENDING; + + while (req->state == REQ_PENDING && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + + switch(req->state) { + case REQ_DESTROY: + ret = -1; + req->state = REQ_NULL; + pthread_cond_signal(&req->cond); + break; + case REQ_PENDING: /* ETIMEDOUT */ + case REQ_RESPONSE: + req->state = REQ_DONE; + pthread_cond_signal(&req->cond); + break; + default: + break; + } + + pthread_mutex_unlock(&req->lock); + + return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ + pthread_mutex_lock(&req->lock); + + req->state = REQ_RESPONSE; + pthread_cond_signal(&req->cond); + + pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, + size_t len, + uint64_t addr) +{ + struct contact * c; + struct timespec t; + + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; + + list_head_init(&c->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id, len); + if (c->id == NULL) { + free(c); + return NULL; + } + + return c; +} + +static void contact_destroy(struct contact * c) +{ + if (c != NULL) + free(c->id); + + free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) +{ + uint8_t byte; + uint8_t mask; + + assert(b); + + if (b->children[0] == NULL) + return b; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + + return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht * dht, + const uint8_t * id) +{ + assert(dht->buckets); + + return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, + const uint8_t * dst) +{ + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, + struct contact * c, + const uint8_t * key) +{ + struct list_head * p; + + assert(l); + assert(c); + assert(key); + assert(c->id); + + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (dist(c->id, key) > dist(e->id, key)) + break; + } + + list_add_tail(&c->next, p); + + return 1; +} + +static size_t dht_contact_list(struct dht * dht, + struct list_head * l, + const uint8_t * key) +{ + struct list_head * p; + struct bucket * b; + size_t len = 0; + size_t i; + struct timespec t; + + assert(l); + assert(dht); + assert(key); + assert(list_is_empty(l)); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + b = dht_get_bucket(dht, key); + if (b == NULL) + return 0; + + b->t_refr = t.tv_sec + KAD_T_REFR; + + if (b->n_contacts == dht->k || b->parent == NULL) { + list_for_each(p, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (list_add_sorted(l, c, key) == 1) + if (++len > dht->k) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << KAD_BETA); ++i) { + list_for_each(p, &d->children[i]->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (c == NULL) + continue; + if (list_add_sorted(l, c, key) == 1) + if (++len > dht->k) + break; + } + } + } + + assert(len == dht->k || b->parent == NULL); + + return len; +} + +static struct lookup * lookup_create(struct dht * dht, + const uint8_t * id) +{ + struct lookup * lu; + pthread_condattr_t cattr; + + assert(dht); + assert(id); + + lu = malloc(sizeof(*lu)); + if (lu == NULL) + goto fail_malloc; + + list_head_init(&lu->contacts); + + lu->state = LU_INIT; + lu->addrs = NULL; + lu->n_addrs = 0; + lu->key = dht_dup_key(id, dht->b); + if (lu->key == NULL) + goto fail_id; + + if (pthread_mutex_init(&lu->lock, NULL)) + goto fail_mutex; + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&lu->cond, &cattr)) + goto fail_cond; + + pthread_condattr_destroy(&cattr); + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&lu->next, &dht->lookups); + + lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + + pthread_rwlock_unlock(&dht->lock); + + return lu; + + fail_cond: + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&lu->lock); + fail_mutex: + free(lu->key); + fail_id: + free(lu); + fail_malloc: + return NULL; +} + +static void lookup_destroy(struct lookup * lu) +{ + struct list_head * p; + struct list_head * h; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + switch (lu->state) { + case LU_DESTROY: + pthread_mutex_unlock(&lu->lock); + return; + case LU_PENDING: + lu->state = LU_DESTROY; + pthread_cond_signal(&lu->cond); + break; + case LU_INIT: + case LU_DONE: + case LU_UPDATE: + case LU_COMPLETE: + lu->state = REQ_NULL; + break; + case LU_NULL: + default: + break; + } + + while (lu->state != LU_NULL) + pthread_cond_wait(&lu->cond, &lu->lock); + + if (lu->key != NULL) + free(lu->key); + if (lu->addrs != NULL) + free(lu->addrs); + + list_for_each_safe(p, h, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } + + pthread_mutex_unlock(&lu->lock); + + pthread_cond_destroy(&lu->cond); + pthread_mutex_destroy(&lu->lock); + + free(lu); +} + +static void lookup_update(struct dht * dht, + struct lookup * lu, + kad_msg_t * msg) +{ + struct list_head * p = NULL; + struct contact * c = NULL; + size_t n; + size_t pos = 0; + + assert(lu); + assert(msg); + + if (dht_get_state(dht) != DHT_RUNNING) + return; + + pthread_mutex_lock(&lu->lock); + + if (msg->n_addrs > 0) { + if (lu->addrs == NULL) { + lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); + for (n = 0; n < msg->n_addrs; ++n) + lu->addrs[n] = msg->addrs[n]; + lu->n_addrs = msg->n_addrs; + } + lu->state = LU_COMPLETE; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; + } + + while (lu->state == LU_INIT) + pthread_cond_wait(&lu->cond, &lu->lock); + + for (n = 0; n < msg->n_contacts; ++n) { + c = contact_create(msg->contacts[n]->id.data, + dht->b, msg->contacts[n]->addr); + if (c == NULL) + continue; + + list_for_each(p, &lu->contacts) { + struct contact * e; + e = list_entry(p, struct contact, next); + if (!memcmp(e->id, c->id, dht->b)) { + contact_destroy(c); + goto finish_node; + } + + if (dist(c->id, lu->key) > dist(e->id, lu->key)) + break; + pos++; + } + + } + + if (pos == dht->k) { + contact_destroy(c); + goto finish_node; + } else { + struct contact * d; + d = list_last_entry(&lu->contacts, struct contact, next); + list_del(&d->next); + list_add_tail(&c->next, p); + contact_destroy(d); + } + + finish_node: + lu->state = LU_UPDATE; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, + uint64_t * addrs) +{ + ssize_t n; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + for (n = 0; (size_t) n < lu->n_addrs; ++n) + addrs[n] = lu->addrs[n]; + + assert((size_t) n == lu->n_addrs); + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + ssize_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + addrs[n] = c->addr; + n++; + } + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static ssize_t lookup_new_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + ssize_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + /* Uses fails to check if the contact has been contacted. */ + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (c->fails == 0) { + c->fails = 1; + addrs[n] = c->addr; + n++; + } + + if (n == KAD_ALPHA) + break; + } + + if (n == 0) + lu->state = LU_DONE; + + pthread_mutex_unlock(&lu->lock); + + assert(n <= KAD_ALPHA); + + return n; +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ + enum lookup_state state; + + pthread_mutex_lock(&lu->lock); + + lu->state = LU_PENDING; + pthread_cond_signal(&lu->cond); + + pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); + + while (lu->state == LU_PENDING) + pthread_cond_wait(&lu->cond, &lu->lock); + + pthread_cleanup_pop(false); + + if (lu->state == LU_DESTROY) { + lu->state = LU_NULL; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return -1; + } + + state = lu->state; + + pthread_mutex_unlock(&lu->lock); + + return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, + kad_msg_t * msg) +{ + struct list_head * p; + + assert(dht); + assert(msg); + + list_for_each(p, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + if (r->cookie == msg->cookie) + return r; + } + + return NULL; +} + +static struct lookup * dht_find_lookup(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + assert(dht); + assert(key); + + list_for_each(p, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + if (!memcmp(l->key, key, dht->b)) + return l; + } + + return NULL; +} + +static struct val * val_create(uint64_t addr, + time_t exp) +{ + struct val * v; + struct timespec t; + + v = malloc(sizeof(*v)); + if (v == NULL) + return NULL; + + list_head_init(&v->next); + v->addr = addr; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + + return v; +} + +static void val_destroy(struct val * v) +{ + assert(v); + + free(v); +} + +static struct ref_entry * ref_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + struct timespec t; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + e->t_rep = t.tv_sec + dht->t_repub; + + return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ + free(e->key); + free(e); +} + +static struct dht_entry * dht_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + list_head_init(&e->next); + list_head_init(&e->vals); + + e->n_vals = 0; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + list_del(&v->next); + val_destroy(v); + } + + free(e->key); + + free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, + uint64_t addr, + time_t exp) +{ + struct list_head * p; + struct val * val; + struct timespec t; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + if (v->t_exp < t.tv_sec + exp) { + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + } + + return 0; + } + } + + val = val_create(addr, exp); + if (val == NULL) + return -ENOMEM; + + list_add(&val->next, &e->vals); + ++e->n_vals; + + return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + list_del(&v->next); + val_destroy(v); + --e->n_vals; + } + } + + if (e->n_vals == 0) { + list_del(&e->next); + dht_entry_destroy(e); + } +} + +static uint64_t dht_entry_get_addr(struct dht * dht, + struct dht_entry * e) +{ + struct list_head * p; + + assert(e); + assert(!list_is_empty(&e->vals)); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr != dht->addr) + return v->addr; + } + + return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * key, + enum kad_code code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht * dht, + struct bucket * b, + time_t t, + struct list_head * r) +{ + size_t i; + + if (*b->children != NULL) + for (i = 0; i < (1L << KAD_BETA); ++i) + bucket_refresh(dht, b->children[i], t, r); + + if (b->n_contacts == 0) + return; + + if (t > b->t_refr) { + struct contact * c; + struct contact * d; + c = list_first_entry(&b->contacts, struct contact, next); + d = contact_create(c->id, dht->b, c->addr); + if (c != NULL) + list_add(&d->next, r); + return; + } +} + + +static struct bucket * bucket_create(void) +{ + struct bucket * b; + struct timespec t; + size_t i; + + b = malloc(sizeof(*b)); + if (b == NULL) + return NULL; + + list_head_init(&b->contacts); + b->n_contacts = 0; + + list_head_init(&b->alts); + b->n_alts = 0; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + b->t_refr = t.tv_sec + KAD_T_REFR; + + for (i = 0; i < (1L << KAD_BETA); ++i) + b->children[i] = NULL; + + b->parent = NULL; + b->depth = 0; + + return b; +} + +static void bucket_destroy(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + size_t i; + + assert(b); + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i] != NULL) + bucket_destroy(b->children[i]); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + list_for_each_safe(p, h, &b->alts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + free(b); +} + +static bool bucket_has_id(struct bucket * b, + const uint8_t * id) +{ + uint8_t mask; + uint8_t byte; + + if (b->depth == 0) + return true; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + + return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + uint8_t mask = 0; + size_t i; + size_t c; + + assert(b); + assert(b->n_alts == 0); + assert(b->n_contacts); + assert(b->children[0] == NULL); + + c = b->n_contacts; + + for (i = 0; i < (1L << KAD_BETA); ++i) { + b->children[i] = bucket_create(); + if (b->children[i] == NULL) { + size_t j; + for (j = 0; j < i; ++j) + bucket_destroy(b->children[j]); + return -1; + } + + b->children[i]->depth = b->depth + 1; + b->children[i]->mask = mask; + b->children[i]->parent = b; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + if (bucket_has_id(b->children[i], c->id)) { + list_del(&c->next); + --b->n_contacts; + list_add(&c->next, &b->children[i]->contacts); + ++b->children[i]->n_contacts; + } + } + + mask++; + } + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i]->n_contacts == c) + split_bucket(b->children[i]); + + return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht * dht, + const uint8_t * id, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + struct contact * c; + + assert(dht); + + b = dht_get_bucket(dht, id); + if (b == NULL) + return -1; + + c = contact_create(id, dht->b, addr); + if (c == NULL) + return -1; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * d = list_entry(p, struct contact, next); + if (d->addr == addr) { + list_del(&d->next); + contact_destroy(d); + --b->n_contacts; + } + } + + if (b->n_contacts == dht->k) { + if (bucket_has_id(b, dht->id)) { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + if (split_bucket(b)) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + } else if (b->n_alts == dht->k) { + struct contact * d; + d = list_first_entry(&b->alts, struct contact, next); + list_del(&d->next); + contact_destroy(d); + list_add_tail(&c->next, &b->alts); + } else { + list_add_tail(&c->next, &b->alts); + ++b->n_alts; + } + } else { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + } + + return 0; +} + +static int send_msg(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) +{ + struct shm_du_buff * sdb; + struct kad_req * req; + size_t len; + + pthread_rwlock_wrlock(&dht->lock); + + if (dht->id != NULL) { + msg->has_s_id = true; + msg->s_id.data = dht->id; + msg->s_id.len = dht->b; + } + + msg->s_addr = dht->addr; + + if (msg->code < KAD_STORE) { + msg->cookie = bmp_allocate(dht->cookies); + if (!bmp_is_id_valid(dht->cookies, msg->cookie)) + goto fail_bmp_alloc; + } + + len = kad_msg__get_packed_size(msg); + if (len == 0) + goto fail_msg; + + if (ipcp_sdb_reserve(&sdb, len)) + goto fail_msg; + + kad_msg__pack(msg, shm_du_buff_head(sdb)); + +#ifndef __DHT_TEST__ + if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb)) + goto fail_write; +#else + (void) addr; + ipcp_sdb_release(sdb); +#endif /* __DHT_TEST__ */ + + if (msg->code < KAD_STORE) { + req = kad_req_create(dht, msg, addr); + if (req != NULL) + list_add(&req->next, &dht->requests); + } + + pthread_rwlock_unlock(&dht->lock); + + return 0; + +#ifndef __DHT_TEST__ + fail_write: + ipcp_sdb_release(sdb); +#endif + fail_msg: + bmp_release(dht->cookies, msg->cookie); + fail_bmp_alloc: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +static struct dht_entry * dht_find_entry(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + list_for_each(p, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht->b)) + return e; + } + + return NULL; +} + +static int kad_add(struct dht * dht, + const kad_contact_msg_t * contacts, + ssize_t n, + time_t exp) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + while (--n >= 0) { + if (contacts[n].id.len != dht->b) + log_warn("Bad key length in contact data."); + + e = dht_find_entry(dht, contacts[n].id.data); + if (e != NULL) { + if (dht_entry_add_addr(e, contacts[n].addr, exp)) + goto fail; + } else { + e = dht_entry_create(dht, contacts[n].id.data); + if (e == NULL) + goto fail; + + if (dht_entry_add_addr(e, contacts[n].addr, exp)) { + dht_entry_destroy(e); + goto fail; + } + + list_add(&e->next, &dht->entries); + } + } + + pthread_rwlock_unlock(&dht->lock); + return 0; + + fail: + pthread_rwlock_unlock(&dht->lock); + return -ENOMEM; +} + +static int wait_resp(struct dht * dht, + kad_msg_t * msg, + time_t timeo) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + pthread_rwlock_unlock(&dht->lock); + + return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht * dht, + const uint8_t * key, + uint64_t addr, + uint64_t r_addr, + time_t ttl) +{ + kad_msg_t msg = KAD_MSG__INIT; + kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; + kad_contact_msg_t * cmsgp[1]; + + cmsg.id.data = (uint8_t *) key; + cmsg.id.len = dht->b; + cmsg.addr = addr; + + cmsgp[0] = &cmsg; + + msg.code = KAD_STORE; + msg.has_t_expire = true; + msg.t_expire = ttl; + msg.n_contacts = 1; + msg.contacts = cmsgp; + + if (send_msg(dht, &msg, r_addr)) + return -1; + + return 0; +} + +static ssize_t kad_find(struct dht * dht, + const uint8_t * key, + const uint64_t * addrs, + enum kad_code code) +{ + kad_msg_t msg = KAD_MSG__INIT; + ssize_t sent = 0; + + assert(dht); + assert(key); + + msg.code = code; + + msg.has_key = true; + msg.key.data = (uint8_t *) key; + msg.key.len = dht->b; + + while (*addrs != 0) { + if (*addrs != dht->addr) { + send_msg(dht, &msg, *addrs); + sent++; + } + ++addrs; + } + + return sent; +} + +static void lookup_set_state(struct lookup * lu, + enum lookup_state state) +{ + pthread_mutex_lock(&lu->lock); + + lu->state = state; + + pthread_mutex_unlock(&lu->lock); +} + +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * id, + enum kad_code code) +{ + uint64_t addrs[KAD_ALPHA + 1]; + enum lookup_state state; + struct lookup * lu; + + lu = lookup_create(dht, id); + if (lu == NULL) + return NULL; + + addrs[lookup_new_addrs(lu, addrs)] = 0; + + if (addrs[0] == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lookup_destroy(lu); + return NULL; + } + + if (kad_find(dht, id, addrs, code) == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lu->state = LU_COMPLETE; + return lu; + } + + while ((state = lookup_wait(lu)) != LU_COMPLETE) { + switch (state) { + case LU_UPDATE: + addrs[lookup_new_addrs(lu, addrs)] = 0; + if (addrs[0] == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + return lu; + } + + kad_find(dht, id, addrs, code); + break; + case LU_DESTROY: + lookup_set_state(lu, LU_NULL); + return NULL; + default: + break; + }; + } + + assert(state = LU_COMPLETE); + + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + + return lu; +} + +static void kad_publish(struct dht * dht, + const uint8_t * key, + uint64_t addr, + time_t exp) +{ + struct lookup * lu; + uint64_t addrs[KAD_K]; + ssize_t n; + + assert(dht); + assert(key); + + lu = kad_lookup(dht, key, KAD_FIND_NODE); + if (lu == NULL) + return; + + n = lookup_contact_addrs(lu, addrs); + + while (--n > 0) { + if (addrs[n] == dht->addr) { + kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; + msg.id.data = (uint8_t *) key; + msg.id.len = dht->b; + msg.addr = addr; + kad_add(dht, &msg, 1, exp); + } else { + if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) + log_warn("Failed to send store message."); + } + } + + lookup_destroy(lu); +} + +static int kad_join(struct dht * dht, + uint64_t addr) +{ + kad_msg_t msg = KAD_MSG__INIT; + struct lookup * lu; + + msg.code = KAD_JOIN; + + msg.has_alpha = true; + msg.has_b = true; + msg.has_k = true; + msg.has_t_refresh = true; + msg.has_t_replicate = true; + msg.alpha = KAD_ALPHA; + msg.b = dht->b; + msg.k = KAD_K; + msg.t_refresh = KAD_T_REFR; + msg.t_replicate = KAD_T_REPL; + + if (send_msg(dht, &msg, addr)) + return -1; + + if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) + return -1; + + dht->id = create_id(dht->b); + if (dht->id == NULL) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + if (lu == NULL) + log_warn("Join response not yet added."); + else + lookup_destroy(lu); + + return 0; +} + +static void dht_dead_peer(struct dht * dht, + uint8_t * key, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + + b = dht_get_bucket(dht, key); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (b->n_contacts + b->n_alts <= dht->k) { + ++c->fails; + return; + } + + if (c->addr == addr) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + break; + } + } + + while (b->n_contacts < dht->k && b->n_alts > 0) { + struct contact * c; + c = list_first_entry(&b->alts, struct contact, next); + list_del(&c->next); + --b->n_alts; + list_add(&c->next, &b->contacts); + ++b->n_contacts; + } +} + +static int dht_del(struct dht * dht, + const uint8_t * key, + uint64_t addr) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + dht_entry_del_addr(e, addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static buffer_t dht_retrieve(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct list_head * p; + buffer_t buf; + uint64_t * pos; + + buf.len = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return buf; + } + + buf.data = malloc(sizeof(dht->addr) * e->n_vals); + if (buf.data == NULL) { + pthread_rwlock_unlock(&dht->lock); + return buf; + } + + buf.len = e->n_vals; + + pos = (uint64_t *) buf.data;; + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + *pos++ = v->addr; + } + + pthread_rwlock_unlock(&dht->lock); + + return buf; +} + +static ssize_t dht_get_contacts(struct dht * dht, + const uint8_t * key, + kad_contact_msg_t *** msgs) +{ + struct list_head l; + struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + list_head_init(&l); + + pthread_rwlock_rdlock(&dht->lock); + + len = dht_contact_list(dht, &l, key); + if (len == 0) + return 0; + + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) + return 0; + + list_for_each_safe(p, h, &l) { + struct contact * c = list_entry(p, struct contact, next); + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) { + pthread_rwlock_unlock(&dht->lock); + while (i > 0) + free(*msgs[--i]); + free(*msgs); + return 0; + } + + kad_contact_msg__init((*msgs)[i]); + + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht->b; + (*msgs)[i++]->addr = c->addr; + list_del(&c->next); + free(c); + } + + pthread_rwlock_unlock(&dht->lock); + + return i; +} + +static time_t gcd(time_t a, + time_t b) +{ + if (a == 0) + return b; + + return gcd(b % a, a); +} + +static void * work(void * o) +{ + struct dht * dht; + struct timespec now; + struct list_head * p; + struct list_head * h; + struct list_head reflist; + time_t intv; + struct lookup * lu; + + dht = (struct dht *) o; + + intv = gcd(dht->t_expire, dht->t_repub); + intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + + list_head_init(&reflist); + + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht->lock); + + /* Republish registered hashes. */ + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e; + e = list_entry(p, struct ref_entry, next); + if (now.tv_sec > e->t_rep) { + kad_publish(dht, e->key, dht->addr, + dht->t_expire); + e->t_rep = now.tv_sec + dht->t_repub; + } + } + + /* Remove stale entries and republish if necessary. */ + list_for_each_safe(p, h, &dht->entries) { + struct list_head * p1; + struct list_head * h1; + struct dht_entry * e; + e = list_entry (p, struct dht_entry, next); + list_for_each_safe(p1, h1, &e->vals) { + struct val * v; + v = list_entry(p1, struct val, next); + if (now.tv_sec > v->t_exp) { + list_del(&v->next); + val_destroy(v); + } + + if (now.tv_sec > v->t_rep) { + kad_publish(dht, e->key, v->addr, + dht->t_expire - now.tv_sec); + v->t_rep = now.tv_sec + dht->t_replic; + } + } + } + + /* Check the requests list for unresponsive nodes. */ + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r; + r = list_entry(p, struct kad_req, next); + if (now.tv_sec > r->t_exp) { + list_del(&r->next); + bmp_release(dht->cookies, r->cookie); + dht_dead_peer(dht, r->key, r->addr); + kad_req_destroy(r); + } + } + + /* Refresh unaccessed buckets. */ + bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + + pthread_rwlock_unlock(&dht->lock); + + list_for_each_safe(p, h, &reflist) { + struct contact * c; + c = list_entry(p, struct contact, next); + lu = kad_lookup(dht, c->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + list_del(&c->next); + contact_destroy(c); + } + + sleep(intv); + } + + return (void *) 0; +} + +static int kad_handle_join_resp(struct dht * dht, + struct kad_req * req, + kad_msg_t * msg) +{ + assert(dht); + assert(req); + assert(msg); + + /* We might send version numbers later to warn of updates if needed. */ + if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && + msg->has_t_refresh && msg->has_t_replicate)) { + log_warn("Join refused by remote."); + return -1; + } + + if (msg->b < sizeof(uint64_t)) { + log_err("Hash sizes less than 8 bytes unsupported."); + return -1; + } + + pthread_rwlock_wrlock(&dht->lock); + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + /* Likely corrupt packet. The member will refuse, we might here too. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) + log_warn("Different kademlia parameters detected."); + + if (msg->t_replicate != KAD_T_REPL) + log_warn("Different kademlia replication time detected."); + + if (msg->t_refresh != KAD_T_REFR) + log_warn("Different kademlia refresh time detected."); + + dht->k = msg->k; + dht->b = msg->b; + dht->t_expire = msg->t_expire; + dht->t_repub = MAX(1, dht->t_expire - 10); + + if (pthread_create(&dht->worker, NULL, work, dht)) { + bucket_destroy(dht->buckets); + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + dht->state = DHT_RUNNING; + + kad_req_respond(req); + + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + + pthread_rwlock_unlock(&dht->lock); + + log_dbg("Enrollment of DHT completed."); + + return 0; +} + +static int kad_handle_find_resp(struct dht * dht, + struct kad_req * req, + kad_msg_t * msg) +{ + struct lookup * lu; + + assert(dht); + assert(req); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + lu = dht_find_lookup(dht, req->key); + if (lu == NULL) { + log_dbg("Response for unknown lookup."); + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + lookup_update(dht, lu, msg); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static void kad_handle_response(struct dht * dht, + kad_msg_t * msg) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_wrlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return; + } + + bmp_release(dht->cookies, req->cookie); + list_del(&req->next); + + pthread_rwlock_unlock(&dht->lock); + + switch(req->code) { + case KAD_JOIN: + if (kad_handle_join_resp(dht, req, msg)) + log_err("Enrollment of DHT failed."); + break; + case KAD_FIND_VALUE: + case KAD_FIND_NODE: + if (dht_get_state(dht) != DHT_RUNNING) + return; + if (kad_handle_find_resp(dht, req, msg)) + log_dbg("Invalid or outdated response."); + break; + default: + break; + } + + kad_req_destroy(req); +} + +int dht_bootstrap(struct dht * dht, + size_t b, + time_t t_expire) +{ + assert(dht); + + pthread_rwlock_wrlock(&dht->lock); + + dht->id = create_id(b); + if (dht->id == NULL) + goto fail_id; + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) + goto fail_buckets; + + dht->buckets->depth = 0; + dht->buckets->mask = 0; + + dht->b = b / CHAR_BIT; + dht->t_expire = MAX(2, t_expire); + dht->t_repub = MAX(1, t_expire - 10); + dht->k = KAD_K; + + if (pthread_create(&dht->worker, NULL, work, dht)) + goto fail_pthread_create; + + dht->state = DHT_RUNNING; + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; + + fail_pthread_create: + bucket_destroy(dht->buckets); + dht->buckets = NULL; + fail_buckets: + free(dht->id); + dht->id = NULL; + fail_id: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +int dht_enroll(struct dht * dht, + uint64_t addr) +{ + assert(dht); + + return kad_join(dht, addr); +} + +int dht_reg(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + + assert(dht); + assert(key); + assert(dht->addr != 0); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + e = ref_entry_create(dht, key); + if (e == NULL) + return -ENOMEM; + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&e->next, &dht->refs); + + pthread_rwlock_unlock(&dht->lock); + + kad_publish(dht, key, dht->addr, dht->t_expire); + + return 0; +} + +int dht_unreg(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + struct list_head * h; + + assert(dht); + assert(key); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * r = list_entry(p, struct ref_entry, next); + if (!memcmp(key, r->key, dht-> b) ) { + list_del(&r->next); + ref_entry_destroy(r); + } + } + + dht_del(dht, key, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +uint64_t dht_query(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct lookup * lu; + uint64_t addrs[KAD_K]; + size_t n; + + addrs[0] = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e != NULL) + addrs[0] = dht_entry_get_addr(dht, e); + + pthread_rwlock_unlock(&dht->lock); + + if (addrs[0] != 0 && addrs[0] != dht->addr) + return addrs[0]; + + lu = kad_lookup(dht, key, KAD_FIND_VALUE); + if (lu == NULL) + return 0; + + n = lookup_get_addrs(lu, addrs); + if (n == 0) { + lookup_destroy(lu); + return 0; + } + + lookup_destroy(lu); + + /* Current behaviour is anycast and return the first peer address. */ + if (addrs[0] != dht->addr) + return addrs[0]; + + if (n > 1) + return addrs[1]; + + return 0; +} + +void dht_post_sdu(void * ae, + struct shm_du_buff * sdb) +{ + struct dht * dht; + kad_msg_t * msg; + kad_contact_msg_t ** cmsgs; + kad_msg_t resp_msg = KAD_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + + assert(ae); + assert(sdb); + + memset(&buf, 0, sizeof(buf)); + + dht = (struct dht *) ae; + + msg = kad_msg__unpack(NULL, + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + shm_du_buff_head(sdb)); + + ipcp_sdb_release(sdb); + + if (msg == NULL) { + log_err("Failed to unpack message."); + return; + } + + if (msg->has_key && msg->key.len != dht->b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + return; + } + + if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", msg->code); + return; + } + + if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { + kad_msg__free_unpacked(msg, NULL); + return; + } + + addr = msg->s_addr; + + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; + + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } + + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); + + break; + } + + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = dht->b; + resp_msg.k = KAD_K; + resp_msg.t_expire = dht->t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; + break; + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } + + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); + break; + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); + break; + } + + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); + } + + if (msg->code < KAD_STORE) + send_msg(dht, &resp_msg, addr); + + kad_msg__free_unpacked(msg, NULL); + + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); + + if (resp_msg.n_contacts == 0) + return; + + for (i = 0; i < resp_msg.n_contacts; ++i) + kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); + free(resp_msg.contacts); +} + +void dht_destroy(struct dht * dht) +{ + struct list_head * p; + struct list_head * h; + + if (dht == NULL) + return; + + if (dht_get_state(dht) == DHT_RUNNING) + dht_set_state(dht, DHT_SHUTDOWN); + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + list_del(&r->next); + free(r); + } + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e = list_entry(p, struct ref_entry, next); + list_del(&e->next); + ref_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + list_del(&l->next); + lookup_destroy(l); + } + + pthread_rwlock_unlock(&dht->lock); + + if (dht_get_state(dht) == DHT_SHUTDOWN) { + pthread_cancel(dht->worker); + pthread_join(dht->worker, NULL); + } + + if (dht->buckets != NULL) + bucket_destroy(dht->buckets); + + bmp_destroy(dht->cookies); + + pthread_mutex_destroy(&dht->mtx); + + pthread_rwlock_destroy(&dht->lock); + + free(dht->id); + + free(dht); +} + +struct dht * dht_create(uint64_t addr) +{ + struct dht * dht; + + dht = malloc(sizeof(*dht)); + if (dht == NULL) + goto fail_malloc; + + dht->buckets = NULL; + + list_head_init(&dht->entries); + list_head_init(&dht->requests); + list_head_init(&dht->refs); + list_head_init(&dht->lookups); + + if (pthread_rwlock_init(&dht->lock, NULL)) + goto fail_rwlock; + + if (pthread_mutex_init(&dht->mtx, NULL)) + goto fail_mutex; + + dht->cookies = bmp_create(DHT_MAX_REQS, 1); + if (dht->cookies == NULL) + goto fail_bmp; + + dht->b = 0; + dht->addr = addr; + dht->id = NULL; +#ifndef __DHT_TEST__ + dht->fd = dt_reg_ae(dht, &dht_post_sdu); +#endif /* __DHT_TEST__ */ + + dht->state = DHT_INIT; + + return dht; + + fail_bmp: + pthread_mutex_destroy(&dht->mtx); + fail_mutex: + pthread_rwlock_destroy(&dht->lock); + fail_rwlock: + free(dht); + fail_malloc: + return NULL; +} |