summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-07-13 09:43:09 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-07-18 13:03:05 +0200
commit6e739b09bef860a4830328630ea07622bdd79d79 (patch)
tree205ea90bd2f59a0a707c7b4a14df2a54fd7b4a50
parent0bcb3ab0804bbfd31d056c08548cb40591598f4b (diff)
downloadouroboros-6e739b09bef860a4830328630ea07622bdd79d79.tar.gz
ouroboros-6e739b09bef860a4830328630ea07622bdd79d79.zip
ipcpd: Add DHT as directory in normal IPCP
This implements a Distributed Hash Table (DHT) based on the Kademlia protocol, with default parameters set as used in the BitTorrent Mainline DHT. This initial implementation is almost feature complete, except for some things to be done after a testing period: caching and stale peer bumping, and setting the expiration timeout via the IRM tool.
-rw-r--r--include/ouroboros/config.h.in5
-rw-r--r--src/ipcpd/ipcp.c1
-rw-r--r--src/ipcpd/normal/CMakeLists.txt6
-rw-r--r--src/ipcpd/normal/dht.c2369
-rw-r--r--src/ipcpd/normal/dht.h54
-rw-r--r--src/ipcpd/normal/dir.c167
-rw-r--r--src/ipcpd/normal/dir.h10
-rw-r--r--src/ipcpd/normal/dt.c14
-rw-r--r--src/ipcpd/normal/dt.h2
-rw-r--r--src/ipcpd/normal/fa.c61
-rw-r--r--src/ipcpd/normal/kademlia.proto46
-rw-r--r--src/ipcpd/normal/main.c28
-rw-r--r--src/ipcpd/normal/pol/flat.c4
-rw-r--r--src/ipcpd/normal/ribconfig.h2
-rw-r--r--src/ipcpd/normal/ribmgr.c3
-rw-r--r--src/ipcpd/normal/tests/CMakeLists.txt37
-rw-r--r--src/ipcpd/normal/tests/dht_test.c99
17 files changed, 2745 insertions, 163 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index bae2d89e..1b8e0db6 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -3,7 +3,8 @@
*
* Configuration information
*
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -68,7 +69,7 @@
#define IRMD_REQ_ARR_TIMEOUT 500
#define IRMD_FLOW_TIMEOUT 5000
#define IPCP_ACCEPT_TIMEOUT 100
-#define SOCKET_TIMEOUT 4000
+#define SOCKET_TIMEOUT 10000
#define CDAP_REPLY_TIMEOUT 1000
#define ENROLL_TIMEOUT 2000
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 4aec7567..84e339ab 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -260,7 +260,6 @@ static void * ipcp_main_loop(void * o)
ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dst_name,
&info);
-
if (ret_msg.result == 0) {
ret_msg.dif_info = &dif_info;
dif_info.dir_hash_algo = info.dir_hash_algo;
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 336b0e8f..8c2d4efc 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -15,6 +15,8 @@ include_directories(${CMAKE_BINARY_DIR}/include)
set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto)
+protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)
+
# Add GPB sources of policies last
protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto)
@@ -22,6 +24,7 @@ set(SOURCE_FILES
# Add source files here
addr_auth.c
connmgr.c
+ dht.c
dir.c
dt.c
dt_pci.c
@@ -42,7 +45,7 @@ set(SOURCE_FILES
)
add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${FLOW_ALLOC_SRCS} ${FSO_SRCS})
+ ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS})
target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros)
include(AddCompileFlags)
@@ -53,3 +56,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug)
install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)
add_subdirectory(pol/tests)
+add_subdirectory(tests)
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
new file mode 100644
index 00000000..0b00e2f5
--- /dev/null
+++ b/src/ipcpd/normal/dht.c
@@ -0,0 +1,2369 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Distributed Hash Table based on Kademlia
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#define OUROBOROS_PREFIX "dht"
+
+#include <ouroboros/config.h>
+#include <ouroboros/hash.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/list.h>
+#include <ouroboros/random.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/utils.h>
+
+#include "dht.h"
+#include "dt.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <inttypes.h>
+
+#include "kademlia.pb-c.h"
+typedef KadMsg kad_msg_t;
+typedef KadContactMsg kad_contact_msg_t;
+
+#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */
+#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */
+#define KAD_K 8 /* Replication factor, MDHT value. */
+#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */
+#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */
+#define KAD_T_JOIN 6 /* Response time to wait for a join. */
+#define KAD_T_RESP 2 /* Response time to wait for a response. */
+#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */
+#define KAD_QUEER 15 /* Time to declare peer questionable. */
+#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
+
+enum dht_state {
+ DHT_INIT = 0,
+ DHT_RUNNING,
+ DHT_SHUTDOWN,
+};
+
+enum kad_code {
+ KAD_JOIN = 0,
+ KAD_FIND_NODE,
+ KAD_FIND_VALUE,
+ /* Messages without a response below. */
+ KAD_STORE,
+ KAD_RESPONSE
+};
+
+enum kad_req_state {
+ REQ_NULL = 0,
+ REQ_INIT,
+ REQ_PENDING,
+ REQ_RESPONSE,
+ REQ_DONE,
+ REQ_DESTROY
+};
+
+enum lookup_state {
+ LU_NULL = 0,
+ LU_INIT,
+ LU_PENDING,
+ LU_UPDATE,
+ LU_COMPLETE,
+ LU_DONE,
+ LU_DESTROY
+};
+
+struct kad_req {
+ struct list_head next;
+
+ uint32_t cookie;
+ enum kad_code code;
+ uint8_t * key;
+ uint64_t addr;
+
+ enum kad_req_state state;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+
+ time_t t_exp;
+};
+
+struct lookup {
+ struct list_head next;
+
+ uint8_t * key;
+
+ struct list_head contacts;
+ size_t n_contacts;
+
+ uint64_t * addrs;
+ size_t n_addrs;
+
+ enum lookup_state state;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+};
+
+struct val {
+ struct list_head next;
+
+ uint64_t addr;
+
+ time_t t_exp;
+ time_t t_rep;
+};
+
+struct ref_entry {
+ struct list_head next;
+
+ uint8_t * key;
+
+ time_t t_rep;
+};
+
+struct dht_entry {
+ struct list_head next;
+
+ uint8_t * key;
+ size_t n_vals;
+ struct list_head vals;
+};
+
+struct contact {
+ struct list_head next;
+
+ uint8_t * id;
+ uint64_t addr;
+
+ size_t fails;
+ time_t t_seen;
+};
+
+struct bucket {
+ struct list_head contacts;
+ size_t n_contacts;
+
+ struct list_head alts;
+ size_t n_alts;
+
+ time_t t_refr;
+
+ size_t depth;
+ uint8_t mask;
+
+ struct bucket * parent;
+ struct bucket * children[1L << KAD_BETA];
+};
+
+struct dht {
+ size_t alpha;
+ size_t b;
+ size_t k;
+
+ time_t t_expire;
+ time_t t_refresh;
+ time_t t_replic;
+ time_t t_repub;
+
+ uint8_t * id;
+ uint64_t addr;
+
+ struct bucket * buckets;
+
+ struct list_head entries;
+
+ struct list_head refs;
+
+ struct list_head lookups;
+
+ struct list_head requests;
+ struct bmp * cookies;
+
+ enum dht_state state;
+ pthread_mutex_t mtx;
+
+ pthread_rwlock_t lock;
+
+ int fd;
+
+ pthread_t worker;
+};
+
+static uint8_t * dht_dup_key(const uint8_t * key,
+ size_t len)
+{
+ uint8_t * dup;
+
+ dup = malloc(sizeof(*dup) * len);
+ if (dup == NULL)
+ return NULL;
+
+ memcpy(dup, key, len);
+
+ return dup;
+}
+
+static enum dht_state dht_get_state(struct dht * dht)
+{
+ enum dht_state state;
+
+ pthread_mutex_lock(&dht->mtx);
+
+ state = dht->state;
+
+ pthread_mutex_unlock(&dht->mtx);
+
+ return state;
+}
+
+static void dht_set_state(struct dht * dht,
+ enum dht_state state)
+{
+ pthread_mutex_lock(&dht->mtx);
+
+ dht->state = state;
+
+ pthread_mutex_unlock(&dht->mtx);
+}
+
+static uint8_t * create_id(size_t len)
+{
+ uint8_t * id;
+
+ id = malloc(len);
+ if (id == NULL)
+ return NULL;
+
+ if (random_buffer(id, len) < 0) {
+ free(id);
+ return NULL;
+ }
+
+ return id;
+}
+
+static struct kad_req * kad_req_create(struct dht * dht,
+ kad_msg_t * msg,
+ uint64_t addr)
+{
+ struct kad_req * req;
+ pthread_condattr_t cattr;
+ struct timespec t;
+
+ req = malloc(sizeof(*req));
+ if (req == NULL)
+ return NULL;
+
+ list_head_init(&req->next);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ req->t_exp = t.tv_sec + KAD_T_RESP;
+ req->addr = addr;
+ req->state = REQ_INIT;
+ req->cookie = msg->cookie;
+ req->code = msg->code;
+ req->key = NULL;
+
+ if (msg->has_key) {
+ req->key = dht_dup_key(msg->key.data, dht->b);
+ if (req->key == NULL) {
+ free(req);
+ return NULL;
+ }
+ }
+
+ if (pthread_mutex_init(&req->lock, NULL)) {
+ free(req->key);
+ free(req);
+ return NULL;
+ }
+
+ pthread_condattr_init(&cattr);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+
+ if (pthread_cond_init(&req->cond, &cattr)) {
+ pthread_condattr_destroy(&cattr);
+ pthread_mutex_destroy(&req->lock);
+ free(req->key);
+ free(req);
+ return NULL;
+ }
+
+ pthread_condattr_destroy(&cattr);
+
+ return req;
+}
+
+static void kad_req_destroy(struct kad_req * req)
+{
+ assert(req);
+
+ if (req->key != NULL)
+ free(req->key);
+
+ pthread_mutex_lock(&req->lock);
+
+ switch (req->state) {
+ case REQ_DESTROY:
+ pthread_mutex_unlock(&req->lock);
+ return;
+ case REQ_PENDING:
+ req->state = REQ_DESTROY;
+ pthread_cond_signal(&req->cond);
+ break;
+ case REQ_INIT:
+ case REQ_DONE:
+ req->state = REQ_NULL;
+ break;
+ case REQ_RESPONSE:
+ case REQ_NULL:
+ default:
+ break;
+ }
+
+ while (req->state != REQ_NULL)
+ pthread_cond_wait(&req->cond, &req->lock);
+
+ pthread_mutex_unlock(&req->lock);
+
+ pthread_cond_destroy(&req->cond);
+ pthread_mutex_destroy(&req->lock);
+
+ free(req);
+}
+
+static int kad_req_wait(struct kad_req * req,
+ time_t t)
+{
+ struct timespec timeo = {t, 0};
+ struct timespec abs;
+ int ret = 0;
+
+ assert(req);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ ts_add(&abs, &timeo, &abs);
+
+ pthread_mutex_lock(&req->lock);
+
+ req->state = REQ_PENDING;
+
+ while (req->state == REQ_PENDING && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs);
+
+ switch(req->state) {
+ case REQ_DESTROY:
+ ret = -1;
+ req->state = REQ_NULL;
+ pthread_cond_signal(&req->cond);
+ break;
+ case REQ_PENDING: /* ETIMEDOUT */
+ case REQ_RESPONSE:
+ req->state = REQ_DONE;
+ pthread_cond_signal(&req->cond);
+ break;
+ default:
+ break;
+ }
+
+ pthread_mutex_unlock(&req->lock);
+
+ return ret;
+}
+
+static void kad_req_respond(struct kad_req * req)
+{
+ pthread_mutex_lock(&req->lock);
+
+ req->state = REQ_RESPONSE;
+ pthread_cond_signal(&req->cond);
+
+ pthread_mutex_unlock(&req->lock);
+}
+
+static struct contact * contact_create(const uint8_t * id,
+ size_t len,
+ uint64_t addr)
+{
+ struct contact * c;
+ struct timespec t;
+
+ c = malloc(sizeof(*c));
+ if (c == NULL)
+ return NULL;
+
+ list_head_init(&c->next);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ c->addr = addr;
+ c->fails = 0;
+ c->t_seen = t.tv_sec;
+ c->id = dht_dup_key(id, len);
+ if (c->id == NULL) {
+ free(c);
+ return NULL;
+ }
+
+ return c;
+}
+
+static void contact_destroy(struct contact * c)
+{
+ if (c != NULL)
+ free(c->id);
+
+ free(c);
+}
+
+static struct bucket * iter_bucket(struct bucket * b,
+ const uint8_t * id)
+{
+ uint8_t byte;
+ uint8_t mask;
+
+ assert(b);
+
+ if (b->children[0] == NULL)
+ return b;
+
+ byte = id[(b->depth * KAD_BETA) / CHAR_BIT];
+
+ mask = ((1L << KAD_BETA) - 1) & 0xFF;
+
+ byte >>= (CHAR_BIT - KAD_BETA) -
+ (((b->depth) * KAD_BETA) & (CHAR_BIT - 1));
+
+ return iter_bucket(b->children[(byte & mask)], id);
+}
+
+static struct bucket * dht_get_bucket(struct dht * dht,
+ const uint8_t * id)
+{
+ assert(dht->buckets);
+
+ return iter_bucket(dht->buckets, id);
+}
+
+/*
+ * If someone builds a network where the n (n > k) closest nodes all
+ * have IDs starting with the same 64 bits: by all means, change this.
+ */
+static uint64_t dist(const uint8_t * src,
+ const uint8_t * dst)
+{
+ return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst));
+}
+
+static size_t list_add_sorted(struct list_head * l,
+ struct contact * c,
+ const uint8_t * key)
+{
+ struct list_head * p;
+
+ assert(l);
+ assert(c);
+ assert(key);
+ assert(c->id);
+
+ list_for_each(p, l) {
+ struct contact * e = list_entry(p, struct contact, next);
+ if (dist(c->id, key) > dist(e->id, key))
+ break;
+ }
+
+ list_add_tail(&c->next, p);
+
+ return 1;
+}
+
+static size_t dht_contact_list(struct dht * dht,
+ struct list_head * l,
+ const uint8_t * key)
+{
+ struct list_head * p;
+ struct bucket * b;
+ size_t len = 0;
+ size_t i;
+ struct timespec t;
+
+ assert(l);
+ assert(dht);
+ assert(key);
+ assert(list_is_empty(l));
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ b = dht_get_bucket(dht, key);
+ if (b == NULL)
+ return 0;
+
+ b->t_refr = t.tv_sec + KAD_T_REFR;
+
+ if (b->n_contacts == dht->k || b->parent == NULL) {
+ list_for_each(p, &b->contacts) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ c = contact_create(c->id, dht->b, c->addr);
+ if (list_add_sorted(l, c, key) == 1)
+ if (++len > dht->k)
+ break;
+ }
+ } else {
+ struct bucket * d = b->parent;
+ for (i = 0; i < (1L << KAD_BETA); ++i) {
+ list_for_each(p, &d->children[i]->contacts) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ c = contact_create(c->id, dht->b, c->addr);
+ if (c == NULL)
+ continue;
+ if (list_add_sorted(l, c, key) == 1)
+ if (++len > dht->k)
+ break;
+ }
+ }
+ }
+
+ assert(len == dht->k || b->parent == NULL);
+
+ return len;
+}
+
+static struct lookup * lookup_create(struct dht * dht,
+ const uint8_t * id)
+{
+ struct lookup * lu;
+ pthread_condattr_t cattr;
+
+ assert(dht);
+ assert(id);
+
+ lu = malloc(sizeof(*lu));
+ if (lu == NULL)
+ goto fail_malloc;
+
+ list_head_init(&lu->contacts);
+
+ lu->state = LU_INIT;
+ lu->addrs = NULL;
+ lu->n_addrs = 0;
+ lu->key = dht_dup_key(id, dht->b);
+ if (lu->key == NULL)
+ goto fail_id;
+
+ if (pthread_mutex_init(&lu->lock, NULL))
+ goto fail_mutex;
+
+ pthread_condattr_init(&cattr);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+
+ if (pthread_cond_init(&lu->cond, &cattr))
+ goto fail_cond;
+
+ pthread_condattr_destroy(&cattr);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_add(&lu->next, &dht->lookups);
+
+ lu->n_contacts = dht_contact_list(dht, &lu->contacts, id);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return lu;
+
+ fail_cond:
+ pthread_condattr_destroy(&cattr);
+ pthread_mutex_destroy(&lu->lock);
+ fail_mutex:
+ free(lu->key);
+ fail_id:
+ free(lu);
+ fail_malloc:
+ return NULL;
+}
+
+static void lookup_destroy(struct lookup * lu)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(lu);
+
+ pthread_mutex_lock(&lu->lock);
+
+ switch (lu->state) {
+ case LU_DESTROY:
+ pthread_mutex_unlock(&lu->lock);
+ return;
+ case LU_PENDING:
+ lu->state = LU_DESTROY;
+ pthread_cond_signal(&lu->cond);
+ break;
+ case LU_INIT:
+ case LU_DONE:
+ case LU_UPDATE:
+ case LU_COMPLETE:
+ lu->state = REQ_NULL;
+ break;
+ case LU_NULL:
+ default:
+ break;
+ }
+
+ while (lu->state != LU_NULL)
+ pthread_cond_wait(&lu->cond, &lu->lock);
+
+ if (lu->key != NULL)
+ free(lu->key);
+ if (lu->addrs != NULL)
+ free(lu->addrs);
+
+ list_for_each_safe(p, h, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+
+ pthread_mutex_unlock(&lu->lock);
+
+ pthread_cond_destroy(&lu->cond);
+ pthread_mutex_destroy(&lu->lock);
+
+ free(lu);
+}
+
+static void lookup_update(struct dht * dht,
+ struct lookup * lu,
+ kad_msg_t * msg)
+{
+ struct list_head * p = NULL;
+ struct contact * c = NULL;
+ size_t n;
+ size_t pos = 0;
+
+ assert(lu);
+ assert(msg);
+
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return;
+
+ pthread_mutex_lock(&lu->lock);
+
+ if (msg->n_addrs > 0) {
+ if (lu->addrs == NULL) {
+ lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs);
+ for (n = 0; n < msg->n_addrs; ++n)
+ lu->addrs[n] = msg->addrs[n];
+ lu->n_addrs = msg->n_addrs;
+ }
+ lu->state = LU_COMPLETE;
+ pthread_cond_signal(&lu->cond);
+ pthread_mutex_unlock(&lu->lock);
+ return;
+ }
+
+ while (lu->state == LU_INIT)
+ pthread_cond_wait(&lu->cond, &lu->lock);
+
+ for (n = 0; n < msg->n_contacts; ++n) {
+ c = contact_create(msg->contacts[n]->id.data,
+ dht->b, msg->contacts[n]->addr);
+ if (c == NULL)
+ continue;
+
+ list_for_each(p, &lu->contacts) {
+ struct contact * e;
+ e = list_entry(p, struct contact, next);
+ if (!memcmp(e->id, c->id, dht->b)) {
+ contact_destroy(c);
+ goto finish_node;
+ }
+
+ if (dist(c->id, lu->key) > dist(e->id, lu->key))
+ break;
+ pos++;
+ }
+
+ }
+
+ if (pos == dht->k) {
+ contact_destroy(c);
+ goto finish_node;
+ } else {
+ struct contact * d;
+ d = list_last_entry(&lu->contacts, struct contact, next);
+ list_del(&d->next);
+ list_add_tail(&c->next, p);
+ contact_destroy(d);
+ }
+
+ finish_node:
+ lu->state = LU_UPDATE;
+ pthread_cond_signal(&lu->cond);
+ pthread_mutex_unlock(&lu->lock);
+ return;
+}
+
+static ssize_t lookup_get_addrs(struct lookup * lu,
+ uint64_t * addrs)
+{
+ ssize_t n;
+
+ assert(lu);
+
+ pthread_mutex_lock(&lu->lock);
+
+ for (n = 0; (size_t) n < lu->n_addrs; ++n)
+ addrs[n] = lu->addrs[n];
+
+ assert((size_t) n == lu->n_addrs);
+
+ pthread_mutex_unlock(&lu->lock);
+
+ return n;
+}
+
+static ssize_t lookup_contact_addrs(struct lookup * lu,
+ uint64_t * addrs)
+{
+ struct list_head * p;
+ ssize_t n = 0;
+
+ assert(lu);
+ assert(addrs);
+
+ pthread_mutex_lock(&lu->lock);
+
+ list_for_each(p, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ addrs[n] = c->addr;
+ n++;
+ }
+
+ pthread_mutex_unlock(&lu->lock);
+
+ return n;
+}
+
+static ssize_t lookup_new_addrs(struct lookup * lu,
+ uint64_t * addrs)
+{
+ struct list_head * p;
+ ssize_t n = 0;
+
+ assert(lu);
+ assert(addrs);
+
+ pthread_mutex_lock(&lu->lock);
+
+ /* Uses fails to check if the contact has been contacted. */
+ list_for_each(p, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ if (c->fails == 0) {
+ c->fails = 1;
+ addrs[n] = c->addr;
+ n++;
+ }
+
+ if (n == KAD_ALPHA)
+ break;
+ }
+
+ if (n == 0)
+ lu->state = LU_DONE;
+
+ pthread_mutex_unlock(&lu->lock);
+
+ assert(n <= KAD_ALPHA);
+
+ return n;
+}
+
+static enum lookup_state lookup_wait(struct lookup * lu)
+{
+ enum lookup_state state;
+
+ pthread_mutex_lock(&lu->lock);
+
+ lu->state = LU_PENDING;
+ pthread_cond_signal(&lu->cond);
+
+ pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu);
+
+ while (lu->state == LU_PENDING)
+ pthread_cond_wait(&lu->cond, &lu->lock);
+
+ pthread_cleanup_pop(false);
+
+ if (lu->state == LU_DESTROY) {
+ lu->state = LU_NULL;
+ pthread_cond_signal(&lu->cond);
+ pthread_mutex_unlock(&lu->lock);
+ return -1;
+ }
+
+ state = lu->state;
+
+ pthread_mutex_unlock(&lu->lock);
+
+ return state;
+}
+
+static struct kad_req * dht_find_request(struct dht * dht,
+ kad_msg_t * msg)
+{
+ struct list_head * p;
+
+ assert(dht);
+ assert(msg);
+
+ list_for_each(p, &dht->requests) {
+ struct kad_req * r = list_entry(p, struct kad_req, next);
+ if (r->cookie == msg->cookie)
+ return r;
+ }
+
+ return NULL;
+}
+
+static struct lookup * dht_find_lookup(struct dht * dht,
+ const uint8_t * key)
+{
+ struct list_head * p;
+
+ assert(dht);
+ assert(key);
+
+ list_for_each(p, &dht->lookups) {
+ struct lookup * l = list_entry(p, struct lookup, next);
+ if (!memcmp(l->key, key, dht->b))
+ return l;
+ }
+
+ return NULL;
+}
+
+static struct val * val_create(uint64_t addr,
+ time_t exp)
+{
+ struct val * v;
+ struct timespec t;
+
+ v = malloc(sizeof(*v));
+ if (v == NULL)
+ return NULL;
+
+ list_head_init(&v->next);
+ v->addr = addr;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ v->t_exp = t.tv_sec + exp;
+ v->t_rep = t.tv_sec + KAD_T_REPL;
+
+ return v;
+}
+
+static void val_destroy(struct val * v)
+{
+ assert(v);
+
+ free(v);
+}
+
+static struct ref_entry * ref_entry_create(struct dht * dht,
+ const uint8_t * key)
+{
+ struct ref_entry * e;
+ struct timespec t;
+
+ assert(dht);
+ assert(key);
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->key = dht_dup_key(key, dht->b);
+ if (e->key == NULL) {
+ free(e);
+ return NULL;
+ }
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ e->t_rep = t.tv_sec + dht->t_repub;
+
+ return e;
+}
+
+static void ref_entry_destroy(struct ref_entry * e)
+{
+ free(e->key);
+ free(e);
+}
+
+static struct dht_entry * dht_entry_create(struct dht * dht,
+ const uint8_t * key)
+{
+ struct dht_entry * e;
+
+ assert(dht);
+ assert(key);
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ list_head_init(&e->next);
+ list_head_init(&e->vals);
+
+ e->n_vals = 0;
+
+ e->key = dht_dup_key(key, dht->b);
+ if (e->key == NULL) {
+ free(e);
+ return NULL;
+ }
+
+ return e;
+}
+
+static void dht_entry_destroy(struct dht_entry * e)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(e);
+
+ list_for_each_safe(p, h, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ list_del(&v->next);
+ val_destroy(v);
+ }
+
+ free(e->key);
+
+ free(e);
+}
+
+static int dht_entry_add_addr(struct dht_entry * e,
+ uint64_t addr,
+ time_t exp)
+{
+ struct list_head * p;
+ struct val * val;
+ struct timespec t;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ list_for_each(p, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ if (v->addr == addr) {
+ if (v->t_exp < t.tv_sec + exp) {
+ v->t_exp = t.tv_sec + exp;
+ v->t_rep = t.tv_sec + KAD_T_REPL;
+ }
+
+ return 0;
+ }
+ }
+
+ val = val_create(addr, exp);
+ if (val == NULL)
+ return -ENOMEM;
+
+ list_add(&val->next, &e->vals);
+ ++e->n_vals;
+
+ return 0;
+}
+
+
+static void dht_entry_del_addr(struct dht_entry * e,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(e);
+
+ list_for_each_safe(p, h, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ if (v->addr == addr) {
+ list_del(&v->next);
+ val_destroy(v);
+ --e->n_vals;
+ }
+ }
+
+ if (e->n_vals == 0) {
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ }
+}
+
+static uint64_t dht_entry_get_addr(struct dht * dht,
+ struct dht_entry * e)
+{
+ struct list_head * p;
+
+ assert(e);
+ assert(!list_is_empty(&e->vals));
+
+ list_for_each(p, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ if (v->addr != dht->addr)
+ return v->addr;
+ }
+
+ return 0;
+}
+
+/* Forward declaration. */
+static struct lookup * kad_lookup(struct dht * dht,
+ const uint8_t * key,
+ enum kad_code code);
+
+
+/* Build a refresh list. */
+static void bucket_refresh(struct dht * dht,
+ struct bucket * b,
+ time_t t,
+ struct list_head * r)
+{
+ size_t i;
+
+ if (*b->children != NULL)
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ bucket_refresh(dht, b->children[i], t, r);
+
+ if (b->n_contacts == 0)
+ return;
+
+ if (t > b->t_refr) {
+ struct contact * c;
+ struct contact * d;
+ c = list_first_entry(&b->contacts, struct contact, next);
+ d = contact_create(c->id, dht->b, c->addr);
+ if (c != NULL)
+ list_add(&d->next, r);
+ return;
+ }
+}
+
+
+static struct bucket * bucket_create(void)
+{
+ struct bucket * b;
+ struct timespec t;
+ size_t i;
+
+ b = malloc(sizeof(*b));
+ if (b == NULL)
+ return NULL;
+
+ list_head_init(&b->contacts);
+ b->n_contacts = 0;
+
+ list_head_init(&b->alts);
+ b->n_alts = 0;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ b->t_refr = t.tv_sec + KAD_T_REFR;
+
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ b->children[i] = NULL;
+
+ b->parent = NULL;
+ b->depth = 0;
+
+ return b;
+}
+
+static void bucket_destroy(struct bucket * b)
+{
+ struct list_head * p;
+ struct list_head * h;
+ size_t i;
+
+ assert(b);
+
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ if (b->children[i] != NULL)
+ bucket_destroy(b->children[i]);
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ }
+
+ list_for_each_safe(p, h, &b->alts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ }
+
+ free(b);
+}
+
+static bool bucket_has_id(struct bucket * b,
+ const uint8_t * id)
+{
+ uint8_t mask;
+ uint8_t byte;
+
+ if (b->depth == 0)
+ return true;
+
+ byte = id[(b->depth * KAD_BETA) / CHAR_BIT];
+
+ mask = ((1L << KAD_BETA) - 1) & 0xFF;
+
+ byte >>= (CHAR_BIT - KAD_BETA) -
+ (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1));
+
+ return ((byte & mask) == b->mask);
+}
+
+static int split_bucket(struct bucket * b)
+{
+ struct list_head * p;
+ struct list_head * h;
+ uint8_t mask = 0;
+ size_t i;
+ size_t c;
+
+ assert(b);
+ assert(b->n_alts == 0);
+ assert(b->n_contacts);
+ assert(b->children[0] == NULL);
+
+ c = b->n_contacts;
+
+ for (i = 0; i < (1L << KAD_BETA); ++i) {
+ b->children[i] = bucket_create();
+ if (b->children[i] == NULL) {
+ size_t j;
+ for (j = 0; j < i; ++j)
+ bucket_destroy(b->children[j]);
+ return -1;
+ }
+
+ b->children[i]->depth = b->depth + 1;
+ b->children[i]->mask = mask;
+ b->children[i]->parent = b;
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ if (bucket_has_id(b->children[i], c->id)) {
+ list_del(&c->next);
+ --b->n_contacts;
+ list_add(&c->next, &b->children[i]->contacts);
+ ++b->children[i]->n_contacts;
+ }
+ }
+
+ mask++;
+ }
+
+ for (i = 0; i < (1L << KAD_BETA); ++i)
+ if (b->children[i]->n_contacts == c)
+ split_bucket(b->children[i]);
+
+ return 0;
+}
+
+/* Locked externally to mandate update as (final) part of join transaction. */
+static int dht_update_bucket(struct dht * dht,
+ const uint8_t * id,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct bucket * b;
+ struct contact * c;
+
+ assert(dht);
+
+ b = dht_get_bucket(dht, id);
+ if (b == NULL)
+ return -1;
+
+ c = contact_create(id, dht->b, addr);
+ if (c == NULL)
+ return -1;
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * d = list_entry(p, struct contact, next);
+ if (d->addr == addr) {
+ list_del(&d->next);
+ contact_destroy(d);
+ --b->n_contacts;
+ }
+ }
+
+ if (b->n_contacts == dht->k) {
+ if (bucket_has_id(b, dht->id)) {
+ list_add_tail(&c->next, &b->contacts);
+ ++b->n_contacts;
+ if (split_bucket(b)) {
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ }
+ } else if (b->n_alts == dht->k) {
+ struct contact * d;
+ d = list_first_entry(&b->alts, struct contact, next);
+ list_del(&d->next);
+ contact_destroy(d);
+ list_add_tail(&c->next, &b->alts);
+ } else {
+ list_add_tail(&c->next, &b->alts);
+ ++b->n_alts;
+ }
+ } else {
+ list_add_tail(&c->next, &b->contacts);
+ ++b->n_contacts;
+ }
+
+ return 0;
+}
+
+static int send_msg(struct dht * dht,
+ kad_msg_t * msg,
+ uint64_t addr)
+{
+ struct shm_du_buff * sdb;
+ struct kad_req * req;
+ size_t len;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ if (dht->id != NULL) {
+ msg->has_s_id = true;
+ msg->s_id.data = dht->id;
+ msg->s_id.len = dht->b;
+ }
+
+ msg->s_addr = dht->addr;
+
+ if (msg->code < KAD_STORE) {
+ msg->cookie = bmp_allocate(dht->cookies);
+ if (!bmp_is_id_valid(dht->cookies, msg->cookie))
+ goto fail_bmp_alloc;
+ }
+
+ len = kad_msg__get_packed_size(msg);
+ if (len == 0)
+ goto fail_msg;
+
+ if (ipcp_sdb_reserve(&sdb, len))
+ goto fail_msg;
+
+ kad_msg__pack(msg, shm_du_buff_head(sdb));
+
+#ifndef __DHT_TEST__
+ if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb))
+ goto fail_write;
+#else
+ (void) addr;
+ ipcp_sdb_release(sdb);
+#endif /* __DHT_TEST__ */
+
+ if (msg->code < KAD_STORE) {
+ req = kad_req_create(dht, msg, addr);
+ if (req != NULL)
+ list_add(&req->next, &dht->requests);
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+
+#ifndef __DHT_TEST__
+ fail_write:
+ ipcp_sdb_release(sdb);
+#endif
+ fail_msg:
+ bmp_release(dht->cookies, msg->cookie);
+ fail_bmp_alloc:
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+}
+
+static struct dht_entry * dht_find_entry(struct dht * dht,
+ const uint8_t * key)
+{
+ struct list_head * p;
+
+ list_for_each(p, &dht->entries) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ if (!memcmp(key, e->key, dht->b))
+ return e;
+ }
+
+ return NULL;
+}
+
+static int kad_add(struct dht * dht,
+ const kad_contact_msg_t * contacts,
+ ssize_t n,
+ time_t exp)
+{
+ struct dht_entry * e;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ while (--n >= 0) {
+ if (contacts[n].id.len != dht->b)
+ log_warn("Bad key length in contact data.");
+
+ e = dht_find_entry(dht, contacts[n].id.data);
+ if (e != NULL) {
+ if (dht_entry_add_addr(e, contacts[n].addr, exp))
+ goto fail;
+ } else {
+ e = dht_entry_create(dht, contacts[n].id.data);
+ if (e == NULL)
+ goto fail;
+
+ if (dht_entry_add_addr(e, contacts[n].addr, exp)) {
+ dht_entry_destroy(e);
+ goto fail;
+ }
+
+ list_add(&e->next, &dht->entries);
+ }
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+ return 0;
+
+ fail:
+ pthread_rwlock_unlock(&dht->lock);
+ return -ENOMEM;
+}
+
+static int wait_resp(struct dht * dht,
+ kad_msg_t * msg,
+ time_t timeo)
+{
+ struct kad_req * req;
+
+ assert(dht);
+ assert(msg);
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ req = dht_find_request(dht, msg);
+ if (req == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -EPERM;
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return kad_req_wait(req, timeo);
+}
+
+static int kad_store(struct dht * dht,
+ const uint8_t * key,
+ uint64_t addr,
+ uint64_t r_addr,
+ time_t ttl)
+{
+ kad_msg_t msg = KAD_MSG__INIT;
+ kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT;
+ kad_contact_msg_t * cmsgp[1];
+
+ cmsg.id.data = (uint8_t *) key;
+ cmsg.id.len = dht->b;
+ cmsg.addr = addr;
+
+ cmsgp[0] = &cmsg;
+
+ msg.code = KAD_STORE;
+ msg.has_t_expire = true;
+ msg.t_expire = ttl;
+ msg.n_contacts = 1;
+ msg.contacts = cmsgp;
+
+ if (send_msg(dht, &msg, r_addr))
+ return -1;
+
+ return 0;
+}
+
+static ssize_t kad_find(struct dht * dht,
+ const uint8_t * key,
+ const uint64_t * addrs,
+ enum kad_code code)
+{
+ kad_msg_t msg = KAD_MSG__INIT;
+ ssize_t sent = 0;
+
+ assert(dht);
+ assert(key);
+
+ msg.code = code;
+
+ msg.has_key = true;
+ msg.key.data = (uint8_t *) key;
+ msg.key.len = dht->b;
+
+ while (*addrs != 0) {
+ if (*addrs != dht->addr) {
+ send_msg(dht, &msg, *addrs);
+ sent++;
+ }
+ ++addrs;
+ }
+
+ return sent;
+}
+
+static void lookup_set_state(struct lookup * lu,
+ enum lookup_state state)
+{
+ pthread_mutex_lock(&lu->lock);
+
+ lu->state = state;
+
+ pthread_mutex_unlock(&lu->lock);
+}
+
+static struct lookup * kad_lookup(struct dht * dht,
+ const uint8_t * id,
+ enum kad_code code)
+{
+ uint64_t addrs[KAD_ALPHA + 1];
+ enum lookup_state state;
+ struct lookup * lu;
+
+ lu = lookup_create(dht, id);
+ if (lu == NULL)
+ return NULL;
+
+ addrs[lookup_new_addrs(lu, addrs)] = 0;
+
+ if (addrs[0] == 0) {
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ lookup_destroy(lu);
+ return NULL;
+ }
+
+ if (kad_find(dht, id, addrs, code) == 0) {
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ lu->state = LU_COMPLETE;
+ return lu;
+ }
+
+ while ((state = lookup_wait(lu)) != LU_COMPLETE) {
+ switch (state) {
+ case LU_UPDATE:
+ addrs[lookup_new_addrs(lu, addrs)] = 0;
+ if (addrs[0] == 0) {
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+ return lu;
+ }
+
+ kad_find(dht, id, addrs, code);
+ break;
+ case LU_DESTROY:
+ lookup_set_state(lu, LU_NULL);
+ return NULL;
+ default:
+ break;
+ };
+ }
+
+ assert(state = LU_COMPLETE);
+
+ pthread_rwlock_wrlock(&dht->lock);
+ list_del(&lu->next);
+ pthread_rwlock_unlock(&dht->lock);
+
+ return lu;
+}
+
+static void kad_publish(struct dht * dht,
+ const uint8_t * key,
+ uint64_t addr,
+ time_t exp)
+{
+ struct lookup * lu;
+ uint64_t addrs[KAD_K];
+ ssize_t n;
+
+ assert(dht);
+ assert(key);
+
+ lu = kad_lookup(dht, key, KAD_FIND_NODE);
+ if (lu == NULL)
+ return;
+
+ n = lookup_contact_addrs(lu, addrs);
+
+ while (--n > 0) {
+ if (addrs[n] == dht->addr) {
+ kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;
+ msg.id.data = (uint8_t *) key;
+ msg.id.len = dht->b;
+ msg.addr = addr;
+ kad_add(dht, &msg, 1, exp);
+ } else {
+ if (kad_store(dht, key, addr, addrs[n], dht->t_expire))
+ log_warn("Failed to send store message.");
+ }
+ }
+
+ lookup_destroy(lu);
+}
+
+static int kad_join(struct dht * dht,
+ uint64_t addr)
+{
+ kad_msg_t msg = KAD_MSG__INIT;
+ struct lookup * lu;
+
+ msg.code = KAD_JOIN;
+
+ msg.has_alpha = true;
+ msg.has_b = true;
+ msg.has_k = true;
+ msg.has_t_refresh = true;
+ msg.has_t_replicate = true;
+ msg.alpha = KAD_ALPHA;
+ msg.b = dht->b;
+ msg.k = KAD_K;
+ msg.t_refresh = KAD_T_REFR;
+ msg.t_replicate = KAD_T_REPL;
+
+ if (send_msg(dht, &msg, addr))
+ return -1;
+
+ if (wait_resp(dht, &msg, KAD_T_JOIN) < 0)
+ return -1;
+
+ dht->id = create_id(dht->b);
+ if (dht->id == NULL)
+ return -1;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ dht_update_bucket(dht, dht->id, dht->addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
+ if (lu == NULL)
+ log_warn("Join response not yet added.");
+ else
+ lookup_destroy(lu);
+
+ return 0;
+}
+
+static void dht_dead_peer(struct dht * dht,
+ uint8_t * key,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct bucket * b;
+
+ b = dht_get_bucket(dht, key);
+
+ list_for_each_safe(p, h, &b->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ if (b->n_contacts + b->n_alts <= dht->k) {
+ ++c->fails;
+ return;
+ }
+
+ if (c->addr == addr) {
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->n_contacts;
+ break;
+ }
+ }
+
+ while (b->n_contacts < dht->k && b->n_alts > 0) {
+ struct contact * c;
+ c = list_first_entry(&b->alts, struct contact, next);
+ list_del(&c->next);
+ --b->n_alts;
+ list_add(&c->next, &b->contacts);
+ ++b->n_contacts;
+ }
+}
+
+static int dht_del(struct dht * dht,
+ const uint8_t * key,
+ uint64_t addr)
+{
+ struct dht_entry * e;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ e = dht_find_entry(dht, key);
+ if (e == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -EPERM;
+ }
+
+ dht_entry_del_addr(e, addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+}
+
+static buffer_t dht_retrieve(struct dht * dht,
+ const uint8_t * key)
+{
+ struct dht_entry * e;
+ struct list_head * p;
+ buffer_t buf;
+ uint64_t * pos;
+
+ buf.len = 0;
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ e = dht_find_entry(dht, key);
+ if (e == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return buf;
+ }
+
+ buf.data = malloc(sizeof(dht->addr) * e->n_vals);
+ if (buf.data == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return buf;
+ }
+
+ buf.len = e->n_vals;
+
+ pos = (uint64_t *) buf.data;;
+
+ list_for_each(p, &e->vals) {
+ struct val * v = list_entry(p, struct val, next);
+ *pos++ = v->addr;
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return buf;
+}
+
+static ssize_t dht_get_contacts(struct dht * dht,
+ const uint8_t * key,
+ kad_contact_msg_t *** msgs)
+{
+ struct list_head l;
+ struct list_head * p;
+ struct list_head * h;
+ size_t len;
+ size_t i = 0;
+
+ list_head_init(&l);
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ len = dht_contact_list(dht, &l, key);
+ if (len == 0)
+ return 0;
+
+ *msgs = malloc(len * sizeof(**msgs));
+ if (*msgs == NULL)
+ return 0;
+
+ list_for_each_safe(p, h, &l) {
+ struct contact * c = list_entry(p, struct contact, next);
+ (*msgs)[i] = malloc(sizeof(***msgs));
+ if ((*msgs)[i] == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ while (i > 0)
+ free(*msgs[--i]);
+ free(*msgs);
+ return 0;
+ }
+
+ kad_contact_msg__init((*msgs)[i]);
+
+ (*msgs)[i]->id.data = c->id;
+ (*msgs)[i]->id.len = dht->b;
+ (*msgs)[i++]->addr = c->addr;
+ list_del(&c->next);
+ free(c);
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return i;
+}
+
+static time_t gcd(time_t a,
+ time_t b)
+{
+ if (a == 0)
+ return b;
+
+ return gcd(b % a, a);
+}
+
+static void * work(void * o)
+{
+ struct dht * dht;
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ struct list_head reflist;
+ time_t intv;
+ struct lookup * lu;
+
+ dht = (struct dht *) o;
+
+ intv = gcd(dht->t_expire, dht->t_repub);
+ intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
+
+ list_head_init(&reflist);
+
+ while (true) {
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ /* Republish registered hashes. */
+ list_for_each_safe(p, h, &dht->refs) {
+ struct ref_entry * e;
+ e = list_entry(p, struct ref_entry, next);
+ if (now.tv_sec > e->t_rep) {
+ kad_publish(dht, e->key, dht->addr,
+ dht->t_expire);
+ e->t_rep = now.tv_sec + dht->t_repub;
+ }
+ }
+
+ /* Remove stale entries and republish if necessary. */
+ list_for_each_safe(p, h, &dht->entries) {
+ struct list_head * p1;
+ struct list_head * h1;
+ struct dht_entry * e;
+ e = list_entry (p, struct dht_entry, next);
+ list_for_each_safe(p1, h1, &e->vals) {
+ struct val * v;
+ v = list_entry(p1, struct val, next);
+ if (now.tv_sec > v->t_exp) {
+ list_del(&v->next);
+ val_destroy(v);
+ }
+
+ if (now.tv_sec > v->t_rep) {
+ kad_publish(dht, e->key, v->addr,
+ dht->t_expire - now.tv_sec);
+ v->t_rep = now.tv_sec + dht->t_replic;
+ }
+ }
+ }
+
+ /* Check the requests list for unresponsive nodes. */
+ list_for_each_safe(p, h, &dht->requests) {
+ struct kad_req * r;
+ r = list_entry(p, struct kad_req, next);
+ if (now.tv_sec > r->t_exp) {
+ list_del(&r->next);
+ bmp_release(dht->cookies, r->cookie);
+ dht_dead_peer(dht, r->key, r->addr);
+ kad_req_destroy(r);
+ }
+ }
+
+ /* Refresh unaccessed buckets. */
+ bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ list_for_each_safe(p, h, &reflist) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ lu = kad_lookup(dht, c->id, KAD_FIND_NODE);
+ if (lu != NULL)
+ lookup_destroy(lu);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+
+ sleep(intv);
+ }
+
+ return (void *) 0;
+}
+
+static int kad_handle_join_resp(struct dht * dht,
+ struct kad_req * req,
+ kad_msg_t * msg)
+{
+ assert(dht);
+ assert(req);
+ assert(msg);
+
+ /* We might send version numbers later to warn of updates if needed. */
+ if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire &&
+ msg->has_t_refresh && msg->has_t_replicate)) {
+ log_warn("Join refused by remote.");
+ return -1;
+ }
+
+ if (msg->b < sizeof(uint64_t)) {
+ log_err("Hash sizes less than 8 bytes unsupported.");
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ dht->buckets = bucket_create();
+ if (dht->buckets == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+ }
+
+ /* Likely corrupt packet. The member will refuse, we might here too. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K)
+ log_warn("Different kademlia parameters detected.");
+
+ if (msg->t_replicate != KAD_T_REPL)
+ log_warn("Different kademlia replication time detected.");
+
+ if (msg->t_refresh != KAD_T_REFR)
+ log_warn("Different kademlia refresh time detected.");
+
+ dht->k = msg->k;
+ dht->b = msg->b;
+ dht->t_expire = msg->t_expire;
+ dht->t_repub = MAX(1, dht->t_expire - 10);
+
+ if (pthread_create(&dht->worker, NULL, work, dht)) {
+ bucket_destroy(dht->buckets);
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+ }
+
+ dht->state = DHT_RUNNING;
+
+ kad_req_respond(req);
+
+ dht_update_bucket(dht, msg->s_id.data, msg->s_addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ log_dbg("Enrollment of DHT completed.");
+
+ return 0;
+}
+
+static int kad_handle_find_resp(struct dht * dht,
+ struct kad_req * req,
+ kad_msg_t * msg)
+{
+ struct lookup * lu;
+
+ assert(dht);
+ assert(req);
+ assert(msg);
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ lu = dht_find_lookup(dht, req->key);
+ if (lu == NULL) {
+ log_dbg("Response for unknown lookup.");
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+ }
+
+ lookup_update(dht, lu, msg);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+}
+
+static void kad_handle_response(struct dht * dht,
+ kad_msg_t * msg)
+{
+ struct kad_req * req;
+
+ assert(dht);
+ assert(msg);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ req = dht_find_request(dht, msg);
+ if (req == NULL) {
+ pthread_rwlock_unlock(&dht->lock);
+ return;
+ }
+
+ bmp_release(dht->cookies, req->cookie);
+ list_del(&req->next);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ switch(req->code) {
+ case KAD_JOIN:
+ if (kad_handle_join_resp(dht, req, msg))
+ log_err("Enrollment of DHT failed.");
+ break;
+ case KAD_FIND_VALUE:
+ case KAD_FIND_NODE:
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return;
+ if (kad_handle_find_resp(dht, req, msg))
+ log_dbg("Invalid or outdated response.");
+ break;
+ default:
+ break;
+ }
+
+ kad_req_destroy(req);
+}
+
+int dht_bootstrap(struct dht * dht,
+ size_t b,
+ time_t t_expire)
+{
+ assert(dht);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ dht->id = create_id(b);
+ if (dht->id == NULL)
+ goto fail_id;
+
+ dht->buckets = bucket_create();
+ if (dht->buckets == NULL)
+ goto fail_buckets;
+
+ dht->buckets->depth = 0;
+ dht->buckets->mask = 0;
+
+ dht->b = b / CHAR_BIT;
+ dht->t_expire = MAX(2, t_expire);
+ dht->t_repub = MAX(1, t_expire - 10);
+ dht->k = KAD_K;
+
+ if (pthread_create(&dht->worker, NULL, work, dht))
+ goto fail_pthread_create;
+
+ dht->state = DHT_RUNNING;
+
+ dht_update_bucket(dht, dht->id, dht->addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+
+ fail_pthread_create:
+ bucket_destroy(dht->buckets);
+ dht->buckets = NULL;
+ fail_buckets:
+ free(dht->id);
+ dht->id = NULL;
+ fail_id:
+ pthread_rwlock_unlock(&dht->lock);
+ return -1;
+}
+
+int dht_enroll(struct dht * dht,
+ uint64_t addr)
+{
+ assert(dht);
+
+ return kad_join(dht, addr);
+}
+
+int dht_reg(struct dht * dht,
+ const uint8_t * key)
+{
+ struct ref_entry * e;
+
+ assert(dht);
+ assert(key);
+ assert(dht->addr != 0);
+
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return -1;
+
+ e = ref_entry_create(dht, key);
+ if (e == NULL)
+ return -ENOMEM;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_add(&e->next, &dht->refs);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ kad_publish(dht, key, dht->addr, dht->t_expire);
+
+ return 0;
+}
+
+int dht_unreg(struct dht * dht,
+ const uint8_t * key)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(dht);
+ assert(key);
+
+ if (dht_get_state(dht) != DHT_RUNNING)
+ return -1;
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_for_each_safe(p, h, &dht->refs) {
+ struct ref_entry * r = list_entry(p, struct ref_entry, next);
+ if (!memcmp(key, r->key, dht-> b) ) {
+ list_del(&r->next);
+ ref_entry_destroy(r);
+ }
+ }
+
+ dht_del(dht, key, dht->addr);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ return 0;
+}
+
+uint64_t dht_query(struct dht * dht,
+ const uint8_t * key)
+{
+ struct dht_entry * e;
+ struct lookup * lu;
+ uint64_t addrs[KAD_K];
+ size_t n;
+
+ addrs[0] = 0;
+
+ pthread_rwlock_rdlock(&dht->lock);
+
+ e = dht_find_entry(dht, key);
+ if (e != NULL)
+ addrs[0] = dht_entry_get_addr(dht, e);
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ if (addrs[0] != 0 && addrs[0] != dht->addr)
+ return addrs[0];
+
+ lu = kad_lookup(dht, key, KAD_FIND_VALUE);
+ if (lu == NULL)
+ return 0;
+
+ n = lookup_get_addrs(lu, addrs);
+ if (n == 0) {
+ lookup_destroy(lu);
+ return 0;
+ }
+
+ lookup_destroy(lu);
+
+ /* Current behaviour is anycast and return the first peer address. */
+ if (addrs[0] != dht->addr)
+ return addrs[0];
+
+ if (n > 1)
+ return addrs[1];
+
+ return 0;
+}
+
+void dht_post_sdu(void * ae,
+ struct shm_du_buff * sdb)
+{
+ struct dht * dht;
+ kad_msg_t * msg;
+ kad_contact_msg_t ** cmsgs;
+ kad_msg_t resp_msg = KAD_MSG__INIT;
+ uint64_t addr;
+ buffer_t buf;
+ size_t i;
+
+ assert(ae);
+ assert(sdb);
+
+ memset(&buf, 0, sizeof(buf));
+
+ dht = (struct dht *) ae;
+
+ msg = kad_msg__unpack(NULL,
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ shm_du_buff_head(sdb));
+
+ ipcp_sdb_release(sdb);
+
+ if (msg == NULL) {
+ log_err("Failed to unpack message.");
+ return;
+ }
+
+ if (msg->has_key && msg->key.len != dht->b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad key in message.");
+ return;
+ }
+
+ if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad source ID in message of type %d.", msg->code);
+ return;
+ }
+
+ if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) {
+ kad_msg__free_unpacked(msg, NULL);
+ return;
+ }
+
+ addr = msg->s_addr;
+
+ resp_msg.code = KAD_RESPONSE;
+ resp_msg.cookie = msg->cookie;
+
+ switch(msg->code) {
+ case KAD_JOIN:
+ /* Refuse enrollee on check fails. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
+ log_warn("Parameter mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ if (msg->t_replicate != KAD_T_REPL) {
+ log_warn("Replication time mismatch. "
+ "DHT enrolment refused.");
+
+ break;
+ }
+
+ if (msg->t_refresh != KAD_T_REFR) {
+ log_warn("Refresh time mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ resp_msg.has_alpha = true;
+ resp_msg.has_b = true;
+ resp_msg.has_k = true;
+ resp_msg.has_t_expire = true;
+ resp_msg.has_t_refresh = true;
+ resp_msg.has_t_replicate = true;
+ resp_msg.alpha = KAD_ALPHA;
+ resp_msg.b = dht->b;
+ resp_msg.k = KAD_K;
+ resp_msg.t_expire = dht->t_expire;
+ resp_msg.t_refresh = KAD_T_REFR;
+ resp_msg.t_replicate = KAD_T_REPL;
+ break;
+ case KAD_FIND_VALUE:
+ buf = dht_retrieve(dht, msg->key.data);
+ if (buf.len != 0) {
+ resp_msg.n_addrs = buf.len;
+ resp_msg.addrs = (uint64_t *) buf.data;
+ break;
+ }
+ /* FALLTHRU */
+ case KAD_FIND_NODE:
+ /* Return k closest contacts. */
+ resp_msg.n_contacts =
+ dht_get_contacts(dht, msg->key.data, &cmsgs);
+ resp_msg.contacts = cmsgs;
+ break;
+ case KAD_STORE:
+ if (msg->n_contacts < 1) {
+ log_warn("No contacts in store message.");
+ break;
+ }
+
+ if (!msg->has_t_expire) {
+ log_warn("No expiry time in store message.");
+ break;
+ }
+
+ kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire);
+ break;
+ case KAD_RESPONSE:
+ kad_handle_response(dht, msg);
+ break;
+ default:
+ assert(false);
+ break;
+ }
+
+ if (msg->code != KAD_JOIN) {
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, msg->s_id.data, addr))
+ log_warn("Failed to update bucket.");
+ pthread_rwlock_unlock(&dht->lock);
+ }
+
+ if (msg->code < KAD_STORE)
+ send_msg(dht, &resp_msg, addr);
+
+ kad_msg__free_unpacked(msg, NULL);
+
+ if (resp_msg.n_addrs > 0)
+ free(resp_msg.addrs);
+
+ if (resp_msg.n_contacts == 0)
+ return;
+
+ for (i = 0; i < resp_msg.n_contacts; ++i)
+ kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);
+ free(resp_msg.contacts);
+}
+
+void dht_destroy(struct dht * dht)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ if (dht == NULL)
+ return;
+
+ if (dht_get_state(dht) == DHT_RUNNING)
+ dht_set_state(dht, DHT_SHUTDOWN);
+
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_for_each_safe(p, h, &dht->entries) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ }
+
+ list_for_each_safe(p, h, &dht->requests) {
+ struct kad_req * r = list_entry(p, struct kad_req, next);
+ list_del(&r->next);
+ free(r);
+ }
+
+ list_for_each_safe(p, h, &dht->refs) {
+ struct ref_entry * e = list_entry(p, struct ref_entry, next);
+ list_del(&e->next);
+ ref_entry_destroy(e);
+ }
+
+ list_for_each_safe(p, h, &dht->lookups) {
+ struct lookup * l = list_entry(p, struct lookup, next);
+ list_del(&l->next);
+ lookup_destroy(l);
+ }
+
+ pthread_rwlock_unlock(&dht->lock);
+
+ if (dht_get_state(dht) == DHT_SHUTDOWN) {
+ pthread_cancel(dht->worker);
+ pthread_join(dht->worker, NULL);
+ }
+
+ if (dht->buckets != NULL)
+ bucket_destroy(dht->buckets);
+
+ bmp_destroy(dht->cookies);
+
+ pthread_mutex_destroy(&dht->mtx);
+
+ pthread_rwlock_destroy(&dht->lock);
+
+ free(dht->id);
+
+ free(dht);
+}
+
+struct dht * dht_create(uint64_t addr)
+{
+ struct dht * dht;
+
+ dht = malloc(sizeof(*dht));
+ if (dht == NULL)
+ goto fail_malloc;
+
+ dht->buckets = NULL;
+
+ list_head_init(&dht->entries);
+ list_head_init(&dht->requests);
+ list_head_init(&dht->refs);
+ list_head_init(&dht->lookups);
+
+ if (pthread_rwlock_init(&dht->lock, NULL))
+ goto fail_rwlock;
+
+ if (pthread_mutex_init(&dht->mtx, NULL))
+ goto fail_mutex;
+
+ dht->cookies = bmp_create(DHT_MAX_REQS, 1);
+ if (dht->cookies == NULL)
+ goto fail_bmp;
+
+ dht->b = 0;
+ dht->addr = addr;
+ dht->id = NULL;
+#ifndef __DHT_TEST__
+ dht->fd = dt_reg_ae(dht, &dht_post_sdu);
+#endif /* __DHT_TEST__ */
+
+ dht->state = DHT_INIT;
+
+ return dht;
+
+ fail_bmp:
+ pthread_mutex_destroy(&dht->mtx);
+ fail_mutex:
+ pthread_rwlock_destroy(&dht->lock);
+ fail_rwlock:
+ free(dht);
+ fail_malloc:
+ return NULL;
+}
diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h
new file mode 100644
index 00000000..5d7fc894
--- /dev/null
+++ b/src/ipcpd/normal/dht.h
@@ -0,0 +1,54 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Distributed Hash Table based on Kademlia
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_DHT_H
+#define OUROBOROS_IPCPD_NORMAL_DHT_H
+
+#include <ouroboros/ipcp-dev.h>
+
+#include <stdint.h>
+#include <sys/types.h>
+
+struct dht;
+
+struct dht * dht_create(uint64_t addr);
+
+int dht_bootstrap(struct dht * dht,
+ size_t b,
+ time_t t_expire);
+
+int dht_enroll(struct dht * dht,
+ uint64_t addr);
+
+void dht_destroy(struct dht * dht);
+
+int dht_reg(struct dht * dht,
+ const uint8_t * key);
+
+int dht_unreg(struct dht * dht,
+ const uint8_t * key);
+
+uint64_t dht_query(struct dht * dht,
+ const uint8_t * key);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_DHT_H */
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 5ea8a300..697c02da 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -20,129 +20,130 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+#define OUROBOROS_PREFIX "directory"
+
#include <ouroboros/config.h>
+#include <ouroboros/endian.h>
#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
#include <ouroboros/rib.h>
+#include <ouroboros/utils.h>
#include "dir.h"
+#include "dht.h"
#include "ipcp.h"
#include "ribconfig.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
+#include <inttypes.h>
-static char dir_path[RIB_MAX_PATH_LEN + 1];
+#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT)
+#define ENROL_RETR 6
+#define ENROL_INTV 1
-static void dir_path_reset(void) {
- dir_path[strlen(DIR_PATH)]= '\0';
- assert(strcmp(DIR_PATH, dir_path) == 0);
-}
+struct dht * dht;
-int dir_init(void)
+static uint64_t find_peer_addr(void)
{
- /* FIXME: set ribmgr dissemination here */
- if (rib_add(RIB_ROOT, DIR_NAME))
- return -1;
+ ssize_t i;
+ char ** members;
+ ssize_t n_members;
+ size_t reset;
+ char path[RIB_MAX_PATH_LEN + 1];
- strcpy(dir_path, DIR_PATH);
+ strcpy(path, MEMBERS_PATH);
- return 0;
-}
+ reset = strlen(path);
-int dir_fini(void)
-{
- /* FIXME: remove ribmgr dissemination here*/
+ n_members = rib_children(path, &members);
+ if (n_members == 1) {
+ freepp(ssize_t, members, n_members);
+ return 0;
+ }
+
+ for (i = 0; i < n_members; ++i) {
+ uint64_t addr;
+ rib_path_append(path, members[i]);
+ if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) {
+ log_err("Failed to read address from RIB.");
+ freepp(ssize_t, members, n_members);
+ return ipcpi.dt_addr;
+ }
+
+ if (addr != ipcpi.dt_addr) {
+ freepp(ssize_t, members, n_members);
+ return addr;
+ }
+
+ path[reset] = '\0';
+ }
- dir_path_reset();
- rib_del(dir_path);
+ freepp(ssize_t, members, n_members);
return 0;
}
-int dir_reg(const uint8_t * hash)
+int dir_init()
{
- char hashstr[ipcp_dir_hash_strlen() + 1];
- int ret;
-
- assert(hash);
-
- dir_path_reset();
+ uint64_t addr;
- ipcp_hash_str(hashstr, hash);
-
- ret = rib_add(dir_path, hashstr);
- if (ret == -ENOMEM)
- return -ENOMEM;
-
- rib_path_append(dir_path, hashstr);
+ dht = dht_create(ipcpi.dt_addr);
+ if (dht == NULL)
+ return -ENOMEM;
- ret = rib_add(dir_path, ipcpi.name);
- if (ret == -EPERM)
+ addr = find_peer_addr();
+ if (addr == ipcpi.dt_addr) {
+ log_err("Failed to get peer address.");
+ dht_destroy(dht);
return -EPERM;
- if (ret == -ENOMEM) {
- if (rib_children(dir_path, NULL) == 0)
- rib_del(dir_path);
- return -ENOMEM;
}
- return 0;
-}
-
-int dir_unreg(const uint8_t * hash)
-{
- char hashstr[ipcp_dir_hash_strlen() + 1];
- size_t len;
-
- assert(hash);
-
- dir_path_reset();
+ if (addr != 0) {
+ size_t retr = 0;
+ log_dbg("Enrolling directory with peer %" PRIu64 ".", addr);
+ /* NOTE: we could try other members if dht_enroll times out. */
+ while (dht_enroll(dht, addr)) {
+ if (retr++ == ENROL_RETR) {
+ dht_destroy(dht);
+ return -EPERM;
+ }
- ipcp_hash_str(hashstr, hash);
+ log_dbg("Directory enrollment failed, retrying...");
+ sleep(ENROL_INTV);
+ }
- rib_path_append(dir_path, hashstr);
-
- if (!rib_has(dir_path))
return 0;
+ }
- len = strlen(dir_path);
-
- rib_path_append(dir_path, ipcpi.name);
-
- rib_del(dir_path);
-
- dir_path[len] = '\0';
+ log_dbg("Bootstrapping DHT.");
- if (rib_children(dir_path, NULL) == 0)
- rib_del(dir_path);
+ /* TODO: get parameters for bootstrap from IRM tool. */
+ if (dht_bootstrap(dht, KAD_B, 86400)) {
+ dht_destroy(dht);
+ return -ENOMEM;
+ }
return 0;
}
-int dir_query(const uint8_t * hash)
+void dir_fini(void)
{
- char hashstr[ipcp_dir_hash_strlen() + 1];
- size_t len;
-
- dir_path_reset();
-
- ipcp_hash_str(hashstr, hash);
-
- rib_path_append(dir_path, hashstr);
-
- if (!rib_has(dir_path))
- return -1;
-
- /* FIXME: assert after local IPCP is deprecated */
- len = strlen(dir_path);
+ dht_destroy(dht);
+}
- rib_path_append(dir_path, ipcpi.name);
+int dir_reg(const uint8_t * hash)
+{
+ return dht_reg(dht, hash);
+}
- if (rib_has(dir_path)) {
- dir_path[len] = '\0';
- if (rib_children(dir_path, NULL) == 1)
- return -1;
- }
+int dir_unreg(const uint8_t * hash)
+{
+ return dht_unreg(dht, hash);
+}
- return 0;
+uint64_t dir_query(const uint8_t * hash)
+{
+ return dht_query(dht, hash);
}
diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h
index 1b28a5c0..4091a3e8 100644
--- a/src/ipcpd/normal/dir.h
+++ b/src/ipcpd/normal/dir.h
@@ -23,14 +23,14 @@
#ifndef OUROBOROS_IPCPD_NORMAL_DIR_H
#define OUROBOROS_IPCPD_NORMAL_DIR_H
-int dir_init(void);
+int dir_init(void);
-int dir_fini(void);
+void dir_fini(void);
-int dir_reg(const uint8_t * hash);
+int dir_reg(const uint8_t * hash);
-int dir_unreg(const uint8_t * hash);
+int dir_unreg(const uint8_t * hash);
-int dir_query(const uint8_t * hash);
+uint64_t dir_query(const uint8_t * hash);
#endif /* OUROBOROS_IPCPD_NORMAL_DIR_H */
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 1867c13b..5fcc5865 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -50,7 +50,7 @@
#include <assert.h>
struct ae_info {
- int (*post_sdu)(void * ae, struct shm_du_buff * sdb);
+ void (* post_sdu)(void * ae, struct shm_du_buff * sdb);
void * ae;
};
@@ -131,11 +131,14 @@ static int sdu_handler(int fd,
return 0;
}
- if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) {
+ if (dt.aes[dt_pci.fd].post_sdu == NULL) {
+ log_err("No registered AE on fd %d.", dt_pci.fd);
ipcp_sdb_release(sdb);
- return -1;
+ return -EPERM;
}
+ dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb);
+
return 0;
}
@@ -295,7 +298,7 @@ void dt_stop(void)
}
int dt_reg_ae(void * ae,
- int (* func)(void * func, struct shm_du_buff *))
+ void (* func)(void * func, struct shm_du_buff *))
{
int res_fd;
@@ -330,10 +333,11 @@ int dt_write_sdu(uint64_t dst_addr,
struct dt_pci dt_pci;
assert(sdb);
+ assert(dst_addr != ipcpi.dt_addr);
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
- log_err("Could not get nhop for addr %" PRIu64 ".", dst_addr);
+ log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
return -1;
}
diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h
index 0e1a8cc3..15ef51f0 100644
--- a/src/ipcpd/normal/dt.h
+++ b/src/ipcpd/normal/dt.h
@@ -38,7 +38,7 @@ int dt_start(void);
void dt_stop(void);
int dt_reg_ae(void * ae,
- int (* func)(void * ae, struct shm_du_buff * sdb));
+ void (* func)(void * ae, struct shm_du_buff * sdb));
int dt_write_sdu(uint64_t dst_addr,
qoscube_t qc,
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 26ee9037..6e880067 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -30,6 +30,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include "dir.h"
#include "dt_pci.h"
#include "fa.h"
#include "sdu_sched.h"
@@ -79,8 +80,8 @@ static void destroy_conn(int fd)
fa.r_addr[fd] = INVALID_ADDR;
}
-static int fa_post_sdu(void * ae,
- struct shm_du_buff * sdb)
+static void fa_post_sdu(void * ae,
+ struct shm_du_buff * sdb)
{
struct timespec ts = {0, TIMEOUT * 1000};
int fd;
@@ -99,7 +100,8 @@ static int fa_post_sdu(void * ae,
shm_du_buff_head(sdb));
if (msg == NULL) {
log_err("Failed to unpack flow alloc message.");
- return -1;
+ ipcp_sdb_release(sdb);
+ return;
}
switch (msg->code) {
@@ -110,7 +112,8 @@ static int fa_post_sdu(void * ae,
log_err("Bad flow request.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ ipcp_sdb_release(sdb);
+ return;
}
while (ipcpi.alloc_id != -1 &&
@@ -123,7 +126,8 @@ static int fa_post_sdu(void * ae,
log_dbg("Won't allocate over non-operational IPCP.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ ipcp_sdb_release(sdb);
+ return;
}
assert(ipcpi.alloc_id == -1);
@@ -136,7 +140,8 @@ static int fa_post_sdu(void * ae,
pthread_mutex_unlock(&ipcpi.alloc_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
log_err("Failed to get fd for flow.");
- return -1;
+ ipcp_sdb_release(sdb);
+ return;
}
pthread_rwlock_wrlock(&fa.flows_lock);
@@ -168,13 +173,12 @@ static int fa_post_sdu(void * ae,
default:
log_err("Got an unknown flow allocation message.");
flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ ipcp_sdb_release(sdb);
+ return;
}
flow_alloc_msg__free_unpacked(msg, NULL);
ipcp_sdb_release(sdb);
-
- return 0;
}
int fa_init(void)
@@ -235,47 +239,10 @@ int fa_alloc(int fd,
qoscube_t qc)
{
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
- char path[RIB_MAX_PATH_LEN + 1];
uint64_t addr;
- ssize_t ch;
- ssize_t i;
- char ** children;
- char hashstr[ipcp_dir_hash_strlen() + 1];
- char * dst_ipcp = NULL;
struct shm_du_buff * sdb;
- ipcp_hash_str(hashstr, dst);
-
- assert(strlen(hashstr) + strlen(DIR_PATH) + 1
- < RIB_MAX_PATH_LEN);
-
- strcpy(path, DIR_PATH);
-
- rib_path_append(path, hashstr);
-
- ch = rib_children(path, &children);
- if (ch <= 0)
- return -1;
-
- for (i = 0; i < ch; ++i)
- if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
- dst_ipcp = children[i];
- else
- free(children[i]);
-
- free(children);
-
- if (dst_ipcp == NULL)
- return -1;
-
- strcpy(path, MEMBERS_PATH);
-
- rib_path_append(path, dst_ipcp);
-
- free(dst_ipcp);
-
- if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr))
- return -1;
+ addr = dir_query(dst);
msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
msg.has_hash = true;
diff --git a/src/ipcpd/normal/kademlia.proto b/src/ipcpd/normal/kademlia.proto
new file mode 100644
index 00000000..0b7e8beb
--- /dev/null
+++ b/src/ipcpd/normal/kademlia.proto
@@ -0,0 +1,46 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * KAD protocol
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+syntax = "proto2";
+
+message kad_contact_msg {
+ required bytes id = 1;
+ required uint64 addr = 2;
+};
+
+message kad_msg {
+ required uint32 code = 1;
+ required uint32 cookie = 2;
+ required uint64 s_addr = 3;
+ optional bytes s_id = 4;
+ optional bytes key = 5;
+ repeated uint64 addrs = 6;
+ repeated kad_contact_msg contacts = 7;
+ // enrolment parameters
+ optional uint32 alpha = 8;
+ optional uint32 b = 9;
+ optional uint32 k = 10;
+ optional uint32 t_expire = 11;
+ optional uint32 t_refresh = 12;
+ optional uint32 t_replicate = 13;
+}; \ No newline at end of file
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 8c28de78..f94c15de 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -116,11 +116,6 @@ static int boot_components(void)
log_dbg("Starting ribmgr.");
- if (dir_init()) {
- log_err("Failed to initialize directory.");
- goto fail_dir;
- }
-
if (ribmgr_init()) {
log_err("Failed to initialize RIB manager.");
goto fail_ribmgr;
@@ -148,6 +143,11 @@ static int boot_components(void)
goto fail_fa_start;
}
+ if (dir_init()) {
+ log_err("Failed to initialize directory.");
+ goto fail_dir;
+ }
+
if (enroll_start()) {
log_err("Failed to start enroll.");
goto fail_enroll_start;
@@ -166,6 +166,8 @@ static int boot_components(void)
ipcp_set_state(IPCP_INIT);
enroll_stop();
fail_enroll_start:
+ dir_fini();
+ fail_dir:
fa_stop();
fail_fa_start:
dt_stop();
@@ -176,8 +178,6 @@ static int boot_components(void)
fail_dt:
ribmgr_fini();
fail_ribmgr:
- dir_fini();
- fail_dir:
addr_auth_fini();
fail_addr_auth:
free(ipcpi.dif_name);
@@ -191,6 +191,8 @@ void shutdown_components(void)
enroll_stop();
+ dir_fini();
+
fa_stop();
dt_stop();
@@ -201,8 +203,6 @@ void shutdown_components(void)
ribmgr_fini();
- dir_fini();
-
addr_auth_fini();
free(ipcpi.dif_name);
@@ -227,10 +227,9 @@ static int normal_ipcp_enroll(const char * dst,
return -1;
}
- log_dbg("Enrolled with " HASH_FMT, HASH_VAL(dst));
+ log_dbg("Enrolled with %s.", dst);
info->dir_hash_algo = ipcpi.dir_hash_algo;
-
strcpy(info->dif_name, ipcpi.dif_name);
return 0;
@@ -347,12 +346,17 @@ static int normal_ipcp_bootstrap(const struct ipcp_config * conf)
return 0;
}
+static int normal_ipcp_query(const uint8_t * dst)
+{
+ return dir_query(dst) ? 0 : -1;
+}
+
static struct ipcp_ops normal_ops = {
.ipcp_bootstrap = normal_ipcp_bootstrap,
.ipcp_enroll = normal_ipcp_enroll,
.ipcp_reg = dir_reg,
.ipcp_unreg = dir_unreg,
- .ipcp_query = dir_query,
+ .ipcp_query = normal_ipcp_query,
.ipcp_flow_alloc = fa_alloc,
.ipcp_flow_alloc_resp = fa_alloc_resp,
.ipcp_flow_dealloc = fa_dealloc
diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c
index e709da7c..0907cf7a 100644
--- a/src/ipcpd/normal/pol/flat.c
+++ b/src/ipcpd/normal/pol/flat.c
@@ -56,7 +56,7 @@ static int addr_taken(char * name,
char path[RIB_MAX_PATH_LEN + 1];
size_t reset;
- strcpy(path, "/" MEMBERS_NAME);
+ strcpy(path, MEMBERS_PATH);
reset = strlen(path);
@@ -102,7 +102,7 @@ uint64_t flat_address(void)
char ** members;
ssize_t n_members;
- strcpy(path, "/" MEMBERS_NAME);
+ strcpy(path, MEMBERS_PATH);
if (!rib_has(path)) {
log_err("Could not read members from RIB.");
diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h
index 31c79fbe..db1ff1bb 100644
--- a/src/ipcpd/normal/ribconfig.h
+++ b/src/ipcpd/normal/ribconfig.h
@@ -29,9 +29,7 @@
#define DLR "/"
#define BOOT_NAME "boot"
#define MEMBERS_NAME "members"
-#define DIR_NAME "directory"
#define ROUTING_NAME "fsdb"
-#define DIR_PATH DLR DIR_NAME
#define BOOT_PATH DLR BOOT_NAME
#define MEMBERS_PATH DLR MEMBERS_NAME
#define ROUTING_PATH DLR ROUTING_NAME
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 266a628d..3beb917c 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -299,9 +299,8 @@ static void * sync_rib(void *o)
rib_path_append(path, children[--ch]);
free(children[ch]);
- /* Only sync fsdb, members and directory */
+ /* Only sync fsdb and members */
if (strcmp(path, MEMBERS_PATH) == 0
- || strcmp(path, DIR_PATH) == 0
|| strcmp(path, ROUTING_PATH) == 0)
ribmgr_sync(path);
}
diff --git a/src/ipcpd/normal/tests/CMakeLists.txt b/src/ipcpd/normal/tests/CMakeLists.txt
new file mode 100644
index 00000000..d975caf6
--- /dev/null
+++ b/src/ipcpd/normal/tests/CMakeLists.txt
@@ -0,0 +1,37 @@
+get_filename_component(CURRENT_SOURCE_PARENT_DIR
+ ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(CURRENT_BINARY_PARENT_DIR
+ ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR})
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+include_directories(${CURRENT_SOURCE_PARENT_DIR})
+include_directories(${CURRENT_BINARY_PARENT_DIR})
+
+include_directories(${CMAKE_SOURCE_DIR}/include)
+include_directories(${CMAKE_BINARY_DIR}/include)
+
+get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
+
+create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
+ # Add new tests here
+ dht_test.c
+)
+
+set_source_files_properties(${KAD_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
+add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}
+ ${KAD_PROTO_SRCS})
+target_link_libraries(${PARENT_DIR}_test ouroboros)
+
+add_dependencies(check ${PARENT_DIR}_test)
+
+set(tests_to_run ${${PARENT_DIR}_tests})
+remove(tests_to_run test_suite.c)
+
+foreach (test ${tests_to_run})
+ get_filename_component(test_name ${test} NAME_WE)
+ add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name})
+endforeach (test)
diff --git a/src/ipcpd/normal/tests/dht_test.c b/src/ipcpd/normal/tests/dht_test.c
new file mode 100644
index 00000000..861ae10a
--- /dev/null
+++ b/src/ipcpd/normal/tests/dht_test.c
@@ -0,0 +1,99 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Unit tests of the DHT AE
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define __DHT_TEST__
+
+#include "dht.c"
+
+#include <pthread.h>
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#define KEY_LEN 32
+
+#define EXP 86400
+#define CONTACTS 1000
+
+int dht_test(int argc,
+ char ** argv)
+{
+ struct dht * dht;
+ uint64_t addr = 0x0D1F;
+ uint8_t key[KEY_LEN];
+ size_t i;
+
+ (void) argc;
+ (void) argv;
+
+ dht = dht_create(addr);
+ if (dht == NULL) {
+ printf("Failed to create dht.\n");
+ return -1;
+ }
+
+ dht_destroy(dht);
+
+ dht = dht_create(addr);
+ if (dht == NULL) {
+ printf("Failed to re-create dht.\n");
+ return -1;
+ }
+
+ if (dht_bootstrap(dht, KEY_LEN, EXP)) {
+ printf("Failed to bootstrap dht.\n");
+ dht_destroy(dht);
+ return -1;
+ }
+
+ dht_destroy(dht);
+
+ dht = dht_create(addr);
+ if (dht == NULL) {
+ printf("Failed to re-create dht.\n");
+ return -1;
+ }
+
+ if (dht_bootstrap(dht, KEY_LEN, EXP)) {
+ printf("Failed to bootstrap dht.\n");
+ dht_destroy(dht);
+ return -1;
+ }
+
+ for (i = 0; i < CONTACTS; ++i) {
+ uint64_t addr;
+ random_buffer(&addr, sizeof(addr));
+ random_buffer(key, KEY_LEN);
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, key, addr)) {
+ pthread_rwlock_unlock(&dht->lock);
+ printf("Failed to update bucket.\n");
+ dht_destroy(dht);
+ return -1;
+ }
+ pthread_rwlock_unlock(&dht->lock);
+ }
+
+ dht_destroy(dht);
+
+ return 0;
+}