summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dir
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dir')
-rw-r--r--src/ipcpd/unicast/dir/dht.c4052
-rw-r--r--src/ipcpd/unicast/dir/dht.h49
-rw-r--r--src/ipcpd/unicast/dir/dht.proto58
-rw-r--r--src/ipcpd/unicast/dir/ops.h42
-rw-r--r--src/ipcpd/unicast/dir/pol.h23
-rw-r--r--src/ipcpd/unicast/dir/tests/CMakeLists.txt40
-rw-r--r--src/ipcpd/unicast/dir/tests/dht_test.c1925
7 files changed, 6189 insertions, 0 deletions
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c
new file mode 100644
index 00000000..6b06def9
--- /dev/null
+++ b/src/ipcpd/unicast/dir/dht.c
@@ -0,0 +1,4052 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Distributed Hash Table based on Kademlia
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if !defined (__DHT_TEST__)
+ #if defined(__linux__) || defined(__CYGWIN__)
+ #define _DEFAULT_SOURCE
+ #else
+ #define _POSIX_C_SOURCE 200112L
+ #endif
+#endif
+
+#include "config.h"
+
+#define DHT "dht"
+#define OUROBOROS_PREFIX DHT
+
+#include <ouroboros/endian.h>
+#include <ouroboros/hash.h>
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/list.h>
+#include <ouroboros/random.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/time.h>
+#include <ouroboros/tpm.h>
+#include <ouroboros/utils.h>
+#include <ouroboros/pthread.h>
+
+#include "addr-auth.h"
+#include "common/connmgr.h"
+#include "dht.h"
+#include "dt.h"
+#include "ipcp.h"
+#include "ops.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <inttypes.h>
+#include <limits.h>
+
+#include "dht.pb-c.h"
+typedef DhtMsg dht_msg_t;
+typedef DhtContactMsg dht_contact_msg_t;
+typedef DhtStoreMsg dht_store_msg_t;
+typedef DhtFindReqMsg dht_find_req_msg_t;
+typedef DhtFindNodeRspMsg dht_find_node_rsp_msg_t;
+typedef DhtFindValueRspMsg dht_find_value_rsp_msg_t;
+typedef ProtobufCBinaryData binary_data_t;
+
+#ifndef CLOCK_REALTIME_COARSE
+#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
+#endif
+
+#define DHT_MAX_REQS 128 /* KAD recommends rnd(), bmp can be changed. */
+#define DHT_WARN_REQS 100 /* Warn if number of requests exceeds this. */
+#define DHT_MAX_VALS 8 /* Max number of values to return for a key. */
+#define DHT_T_CACHE 60 /* Max cache time for values (s) */
+#define DHT_T_RESP 2 /* Response time to wait for a response (s). */
+#define DHT_N_REPUB 5 /* Republish if expiry within n replications. */
+#define DHT_R_PING 2 /* Ping retries before declaring peer dead. */
+#define DHT_QUEER 15 /* Time to declare peer questionable. */
+#define DHT_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
+#define DHT_RESP_RETR 6 /* Number of retries on sending a response. */
+#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */
+#define DHT_INVALID 0 /* Invalid cookie value. */
+
+#define KEY_FMT "K<" HASH_FMT64 ">"
+#define KEY_VAL(key) HASH_VAL64(key)
+
+#define VAL_FMT "V<" HASH_FMT64 ">"
+#define VAL_VAL(val) HASH_VAL64((val).data)
+
+#define KV_FMT "<" HASH_FMT64 ", " HASH_FMT64 ">"
+#define KV_VAL(key, val) HASH_VAL64(key), HASH_VAL64((val).data)
+
+#define PEER_FMT "[" HASH_FMT64 "|" ADDR_FMT32 "]"
+#define PEER_VAL(id, addr) HASH_VAL64(id), ADDR_VAL32(&(addr))
+
+#define DHT_CODE(msg) dht_code_str[(msg)->code]
+
+#define TX_HDR_FMT "%s --> " PEER_FMT
+#define TX_HDR_VAL(msg, id, addr) DHT_CODE(msg), PEER_VAL(id, addr)
+
+#define RX_HDR_FMT "%s <-- " PEER_FMT
+#define RX_HDR_VAL(msg) DHT_CODE(msg), \
+ PEER_VAL(msg->src->id.data, msg->src->addr)
+
+#define CK_FMT "|" HASH_FMT64 "|"
+#define CK_VAL(cookie) HASH_VAL64(&(cookie))
+
+#define IS_REQUEST(code) \
+ (code == DHT_FIND_NODE_REQ || code == DHT_FIND_VALUE_REQ)
+
+enum dht_code {
+ DHT_STORE,
+ DHT_FIND_NODE_REQ,
+ DHT_FIND_NODE_RSP,
+ DHT_FIND_VALUE_REQ,
+ DHT_FIND_VALUE_RSP
+};
+
+const char * dht_code_str[] = {
+ "DHT_STORE",
+ "DHT_FIND_NODE_REQ",
+ "DHT_FIND_NODE_RSP",
+ "DHT_FIND_VALUE_REQ",
+ "DHT_FIND_VALUE_RSP"
+};
+
+enum dht_state {
+ DHT_NULL = 0,
+ DHT_INIT,
+ DHT_RUNNING
+};
+
+struct val_entry {
+ struct list_head next;
+
+ buffer_t val;
+
+ time_t t_exp; /* Expiry time */
+ time_t t_repl; /* Last replication time */
+};
+
+struct dht_entry {
+ struct list_head next;
+
+ uint8_t * key;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ } vals; /* We don't own these, only replicate */
+
+ struct {
+ struct list_head list;
+ size_t len;
+ } lvals; /* We own these, must be republished */
+};
+
+struct contact {
+ struct list_head next;
+
+ uint8_t * id;
+ uint64_t addr;
+
+ size_t fails;
+ time_t t_seen;
+};
+
+struct peer_entry {
+ struct list_head next;
+
+ uint64_t cookie;
+ uint8_t * id;
+ uint64_t addr;
+ enum dht_code code;
+
+ time_t t_sent;
+};
+
+struct dht_req {
+ struct list_head next;
+
+ uint8_t * key;
+ time_t t_exp;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ } peers;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ } cache;
+};
+
+struct bucket {
+ struct {
+ struct list_head list;
+ size_t len;
+ } contacts;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ } alts;
+
+ time_t t_refr;
+
+ size_t depth;
+ uint8_t mask;
+
+ struct bucket * parent;
+ struct bucket * children[1L << DHT_BETA];
+};
+
+struct cmd {
+ struct list_head next;
+ buffer_t cbuf;
+};
+
+struct dir_ops dht_dir_ops = {
+ .init = (int (*)(void *)) dht_init,
+ .fini = dht_fini,
+ .start = dht_start,
+ .stop = dht_stop,
+ .reg = dht_reg,
+ .unreg = dht_unreg,
+ .query = dht_query
+};
+
+struct {
+ struct { /* Kademlia parameters */
+ uint32_t alpha; /* Number of concurrent requests */
+ size_t k; /* Number of replicas to store */
+ time_t t_expire; /* Expiry time for values (s) */
+ time_t t_refresh; /* Refresh time for contacts (s) */
+ time_t t_repl; /* Replication time for values (s) */
+ };
+
+ buffer_t id;
+
+ time_t t0; /* Creation time */
+ uint64_t addr; /* Our own address */
+ uint64_t peer; /* Enrollment peer address */
+ uint64_t magic; /* Magic cookie for retransmit */
+
+ uint64_t eid; /* Entity ID */
+
+ struct tpm * tpm;
+ pthread_t worker;
+
+ enum dht_state state;
+
+ struct {
+ struct {
+ struct bucket * root;
+ } contacts;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ size_t vals;
+ size_t lvals;
+ } kv;
+
+ pthread_rwlock_t lock;
+ } db;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ } reqs;
+
+ struct {
+ struct list_head list;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ } cmds;
+} dht;
+
+
+/* DHT RIB */
+
+static const char * dht_dir[] = {
+ "database",
+ "stats",
+ NULL
+};
+
+const char * dht_stats = \
+ "DHT: " HASH_FMT64 "\n"
+ " Created: %s\n"
+ " Address: " ADDR_FMT32 "\n"
+ " Kademlia parameters:\n"
+ " Number of concurrent requests (alpha): %10zu\n"
+ " Number of replicas (k): %10zu\n"
+ " Expiry time for values (s): %10ld\n"
+ " Refresh time for contacts (s): %10ld\n"
+ " Replication time for values (s): %10ld\n"
+ " Number of keys: %10zu\n"
+ " Number of local values: %10zu\n"
+ " Number of non-local values: %10zu\n";
+
+static int dht_rib_statfile(char * buf,
+ size_t len)
+{
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ size_t keys;
+ size_t vals;
+ size_t lvals;
+
+ assert(buf != NULL);
+ assert(len > 0);
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ keys = dht.db.kv.len;
+ lvals = dht.db.kv.lvals;
+ vals = dht.db.kv.vals;
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ tm = gmtime(&dht.t0);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ snprintf(buf, len, dht_stats,
+ HASH_VAL64(dht.id.data),
+ tmstr,
+ ADDR_VAL32(&dht.addr),
+ dht.alpha, dht.k,
+ dht.t_expire, dht.t_refresh, dht.t_repl,
+ keys, vals, lvals);
+
+ return strlen(buf);
+}
+
+static size_t dht_db_file_len(void)
+{
+ size_t sz;
+ size_t vals;
+
+ sz = 18; /* DHT database + 2 * \n */
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ if (dht.db.kv.len == 0) {
+ pthread_rwlock_unlock(&dht.db.lock);
+ sz += 14; /* No entries */
+ return sz;
+ }
+
+ sz += 39 * 3 + 1; /* tally + extra newline */
+ sz += dht.db.kv.len * (25 + 19 + 23 + 1);
+
+ vals = dht.db.kv.vals + dht.db.kv.lvals;
+
+ sz += vals * (48 + 2 * RIB_TM_STRLEN);
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return sz;
+}
+
+static int dht_rib_dbfile(char * buf,
+ size_t len)
+{
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ char exstr[RIB_TM_STRLEN];
+ size_t i = 0;
+ struct list_head * p;
+
+ assert(buf != NULL);
+ assert(len > 0);
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ if (dht.db.kv.len == 0) {
+ i += snprintf(buf, len, " No entries.\n");
+ pthread_rwlock_unlock(&dht.db.lock);
+ return i;
+ }
+
+ i += snprintf(buf + i, len - i, "DHT database:\n\n");
+ i += snprintf(buf + i, len - i,
+ "Number of keys: %10zu\n"
+ "Number of local values: %10zu\n"
+ "Number of non-local values: %10zu\n\n",
+ dht.db.kv.len, dht.db.kv.vals, dht.db.kv.lvals);
+
+ list_for_each(p, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ struct list_head * h;
+
+ i += snprintf(buf + i, len - i, "Key: " KEY_FMT "\n",
+ KEY_VAL(e->key));
+ i += snprintf(buf + i, len - i, " Local entries:\n");
+
+ list_for_each(h, &e->vals.list) {
+ struct val_entry * v;
+
+ v = list_entry(h, struct val_entry, next);
+
+ tm = gmtime(&v->t_repl);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ tm = gmtime(&v->t_exp);
+ strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm);
+
+ i += snprintf(buf + i, len - i,
+ " " VAL_FMT
+ ", t_replicated=%.*s, t_expire=%.*s\n",
+ VAL_VAL(v->val),
+ RIB_TM_STRLEN, tmstr,
+ RIB_TM_STRLEN, exstr);
+ }
+
+ i += snprintf(buf + i, len - i, "\n");
+
+ i += snprintf(buf + i, len - i, " Non-local entries:\n");
+
+ list_for_each(h, &e->lvals.list) {
+ struct val_entry * v;
+
+ v= list_entry(h, struct val_entry, next);
+
+ tm = gmtime(&v->t_repl);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ tm = gmtime(&v->t_exp);
+ strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm);
+
+ i += snprintf(buf + i, len - i,
+ " " VAL_FMT
+ ", t_replicated=%.*s, t_expire=%.*s\n",
+ VAL_VAL(v->val),
+ RIB_TM_STRLEN, tmstr,
+ RIB_TM_STRLEN, exstr);
+
+ }
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ printf("DHT RIB DB file generated (%zu bytes).\n", i);
+
+ return i;
+}
+
+static int dht_rib_read(const char * path,
+ char * buf,
+ size_t len)
+{
+ char * entry;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+
+ if (strcmp(entry, "database") == 0) {
+ return dht_rib_dbfile(buf, len);
+ } else if (strcmp(entry, "stats") == 0) {
+ return dht_rib_statfile(buf, len);
+ }
+
+ return 0;
+}
+
+static int dht_rib_readdir(char *** buf)
+{
+ int i = 0;
+
+ while (dht_dir[i++] != NULL);
+
+ *buf = malloc(sizeof(**buf) * i);
+ if (*buf == NULL)
+ goto fail_buf;
+
+ i = 0;
+
+ while (dht_dir[i] != NULL) {
+ (*buf)[i] = strdup(dht_dir[i]);
+ if ((*buf)[i] == NULL)
+ goto fail_dup;
+ i++;
+ }
+
+ return i;
+ fail_dup:
+ freepp(char, *buf, i);
+ fail_buf:
+ return -ENOMEM;
+}
+
+static int dht_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ struct timespec now;
+ char * entry;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ attr->mtime = now.tv_sec;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+
+ if (strcmp(entry, "database") == 0) {
+ attr->size = dht_db_file_len();
+ } else if (strcmp(entry, "stats") == 0) {
+ attr->size = 545;
+ }
+
+ return 0;
+}
+
+static struct rib_ops r_ops = {
+ .read = dht_rib_read,
+ .readdir = dht_rib_readdir,
+ .getattr = dht_rib_getattr
+};
+
+/* Helper functions */
+
+static uint8_t * generate_id(void)
+{
+ uint8_t * id;
+
+ if(dht.id.len < sizeof(uint64_t)) {
+ log_err("DHT ID length is too short (%zu < %zu).",
+ dht.id.len, sizeof(uint64_t));
+ return NULL;
+ }
+
+ id = malloc(dht.id.len);
+ if (id == NULL) {
+ log_err("Failed to malloc ID.");
+ goto fail_id;
+ }
+
+ if (random_buffer(id, dht.id.len) < 0) {
+ log_err("Failed to generate random ID.");
+ goto fail_rnd;
+ }
+
+ return id;
+ fail_rnd:
+ free(id);
+ fail_id:
+ return NULL;
+}
+
+static uint64_t generate_cookie(void)
+{
+ uint64_t cookie = DHT_INVALID;
+
+ while (cookie == DHT_INVALID)
+ random_buffer((uint8_t *) &cookie, sizeof(cookie));
+
+ return cookie;
+}
+
+/*
+ * If someone builds a network where the n (n > k) closest nodes all
+ * have IDs starting with the same 64 bits: by all means, change this.
+ */
+static uint64_t dist(const uint8_t * src,
+ const uint8_t * dst)
+{
+ assert(dht.id.len >= sizeof(uint64_t));
+
+ return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst));
+}
+
+#define IS_CLOSER(x, y) (dist((x), dht.id.data) < dist((y), dht.id.data))
+
+static int addr_to_buf(const uint64_t addr,
+ buffer_t * buf)
+{
+ size_t len;
+ uint64_t _addr;
+
+ len = sizeof(addr);
+ _addr = hton64(addr);
+
+ assert(buf != NULL);
+
+ buf->data = malloc(len);
+ if (buf->data == NULL)
+ goto fail_malloc;
+
+ buf->len = sizeof(_addr);
+ memcpy(buf->data, &_addr, sizeof(_addr));
+
+ return 0;
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int buf_to_addr(const buffer_t buf,
+ uint64_t * addr)
+{
+ assert(addr != NULL);
+ assert(buf.data != NULL);
+
+ if (buf.len != sizeof(*addr))
+ return - EINVAL;
+
+ *addr = ntoh64(*((uint64_t *) buf.data));
+
+ if (*addr == dht.addr)
+ *addr = INVALID_ADDR;
+
+ return 0;
+}
+
+static uint8_t * dht_dup_key(const uint8_t * key)
+{
+ uint8_t * dup;
+
+ assert(key != NULL);
+ assert(dht.id.len != 0);
+
+ dup = malloc(dht.id.len);
+ if (dup == NULL)
+ return NULL;
+
+ memcpy(dup, key, dht.id.len);
+
+ return dup;
+}
+
+/* DHT */
+
+static struct val_entry * val_entry_create(const buffer_t val,
+ time_t exp)
+{
+ struct val_entry * e;
+ struct timespec now;
+
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+#ifndef __DHT_TEST_ALLOW_EXPIRED__
+ if (exp < now.tv_sec)
+ return NULL; /* Refuse to add expired values */
+#endif
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ goto fail_entry;
+
+ list_head_init(&e->next);
+
+ e->val.len = val.len;
+ e->val.data = malloc(val.len);
+ if (e->val.data == NULL)
+ goto fail_val;
+
+ memcpy(e->val.data, val.data, val.len);
+
+ e->t_repl = 0;
+ e->t_exp = exp;
+
+ return e;
+
+ fail_val:
+ free(e);
+ fail_entry:
+ return NULL;
+}
+
+static void val_entry_destroy(struct val_entry * v)
+{
+ assert(v->val.data != NULL);
+
+ freebuf(v->val);
+ free(v);
+}
+
+static struct dht_entry * dht_entry_create(const uint8_t * key)
+{
+ struct dht_entry * e;
+
+ assert(key != NULL);
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ goto fail_entry;
+
+ list_head_init(&e->next);
+ list_head_init(&e->vals.list);
+ list_head_init(&e->lvals.list);
+
+ e->vals.len = 0;
+ e->lvals.len = 0;
+
+ e->key = dht_dup_key(key);
+ if (e->key == NULL)
+ goto fail_key;
+
+ return e;
+ fail_key:
+ free(e);
+ fail_entry:
+ return NULL;
+}
+
+static void dht_entry_destroy(struct dht_entry * e)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(e != NULL);
+
+ list_for_each_safe(p, h, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->vals.len;
+ --dht.db.kv.vals;
+ }
+
+ list_for_each_safe(p, h, &e->lvals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->lvals.len;
+ --dht.db.kv.lvals;
+ }
+
+ free(e->key);
+
+ assert(e->vals.len == 0 && e->lvals.len == 0);
+
+ free(e);
+}
+
+static struct val_entry * dht_entry_get_lval(const struct dht_entry * e,
+ const buffer_t val)
+{
+ struct list_head * p;
+
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ list_for_each(p, &e->lvals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (bufcmp(&v->val, &val) == 0)
+ return v;
+ }
+
+ return NULL;
+}
+
+static struct val_entry * dht_entry_get_val(const struct dht_entry * e,
+ const buffer_t val)
+{
+ struct list_head * p;
+
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ list_for_each(p, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (bufcmp(&v->val, &val) == 0)
+ return v;
+
+ }
+
+ return NULL;
+}
+
+static int dht_entry_update_val(struct dht_entry * e,
+ buffer_t val,
+ time_t exp)
+{
+ struct val_entry * v;
+ struct timespec now;
+
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (exp < now.tv_sec)
+ return -EINVAL; /* Refuse to add expired values */
+
+ if (dht_entry_get_lval(e, val) != NULL) {
+ log_dbg(KV_FMT " Val already in lvals.", KV_VAL(e->key, val));
+ return 0; /* Refuse to add local values */
+ }
+
+ v = dht_entry_get_val(e, val);
+ if (v == NULL) {
+ v = val_entry_create(val, exp);
+ if (v == NULL)
+ return -ENOMEM;
+
+ list_add_tail(&v->next, &e->vals.list);
+ ++e->vals.len;
+ ++dht.db.kv.vals;
+
+ return 0;
+ }
+
+ if (v->t_exp < exp)
+ v->t_exp = exp;
+
+ return 0;
+}
+
+static int dht_entry_update_lval(struct dht_entry * e,
+ buffer_t val)
+{
+ struct val_entry * v;
+ struct timespec now;
+
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ v = dht_entry_get_lval(e, val);
+ if (v == NULL) {
+ log_dbg(KV_FMT " Adding lval.", KV_VAL(e->key, val));
+ v = val_entry_create(val, now.tv_sec + dht.t_expire);
+ if (v == NULL)
+ return -ENOMEM;
+
+ list_add_tail(&v->next, &e->lvals.list);
+ ++e->lvals.len;
+ ++dht.db.kv.lvals;
+
+ return 0;
+ }
+
+ return 0;
+}
+
+static int dht_entry_remove_lval(struct dht_entry * e,
+ buffer_t val)
+{
+ struct val_entry * v;
+
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ v = dht_entry_get_lval(e, val);
+ if (v == NULL)
+ return -ENOENT;
+
+ log_dbg(KV_FMT " Removing lval.", KV_VAL(e->key, val));
+
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->lvals.len;
+ --dht.db.kv.lvals;
+
+ return 0;
+}
+
+#define IS_EXPIRED(v, now) ((now)->tv_sec > (v)->t_exp)
+static void dht_entry_remove_expired_vals(struct dht_entry * e)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+
+ assert(e != NULL);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ list_for_each_safe(p, h, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (!IS_EXPIRED(v, &now))
+ continue;
+
+ log_dbg(KV_FMT " Value expired." , KV_VAL(e->key, v->val));
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->vals.len;
+ --dht.db.kv.vals;
+ }
+}
+
+static struct dht_entry * __dht_kv_find_entry(const uint8_t * key)
+{
+ struct list_head * p;
+
+ assert(key != NULL);
+
+ list_for_each(p, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ if (!memcmp(key, e->key, dht.id.len))
+ return e;
+ }
+
+ return NULL;
+}
+
+static void dht_kv_remove_expired_entries(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ list_for_each_safe(p, h, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ dht_entry_remove_expired_vals(e);
+ if (e->lvals.len > 0 || e->vals.len > 0)
+ continue;
+
+ log_dbg(KEY_FMT " Entry removed. ", KEY_VAL(e->key));
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ --dht.db.kv.len;
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+}
+
+
+static struct contact * contact_create(const uint8_t * id,
+ uint64_t addr)
+{
+ struct contact * c;
+ struct timespec t;
+
+ c = malloc(sizeof(*c));
+ if (c == NULL)
+ return NULL;
+
+ list_head_init(&c->next);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ c->addr = addr;
+ c->fails = 0;
+ c->t_seen = t.tv_sec;
+ c->id = dht_dup_key(id);
+ if (c->id == NULL) {
+ free(c);
+ return NULL;
+ }
+
+ return c;
+}
+
+static void contact_destroy(struct contact * c)
+{
+ assert(c != NULL);
+ assert(list_is_empty(&c->next));
+
+ free(c->id);
+ free(c);
+}
+
+static struct dht_req * dht_req_create(const uint8_t * key)
+{
+ struct dht_req * req;
+ struct timespec now;
+
+ assert(key != NULL);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ req = malloc(sizeof(*req));
+ if (req == NULL)
+ goto fail_malloc;
+
+ list_head_init(&req->next);
+
+ req->t_exp = now.tv_sec + DHT_T_RESP;
+
+ list_head_init(&req->peers.list);
+ req->peers.len = 0;
+
+ req->key = dht_dup_key(key);
+ if (req->key == NULL)
+ goto fail_dup_key;
+
+ list_head_init(&req->cache.list);
+ req->cache.len = 0;
+
+ return req;
+
+ fail_dup_key:
+ free(req);
+ fail_malloc:
+ return NULL;
+}
+
+static void dht_req_destroy(struct dht_req * req)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(req);
+ assert(req->key);
+
+ list_for_each_safe(p, h, &req->peers.list) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+ --req->peers.len;
+ }
+
+ list_for_each_safe(p, h, &req->cache.list) {
+ struct val_entry * e = list_entry(p, struct val_entry, next);
+ list_del(&e->next);
+ val_entry_destroy(e);
+ --req->cache.len;
+ }
+
+ free(req->key);
+
+ assert(req->peers.len == 0);
+
+ free(req);
+}
+
+static struct peer_entry * dht_req_get_peer(struct dht_req * req,
+ struct peer_entry * e)
+{
+ struct list_head * p;
+
+ list_for_each(p, &req->peers.list) {
+ struct peer_entry * x = list_entry(p, struct peer_entry, next);
+ if (x->addr == e->addr)
+ return x;
+ }
+
+ return NULL;
+}
+
+#define IS_MAGIC(peer) ((peer)->cookie == dht.magic)
+void dht_req_add_peer(struct dht_req * req,
+ struct peer_entry * e)
+{
+ struct peer_entry * x; /* existing */
+ struct list_head * p; /* iterator */
+ size_t pos = 0;
+
+ assert(req != NULL);
+ assert(e != NULL);
+ assert(e->id != NULL);
+
+ /*
+ * Dedupe messages to the same peer, unless
+ * 1) The previous request was FIND_NODE and now it's FIND_VALUE
+ * 2) We urgently need contacts from emergency peer (magic cookie)
+ */
+ x = dht_req_get_peer(req, e);
+ if (x != NULL && x->code >= e->code && !IS_MAGIC(e))
+ goto skip;
+
+ /* Find how this contact ranks in distance to the key */
+ list_for_each(p, &req->peers.list) {
+ struct peer_entry * y = list_entry(p, struct peer_entry, next);
+ if (IS_CLOSER(y->id, e->id)) {
+ pos++;
+ continue;
+ }
+ break;
+ }
+
+ /* Add a new peer to this request if we need to */
+ if (pos < dht.alpha || !IS_MAGIC(e)) {
+ x = malloc(sizeof(*x));
+ if (x == NULL) {
+ log_err("Failed to malloc peer entry.");
+ goto skip;
+ }
+
+ x->cookie = e->cookie;
+ x->addr = e->addr;
+ x->code = e->code;
+ x->t_sent = e->t_sent;
+ x->id = dht_dup_key(e->id);
+ if (x->id == NULL) {
+ log_err("Failed to dup peer ID.");
+ free(x);
+ goto skip;
+ }
+
+ if (IS_MAGIC(e))
+ list_add(&x->next, p);
+ else
+ list_add_tail(&x->next, p);
+ ++req->peers.len;
+ return;
+ }
+ skip:
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+}
+
+static size_t dht_req_add_peers(struct dht_req * req,
+ struct list_head * pl)
+{
+ struct list_head * p;
+ struct list_head * h;
+ size_t n = 0;
+
+ assert(req != NULL);
+ assert(pl != NULL);
+
+ list_for_each_safe(p, h, pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ dht_req_add_peer(req, e);
+ }
+
+ return n;
+}
+
+static bool dht_req_has_peer(struct dht_req * req,
+ uint64_t cookie)
+{
+ struct list_head * p;
+
+ assert(req != NULL);
+
+ list_for_each(p, &req->peers.list) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ if (e->cookie == cookie)
+ return true;
+ }
+
+ return false;
+}
+
+static void peer_list_destroy(struct list_head * pl)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(pl != NULL);
+
+ list_for_each_safe(p, h, pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+ }
+}
+
+static int dht_kv_create_peer_list(struct list_head * cl,
+ struct list_head * pl,
+ enum dht_code code)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+ size_t len;
+
+ assert(cl != NULL);
+ assert(pl != NULL);
+ assert(list_is_empty(pl));
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ len = 0;
+
+ list_for_each_safe(p, h, cl) {
+ struct contact * c = list_entry(p, struct contact, next);
+ struct peer_entry * e;
+ if (len++ == dht.alpha)
+ break;
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return -ENOMEM;
+
+ e->cookie = generate_cookie();
+ e->code = code;
+ e->addr = c->addr;
+ e->t_sent = now.tv_sec;
+
+ e->id = c->id;
+
+ list_add_tail(&e->next, pl);
+
+ list_del(&c->next);
+ c->id = NULL; /* we stole the id */
+ contact_destroy(c);
+ }
+
+ return 0;
+}
+
+static struct dht_req * __dht_kv_req_get_req(const uint8_t * key)
+{
+ struct list_head * p;
+
+ list_for_each(p, &dht.reqs.list) {
+ struct dht_req * r = list_entry(p, struct dht_req, next);
+ if (memcmp(r->key, key, dht.id.len) == 0)
+ return r;
+ }
+
+ return NULL;
+}
+
+static struct dht_req * __dht_kv_get_req_cache(const uint8_t * key)
+{
+ struct dht_req * req;
+
+ assert(key != NULL);
+
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL)
+ return NULL;
+
+ if (req->cache.len == 0)
+ return NULL;
+
+ return req;
+}
+
+static void __dht_kv_req_remove(const uint8_t * key)
+{
+ struct dht_req * req;
+
+ assert(key != NULL);
+
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL)
+ return;
+
+ list_del(&req->next);
+ --dht.reqs.len;
+
+ dht_req_destroy(req);
+}
+
+static struct dht_req * __dht_kv_get_req_peer(const uint8_t * key,
+ uint64_t cookie)
+{
+ struct dht_req * req;
+
+ assert(key != NULL);
+
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL)
+ return NULL;
+
+ if (!dht_req_has_peer(req, cookie))
+ return NULL;
+
+ return req;
+}
+
+static bool dht_kv_has_req(const uint8_t * key,
+ uint64_t cookie)
+{
+ bool found;
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ found = __dht_kv_get_req_peer(key, cookie) != NULL;
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ return found;
+}
+
+/*
+ * This will filter the peer list for addresses that still need to be
+ * contacted.
+ */
+static int dht_kv_update_req(const uint8_t * key,
+ struct list_head * pl)
+{
+ struct dht_req * req;
+ struct timespec now;
+
+ assert(key != NULL);
+ assert(pl != NULL);
+ assert(!list_is_empty(pl));
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL) {
+ if (dht.reqs.len == DHT_MAX_REQS) {
+ log_err(KEY_FMT " Max reqs reached (%zu).",
+ KEY_VAL(key), dht.reqs.len);
+ peer_list_destroy(pl);
+ goto fail_req;
+ }
+ req = dht_req_create(key);
+ if (req == NULL) {
+ log_err(KEY_FMT "Failed to create req.", KEY_VAL(key));
+ goto fail_req;
+ }
+ list_add_tail(&req->next, &dht.reqs.list);
+ ++dht.reqs.len;
+ }
+
+ if (req->cache.len > 0) /* Already have values */
+ peer_list_destroy(pl);
+
+ dht_req_add_peers(req, pl);
+ req->t_exp = now.tv_sec + DHT_T_RESP;
+
+ if (dht.reqs.len > DHT_WARN_REQS) {
+ log_warn("Number of outstanding requests (%zu) exceeds %u.",
+ dht.reqs.len, DHT_WARN_REQS);
+ }
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ return 0;
+ fail_req:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -1;
+}
+
+static int dht_kv_respond_req(uint8_t * key,
+ binary_data_t * vals,
+ size_t len)
+{
+ struct dht_req * req;
+ struct timespec now;
+ size_t i;
+
+ assert(key != NULL);
+ assert(vals != NULL);
+ assert(len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL) {
+ log_dbg(KEY_FMT " Failed to find req.", KEY_VAL(key));
+ goto fail_req;
+ }
+
+ for (i = 0; i < len; ++i) {
+ struct val_entry * e;
+ buffer_t val;
+ val.data = vals[i].data;
+ val.len = vals[i].len;
+ e = val_entry_create(val, now.tv_sec + DHT_T_CACHE);
+ if (e == NULL) {
+ log_err(" Failed to create val_entry.");
+ continue;
+ }
+
+ list_add_tail(&e->next, &req->cache.list);
+ ++req->cache.len;
+ }
+
+ pthread_cond_broadcast(&dht.reqs.cond);
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ return 0;
+ fail_req:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -1;
+}
+
+static ssize_t dht_kv_wait_req(const uint8_t * key,
+ buffer_t ** vals)
+{
+ struct list_head * p;
+ struct dht_req * req;
+ struct timespec t;
+#ifdef __DHT_TEST__
+ struct timespec intv = TIMESPEC_INIT_MS(10);
+#else
+ struct timespec intv = TIMESPEC_INIT_S(DHT_T_RESP);
+#endif
+ size_t max;
+ size_t i = 0;
+ int ret = 0;
+
+ assert(key != NULL);
+ assert(vals != NULL);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &t);
+
+ ts_add(&t, &intv, &t);
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &dht.reqs.mtx);
+
+ while ((req = __dht_kv_get_req_cache(key)) == NULL) {
+ ret = pthread_cond_timedwait(&dht.reqs.cond, &dht.reqs.mtx, &t);
+ if (ret == ETIMEDOUT)
+ break;
+ }
+
+ pthread_cleanup_pop(false);
+
+ if (ret == ETIMEDOUT) {
+ log_warn(KEY_FMT " Req timed out.", KEY_VAL(key));
+ __dht_kv_req_remove(key);
+ goto timedout;
+ }
+
+ max = MIN(req->cache.len, DHT_MAX_VALS);
+ if (max == 0)
+ goto no_vals;
+
+ *vals = malloc(max * sizeof(**vals));
+ if (*vals == NULL) {
+ log_err(KEY_FMT "Failed to malloc val buffer.", KEY_VAL(key));
+ goto fail_vals;
+ }
+
+ memset(*vals, 0, max * sizeof(**vals));
+
+ list_for_each(p, &req->cache.list) {
+ struct val_entry * v;
+ if (i == max)
+ break; /* We have enough values */
+ v = list_entry(p, struct val_entry, next);
+ (*vals)[i].data = malloc(v->val.len);
+ if ((*vals)[i].data == NULL)
+ goto fail_val_data;
+
+ (*vals)[i].len = v->val.len;
+ memcpy((*vals)[i++].data, v->val.data, v->val.len);
+ }
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ return i;
+ no_vals:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ *vals = NULL;
+ return 0;
+ fail_val_data:
+ freebufs(*vals, i);
+ fail_vals:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -ENOMEM;
+ timedout:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -ETIMEDOUT;
+}
+
+static struct bucket * iter_bucket(struct bucket * b,
+ const uint8_t * id)
+{
+ uint8_t byte;
+ uint8_t mask;
+
+ assert(b != NULL);
+
+ if (b->children[0] == NULL)
+ return b;
+
+ byte = id[(b->depth * DHT_BETA) / CHAR_BIT];
+
+ mask = ((1L << DHT_BETA) - 1) & 0xFF;
+
+ byte >>= (CHAR_BIT - DHT_BETA) -
+ (((b->depth) * DHT_BETA) & (CHAR_BIT - 1));
+
+ return iter_bucket(b->children[(byte & mask)], id);
+}
+
+static struct bucket * __dht_kv_get_bucket(const uint8_t * id)
+{
+ assert(dht.db.contacts.root != NULL);
+
+ return iter_bucket(dht.db.contacts.root, id);
+}
+
+static void contact_list_add(struct list_head * l,
+ struct contact * c)
+{
+ struct list_head * p;
+
+ assert(l != NULL);
+ assert(c != NULL);
+
+ list_for_each(p, l) {
+ struct contact * e = list_entry(p, struct contact, next);
+ if (IS_CLOSER(e->id, c->id))
+ continue;
+ }
+
+ list_add_tail(&c->next, p);
+}
+
+static ssize_t dht_kv_contact_list(const uint8_t * key,
+ struct list_head * l,
+ size_t max)
+{
+ struct list_head * p;
+ struct bucket * b;
+ struct timespec t;
+ size_t i;
+ size_t len = 0;
+
+ assert(l != NULL);
+ assert(key != NULL);
+ assert(list_is_empty(l));
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+
+ max = MIN(max, dht.k);
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ b = __dht_kv_get_bucket(key);
+ if (b == NULL) {
+ log_err(KEY_FMT " Failed to get bucket.", KEY_VAL(key));
+ goto fail_bucket;
+ }
+
+ b->t_refr = t.tv_sec + dht.t_refresh;
+
+ if (b->contacts.len == dht.k || b->parent == NULL) {
+ list_for_each(p, &b->contacts.list) {
+ struct contact * c;
+ struct contact * d;
+ c = list_entry(p, struct contact, next);
+ if (c->addr == dht.addr)
+ continue;
+ d = contact_create(c->id, c->addr);
+ if (d == NULL)
+ continue;
+ contact_list_add(l, d);
+ if (++len == max)
+ break;
+ }
+ } else {
+ struct bucket * d = b->parent;
+ for (i = 0; i < (1L << DHT_BETA) && len < dht.k; ++i) {
+ list_for_each(p, &d->children[i]->contacts.list) {
+ struct contact * c;
+ struct contact * d;
+ c = list_entry(p, struct contact, next);
+ if (c->addr == dht.addr)
+ continue;
+ d = contact_create(c->id, c->addr);
+ if (d == NULL)
+ continue;
+ contact_list_add(l, d);
+ if (++len == max)
+ break;
+ }
+ }
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return len;
+ fail_bucket:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
+}
+
+static void contact_list_destroy(struct list_head * l)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(l != NULL);
+
+ list_for_each_safe(p, h, l) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+}
+
+static ssize_t dht_kv_get_contacts(const uint8_t * key,
+ dht_contact_msg_t *** msgs)
+{
+ struct list_head cl;
+ struct list_head * p;
+ struct list_head * h;
+ size_t len;
+ size_t i = 0;
+
+ assert(key != NULL);
+ assert(msgs != NULL);
+
+ list_head_init(&cl);
+
+ len = dht_kv_contact_list(key, &cl, dht.k);
+ if (len == 0) {
+ *msgs = NULL;
+ return 0;
+ }
+
+ *msgs = malloc(len * sizeof(**msgs));
+ if (*msgs == NULL)
+ goto fail_msgs;
+
+ list_for_each_safe(p, h, &cl) {
+ struct contact * c;
+ (*msgs)[i] = malloc(sizeof(***msgs));
+ if ((*msgs)[i] == NULL)
+ goto fail_contact;
+
+ dht_contact_msg__init((*msgs)[i]);
+ c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ (*msgs)[i]->id.data = c->id;
+ (*msgs)[i]->id.len = dht.id.len;
+ (*msgs)[i++]->addr = c->addr;
+ free(c);
+ }
+
+ return i;
+ fail_contact:
+ while (i-- > 0)
+ dht_contact_msg__free_unpacked((*msgs)[i], NULL);
+ free(*msgs);
+ *msgs = NULL;
+ fail_msgs:
+ contact_list_destroy(&cl);
+ return -ENOMEM;
+}
+
+/* Build a refresh list. */
+static void __dht_kv_bucket_refresh_list(struct bucket * b,
+ time_t t,
+ struct list_head * r)
+{
+ struct contact * c;
+ struct contact * d;
+
+ assert(b != NULL);
+
+ if (t < b->t_refr)
+ return;
+
+ if (*b->children != NULL) {
+ size_t i;
+ for (i = 0; i < (1L << DHT_BETA); ++i)
+ __dht_kv_bucket_refresh_list(b->children[i], t, r);
+ }
+
+ if (b->contacts.len == 0)
+ return;
+
+ c = list_first_entry(&b->contacts.list, struct contact, next);
+ if (t > c->t_seen + dht.t_refresh) {
+ d = contact_create(c->id, c->addr);
+ if (d != NULL)
+ list_add(&d->next, r);
+ }
+}
+
+static struct bucket * bucket_create(void)
+{
+ struct bucket * b;
+ struct timespec t;
+ size_t i;
+
+ b = malloc(sizeof(*b));
+ if (b == NULL)
+ return NULL;
+
+ list_head_init(&b->contacts.list);
+ b->contacts.len = 0;
+
+ list_head_init(&b->alts.list);
+ b->alts.len = 0;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ b->t_refr = t.tv_sec + dht.t_refresh;
+
+ for (i = 0; i < (1L << DHT_BETA); ++i)
+ b->children[i] = NULL;
+
+ b->parent = NULL;
+ b->depth = 0;
+ b->mask = 0;
+
+ return b;
+}
+
+static void bucket_destroy(struct bucket * b)
+{
+ struct list_head * p;
+ struct list_head * h;
+ size_t i;
+
+ assert(b != NULL);
+
+ for (i = 0; i < (1L << DHT_BETA); ++i)
+ if (b->children[i] != NULL)
+ bucket_destroy(b->children[i]);
+
+ list_for_each_safe(p, h, &b->contacts.list) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->contacts.len;
+ }
+
+ list_for_each_safe(p, h, &b->alts.list) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->alts.len;
+ }
+
+ free(b);
+}
+
+static bool bucket_has_id(struct bucket * b,
+ const uint8_t * id)
+{
+ uint8_t mask;
+ uint8_t byte;
+
+ if (b->depth == 0)
+ return true;
+
+ byte = id[(b->depth * DHT_BETA) / CHAR_BIT];
+
+ mask = ((1L << DHT_BETA) - 1) & 0xFF;
+
+ byte >>= (CHAR_BIT - DHT_BETA) -
+ (((b->depth - 1) * DHT_BETA) & (CHAR_BIT - 1));
+
+ return ((byte & mask) == b->mask);
+}
+
+static int move_contacts(struct bucket * b,
+ struct bucket * c)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct contact * d;
+
+ assert(b != NULL);
+ assert(c != NULL);
+
+ list_for_each_safe(p, h, &b->contacts.list) {
+ d = list_entry(p, struct contact, next);
+ if (bucket_has_id(c, d->id)) {
+ list_del(&d->next);
+ --b->contacts.len;
+ list_add_tail(&d->next, &c->contacts.list);
+ ++c->contacts.len;
+ }
+ }
+
+ return 0;
+}
+
+static int split_bucket(struct bucket * b)
+{
+ uint8_t mask = 0;
+ size_t i;
+ size_t b_len;
+
+ assert(b);
+ assert(b->alts.len == 0);
+ assert(b->contacts.len != 0);
+ assert(b->children[0] == NULL);
+
+ b_len = b->contacts.len;
+
+ for (i = 0; i < (1L << DHT_BETA); ++i) {
+ b->children[i] = bucket_create();
+ if (b->children[i] == NULL)
+ goto fail_child;
+
+ b->children[i]->depth = b->depth + 1;
+ b->children[i]->mask = mask;
+ b->children[i]->parent = b;
+
+ move_contacts(b, b->children[i]);
+
+ mask++;
+ }
+
+ for (i = 0; i < (1L << DHT_BETA); ++i)
+ if (b->children[i]->contacts.len == b_len)
+ split_bucket(b->children[i]);
+
+ return 0;
+ fail_child:
+ while (i-- > 0)
+ bucket_destroy(b->children[i]);
+ return -1;
+}
+
+static int dht_kv_update_contacts(const uint8_t * id,
+ uint64_t addr)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct bucket * b;
+ struct contact * c;
+
+ assert(id != NULL);
+ assert(addr != INVALID_ADDR);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ b = __dht_kv_get_bucket(id);
+ if (b == NULL) {
+ log_err(PEER_FMT " Failed to get bucket.", PEER_VAL(id, addr));
+ goto fail_update;
+ }
+
+ c = contact_create(id, addr);
+ if (c == NULL) {
+ log_err(PEER_FMT " Failed to create contact.",
+ PEER_VAL(id, addr));
+ goto fail_update;
+ }
+
+ list_for_each_safe(p, h, &b->contacts.list) {
+ struct contact * d = list_entry(p, struct contact, next);
+ if (d->addr == addr) {
+ list_del(&d->next);
+ contact_destroy(d);
+ --b->contacts.len;
+ }
+ }
+
+ if (b->contacts.len == dht.k) {
+ if (bucket_has_id(b, dht.id.data)) {
+ list_add_tail(&c->next, &b->contacts.list);
+ ++b->contacts.len;
+ if (split_bucket(b)) {
+ list_del(&c->next);
+ contact_destroy(c);
+ --b->contacts.len;
+ }
+ } else if (b->alts.len == dht.k) {
+ struct contact * d;
+ d = list_first_entry(&b->alts.list,
+ struct contact, next);
+ list_del(&d->next);
+ contact_destroy(d);
+ list_add_tail(&c->next, &b->alts.list);
+ ++b->alts.len;
+ } else {
+ list_add_tail(&c->next, &b->alts.list);
+ ++b->alts.len;
+ }
+ } else {
+ list_add_tail(&c->next, &b->contacts.list);
+ ++b->contacts.len;
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return 0;
+ fail_update:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
+}
+
+static time_t gcd(time_t a,
+ time_t b)
+{
+ if (a == 0)
+ return b;
+
+ return gcd(b % a, a);
+}
+
+static dht_contact_msg_t * dht_kv_src_contact_msg(void)
+{
+ dht_contact_msg_t * src;
+
+ src = malloc(sizeof(*src));
+ if (src == NULL)
+ goto fail_malloc;
+
+ dht_contact_msg__init(src);
+
+ src->id.data = dht_dup_key(dht.id.data);
+ if (src->id.data == NULL)
+ goto fail_id;
+
+ src->id.len = dht.id.len;
+ src->addr = dht.addr;
+
+ return src;
+ fail_id:
+ dht_contact_msg__free_unpacked(src, NULL);
+ fail_malloc:
+ return NULL;
+}
+
+static dht_msg_t * dht_kv_find_req_msg(const uint8_t * key,
+ enum dht_code code)
+{
+ dht_msg_t * msg;
+
+ assert(key != NULL);
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ dht_msg__init(msg);
+ msg->code = code;
+
+ msg->src = dht_kv_src_contact_msg();
+ if (msg->src == NULL)
+ goto fail_msg;
+
+ msg->find = malloc(sizeof(*msg->find));
+ if (msg->find == NULL)
+ goto fail_msg;
+
+ dht_find_req_msg__init(msg->find);
+
+ msg->find->key.data = dht_dup_key(key);
+ if (msg->find->key.data == NULL)
+ goto fail_msg;
+
+ msg->find->key.len = dht.id.len;
+ msg->find->cookie = DHT_INVALID;
+
+ return msg;
+
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
+}
+
+static dht_msg_t * dht_kv_find_node_req_msg(const uint8_t * key)
+{
+ return dht_kv_find_req_msg(key, DHT_FIND_NODE_REQ);
+}
+
+static dht_msg_t * dht_kv_find_value_req_msg(const uint8_t * key)
+{
+ return dht_kv_find_req_msg(key, DHT_FIND_VALUE_REQ);
+}
+
+static dht_msg_t * dht_kv_find_node_rsp_msg(uint8_t * key,
+ uint64_t cookie,
+ dht_contact_msg_t *** contacts,
+ size_t len)
+{
+ dht_msg_t * msg;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ dht_msg__init(msg);
+ msg->code = DHT_FIND_NODE_RSP;
+
+ msg->src = dht_kv_src_contact_msg();
+ if (msg->src == NULL)
+ goto fail_msg;
+
+ msg->node = malloc(sizeof(*msg->node));
+ if (msg->node == NULL)
+ goto fail_msg;
+
+ dht_find_node_rsp_msg__init(msg->node);
+
+ msg->node->key.data = dht_dup_key(key);
+ if (msg->node->key.data == NULL)
+ goto fail_msg;
+
+ msg->node->cookie = cookie;
+ msg->node->key.len = dht.id.len;
+ msg->node->n_contacts = len;
+ if (len != 0) { /* Steal the ptr */
+ msg->node->contacts = *contacts;
+ *contacts = NULL;
+ }
+
+ return msg;
+
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
+}
+
+static dht_msg_t * dht_kv_find_value_rsp_msg(uint8_t * key,
+ uint64_t cookie,
+ dht_contact_msg_t *** contacts,
+ size_t n_contacts,
+ buffer_t ** vals,
+ size_t n_vals)
+{
+ dht_msg_t * msg;
+
+ msg = dht_kv_find_node_rsp_msg(key, cookie, contacts, n_contacts);
+ if (msg == NULL)
+ goto fail_node_rsp;
+
+ msg->code = DHT_FIND_VALUE_RSP;
+
+ msg->val = malloc(sizeof(*msg->val));
+ if (msg->val == NULL)
+ goto fail_msg;
+
+ dht_find_value_rsp_msg__init(msg->val);
+
+ msg->val->n_values = n_vals;
+ if (n_vals != 0) /* Steal the ptr */
+ msg->val->values = (binary_data_t *) *vals;
+
+ return msg;
+
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_node_rsp:
+ return NULL;
+}
+
+static dht_msg_t * dht_kv_store_msg(const uint8_t * key,
+ const buffer_t val,
+ time_t exp)
+{
+ dht_msg_t * msg;
+
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ dht_msg__init(msg);
+
+ msg->code = DHT_STORE;
+
+ msg->src = dht_kv_src_contact_msg();
+ if (msg->src == NULL)
+ goto fail_msg;
+
+ msg->store = malloc(sizeof(*msg->store));
+ if (msg->store == NULL)
+ goto fail_msg;
+
+ dht_store_msg__init(msg->store);
+
+ msg->store->key.data = dht_dup_key(key);
+ if (msg->store->key.data == NULL)
+ goto fail_msg;
+
+ msg->store->key.len = dht.id.len;
+ msg->store->val.data = malloc(val.len);
+ if (msg->store->val.data == NULL)
+ goto fail_msg;
+
+ memcpy(msg->store->val.data, val.data, val.len);
+
+ msg->store->val.len = val.len;
+ msg->store->exp = exp;
+
+ return msg;
+
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
+}
+
+static ssize_t dht_kv_retrieve(const uint8_t * key,
+ buffer_t ** vals)
+{
+ struct dht_entry * e;
+ struct list_head * p;
+ size_t n;
+ size_t i;
+
+ assert(key != NULL);
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
+ if (e == NULL)
+ goto no_vals;
+
+ n = MIN(DHT_MAX_VALS, e->vals.len + e->lvals.len);
+ if (n == 0)
+ goto no_vals;
+
+ *vals = malloc(n * sizeof(**vals));
+ if (*vals == NULL)
+ goto fail_vals;
+
+ memset(*vals, 0, n * sizeof(**vals));
+
+ i = 0;
+
+ list_for_each(p, &e->vals.list) {
+ struct val_entry * v;
+ if (i == n)
+ break; /* We have enough values */
+ v = list_entry(p, struct val_entry, next);
+ (*vals)[i].data = malloc(v->val.len);
+ if ((*vals)[i].data == NULL)
+ goto fail_val_data;
+
+ (*vals)[i].len = v->val.len;
+ memcpy((*vals)[i++].data, v->val.data, v->val.len);
+ }
+
+ list_for_each(p, &e->lvals.list) {
+ struct val_entry * v;
+ if (i == n)
+ break; /* We have enough values */
+ v = list_entry(p, struct val_entry, next);
+ (*vals)[i].data = malloc(v->val.len);
+ if ((*vals)[i].data == NULL)
+ goto fail_val_data;
+
+ (*vals)[i].len = v->val.len;
+ memcpy((*vals)[i++].data, v->val.data, v->val.len);
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return (ssize_t) i;
+
+ fail_val_data:
+ pthread_rwlock_unlock(&dht.db.lock);
+ freebufs(*vals, i);
+ *vals = NULL;
+ return -ENOMEM;
+ fail_vals:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -ENOMEM;
+ no_vals:
+ pthread_rwlock_unlock(&dht.db.lock);
+ *vals = NULL;
+ return 0;
+}
+
+static void __cleanup_dht_msg(void * msg)
+{
+ dht_msg__free_unpacked((dht_msg_t *) msg, NULL);
+}
+
+#ifdef DEBUG_PROTO_DHT
+static void dht_kv_debug_msg(dht_msg_t * msg)
+{
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ time_t stamp;
+ size_t i;
+
+ if (msg == NULL)
+ return;
+
+ pthread_cleanup_push(__cleanup_dht_msg, msg);
+
+ switch (msg->code) {
+ case DHT_STORE:
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->store->key.data),
+ msg->store->key.len);
+ log_proto(" val: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->store->val.data),
+ msg->store->val.len);
+ stamp = msg->store->exp;
+ tm = gmtime(&stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+ log_proto(" exp: %s.", tmstr);
+ break;
+ case DHT_FIND_NODE_REQ:
+ /* FALLTHRU */
+ case DHT_FIND_VALUE_REQ:
+ log_proto(" cookie: " HASH_FMT64,
+ HASH_VAL64(&msg->find->cookie));
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->find->key.data),
+ msg->find->key.len);
+ break;
+ case DHT_FIND_VALUE_RSP:
+ log_proto(" cookie: " HASH_FMT64,
+ HASH_VAL64(&msg->node->cookie));
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->node->key.data),
+ msg->node->key.len);
+ log_proto(" values: [%zd]", msg->val->n_values);
+ for (i = 0; i < msg->val->n_values; i++)
+ log_proto(" " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->val->values[i].data),
+ msg->val->values[i].len);
+ log_proto(" contacts: [%zd]", msg->node->n_contacts);
+ for (i = 0; i < msg->node->n_contacts; i++) {
+ dht_contact_msg_t * c = msg->node->contacts[i];
+ log_proto(" " PEER_FMT,
+ PEER_VAL(c->id.data, c->addr));
+ }
+ break;
+ case DHT_FIND_NODE_RSP:
+ log_proto(" cookie: " HASH_FMT64,
+ HASH_VAL64(&msg->node->cookie));
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->node->key.data), msg->node->key.len);
+ log_proto(" contacts: [%zd]", msg->node->n_contacts);
+ for (i = 0; i < msg->node->n_contacts; i++) {
+ dht_contact_msg_t * c = msg->node->contacts[i];
+ log_proto(" " PEER_FMT,
+ PEER_VAL(c->id.data, c->addr));
+ }
+
+ break;
+ default:
+ break;
+ }
+
+ pthread_cleanup_pop(false);
+}
+
+static void dht_kv_debug_msg_snd(dht_msg_t * msg,
+ uint8_t * id,
+ uint64_t addr)
+{
+ if (msg == NULL)
+ return;
+
+ log_proto(TX_HDR_FMT ".", TX_HDR_VAL(msg, id, addr));
+
+ dht_kv_debug_msg(msg);
+}
+
+static void dht_kv_debug_msg_rcv(dht_msg_t * msg)
+{
+ if (msg == NULL)
+ return;
+
+ log_proto(RX_HDR_FMT ".", RX_HDR_VAL(msg));
+
+ dht_kv_debug_msg(msg);
+}
+#endif
+
+#ifndef __DHT_TEST__
+static int dht_send_msg(dht_msg_t * msg,
+ uint64_t addr)
+{
+ size_t len;
+ struct shm_du_buff * sdb;
+
+ if (msg == NULL)
+ return 0;
+
+ assert(addr != INVALID_ADDR && addr != dht.addr);
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ log_warn("%s failed to pack.", DHT_CODE(msg));
+ goto fail_msg;
+ }
+
+ if (ipcp_sdb_reserve(&sdb, len)) {
+ log_warn("%s failed to get sdb.", DHT_CODE(msg));
+ goto fail_msg;
+ }
+
+ dht_msg__pack(msg, shm_du_buff_head(sdb));
+
+ if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) {
+ log_warn("%s write failed", DHT_CODE(msg));
+ goto fail_send;
+ }
+
+ return 0;
+ fail_send:
+ ipcp_sdb_release(sdb);
+ fail_msg:
+ return -1;
+}
+#else /* funtion for testing */
+static int dht_send_msg(dht_msg_t * msg,
+ uint64_t addr)
+{
+ buffer_t buf;
+
+ assert(msg != NULL);
+ assert(addr != INVALID_ADDR && addr != dht.addr);
+
+ buf.len = dht_msg__get_packed_size(msg);
+ if (buf.len == 0) {
+ log_warn("%s failed to pack.", DHT_CODE(msg));
+ goto fail_msg;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ log_warn("%s failed to malloc buf.", DHT_CODE(msg));
+ goto fail_msg;
+ }
+
+ dht_msg__pack(msg, buf.data);
+
+ if (sink_send_msg(&buf, addr) < 0) {
+ log_warn("%s write failed", DHT_CODE(msg));
+ goto fail_send;
+ }
+
+ return 0;
+ fail_send:
+ freebuf(buf);
+ fail_msg:
+ return -1;
+}
+#endif /* __DHT_TEST__ */
+
+static void __cleanup_peer_list(void * pl)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(pl != NULL);
+
+ list_for_each_safe(p, h, (struct list_head *) pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+ }
+}
+
+
+static int dht_kv_send_msgs(dht_msg_t * msg,
+ struct list_head * pl)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_cleanup_push(__cleanup_dht_msg, msg);
+ pthread_cleanup_push(__cleanup_peer_list, pl);
+
+ list_for_each_safe(p, h, pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ if (IS_REQUEST(msg->code)) {
+ msg->find->cookie = e->cookie;
+ assert(msg->find->cookie != DHT_INVALID);
+ }
+ if (dht_send_msg(msg, e->addr) < 0)
+ continue;
+
+#ifdef DEBUG_PROTO_DHT
+ dht_kv_debug_msg_snd(msg, e->id, e->addr);
+#endif
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+ }
+
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+
+ return list_is_empty(pl) ? 0 : -1;
+}
+
+static int dht_kv_get_peer_list_for_msg(dht_msg_t * msg,
+ struct list_head * pl)
+{
+ struct list_head cl; /* contact list */
+ uint8_t * key; /* key in the request */
+ size_t max;
+
+ assert(msg != NULL);
+
+ assert(list_is_empty(pl));
+
+ max = msg->code == DHT_STORE ? dht.k : dht.alpha;
+
+ switch (msg->code) {
+ case DHT_FIND_NODE_REQ:
+ /* FALLTHRU */
+ case DHT_FIND_VALUE_REQ:
+ key = msg->find->key.data;
+ break;
+ case DHT_STORE:
+ key = msg->store->key.data;
+ break;
+ default:
+ log_err("Invalid DHT msg code (%d).", msg->code);
+ return -1;
+ }
+
+ list_head_init(&cl);
+
+ if (dht_kv_contact_list(key, &cl, max) < 0) {
+ log_err(KEY_FMT " Failed to get contact list.", KEY_VAL(key));
+ goto fail_contacts;
+ }
+
+ if (list_is_empty(&cl)) {
+ log_warn(KEY_FMT " No available contacts.", KEY_VAL(key));
+ goto fail_contacts;
+ }
+
+ if (dht_kv_create_peer_list(&cl, pl, msg->code) < 0) {
+ log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key));
+ goto fail_peers;
+ }
+
+ contact_list_destroy(&cl);
+ return 0;
+ fail_peers:
+ contact_list_destroy(&cl);
+ fail_contacts:
+ return -1;
+}
+
+static int dht_kv_store_remote(const uint8_t * key,
+ const buffer_t val,
+ time_t exp)
+{
+ dht_msg_t * msg;
+ struct timespec now;
+ struct list_head pl;
+
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ msg = dht_kv_store_msg(key, val, exp);
+ if (msg == NULL) {
+ log_err(KV_FMT " Failed to create %s.",
+ KV_VAL(key, val), dht_code_str[DHT_STORE]);
+ goto fail_msg;
+ }
+
+ list_head_init(&pl);
+
+ if (dht_kv_get_peer_list_for_msg(msg, &pl) < 0) {
+ log_dbg(KV_FMT " Failed to get peer list.", KV_VAL(key, val));
+ goto fail_peer_list;
+ }
+
+ if (dht_kv_send_msgs(msg, &pl) < 0) {
+ log_warn(KV_FMT " Failed to send any %s msg.",
+ KV_VAL(key, val), DHT_CODE(msg));
+ goto fail_msgs;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msgs:
+ peer_list_destroy(&pl);
+ fail_peer_list:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return -1;
+}
+
+/* recursive lookup, start with pl NULL */
+static int dht_kv_query_contacts(const uint8_t * key,
+ struct list_head * pl)
+{
+ struct list_head p;
+
+ dht_msg_t * msg;
+
+ assert(key != NULL);
+
+ msg = dht_kv_find_node_req_msg(key);
+ if (msg == NULL) {
+ log_err(KEY_FMT " Failed to create %s msg.",
+ KEY_VAL(key), dht_code_str[DHT_FIND_NODE_REQ]);
+ goto fail_msg;
+ }
+
+ if (pl == NULL) {
+ list_head_init(&p);
+ pl = &p;
+ }
+
+ if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) {
+ log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key));
+ goto fail_peer_list;
+ }
+
+ if (dht_kv_update_req(key, pl) < 0) {
+ log_warn(KEY_FMT " Failed to update req.", KEY_VAL(key));
+ goto fail_update;
+ }
+
+ if (dht_kv_send_msgs(msg, pl)) {
+ log_warn(KEY_FMT " Failed to send any %s msg.",
+ KEY_VAL(key), DHT_CODE(msg));
+ goto fail_update;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_update:
+ peer_list_destroy(pl);
+ fail_peer_list:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return -1;
+}
+
+/* recursive lookup, start with pl NULL */
+static ssize_t dht_kv_query_remote(const uint8_t * key,
+ buffer_t ** vals,
+ struct list_head * pl)
+{
+ struct list_head p;
+ dht_msg_t * msg;
+
+ assert(key != NULL);
+
+ msg = dht_kv_find_value_req_msg(key);
+ if (msg == NULL) {
+ log_err(KEY_FMT " Failed to create value req.", KEY_VAL(key));
+ goto fail_msg;
+ }
+
+ if (pl == NULL) {
+ list_head_init(&p);
+ pl = &p;
+ }
+
+ if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) {
+ log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key));
+ goto fail_peer_list;
+ }
+
+ if (dht_kv_update_req(key, pl) < 0) {
+ log_err(KEY_FMT " Failed to update request.", KEY_VAL(key));
+ goto fail_update;
+ }
+
+ if (dht_kv_send_msgs(msg, pl)) {
+ log_warn(KEY_FMT " Failed to send %s msg.",
+ KEY_VAL(key), DHT_CODE(msg));
+ goto fail_update;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ if (vals == NULL) /* recursive lookup, already waiting */
+ return 0;
+
+ return dht_kv_wait_req(key, vals);
+ fail_update:
+ peer_list_destroy(pl);
+ fail_peer_list:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return -1;
+}
+
+static void __add_dht_kv_entry(struct dht_entry * e)
+{
+ struct list_head * p;
+
+ assert(e != NULL);
+
+ list_for_each(p, &dht.db.kv.list) {
+ struct dht_entry * d = list_entry(p, struct dht_entry, next);
+ if (IS_CLOSER(d->key, e->key))
+ continue;
+ break;
+ }
+
+ list_add_tail(&e->next, p);
+ ++dht.db.kv.len;
+}
+
+/* incoming store message */
+static int dht_kv_store(const uint8_t * key,
+ const buffer_t val,
+ time_t exp)
+{
+ struct dht_entry * e;
+ bool new = false;
+
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
+ if (e == NULL) {
+ log_dbg(KV_FMT " Adding entry (store).", KV_VAL(key, val));
+ e = dht_entry_create(key);
+ if (e == NULL)
+ goto fail;
+
+ new = true;
+
+ __add_dht_kv_entry(e);
+ }
+
+ if (dht_entry_update_val(e, val, exp) < 0)
+ goto fail_add;
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return 0;
+ fail_add:
+ if (new) {
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ --dht.db.kv.len;
+ }
+ fail:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
+}
+
+static int dht_kv_publish(const uint8_t * key,
+ const buffer_t val)
+{
+ struct dht_entry * e;
+ struct timespec now;
+ bool new = false;
+
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
+ if (e == NULL) {
+ log_dbg(KV_FMT " Adding entry (publish).", KV_VAL(key, val));
+ e = dht_entry_create(key);
+ if (e == NULL)
+ goto fail;
+
+ __add_dht_kv_entry(e);
+ new = true;
+ }
+
+ if (dht_entry_update_lval(e, val) < 0)
+ goto fail_add;
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ dht_kv_store_remote(key, val, now.tv_sec + dht.t_expire);
+
+ return 0;
+ fail_add:
+ if (new) {
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ --dht.db.kv.len;
+ }
+ fail:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
+}
+
+static int dht_kv_unpublish(const uint8_t * key,
+ const buffer_t val)
+{
+ struct dht_entry * e;
+ int rc;
+
+ assert(key != NULL);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
+ if (e == NULL)
+ goto no_entry;
+
+ rc = dht_entry_remove_lval(e, val);
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return rc;
+ no_entry:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -ENOENT;
+
+}
+
+/* message validation */
+static int dht_kv_validate_store_msg(const dht_store_msg_t * store)
+{
+ if (store == NULL) {
+ log_warn("Store in msg is NULL.");
+ return -EINVAL;
+ }
+
+ if (store->key.data == NULL || store->key.len == 0) {
+ log_warn("Invalid key in DHT store msg.");
+ return -EINVAL;
+ }
+
+ if (store->key.len != dht.id.len) {
+ log_warn("Invalid key length in DHT store msg.");
+ return -EINVAL;
+ }
+
+ if (store->val.data == NULL || store->val.len == 0) {
+ log_warn("Invalid value in DHT store msg.");
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int validate_find_req_msg(const dht_find_req_msg_t * req)
+{
+ if (req == NULL) {
+ log_warn("Request in msg is NULL.");
+ return -EINVAL;
+ }
+
+ if (req->key.data == NULL || req->key.len == 0) {
+ log_warn("Find request without key.");
+ return -EINVAL;
+ }
+
+ if (req->key.len != dht.id.len) {
+ log_warn("Invalid key length in request msg.");
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int validate_node_rsp_msg(const dht_find_node_rsp_msg_t * rsp)
+{
+ if (rsp == NULL) {
+ log_warn("Node rsp in msg is NULL.");
+ return -EINVAL;
+ }
+
+ if (rsp->key.data == NULL) {
+ log_warn("Invalid key in DHT response msg.");
+ return -EINVAL;
+ }
+
+ if (rsp->key.len != dht.id.len) {
+ log_warn("Invalid key length in DHT response msg.");
+ return -EINVAL;
+ }
+
+ if (!dht_kv_has_req(rsp->key.data, rsp->cookie)) {
+ log_warn(KEY_FMT " No request " CK_FMT ".",
+ KEY_VAL(rsp->key.data), CK_VAL(rsp->cookie));
+
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int validate_value_rsp_msg(const dht_find_value_rsp_msg_t * rsp)
+{
+ if (rsp == NULL) {
+ log_warn("Invalid DHT find value response msg.");
+ return -EINVAL;
+ }
+
+ if (rsp->values == NULL && rsp->n_values > 0) {
+ log_dbg("No values in DHT response msg.");
+ return 0;
+ }
+
+ if (rsp->n_values == 0 && rsp->values != NULL) {
+ log_dbg("DHT response did not set values NULL.");
+ return 0;
+ }
+
+ return 0;
+}
+
+static int dht_kv_validate_msg(dht_msg_t * msg)
+{
+
+ assert(msg != NULL);
+
+ if (msg->src->id.len != dht.id.len) {
+ log_warn("%s Invalid source contact ID.", DHT_CODE(msg));
+ return -EINVAL;
+ }
+
+ if (msg->src->addr == INVALID_ADDR) {
+ log_warn("%s Invalid source address.", DHT_CODE(msg));
+ return -EINVAL;
+ }
+
+ switch (msg->code) {
+ case DHT_FIND_VALUE_REQ:
+ /* FALLTHRU */
+ case DHT_FIND_NODE_REQ:
+ if (validate_find_req_msg(msg->find) < 0)
+ return -EINVAL;
+ break;
+ case DHT_FIND_VALUE_RSP:
+ if (validate_value_rsp_msg(msg->val) < 0)
+ return -EINVAL;
+ /* FALLTHRU */
+ case DHT_FIND_NODE_RSP:
+ if (validate_node_rsp_msg(msg->node) < 0)
+ return -EINVAL;
+ break;
+ case DHT_STORE:
+ if (dht_kv_validate_store_msg(msg->store) < 0)
+ return -EINVAL;
+ break;
+ default:
+ log_warn("Invalid DHT msg code (%d).", msg->code);
+ return -ENOENT;
+ }
+
+ return 0;
+}
+
+static void do_dht_kv_store(const dht_store_msg_t * store)
+{
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ buffer_t val;
+ uint8_t * key;
+ time_t exp;
+
+ assert(store != NULL);
+
+ val.data = store->val.data;
+ val.len = store->val.len;
+ key = store->key.data;
+ exp = store->exp;
+
+ if (dht_kv_store(store->key.data, val, store->exp) < 0) {
+ log_err(KV_FMT " Failed to store.", KV_VAL(key, val));
+ return;
+ }
+
+ tm = gmtime(&exp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+ log_dbg(KV_FMT " Stored value until %s.", KV_VAL(key, val), tmstr);
+}
+
+static dht_msg_t * do_dht_kv_find_node_req(const dht_find_req_msg_t * req)
+{
+ dht_contact_msg_t ** contacts;
+ dht_msg_t * rsp;
+ uint8_t * key;
+ uint64_t cookie;
+ ssize_t len;
+
+ assert(req != NULL);
+
+ key = req->key.data;
+ cookie = req->cookie;
+
+ len = dht_kv_get_contacts(key, &contacts);
+ if (len < 0) {
+ log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key));
+ goto fail_contacts;
+ }
+
+ rsp = dht_kv_find_node_rsp_msg(key, cookie, &contacts, len);
+ if (rsp == NULL) {
+ log_err(KEY_FMT " Failed to create %s.", KEY_VAL(key),
+ dht_code_str[DHT_FIND_NODE_RSP]);
+ goto fail_msg;
+ }
+
+ assert(rsp->code == DHT_FIND_NODE_RSP);
+
+ log_info(KEY_FMT " Responding with %zd contacts", KEY_VAL(key), len);
+
+ return rsp;
+ fail_msg:
+ while (len-- > 0)
+ dht_contact_msg__free_unpacked(contacts[len], NULL);
+ free(contacts);
+ fail_contacts:
+ return NULL;
+}
+
+static void dht_kv_process_node_rsp(dht_contact_msg_t ** contacts,
+ size_t len,
+ struct list_head * pl,
+ enum dht_code code)
+{
+ struct timespec now;
+ size_t i;
+
+ assert(contacts != NULL);
+ assert(len > 0);
+ assert(pl != NULL);
+ assert(list_is_empty(pl));
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ for (i = 0; i < len; i++) {
+ dht_contact_msg_t * c = contacts[i];
+ struct peer_entry * e;
+ if (c->addr == dht.addr)
+ continue;
+
+ if (dht_kv_update_contacts(c->id.data, c->addr) < 0)
+ log_warn(PEER_FMT " Failed to update contacts.",
+ PEER_VAL(c->id.data, c->addr));
+
+ e = malloc(sizeof(*e));
+ if (e == NULL) {
+ log_err(PEER_FMT " Failed to malloc entry.",
+ PEER_VAL(c->id.data, c->addr));
+ continue;
+ }
+
+ e->id = dht_dup_key(c->id.data);
+ if (e->id == NULL) {
+ log_warn(PEER_FMT " Failed to duplicate id.",
+ PEER_VAL(c->id.data, c->addr));
+ free(e);
+ continue;
+ }
+
+ e->cookie = generate_cookie();
+ e->code = code;
+ e->addr = c->addr;
+ e->t_sent = now.tv_sec;
+
+ list_add_tail(&e->next, pl);
+ }
+}
+
+static dht_msg_t * do_dht_kv_find_value_req(const dht_find_req_msg_t * req)
+{
+ dht_contact_msg_t ** contacts;
+ ssize_t n_contacts;
+ buffer_t * vals;
+ ssize_t n_vals;
+ dht_msg_t * rsp;
+ uint8_t * key;
+ uint64_t cookie;
+
+ assert(req != NULL);
+
+ key = req->key.data;
+ cookie = req->cookie;
+
+ n_contacts = dht_kv_get_contacts(key, &contacts);
+ if (n_contacts < 0) {
+ log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key));
+ goto fail_contacts;
+ }
+
+ assert(n_contacts > 0 || contacts == NULL);
+
+ n_vals = dht_kv_retrieve(key, &vals);
+ if (n_vals < 0) {
+ log_dbg(KEY_FMT " Failed to get values.", KEY_VAL(key));
+ goto fail_vals;
+ }
+
+ if (n_vals == 0)
+ log_dbg(KEY_FMT " No values found.", KEY_VAL(key));
+
+ rsp = dht_kv_find_value_rsp_msg(key, cookie, &contacts, n_contacts,
+ &vals, n_vals);
+ if (rsp == NULL) {
+ log_err(KEY_FMT " Failed to create %s msg.",
+ KEY_VAL(key), dht_code_str[DHT_FIND_VALUE_RSP]);
+ goto fail_msg;
+ }
+
+ log_info(KEY_FMT " Responding with %zd contacts, %zd values.",
+ KEY_VAL(req->key.data), n_contacts, n_vals);
+
+ return rsp;
+
+ fail_msg:
+ freebufs(vals, n_vals);
+ fail_vals:
+ while (n_contacts-- > 0)
+ dht_contact_msg__free_unpacked(contacts[n_contacts], NULL);
+ free(contacts);
+ fail_contacts:
+ return NULL;
+}
+
+static void do_dht_kv_find_node_rsp(const dht_find_node_rsp_msg_t * rsp)
+{
+ struct list_head pl;
+
+ assert(rsp != NULL);
+
+ list_head_init(&pl);
+
+ dht_kv_process_node_rsp(rsp->contacts, rsp->n_contacts, &pl,
+ DHT_FIND_NODE_REQ);
+
+ if (list_is_empty(&pl))
+ goto no_contacts;
+
+ if (dht_kv_update_req(rsp->key.data, &pl) < 0) {
+ log_err(KEY_FMT " Failed to update request.",
+ KEY_VAL(rsp->key.data));
+ goto fail_update;
+ }
+
+ dht_kv_query_contacts(rsp->key.data, &pl);
+
+ return;
+
+ fail_update:
+ peer_list_destroy(&pl);
+ no_contacts:
+ return;
+}
+
+static void do_dht_kv_find_value_rsp(const dht_find_node_rsp_msg_t * node,
+ const dht_find_value_rsp_msg_t * val)
+{
+ struct list_head pl;
+ uint8_t * key;
+
+ assert(node != NULL);
+ assert(val != NULL);
+
+ list_head_init(&pl);
+
+ key = node->key.data;
+
+ dht_kv_process_node_rsp(node->contacts, node->n_contacts, &pl,
+ DHT_FIND_VALUE_REQ);
+
+ if (val->n_values > 0) {
+ log_dbg(KEY_FMT " %zd new values received.",
+ KEY_VAL(key), val->n_values);
+ if (dht_kv_respond_req(key, val->values, val->n_values) < 0)
+ log_warn(KEY_FMT " Failed to respond to request.",
+ KEY_VAL(key));
+ peer_list_destroy(&pl);
+ return; /* done! */
+ }
+
+ if (list_is_empty(&pl))
+ goto no_contacts;
+
+ if (dht_kv_update_req(key, &pl) < 0) {
+ log_err(KEY_FMT " Failed to update request.", KEY_VAL(key));
+ goto fail_update;
+ }
+
+ dht_kv_query_remote(key, NULL, &pl);
+
+ return;
+ fail_update:
+ peer_list_destroy(&pl);
+ no_contacts:
+ return;
+}
+
+static dht_msg_t * dht_wait_for_dht_msg(void)
+{
+ dht_msg_t * msg;
+ struct cmd * cmd;
+
+ pthread_mutex_lock(&dht.cmds.mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &dht.cmds.mtx);
+
+ while (list_is_empty(&dht.cmds.list))
+ pthread_cond_wait(&dht.cmds.cond, &dht.cmds.mtx);
+
+ cmd = list_last_entry(&dht.cmds.list, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_cleanup_pop(true);
+
+ msg = dht_msg__unpack(NULL, cmd->cbuf.len, cmd->cbuf.data);
+ if (msg == NULL)
+ log_warn("Failed to unpack DHT msg.");
+
+ freebuf(cmd->cbuf);
+ free(cmd);
+
+ return msg;
+}
+
+static void do_dht_msg(dht_msg_t * msg)
+{
+ dht_msg_t * rsp = NULL;
+ uint8_t * id;
+ uint64_t addr;
+
+#ifdef DEBUG_PROTO_DHT
+ dht_kv_debug_msg_rcv(msg);
+#endif
+ if (dht_kv_validate_msg(msg) == -EINVAL) {
+ log_warn("%s Validation failed.", DHT_CODE(msg));
+ dht_msg__free_unpacked(msg, NULL);
+ return;
+ }
+
+ id = msg->src->id.data;
+ addr = msg->src->addr;
+
+ if (dht_kv_update_contacts(id, addr) < 0)
+ log_warn(PEER_FMT " Failed to update contact from msg src.",
+ PEER_VAL(id, addr));
+
+ pthread_cleanup_push(__cleanup_dht_msg, msg);
+
+ switch(msg->code) {
+ case DHT_FIND_VALUE_REQ:
+ rsp = do_dht_kv_find_value_req(msg->find);
+ break;
+ case DHT_FIND_NODE_REQ:
+ rsp = do_dht_kv_find_node_req(msg->find);
+ break;
+ case DHT_STORE:
+ do_dht_kv_store(msg->store);
+ break;
+ case DHT_FIND_NODE_RSP:
+ do_dht_kv_find_node_rsp(msg->node);
+ break;
+ case DHT_FIND_VALUE_RSP:
+ do_dht_kv_find_value_rsp(msg->node, msg->val);
+ break;
+ default:
+ assert(false); /* already validated */
+ }
+
+ pthread_cleanup_pop(true);
+
+ if (rsp == NULL)
+ return;
+
+ pthread_cleanup_push(__cleanup_dht_msg, rsp);
+
+ dht_send_msg(rsp, addr);
+
+ pthread_cleanup_pop(true); /* free rsp */
+}
+
+static void * dht_handle_packet(void * o)
+{
+ (void) o;
+
+ while (true) {
+ dht_msg_t * msg;
+
+ msg = dht_wait_for_dht_msg();
+ if (msg == NULL)
+ continue;
+
+ tpm_begin_work(dht.tpm);
+
+ do_dht_msg(msg);
+
+ tpm_end_work(dht.tpm);
+ }
+
+ return (void *) 0;
+}
+#ifndef __DHT_TEST__
+static void dht_post_packet(void * comp,
+ struct shm_du_buff * sdb)
+{
+ struct cmd * cmd;
+
+ (void) comp;
+
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command malloc failed.");
+ goto fail_cmd;
+ }
+
+ cmd->cbuf.data = malloc(shm_du_buff_len(sdb));
+ if (cmd->cbuf.data == NULL) {
+ log_err("Command buffer malloc failed.");
+ goto fail_buf;
+ }
+
+ cmd->cbuf.len = shm_du_buff_len(sdb);
+
+ memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len);
+
+ ipcp_sdb_release(sdb);
+
+ pthread_mutex_lock(&dht.cmds.mtx);
+
+ list_add(&cmd->next, &dht.cmds.list);
+
+ pthread_cond_signal(&dht.cmds.cond);
+
+ pthread_mutex_unlock(&dht.cmds.mtx);
+
+ return;
+
+ fail_buf:
+ free(cmd);
+ fail_cmd:
+ ipcp_sdb_release(sdb);
+ return;
+}
+#endif
+
+int dht_reg(const uint8_t * key)
+{
+ buffer_t val;
+
+ if (addr_to_buf(dht.addr, &val) < 0) {
+ log_err("Failed to convert address to buffer.");
+ goto fail_a2b;
+ }
+
+ if (dht_kv_publish(key, val)) {
+ log_err(KV_FMT " Failed to publish.", KV_VAL(key, val));
+ goto fail_publish;
+ }
+
+ freebuf(val);
+
+ return 0;
+ fail_publish:
+ freebuf(val);
+ fail_a2b:
+ return -1;
+}
+
+int dht_unreg(const uint8_t * key)
+{
+ buffer_t val;
+
+ if (addr_to_buf(dht.addr, &val) < 0) {
+ log_err("Failed to convert address to buffer.");
+ goto fail_a2b;
+ }
+
+ if (dht_kv_unpublish(key, val)) {
+ log_err(KV_FMT " Failed to unpublish.", KV_VAL(key, val));
+ goto fail_unpublish;
+ }
+
+ freebuf(val);
+
+ return 0;
+ fail_unpublish:
+ freebuf(val);
+ fail_a2b:
+ return -ENOMEM;
+}
+
+uint64_t dht_query(const uint8_t * key)
+{
+ buffer_t * vals;
+ ssize_t n;
+ uint64_t addr;
+
+ n = dht_kv_retrieve(key, &vals);
+ if (n < 0) {
+ log_err(KEY_FMT " Failed to query db.", KEY_VAL(key));
+ goto fail_vals;
+ }
+
+ if (n == 0) {
+ assert(vals == NULL);
+
+ log_dbg(KEY_FMT " No local values.", KEY_VAL(key));
+ n = dht_kv_query_remote(key, &vals, NULL);
+ if (n < 0) {
+ log_warn(KEY_FMT " Failed to query DHT.", KEY_VAL(key));
+ goto fail_vals;
+ }
+ if (n == 0) {
+ log_dbg(KEY_FMT " No values.", KEY_VAL(key));
+ goto no_vals;
+ }
+ }
+
+ if (buf_to_addr(vals[0], &addr) < 0) {
+ log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[0]));
+ goto fail_b2a;
+ }
+
+ if (n > 1 && addr == INVALID_ADDR && buf_to_addr(vals[1], &addr) < 0) {
+ log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[1]));
+ goto fail_b2a;
+ }
+
+ freebufs(vals, n);
+
+ return addr;
+ fail_b2a:
+ freebufs(vals, n);
+ return INVALID_ADDR;
+ no_vals:
+ free(vals);
+ fail_vals:
+ return INVALID_ADDR;
+}
+
+static int emergency_peer(struct list_head * pl)
+{
+ struct peer_entry * e;
+ struct timespec now;
+
+ assert(pl != NULL);
+ assert(list_is_empty(pl));
+
+ if (dht.peer == INVALID_ADDR)
+ return -1;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ e = malloc(sizeof(*e));
+ if (e == NULL) {
+ log_err("Failed to malloc emergency peer entry.");
+ goto fail_malloc;
+ }
+
+ e->id = dht_dup_key(dht.id.data);
+ if (e->id == NULL) {
+ log_err("Failed to duplicate DHT ID for emergency peer.");
+ goto fail_id;
+ }
+
+ e->addr = dht.peer;
+ e->cookie = dht.magic;
+ e->code = DHT_FIND_NODE_REQ;
+ e->t_sent = now.tv_sec;
+
+ list_add_tail(&e->next, pl);
+
+ return 0;
+ fail_id:
+ free(e);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int dht_kv_seed_bootstrap_peer(void)
+{
+ struct list_head pl;
+
+ list_head_init(&pl);
+
+ if (dht.peer == INVALID_ADDR) {
+ log_dbg("No-one to contact.");
+ return 0;
+ }
+
+ if (emergency_peer(&pl) < 0) {
+ log_err("Could not create emergency peer.");
+ goto fail_peer;
+ }
+
+ log_dbg("Pinging emergency peer " ADDR_FMT32 ".",
+ ADDR_VAL32(&dht.peer));
+
+ if (dht_kv_query_contacts(dht.id.data, &pl) < 0) {
+ log_warn("Failed to bootstrap peer.");
+ goto fail_query;
+ }
+
+ peer_list_destroy(&pl);
+
+ return 0;
+ fail_query:
+ peer_list_destroy(&pl);
+ fail_peer:
+ return -EAGAIN;
+}
+
+static void dht_kv_check_contacts(void)
+{
+ struct list_head cl;
+ struct list_head pl;
+
+ list_head_init(&cl);
+
+ dht_kv_contact_list(dht.id.data, &cl, dht.k);
+
+ if (!list_is_empty(&cl))
+ goto success;
+
+ contact_list_destroy(&cl);
+
+ list_head_init(&pl);
+
+ if (dht.peer == INVALID_ADDR) {
+ log_dbg("No-one to contact.");
+ return;
+ }
+
+ if (emergency_peer(&pl) < 0) {
+ log_err("Could not create emergency peer.");
+ goto fail_peer;
+ }
+
+ log_dbg("No contacts found, using emergency peer " ADDR_FMT32 ".",
+ ADDR_VAL32(&dht.peer));
+
+ dht_kv_query_contacts(dht.id.data, &pl);
+
+ peer_list_destroy(&pl);
+
+ return;
+ success:
+ contact_list_destroy(&cl);
+ return;
+ fail_peer:
+ return;
+}
+
+static void dht_kv_remove_expired_reqs(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ list_for_each_safe(p, h, &dht.reqs.list) {
+ struct dht_req * e;
+ e = list_entry(p, struct dht_req, next);
+ if (IS_EXPIRED(e, &now)) {
+ log_dbg(KEY_FMT " Removing expired request.",
+ KEY_VAL(e->key));
+ list_del(&e->next);
+ dht_req_destroy(e);
+ --dht.reqs.len;
+ }
+ }
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+}
+
+static void value_list_destroy(struct list_head * vl)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(vl != NULL);
+
+ list_for_each_safe(p, h, vl) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ }
+}
+
+#define MUST_REPLICATE(v, now) ((now)->tv_sec > (v)->t_repl + dht.t_repl)
+#define MUST_REPUBLISH(v, now) /* Close to expiry deadline */ \
+ (((v)->t_exp - (now)->tv_sec) < (DHT_N_REPUB * dht.t_repl))
+static void dht_entry_get_repl_lists(const struct dht_entry * e,
+ struct list_head * repl,
+ struct list_head * rebl,
+ struct timespec * now)
+{
+ struct list_head * p;
+ struct val_entry * n;
+
+ list_for_each(p, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (MUST_REPLICATE(v, now) && !IS_EXPIRED(v, now)) {
+ n = val_entry_create(v->val, v->t_exp);
+ if (n == NULL)
+ continue;
+
+ list_add_tail(&n->next, repl);
+ }
+ }
+
+ list_for_each(p, &e->lvals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (MUST_REPLICATE(v, now) && MUST_REPUBLISH(v, now)) {
+ /* Add expire time here, to allow creating val_entry */
+ n = val_entry_create(v->val, now->tv_sec + dht.t_expire);
+ if (n == NULL)
+ continue;
+
+ list_add_tail(&n->next, rebl);
+ }
+ }
+}
+
+static int dht_kv_next_values(uint8_t * key,
+ struct list_head * repl,
+ struct list_head * rebl)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ struct dht_entry * e = NULL;
+
+ assert(key != NULL);
+ assert(repl != NULL);
+ assert(rebl != NULL);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ assert(list_is_empty(repl));
+ assert(list_is_empty(rebl));
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ if (dht.db.kv.len == 0)
+ goto no_entries;
+
+ list_for_each_safe(p, h, &dht.db.kv.list) {
+ e = list_entry(p, struct dht_entry, next);
+ if (IS_CLOSER(e->key, key))
+ continue; /* Already processed */
+ }
+
+ if (e != NULL) {
+ memcpy(key, e->key, dht.id.len);
+ dht_entry_get_repl_lists(e, repl, rebl, &now);
+ }
+ no_entries:
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return list_is_empty(repl) && list_is_empty(rebl) ? -ENOENT : 0;
+}
+
+static void dht_kv_replicate_value(const uint8_t * key,
+ struct val_entry * v,
+ const struct timespec * now)
+{
+ assert(MUST_REPLICATE(v, now));
+
+ (void) now;
+
+ if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) {
+ log_dbg(KV_FMT " Replicated.", KV_VAL(key, v->val));
+ return;
+ }
+
+ log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val));
+
+ list_del(&v->next);
+ val_entry_destroy(v);
+}
+
+static void dht_kv_republish_value(const uint8_t * key,
+ struct val_entry * v,
+ const struct timespec * now)
+{
+ assert(MUST_REPLICATE(v, now));
+
+ if (MUST_REPUBLISH(v, now))
+ assert(v->t_exp >= now->tv_sec + dht.t_expire);
+
+ if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) {
+ log_dbg(KV_FMT " Republished.", KV_VAL(key, v->val));
+ return;
+ }
+
+ if (MUST_REPUBLISH(v, now))
+ log_warn(KV_FMT " Republish failed.", KV_VAL(key, v->val));
+ else
+ log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val));
+
+ list_del(&v->next);
+ val_entry_destroy(v);
+}
+
+static void dht_kv_update_replication_times(const uint8_t * key,
+ struct list_head * repl,
+ struct list_head * rebl,
+ const struct timespec * now)
+{
+ struct dht_entry * e;
+ struct list_head * p;
+ struct list_head * h;
+ struct val_entry * v;
+
+ assert(key != NULL);
+ assert(repl != NULL);
+ assert(rebl != NULL);
+ assert(now != NULL);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
+ if (e == NULL) {
+ pthread_rwlock_unlock(&dht.db.lock);
+ return;
+ }
+
+ list_for_each_safe(p, h, repl) {
+ struct val_entry * x;
+ v = list_entry(p, struct val_entry, next);
+ x = dht_entry_get_val(e, v->val);
+ if (x == NULL) {
+ log_err(KV_FMT " Not in vals.", KV_VAL(key, v->val));
+ continue;
+ }
+
+ x->t_repl = now->tv_sec;
+
+ list_del(&v->next);
+ val_entry_destroy(v);
+ }
+
+ list_for_each_safe(p, h, rebl) {
+ struct val_entry * x;
+ v = list_entry(p, struct val_entry, next);
+ x = dht_entry_get_lval(e, v->val);
+ if (x == NULL) {
+ log_err(KV_FMT " Not in lvals.", KV_VAL(key, v->val));
+ continue;
+ }
+
+ x->t_repl = now->tv_sec;
+ if (v->t_exp > x->t_exp) {
+ x->t_exp = v->t_exp; /* update expiration time */
+ }
+
+ list_del(&v->next);
+ val_entry_destroy(v);
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+}
+
+static void __cleanup_value_list(void * o)
+{
+ return value_list_destroy((struct list_head *) o);
+}
+
+static void dht_kv_replicate_values(const uint8_t * key,
+ struct list_head * repl,
+ struct list_head * rebl)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_cleanup_push(__cleanup_value_list, repl);
+ pthread_cleanup_push(__cleanup_value_list, rebl);
+
+ list_for_each_safe(p, h, repl) {
+ struct val_entry * v;
+ v = list_entry(p, struct val_entry, next);
+ dht_kv_replicate_value(key, v, &now);
+ }
+
+ list_for_each_safe(p, h, rebl) {
+ struct val_entry * v;
+ v = list_entry(p, struct val_entry, next);
+ dht_kv_republish_value(key, v, &now);
+ }
+
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+
+ /* removes non-replicated items from the list */
+ dht_kv_update_replication_times(key, repl, rebl, &now);
+
+ if (list_is_empty(repl) && list_is_empty(rebl))
+ return;
+
+ log_warn(KEY_FMT " Failed to update replication times.", KEY_VAL(key));
+}
+
+static void dht_kv_replicate(void)
+{
+ struct list_head repl; /* list of values to replicate */
+ struct list_head rebl; /* list of local values to republish */
+ uint8_t * key;
+
+ key = dht_dup_key(dht.id.data); /* dist == 0 */
+ if (key == NULL) {
+ log_err("Replicate: Failed to duplicate DHT ID.");
+ return;
+ }
+
+ list_head_init(&repl);
+ list_head_init(&rebl);
+
+ pthread_cleanup_push(free, key);
+
+ while (dht_kv_next_values(key, &repl, &rebl) == 0) {
+ dht_kv_replicate_values(key, &repl, &rebl);
+ if (!list_is_empty(&repl)) {
+ log_warn(KEY_FMT " Replication items left.",
+ KEY_VAL(key));
+ value_list_destroy(&repl);
+ }
+
+ if (!list_is_empty(&rebl)) {
+ log_warn(KEY_FMT " Republish items left.",
+ KEY_VAL(key));
+ value_list_destroy(&rebl);
+ }
+ }
+
+ pthread_cleanup_pop(true);
+}
+
+static void dht_kv_refresh_contacts(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct list_head rl; /* refresh list */
+ struct timespec now;
+
+ list_head_init(&rl);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ __dht_kv_bucket_refresh_list(dht.db.contacts.root, now.tv_sec, &rl);
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ list_for_each_safe(p, h, &rl) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ log_dbg(PEER_FMT " Refreshing contact.",
+ PEER_VAL(c->id, c->addr));
+ dht_kv_query_contacts(c->id, NULL);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+
+ assert(list_is_empty(&rl));
+}
+
+static void (*tasks[])(void) = {
+ dht_kv_check_contacts,
+ dht_kv_remove_expired_entries,
+ dht_kv_remove_expired_reqs,
+ dht_kv_replicate,
+ dht_kv_refresh_contacts,
+ NULL
+};
+
+static void * work(void * o)
+{
+ struct timespec now = TIMESPEC_INIT_MS(1);
+ time_t intv;
+ size_t n; /* number of tasks */
+
+ n = sizeof(tasks) / sizeof(tasks[0]) - 1; /* last is NULL */
+
+ (void) o;
+
+ while (dht_kv_seed_bootstrap_peer() == -EAGAIN) {
+ ts_add(&now, &now, &now); /* exponential backoff */
+ if (now.tv_sec > 1) /* cap at 1 second */
+ now.tv_sec = 1;
+ nanosleep(&now, NULL);
+ }
+
+ intv = gcd(dht.t_expire, (dht.t_expire - DHT_N_REPUB * dht.t_repl));
+ intv = gcd(intv, gcd(dht.t_repl, dht.t_refresh)) / 2;
+ intv = MAX(1, intv / n);
+
+ log_dbg("DHT worker starting %ld seconds interval.", intv * n);
+
+ while (true) {
+ int i = 0;
+ while (tasks[i] != NULL) {
+ tasks[i++]();
+ sleep(intv);
+ }
+ }
+
+ return (void *) 0;
+}
+
+int dht_start(void)
+{
+ dht.state = DHT_RUNNING;
+
+ if (tpm_start(dht.tpm))
+ goto fail_tpm_start;
+
+#ifndef __DHT_TEST__
+ if (pthread_create(&dht.worker, NULL, work, NULL)) {
+ log_err("Failed to create DHT worker thread.");
+ goto fail_worker;
+ }
+
+ dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT);
+ if ((int) dht.eid < 0) {
+ log_err("Failed to register DHT component.");
+ goto fail_reg;
+ }
+#else
+ (void) work;
+#endif
+ return 0;
+#ifndef __DHT_TEST__
+ fail_reg:
+ pthread_cancel(dht.worker);
+ pthread_join(dht.worker, NULL);
+ fail_worker:
+ tpm_stop(dht.tpm);
+#endif
+ fail_tpm_start:
+ dht.state = DHT_INIT;
+ return -1;
+}
+
+void dht_stop(void)
+{
+ assert(dht.state == DHT_RUNNING);
+
+#ifndef __DHT_TEST__
+ dt_unreg_comp(dht.eid);
+
+ pthread_cancel(dht.worker);
+ pthread_join(dht.worker, NULL);
+#endif
+ tpm_stop(dht.tpm);
+
+ dht.state = DHT_INIT;
+}
+
+int dht_init(struct dir_dht_config * conf)
+{
+ struct timespec now;
+ pthread_condattr_t cattr;
+
+ assert(conf != NULL);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+#ifndef __DHT_TEST__
+ dht.id.len = ipcp_dir_hash_len();
+ dht.addr = addr_auth_address();
+#else
+ dht.id.len = DHT_TEST_KEY_LEN;
+ dht.addr = DHT_TEST_ADDR;
+#endif
+ dht.t0 = now.tv_sec;
+ dht.alpha = conf->params.alpha;
+ dht.k = conf->params.k;
+ dht.t_expire = conf->params.t_expire;
+ dht.t_refresh = conf->params.t_refresh;
+ dht.t_repl = conf->params.t_replicate;
+ dht.peer = conf->peer;
+
+ dht.magic = generate_cookie();
+
+ /* Send my address on enrollment */
+ conf->peer = dht.addr;
+
+ dht.id.data = generate_id();
+ if (dht.id.data == NULL) {
+ log_err("Failed to create DHT ID.");
+ goto fail_id;
+ }
+
+ list_head_init(&dht.cmds.list);
+
+ if (pthread_mutex_init(&dht.cmds.mtx, NULL)) {
+ log_err("Failed to initialize command mutex.");
+ goto fail_cmds_mutex;
+ }
+
+ if (pthread_cond_init(&dht.cmds.cond, NULL)) {
+ log_err("Failed to initialize command condvar.");
+ goto fail_cmds_cond;
+ }
+
+ list_head_init(&dht.reqs.list);
+ dht.reqs.len = 0;
+
+ if (pthread_mutex_init(&dht.reqs.mtx, NULL)) {
+ log_err("Failed to initialize request mutex.");
+ goto fail_reqs_mutex;
+ }
+
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Failed to initialize request condvar attributes.");
+ goto fail_cattr;
+ }
+#ifndef __APPLE__
+ if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) {
+ log_err("Failed to set request condvar clock.");
+ goto fail_cattr;
+ }
+#endif
+ if (pthread_cond_init(&dht.reqs.cond, &cattr)) {
+ log_err("Failed to initialize request condvar.");
+ goto fail_reqs_cond;
+ }
+
+ list_head_init(&dht.db.kv.list);
+ dht.db.kv.len = 0;
+ dht.db.kv.vals = 0;
+ dht.db.kv.lvals = 0;
+
+ if (pthread_rwlock_init(&dht.db.lock, NULL)) {
+ log_err("Failed to initialize store rwlock.");
+ goto fail_rwlock;
+ }
+
+ dht.db.contacts.root = bucket_create();
+ if (dht.db.contacts.root == NULL) {
+ log_err("Failed to create DHT buckets.");
+ goto fail_buckets;
+ }
+
+ if (rib_reg(DHT, &r_ops) < 0) {
+ log_err("Failed to register DHT RIB operations.");
+ goto fail_rib_reg;
+ }
+
+ dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL);
+ if (dht.tpm == NULL) {
+ log_err("Failed to create TPM for DHT.");
+ goto fail_tpm_create;
+ }
+
+ if (dht_kv_update_contacts(dht.id.data, dht.addr) < 0)
+ log_warn("Failed to update contacts with DHT ID.");
+
+ pthread_condattr_destroy(&cattr);
+#ifndef __DHT_TEST__
+ log_info("DHT initialized.");
+ log_dbg(" ID: " HASH_FMT64 " [%zu bytes].",
+ HASH_VAL64(dht.id.data), dht.id.len);
+ log_dbg(" address: " ADDR_FMT32 ".", ADDR_VAL32(&dht.addr));
+ log_dbg(" peer: " ADDR_FMT32 ".", ADDR_VAL32(&dht.peer));
+ log_dbg(" magic cookie: " HASH_FMT64 ".", HASH_VAL64(&dht.magic));
+ log_info(" parameters: alpha=%u, k=%zu, t_expire=%ld, "
+ "t_refresh=%ld, t_replicate=%ld.",
+ dht.alpha, dht.k, dht.t_expire, dht.t_refresh, dht.t_repl);
+#endif
+ dht.state = DHT_INIT;
+
+ return 0;
+
+ fail_tpm_create:
+ rib_unreg(DHT);
+ fail_rib_reg:
+ bucket_destroy(dht.db.contacts.root);
+ fail_buckets:
+ pthread_rwlock_destroy(&dht.db.lock);
+ fail_rwlock:
+ pthread_cond_destroy(&dht.reqs.cond);
+ fail_reqs_cond:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&dht.reqs.mtx);
+ fail_reqs_mutex:
+ pthread_cond_destroy(&dht.cmds.cond);
+ fail_cmds_cond:
+ pthread_mutex_destroy(&dht.cmds.mtx);
+ fail_cmds_mutex:
+ freebuf(dht.id);
+ fail_id:
+ return -1;
+}
+
+void dht_fini(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ rib_unreg(DHT);
+
+ tpm_destroy(dht.tpm);
+
+ pthread_mutex_lock(&dht.cmds.mtx);
+
+ list_for_each_safe(p, h, &dht.cmds.list) {
+ struct cmd * c = list_entry(p, struct cmd, next);
+ list_del(&c->next);
+ freebuf(c->cbuf);
+ free(c);
+ }
+
+ pthread_mutex_unlock(&dht.cmds.mtx);
+
+ pthread_cond_destroy(&dht.cmds.cond);
+ pthread_mutex_destroy(&dht.cmds.mtx);
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ list_for_each_safe(p, h, &dht.reqs.list) {
+ struct dht_req * r = list_entry(p, struct dht_req, next);
+ list_del(&r->next);
+ dht_req_destroy(r);
+ dht.reqs.len--;
+ }
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ pthread_cond_destroy(&dht.reqs.cond);
+ pthread_mutex_destroy(&dht.reqs.mtx);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ list_for_each_safe(p, h, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ dht.db.kv.len--;
+ }
+
+ if (dht.db.contacts.root != NULL)
+ bucket_destroy(dht.db.contacts.root);
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ pthread_rwlock_destroy(&dht.db.lock);
+
+ assert(dht.db.kv.len == 0);
+ assert(dht.db.kv.vals == 0);
+ assert(dht.db.kv.lvals == 0);
+ assert(dht.reqs.len == 0);
+
+ freebuf(dht.id);
+}
diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h
new file mode 100644
index 00000000..852a5130
--- /dev/null
+++ b/src/ipcpd/unicast/dir/dht.h
@@ -0,0 +1,49 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Distributed Hash Table based on Kademlia
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_DIR_DHT_H
+#define OUROBOROS_IPCPD_UNICAST_DIR_DHT_H
+
+#include <ouroboros/ipcp-dev.h>
+
+#include "ops.h"
+
+#include <stdint.h>
+#include <sys/types.h>
+
+int dht_init(struct dir_dht_config * conf);
+
+void dht_fini(void);
+
+int dht_start(void);
+
+void dht_stop(void);
+
+int dht_reg(const uint8_t * key);
+
+int dht_unreg(const uint8_t * key);
+
+uint64_t dht_query(const uint8_t * key);
+
+extern struct dir_ops dht_dir_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_DIR_DHT_H */
diff --git a/src/ipcpd/unicast/dir/dht.proto b/src/ipcpd/unicast/dir/dht.proto
new file mode 100644
index 00000000..ea74805f
--- /dev/null
+++ b/src/ipcpd/unicast/dir/dht.proto
@@ -0,0 +1,58 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * DHT protocol, based on Kademlia
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+syntax = "proto2";
+
+message dht_contact_msg {
+ required bytes id = 1;
+ required uint64 addr = 2;
+}
+
+message dht_find_req_msg {
+ required uint64 cookie = 1;
+ required bytes key = 2;
+}
+
+message dht_find_node_rsp_msg {
+ required uint64 cookie = 1;
+ required bytes key = 2;
+ repeated dht_contact_msg contacts = 3;
+}
+
+message dht_find_value_rsp_msg {
+ repeated bytes values = 1;
+}
+
+message dht_store_msg {
+ required bytes key = 1;
+ required bytes val = 2;
+ required uint32 exp = 3;
+}
+
+message dht_msg {
+ required uint32 code = 1;
+ required dht_contact_msg src = 2;
+ optional dht_store_msg store = 3;
+ optional dht_find_req_msg find = 4;
+ optional dht_find_node_rsp_msg node = 5;
+ optional dht_find_value_rsp_msg val = 6;
+}
diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h
new file mode 100644
index 00000000..8c6e5eb5
--- /dev/null
+++ b/src/ipcpd/unicast/dir/ops.h
@@ -0,0 +1,42 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Directory policy ops
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H
+#define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H
+
+struct dir_ops {
+ int (* init)(void * config);
+
+ void (* fini)(void);
+
+ int (* start)(void);
+
+ void (* stop)(void);
+
+ int (* reg)(const uint8_t * hash);
+
+ int (* unreg)(const uint8_t * hash);
+
+ uint64_t (* query)(const uint8_t * hash);
+};
+
+#endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */
diff --git a/src/ipcpd/unicast/dir/pol.h b/src/ipcpd/unicast/dir/pol.h
new file mode 100644
index 00000000..eae4b2e7
--- /dev/null
+++ b/src/ipcpd/unicast/dir/pol.h
@@ -0,0 +1,23 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Directory policies
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include "dht.h"
diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt
new file mode 100644
index 00000000..897f1ec2
--- /dev/null
+++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt
@@ -0,0 +1,40 @@
+get_filename_component(CURRENT_SOURCE_PARENT_DIR
+ ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(CURRENT_BINARY_PARENT_DIR
+ ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR})
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+include_directories(${CURRENT_SOURCE_PARENT_DIR})
+include_directories(${CURRENT_BINARY_PARENT_DIR})
+
+include_directories(${CMAKE_SOURCE_DIR}/include)
+include_directories(${CMAKE_BINARY_DIR}/include)
+
+get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
+
+create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
+ # Add new tests here
+ dht_test.c
+ )
+
+protobuf_generate_c(DHT_PROTO_SRCS KAD_PROTO_HDRS ../dht.proto)
+add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests}
+ ${DHT_PROTO_SRCS})
+target_link_libraries(${PARENT_DIR}_test ouroboros-common)
+
+add_dependencies(check ${PARENT_DIR}_test)
+
+set(tests_to_run ${${PARENT_DIR}_tests})
+if(CMAKE_VERSION VERSION_LESS "3.29.0")
+ remove(tests_to_run test_suite.c)
+else ()
+ list(POP_FRONT tests_to_run)
+endif()
+
+foreach (test ${tests_to_run})
+ get_filename_component(test_name ${test} NAME_WE)
+ add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name})
+endforeach (test)
diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c
new file mode 100644
index 00000000..cb6b0f9f
--- /dev/null
+++ b/src/ipcpd/unicast/dir/tests/dht_test.c
@@ -0,0 +1,1925 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Unit tests of the DHT
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define __DHT_TEST__
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include <ouroboros/test.h>
+#include <ouroboros/list.h>
+#include <ouroboros/utils.h>
+
+#include "dht.pb-c.h"
+
+#include <assert.h>
+#include <inttypes.h>
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#define DHT_MAX_RAND_SIZE 64
+#define DHT_TEST_KEY_LEN 32
+#define DHT_TEST_ADDR 0x1234567890abcdefULL
+
+/* forward declare for use in the dht code */
+/* Packet sink for DHT tests */
+struct {
+ bool enabled;
+
+ struct list_head list;
+ size_t len;
+} sink;
+
+struct message {
+ struct list_head next;
+ void * msg;
+ uint64_t dst;
+};
+
+static int sink_send_msg(buffer_t * pkt,
+ uint64_t addr)
+{
+ struct message * m;
+
+ assert(pkt != NULL);
+ assert(addr != 0);
+
+ assert(!list_is_empty(&sink.list) || sink.len == 0);
+
+ if (!sink.enabled)
+ goto finish;
+
+ m = malloc(sizeof(*m));
+ if (m == NULL) {
+ printf("Failed to malloc message.");
+ goto fail_malloc;
+ }
+
+ m->msg = dht_msg__unpack(NULL, pkt->len, pkt->data);
+ if (m->msg == NULL)
+ goto fail_unpack;
+
+ m->dst = addr;
+
+ list_add_tail(&m->next, &sink.list);
+
+ ++sink.len;
+ finish:
+ freebuf(*pkt);
+
+ return 0;
+ fail_unpack:
+ free(m);
+ fail_malloc:
+ freebuf(*pkt);
+ return -1;
+}
+
+#include "dht.c"
+
+/* Test helpers */
+
+static void sink_init(void)
+{
+ list_head_init(&sink.list);
+ sink.len = 0;
+ sink.enabled = true;
+}
+
+static void sink_clear(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, &sink.list) {
+ struct message * m = list_entry(p, struct message, next);
+ list_del(&m->next);
+ dht_msg__free_unpacked((dht_msg_t *) m->msg, NULL);
+ free(m);
+ --sink.len;
+ }
+
+ assert(list_is_empty(&sink.list));
+}
+
+static void sink_fini(void)
+{
+ sink_clear();
+
+ assert(list_is_empty(&sink.list) || sink.len != 0);
+}
+
+static dht_msg_t * sink_read(void)
+{
+ struct message * m;
+ dht_msg_t * msg;
+
+ assert(!list_is_empty(&sink.list) || sink.len == 0);
+
+ if (list_is_empty(&sink.list))
+ return NULL;
+
+ m = list_first_entry(&sink.list, struct message, next);
+
+ --sink.len;
+
+ list_del(&m->next);
+
+ msg = m->msg;
+
+ free(m);
+
+ return (dht_msg_t *) msg;
+}
+
+static const buffer_t test_val = {
+ .data = (uint8_t *) "test_value",
+ .len = 10
+};
+
+static const buffer_t test_val2 = {
+ .data = (uint8_t *) "test_value_2",
+ .len = 12
+};
+
+static int random_value_len(buffer_t * b)
+{
+ assert(b != NULL);
+ assert(b->len > 0 && b->len <= DHT_MAX_RAND_SIZE);
+
+ b->data = malloc(b->len);
+ if (b->data == NULL)
+ goto fail_malloc;
+
+ random_buffer(b->data, b->len);
+
+ return 0;
+
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int random_value(buffer_t * b)
+{
+ assert(b != NULL);
+
+ b->len = rand() % DHT_MAX_RAND_SIZE + 1;
+
+ return random_value_len(b);
+}
+
+static int fill_dht_with_contacts(size_t n)
+{
+ size_t i;
+ uint8_t * id;
+
+ for (i = 0; i < n; i++) {
+ uint64_t addr = generate_cookie();
+ id = generate_id();
+ if (id == NULL)
+ goto fail_id;
+
+ if (dht_kv_update_contacts(id, addr) < 0)
+ goto fail_update;
+ free(id);
+ }
+
+ return 0;
+
+ fail_update:
+ free(id);
+ fail_id:
+ return -1;
+}
+
+static int fill_store_with_random_values(const uint8_t * key,
+ size_t len,
+ size_t n_values)
+{
+ buffer_t val;
+ struct timespec now;
+ size_t i;
+ uint8_t * _key;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ for (i = 0; i < n_values; ++i) {
+ if (key != NULL)
+ _key = (uint8_t *) key;
+ else {
+ _key = generate_id();
+ if (_key == NULL)
+ goto fail_key;
+ }
+
+ if (len == 0)
+ val.len = rand() % DHT_MAX_RAND_SIZE + 1;
+ else
+ val.len = len;
+
+ if (random_value_len(&val) < 0)
+ goto fail_value;
+
+ if (dht_kv_store(_key, val, now.tv_sec + 10) < 0)
+ goto fail_store;
+
+ freebuf(val);
+ if (key == NULL)
+ free(_key);
+ }
+
+ return 0;
+
+ fail_store:
+ freebuf(val);
+ fail_value:
+ free(_key);
+ fail_key:
+ return -1;
+}
+
+static int random_contact_list(dht_contact_msg_t *** contacts,
+ size_t max)
+{
+ size_t i;
+
+ assert(contacts != NULL);
+ assert(max > 0);
+
+ *contacts = malloc(max * sizeof(**contacts));
+ if (*contacts == NULL)
+ goto fail_malloc;
+
+ for (i = 0; i < max; i++) {
+ (*contacts)[i] = malloc(sizeof(*(*contacts)[i]));
+ if ((*contacts)[i] == NULL)
+ goto fail_contacts;
+
+ dht_contact_msg__init((*contacts)[i]);
+
+ (*contacts)[i]->id.data = generate_id();
+ if ((*contacts)[i]->id.data == NULL)
+ goto fail_contact;
+
+ (*contacts)[i]->id.len = dht.id.len;
+ (*contacts)[i]->addr = generate_cookie();
+ }
+
+ return 0;
+
+ fail_contact:
+ dht_contact_msg__free_unpacked((*contacts)[i], NULL);
+ fail_contacts:
+ while (i-- > 0)
+ free((*contacts)[i]);
+ free(*contacts);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static void clear_contacts(dht_contact_msg_t ** contacts,
+ size_t len)
+{
+ size_t i;
+
+ assert(contacts != NULL);
+ if (*contacts == NULL)
+ return;
+
+ for (i = 0; i < len; ++i)
+ dht_contact_msg__free_unpacked((contacts)[i], NULL);
+
+ free(*contacts);
+ *contacts = NULL;
+}
+
+/* Start of actual tests */
+static struct dir_dht_config test_dht_config = {
+ .params = {
+ .alpha = 3,
+ .k = 8,
+ .t_expire = 86400,
+ .t_refresh = 900,
+ .t_replicate = 900
+ }
+};
+
+static int test_dht_init_fini(void)
+{
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_start_stop(void)
+{
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (dht_start() < 0) {
+ printf("Failed to start dht.\n");
+ goto fail_start;
+ }
+
+ dht_stop();
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_start:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_val_entry_create_destroy(void)
+{
+ struct val_entry * e;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ e = val_entry_create(test_val, now.tv_sec + 10);
+ if (e == NULL) {
+ printf("Failed to create val entry.\n");
+ goto fail_entry;
+ }
+
+ val_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_entry_create_destroy(void)
+{
+ struct dht_entry * e;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ e = dht_entry_create(dht.id.data);
+ if (e == NULL) {
+ printf("Failed to create dht entry.\n");
+ goto fail_entry;
+ }
+
+ dht_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_entry_update_get_val(void)
+{
+ struct dht_entry * e;
+ struct val_entry * v;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ e = dht_entry_create(dht.id.data);
+ if (e == NULL) {
+ printf("Failed to create dht entry.\n");
+ goto fail_entry;
+ }
+
+ if (dht_entry_get_val(e, test_val) != NULL) {
+ printf("Found value in empty dht entry.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_val(e, test_val, now.tv_sec + 10) < 0) {
+ printf("Failed to update dht entry value.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_val(e, test_val2) != NULL) {
+ printf("Found value in dht entry with different key.\n");
+ goto fail_get;
+ }
+
+ v = dht_entry_get_val(e, test_val);
+ if (v == NULL) {
+ printf("Failed to get value from dht entry.\n");
+ goto fail_get;
+ }
+
+ if (v->val.len != test_val.len) {
+ printf("Length in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if(memcmp(v->val.data, test_val.data, test_val.len) != 0) {
+ printf("Data in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_val(e, test_val, now.tv_sec + 15) < 0) {
+ printf("Failed to update exsting dht entry value.\n");
+ goto fail_get;
+ }
+
+ if (v->t_exp != now.tv_sec + 15) {
+ printf("Expiration time in dht entry value not updated.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_val(e, test_val, now.tv_sec + 5) < 0) {
+ printf("Failed to update existing dht entry value (5).\n");
+ goto fail_get;
+ }
+
+ if (v->t_exp != now.tv_sec + 15) {
+ printf("Expiration time in dht entry shortened.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_val(e, test_val) != v) {
+ printf("Wrong value in dht entry found after update.\n");
+ goto fail_get;
+ }
+
+ dht_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get:
+ dht_entry_destroy(e);
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_entry_update_get_lval(void)
+{
+ struct dht_entry * e;
+ struct val_entry * v;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ e = dht_entry_create(dht.id.data);
+ if (e == NULL) {
+ printf("Failed to create dht entry.\n");
+ goto fail_entry;
+ }
+
+ if (dht_entry_get_lval(e, test_val) != NULL) {
+ printf("Found value in empty dht entry.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_lval(e, test_val) < 0) {
+ printf("Failed to update dht entry value.\n");
+ goto fail_get;
+ }
+
+ v = dht_entry_get_lval(e, test_val);
+ if (v== NULL) {
+ printf("Failed to get value from dht entry.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_lval(e, test_val2) != NULL) {
+ printf("Found value in dht entry in vals.\n");
+ goto fail_get;
+ }
+
+ if (v->val.len != test_val.len) {
+ printf("Length in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if(memcmp(v->val.data, test_val.data, test_val.len) != 0) {
+ printf("Data in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_lval(e, test_val) < 0) {
+ printf("Failed to update existing dht entry value.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_lval(e, test_val) != v) {
+ printf("Wrong value in dht entry found after update.\n");
+ goto fail_get;
+ }
+
+ dht_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get:
+ dht_entry_destroy(e);
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_contact_create_destroy(void)
+{
+ struct contact * c;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ c = contact_create(dht.id.data, dht.addr);
+ if (c == NULL) {
+ printf("Failed to create contact.\n");
+ goto fail_contact;
+ }
+
+ contact_destroy(c);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_contact:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_update_bucket(void)
+{
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(1000) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_update;
+ }
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_update:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_contact_list(void)
+{
+ struct list_head cl;
+ ssize_t len;
+ ssize_t items;
+
+ TEST_START();
+
+ list_head_init(&cl);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ items = 5;
+
+ if (fill_dht_with_contacts(items) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_contact_list(dht.id.data, &cl, dht.k);
+ if (len < 0) {
+ printf("Failed to get contact list.\n");
+ goto fail_fill;
+ }
+
+ if (len != items) {
+ printf("Failed to get contacts (%zu != %zu).\n", len, items);
+ goto fail_contact_list;
+ }
+
+ contact_list_destroy(&cl);
+
+ items = 100;
+
+ if (fill_dht_with_contacts(items) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_contact_list(dht.id.data, &cl, items);
+ if (len < 0) {
+ printf("Failed to get contact list.\n");
+ goto fail_fill;
+ }
+
+ if ((size_t) len < dht.k) {
+ printf("Failed to get contacts (%zu < %zu).\n", len, dht.k);
+ goto fail_contact_list;
+ }
+
+ contact_list_destroy(&cl);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_contact_list:
+ contact_list_destroy(&cl);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_get_values(void)
+{
+ buffer_t * vals;
+ ssize_t len;
+ size_t n = sizeof(uint64_t);
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_store_with_random_values(dht.id.data, n, 3) < 0) {
+ printf("Failed to fill store with random values.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_retrieve(dht.id.data, &vals);
+ if (len < 0) {
+ printf("Failed to get values from store.\n");
+ goto fail_fill;
+ }
+
+ if (len != 3) {
+ printf("Failed to get %ld values (%zu).\n", 3L, len);
+ goto fail_get_values;
+ }
+
+ freebufs(vals, len);
+
+ if (fill_store_with_random_values(dht.id.data, n, 20) < 0) {
+ printf("Failed to fill store with random values.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_retrieve(dht.id.data, &vals);
+ if (len < 0) {
+ printf("Failed to get values from store.\n");
+ goto fail_fill;
+ }
+
+ if (len != DHT_MAX_VALS) {
+ printf("Failed to get %d values.\n", DHT_MAX_VALS);
+ goto fail_get_values;
+ }
+
+ freebufs(vals, len);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get_values:
+ freebufs(vals, len);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_node_req_msg(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_node_req_msg(dht.id.data);
+ if (msg == NULL) {
+ printf("Failed to get find node request message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_NODE_REQ) {
+ printf("Wrong code in find_node_req message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_NODE_REQ]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_node_req.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_req buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_node_req message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_req message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_node_rsp_msg(void)
+{
+ dht_contact_msg_t ** contacts;
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, 0);
+ if (msg == NULL) {
+ printf("Failed to get find node response message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_NODE_RSP) {
+ printf("Wrong code in find_node_rsp message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_NODE_RSP]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_node_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_node_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_node_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_node_rsp_msg_contacts(void)
+{
+ dht_contact_msg_t ** contacts;
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ uint8_t * buf;
+ size_t len;
+ ssize_t n;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(100) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ n = dht_kv_get_contacts(dht.id.data, &contacts);
+ if (n < 0) {
+ printf("Failed to get contacts.\n");
+ goto fail_fill;
+ }
+
+ if ((size_t) n < dht.k) {
+ printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k);
+ goto fail_fill;
+ }
+
+ msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, n);
+ if (msg == NULL) {
+ printf("Failed to build find node response message.\n");
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_node_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_node_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_node_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ clear_contacts(contacts, n);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_req_msg(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_value_req_msg(dht.id.data);
+ if (msg == NULL) {
+ printf("Failed to build find value request message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_VALUE_REQ) {
+ printf("Wrong code in find_value_req message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_VALUE_REQ]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_req.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_req buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_req message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_req message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_rsp_msg(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, NULL, 0, NULL, 0);
+ if (msg == NULL) {
+ printf("Failed to build find value response message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_VALUE_RSP) {
+ printf("Wrong code in find_value_rsp message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_VALUE_RSP]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_value_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_rsp_msg_contacts(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+ dht_contact_msg_t ** contacts;
+ ssize_t n;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(100) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ n = dht_kv_get_contacts(dht.id.data, &contacts);
+ if (n < 0) {
+ printf("Failed to get contacts.\n");
+ goto fail_fill;
+ }
+
+ if ((size_t) n < dht.k) {
+ printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k);
+ goto fail_fill;
+ }
+
+ msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, &contacts, n, NULL, 0);
+ if (msg == NULL) {
+ printf("Failed to build find value response message.\n");
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_value_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ clear_contacts(contacts, n);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_rsp_msg_values(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+ buffer_t * values;
+ size_t i;
+ uint64_t ck;
+
+ TEST_START();
+
+ ck = generate_cookie();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ values = malloc(sizeof(*values) * 8);
+ if (values == NULL) {
+ printf("Failed to malloc values.\n");
+ goto fail_values;
+ }
+
+ for (i = 0; i < 8; i++) {
+ if (random_value(&values[i]) < 0) {
+ printf("Failed to create random value.\n");
+ goto fail_fill;
+ }
+ }
+
+ msg = dht_kv_find_value_rsp_msg(dht.id.data, ck, NULL, 0, &values, 8);
+ if (msg == NULL) {
+ printf("Failed to build find value response message.\n");
+ goto fail_msg;
+ }
+
+ values = NULL; /* msg owns the values now */
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_value_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ if (upk->code != DHT_FIND_VALUE_RSP) {
+ printf("Wrong code in find_value_rsp message (%s != %s).\n",
+ dht_code_str[upk->code],
+ dht_code_str[DHT_FIND_VALUE_RSP]);
+ goto fail_unpack;
+ }
+
+ if (upk->val == NULL) {
+ printf("No values in find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ if (upk->val->n_values != 8) {
+ printf("Not enough values in find_value_rsp (%zu != %lu).\n",
+ upk->val->n_values, 8UL);
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ free(values);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ fail_fill:
+ while((i--) > 0)
+ freebuf(values[i]);
+ free(values);
+ fail_values:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_store_msg(void)
+{
+ dht_msg_t * msg;
+ size_t len;
+ uint8_t * buf;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_store_msg(dht.id.data, test_val, now.tv_sec + 10);
+ if (msg == NULL) {
+ printf("Failed to get store message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_STORE) {
+ printf("Wrong code in store message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_STORE]);
+ goto fail_store_msg;
+ }
+
+ if (dht_kv_validate_msg(msg) < 0) {
+ printf("Failed to validate store message.\n");
+ goto fail_store_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed msg length.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc store msg buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack store message.\n");
+ goto fail_pack;
+ }
+
+ free(buf);
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_pack:
+ free(buf);
+ fail_store_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_query_contacts_req_rsp(void)
+{
+ dht_msg_t * req;
+ dht_msg_t * rsp;
+ dht_contact_msg_t ** contacts;
+ size_t len = 2;
+
+ uint8_t * key;
+
+ TEST_START();
+
+ sink_init();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(1) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_prep;
+ }
+
+ key = generate_id();
+ if (key == NULL) {
+ printf("Failed to generate key.\n");
+ goto fail_prep;
+ }
+
+ if (dht_kv_query_contacts(key, NULL) < 0) {
+ printf("Failed to query contacts.\n");
+ goto fail_query;
+ }
+
+ req = sink_read();
+ if (req == NULL) {
+ printf("Failed to read request from sink.\n");
+ goto fail_query;
+ }
+
+ if (dht_kv_validate_msg(req) < 0) {
+ printf("Failed to validate find node req.\n");
+ goto fail_val_req;
+ }
+
+ if (random_contact_list(&contacts, len) < 0) {
+ printf("Failed to create random contact.\n");
+ goto fail_val_req;
+ }
+
+ rsp = dht_kv_find_node_rsp_msg(key, req->find->cookie, &contacts, len);
+ if (rsp == NULL) {
+ printf("Failed to create find node response message.\n");
+ goto fail_rsp;
+ }
+
+ memcpy(rsp->src->id.data, dht.id.data, dht.id.len);
+ rsp->src->addr = generate_cookie();
+
+ if (dht_kv_validate_msg(rsp) < 0) {
+ printf("Failed to validate find node response message.\n");
+ goto fail_val_rsp;
+ }
+
+ do_dht_kv_find_node_rsp(rsp->node);
+
+ /* dht_contact_msg__free_unpacked(contacts[0], NULL); set to NULL */
+
+ free(contacts);
+
+ dht_msg__free_unpacked(rsp, NULL);
+
+ free(key);
+
+ dht_msg__free_unpacked(req, NULL);
+
+ sink_fini();
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_val_rsp:
+ dht_msg__free_unpacked(rsp, NULL);
+ fail_rsp:
+ while (len-- > 0)
+ dht_contact_msg__free_unpacked(contacts[len], NULL);
+ free(contacts);
+ fail_val_req:
+ dht_msg__free_unpacked(req, NULL);
+ fail_query:
+ free(key);
+ fail_prep:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_req_create_destroy(void)
+{
+ struct dht_req * req;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ req = dht_req_create(dht.id.data);
+ if (req == NULL) {
+ printf("Failed to create kad request.\n");
+ goto fail_req;
+ }
+
+ dht_req_destroy(req);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_req:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_reg_unreg(void)
+{
+ TEST_START();
+
+ sink_init();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (dht_reg(dht.id.data) < 0) {
+ printf("Failed to register own id.\n");
+ goto fail_reg;
+ }
+
+ if (sink.len != 0) {
+ printf("Packet sent without contacts!");
+ goto fail_msg;
+ }
+
+ if (dht_unreg(dht.id.data) < 0) {
+ printf("Failed to unregister own id.\n");
+ goto fail_msg;
+ }
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_msg:
+ dht_unreg(dht.id.data);
+ fail_reg:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_reg_unreg_contacts(void)
+{
+ dht_msg_t * msg;
+
+ TEST_START();
+
+ sink_init();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(4) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_reg;
+ }
+
+ if (dht_reg(dht.id.data) < 0) {
+ printf("Failed to register own id.\n");
+ goto fail_reg;
+ }
+
+ if (sink.len != dht.alpha) {
+ printf("Packet sent to too few contacts!\n");
+ goto fail_msg;
+ }
+
+ msg = sink_read();
+ if (msg == NULL) {
+ printf("Failed to read message from sink.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_STORE) {
+ printf("Wrong code in dht reg message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_STORE]);
+ goto fail_validation;
+ }
+
+ if (dht_kv_validate_msg(msg) < 0) {
+ printf("Failed to validate dht message.\n");
+ goto fail_validation;
+ }
+
+ if (dht_unreg(dht.id.data) < 0) {
+ printf("Failed to unregister own id.\n");
+ goto fail_validation;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_validation:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ sink_clear();
+ dht_unreg(dht.id.data);
+ fail_reg:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_reg_query_local(void)
+{
+ struct timespec now;
+ buffer_t test_addr;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (addr_to_buf(1234321, &test_addr) < 0) {
+ printf("Failed to convert test address to buffer.\n");
+ goto fail_buf;
+ }
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (dht_reg(dht.id.data) < 0) {
+ printf("Failed to register own id.\n");
+ goto fail_reg;
+ }
+
+ if (dht_query(dht.id.data) == dht.addr) {
+ printf("Succeeded to query own id.\n");
+ goto fail_get;
+ }
+
+ if (dht_kv_store(dht.id.data, test_addr, now.tv_sec + 5) < 0) {
+ printf("Failed to publish value.\n");
+ goto fail_get;
+ }
+
+ if (dht_query(dht.id.data) != 1234321) {
+ printf("Failed to return remote addr.\n");
+ goto fail_get;
+ }
+
+ if (dht_unreg(dht.id.data) < 0) {
+ printf("Failed to unregister own id.\n");
+ goto fail_get;
+ }
+
+ freebuf(test_addr);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get:
+ dht_unreg(dht.id.data);
+ fail_reg:
+ dht_fini();
+ fail_init:
+ freebuf(test_addr);
+ fail_buf:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_query(void)
+{
+ uint8_t * key;
+ struct dir_dht_config cfg;
+
+ TEST_START();
+
+ sink_init();
+
+ cfg = test_dht_config;
+ cfg.peer = generate_cookie();
+
+ if (dht_init(&cfg)) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ key = generate_id();
+ if (key == NULL) {
+ printf("Failed to generate key.\n");
+ goto fail_key;
+ }
+
+ if (dht_query(key) != INVALID_ADDR) {
+ printf("Succeeded to get address without contacts.\n");
+ goto fail_get;
+ }
+
+ if (sink.len != 0) {
+ printf("Packet sent without contacts!");
+ goto fail_test;
+ }
+
+ free(key);
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ sink_clear();
+ fail_get:
+ free(key);
+ fail_key:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_query_contacts(void)
+{
+ dht_msg_t * msg;
+ uint8_t * key;
+ struct dir_dht_config cfg;
+
+
+ TEST_START();
+
+ sink_init();
+
+ cfg = test_dht_config;
+ cfg.peer = generate_cookie();
+
+ if (dht_init(&cfg)) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(10) < 0) {
+ printf("Failed to fill with contacts!");
+ goto fail_contacts;
+ }
+
+ key = generate_id();
+ if (key == NULL) {
+ printf("Failed to generate key.");
+ goto fail_contacts;
+ }
+
+ if (dht_query(key) != INVALID_ADDR) {
+ printf("Succeeded to get address for random id.\n");
+ goto fail_query;
+ }
+
+ msg = sink_read();
+ if (msg == NULL) {
+ printf("Failed to read message.!\n");
+ goto fail_read;
+ }
+
+ if (dht_kv_validate_msg(msg) < 0) {
+ printf("Failed to validate dht message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_VALUE_REQ) {
+ printf("Failed to validate dht message.\n");
+ goto fail_msg;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ free(key);
+
+ sink_clear();
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_read:
+ sink_clear();
+ fail_query:
+ free(key);
+ fail_contacts:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ return TEST_RC_FAIL;
+}
+
+int dht_test(int argc,
+ char ** argv)
+{
+ int rc = 0;
+
+ (void) argc;
+ (void) argv;
+
+ rc |= test_dht_init_fini();
+ rc |= test_dht_start_stop();
+ rc |= test_val_entry_create_destroy();
+ rc |= test_dht_entry_create_destroy();
+ rc |= test_dht_entry_update_get_val();
+ rc |= test_dht_entry_update_get_lval();
+ rc |= test_dht_kv_contact_create_destroy();
+ rc |= test_dht_kv_contact_list();
+ rc |= test_dht_kv_update_bucket();
+ rc |= test_dht_kv_get_values();
+ rc |= test_dht_kv_find_node_req_msg();
+ rc |= test_dht_kv_find_node_rsp_msg();
+ rc |= test_dht_kv_find_node_rsp_msg_contacts();
+ rc |= test_dht_kv_query_contacts_req_rsp();
+ rc |= test_dht_kv_find_value_req_msg();
+ rc |= test_dht_kv_find_value_rsp_msg();
+ rc |= test_dht_kv_find_value_rsp_msg_contacts();
+ rc |= test_dht_kv_find_value_rsp_msg_values();
+ rc |= test_dht_kv_store_msg();
+ rc |= test_dht_req_create_destroy();
+ rc |= test_dht_reg_unreg();
+ rc |= test_dht_reg_unreg_contacts();
+ rc |= test_dht_reg_query_local();
+ rc |= test_dht_query();
+ rc |= test_dht_query_contacts();
+
+ return rc;
+}