summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dir
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2025-08-06 12:29:02 +0200
committerSander Vrijders <sander@ouroboros.rocks>2025-08-06 12:34:15 +0200
commitfa1af6aaed6a46acd0af1600f4c63e79fcf9ff84 (patch)
tree6c386340dab2af965f9ccfc9b5b6f6e97326a586 /src/ipcpd/unicast/dir
parenta5f6ab5af03d9be6f3412d4dff67748908799e21 (diff)
downloadouroboros-fa1af6aaed6a46acd0af1600f4c63e79fcf9ff84.tar.gz
ouroboros-fa1af6aaed6a46acd0af1600f4c63e79fcf9ff84.zip
ipcpd: Update DHT for unicast layer
This is a rewrite of the DHT for name-to-address resolution in the unicast layer. It is now integrated as a proper directory policy. The dir_wait_running function is removed, instead the a DHT peer is passed on during IPCP enrolment. Each DHT request/response gets a random 64-bit ID ('cookie'). DHT messages to the same peer are deduped, except in the case when the DHT is low on contacts. In that case, it will contact the per it received at enrolment for more contacts. To combat packet loss, these messages are not deduped by means of a 'magic cookie', chosen at random when the DHT starts. The DHT parameters (Kademlia) can be set using the configfile or the IRM command line tools: if DIRECTORY_POLICY == DHT [dht_alpha <search factor> (default: 3)] [dht_k <replication factor> (default: 8)] [dht_t_expire <expiration (s)> (default: 86400)] [dht_t_refresh <contact refresh (s)> (default: 900)] [dht_t_replicate <replication (s)> (default: 900)] This commit also adds support for a protocol debug level (PP). Protocol debugging for the DHT can be enabled using the DEBUG_PROTO_DHT build flag. The DHT has the following message types: DHT_STORE, sent to k peers. Not acknowledged. DHT_STORE --> [2861814146dbf9b5|ed:d9:e2:c4]. key: bcc236ab6ec69e65 [32 bytes] val: 00000000c4e2d9ed [8 bytes] exp: 2025-08-03 17:29:44 (UTC). DHT_FIND_NODE_REQ, sent to 'alpha' peers, with a corresponding response. This is used to update the peer routing table to iteratively look for the nodes with IDs closest to the requested key. DHT_FIND_NODE_REQ --> [a62f92abffb451c4|ed:d9:e2:c4]. cookie: 2d4b7acef8308210 key: a62f92abffb451c4 [32 bytes] DHT_FIND_NODE_RSP <-- [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 2d4b7acef8308210 key: a62f92abffb451c4 [32 bytes] contacts: [1] [a62f92abffb451c4|9f:0d:c1:fb] DHT_FIND_VALUE_REQ, sent to 'k' peers, with a corresponding response. Used to find a value for a key. Will also send its closest known peers in the response. DHT_FIND_VALUE_REQ --> [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 80a1adcb09a2ff0a key: 42dee3b0415b4f69 [32 bytes] DHT_FIND_VALUE_RSP <-- [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 80a1adcb09a2ff0a key: 42dee3b0415b4f69 [32 bytes] values: [1] 00000000c4e2d9ed [8 bytes] contacts: [1] [a62f92abffb451c4|9f:0d:c1:fb] Also removes ubuntu 20 from appveyor config as it is not supported anymore. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/unicast/dir')
-rw-r--r--src/ipcpd/unicast/dir/dht.c4899
-rw-r--r--src/ipcpd/unicast/dir/dht.h17
-rw-r--r--src/ipcpd/unicast/dir/dht.proto41
-rw-r--r--src/ipcpd/unicast/dir/ops.h18
-rw-r--r--src/ipcpd/unicast/dir/tests/CMakeLists.txt3
-rw-r--r--src/ipcpd/unicast/dir/tests/dht_test.c1914
6 files changed, 4932 insertions, 1960 deletions
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c
index da39e567..c7205505 100644
--- a/src/ipcpd/unicast/dir/dht.c
+++ b/src/ipcpd/unicast/dir/dht.c
@@ -4,7 +4,6 @@
* Distributed Hash Table based on Kademlia
*
* Dimitri Staessens <dimitri@ouroboros.rocks>
- * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -20,10 +19,12 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#if defined(__linux__) || defined(__CYGWIN__)
-#define _DEFAULT_SOURCE
-#else
-#define _POSIX_C_SOURCE 200112L
+#if !defined (__DHT_TEST__)
+ #if defined(__linux__) || defined(__CYGWIN__)
+ #define _DEFAULT_SOURCE
+ #else
+ #define _POSIX_C_SOURCE 200112L
+ #endif
#endif
#include "config.h"
@@ -40,6 +41,7 @@
#include <ouroboros/list.h>
#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
+#include <ouroboros/rib.h>
#include <ouroboros/time.h>
#include <ouroboros/tpm.h>
#include <ouroboros/utils.h>
@@ -59,143 +61,153 @@
#include <limits.h>
#include "dht.pb-c.h"
-typedef DhtMsg dht_msg_t;
-typedef DhtContactMsg dht_contact_msg_t;
+typedef DhtMsg dht_msg_t;
+typedef DhtContactMsg dht_contact_msg_t;
+typedef DhtStoreMsg dht_store_msg_t;
+typedef DhtFindReqMsg dht_find_req_msg_t;
+typedef DhtFindNodeRspMsg dht_find_node_rsp_msg_t;
+typedef DhtFindValueRspMsg dht_find_value_rsp_msg_t;
+typedef ProtobufCBinaryData binary_data_t;
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
-#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */
-#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */
-#define KAD_K 8 /* Replication factor, MDHT value. */
-#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */
-#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */
-#define KAD_T_JOIN 8 /* Response time to wait for a join. */
-#define KAD_T_RESP 5 /* Response time to wait for a response. */
-#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */
-#define KAD_QUEER 15 /* Time to declare peer questionable. */
-#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
-#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
-#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */
-#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
+#define DHT_MAX_REQS 128 /* KAD recommends rnd(), bmp can be changed. */
+#define DHT_WARN_REQS 100 /* Warn if number of requests exceeds this. */
+#define DHT_MAX_VALS 8 /* Max number of values to return for a key. */
+#define DHT_T_CACHE 60 /* Max cache time for values (s) */
+#define DHT_T_RESP 2 /* Response time to wait for a response (s). */
+#define DHT_N_REPUB 5 /* Republish if expiry within n replications. */
+#define DHT_R_PING 2 /* Ping retries before declaring peer dead. */
+#define DHT_QUEER 15 /* Time to declare peer questionable. */
+#define DHT_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
+#define DHT_RESP_RETR 6 /* Number of retries on sending a response. */
#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */
-#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */
+#define DHT_INVALID 0 /* Invalid cookie value. */
-enum dht_state {
- DHT_INIT = 0,
- DHT_SHUTDOWN,
- DHT_JOINING,
- DHT_RUNNING,
-};
+#define KEY_FMT "K<" HASH_FMT64 ">"
+#define KEY_VAL(key) HASH_VAL64(key)
-enum kad_code {
- KAD_JOIN = 0,
- KAD_FIND_NODE,
- KAD_FIND_VALUE,
- /* Messages without a response below. */
- KAD_STORE,
- KAD_RESPONSE
-};
+#define VAL_FMT "V<" HASH_FMT64 ">"
+#define VAL_VAL(val) HASH_VAL64((val).data)
-enum kad_req_state {
- REQ_NULL = 0,
- REQ_INIT,
- REQ_PENDING,
- REQ_RESPONSE,
- REQ_DONE,
- REQ_DESTROY
-};
+#define KV_FMT "<" HASH_FMT64 ", " HASH_FMT64 ">"
+#define KV_VAL(key, val) HASH_VAL64(key), HASH_VAL64((val).data)
-enum lookup_state {
- LU_NULL = 0,
- LU_INIT,
- LU_PENDING,
- LU_UPDATE,
- LU_COMPLETE,
- LU_DESTROY
-};
+#define PEER_FMT "[" HASH_FMT64 "|" ADDR_FMT32 "]"
+#define PEER_VAL(id, addr) HASH_VAL64(id), ADDR_VAL32(&(addr))
-struct kad_req {
- struct list_head next;
+#define DHT_CODE(msg) dht_code_str[(msg)->code]
- uint32_t cookie;
- enum kad_code code;
- uint8_t * key;
- uint64_t addr;
+#define TX_HDR_FMT "%s --> " PEER_FMT
+#define TX_HDR_VAL(msg, id, addr) DHT_CODE(msg), PEER_VAL(id, addr)
- enum kad_req_state state;
- pthread_cond_t cond;
- pthread_mutex_t lock;
+#define RX_HDR_FMT "%s <-- " PEER_FMT
+#define RX_HDR_VAL(msg) DHT_CODE(msg), \
+ PEER_VAL(msg->src->id.data, msg->src->addr)
- time_t t_exp;
+#define CK_FMT "|" HASH_FMT64 "|"
+#define CK_VAL(cookie) HASH_VAL64(&(cookie))
+
+#define IS_REQUEST(code) \
+ (code == DHT_FIND_NODE_REQ || code == DHT_FIND_VALUE_REQ)
+
+enum dht_code {
+ DHT_STORE,
+ DHT_FIND_NODE_REQ,
+ DHT_FIND_NODE_RSP,
+ DHT_FIND_VALUE_REQ,
+ DHT_FIND_VALUE_RSP
};
-struct cookie_el {
- struct list_head next;
+const char * dht_code_str[] = {
+ "DHT_STORE",
+ "DHT_FIND_NODE_REQ",
+ "DHT_FIND_NODE_RSP",
+ "DHT_FIND_VALUE_REQ",
+ "DHT_FIND_VALUE_RSP"
+};
- uint32_t cookie;
+enum dht_state {
+ DHT_NULL = 0,
+ DHT_INIT,
+ DHT_RUNNING
};
-struct lookup {
- struct list_head next;
+struct val_entry {
+ struct list_head next;
- struct list_head cookies;
+ buffer_t val;
- uint8_t * key;
+ time_t t_exp; /* Expiry time */
+ time_t t_repl; /* Last replication time */
+};
+
+struct dht_entry {
+ struct list_head next;
- struct list_head contacts;
- size_t n_contacts;
+ uint8_t * key;
- uint64_t * addrs;
- size_t n_addrs;
+ struct {
+ struct list_head list;
+ size_t len;
+ } vals; /* We don't own these, only replicate */
- enum lookup_state state;
- pthread_cond_t cond;
- pthread_mutex_t lock;
+ struct {
+ struct list_head list;
+ size_t len;
+ } lvals; /* We own these, must be republished */
};
-struct val {
+struct contact {
struct list_head next;
+ uint8_t * id;
uint64_t addr;
- time_t t_exp;
- time_t t_rep;
+ size_t fails;
+ time_t t_seen;
};
-struct ref_entry {
+struct peer_entry {
struct list_head next;
- uint8_t * key;
+ uint64_t cookie;
+ uint8_t * id;
+ uint64_t addr;
+ enum dht_code code;
- time_t t_rep;
+ time_t t_sent;
};
-struct dht_entry {
+struct dht_req {
struct list_head next;
uint8_t * key;
- size_t n_vals;
- struct list_head vals;
-};
-
-struct contact {
- struct list_head next;
+ time_t t_exp;
- uint8_t * id;
- uint64_t addr;
+ struct {
+ struct list_head list;
+ size_t len;
+ } peers;
- size_t fails;
- time_t t_seen;
+ struct {
+ struct list_head list;
+ size_t len;
+ } cache;
};
struct bucket {
- struct list_head contacts;
- size_t n_contacts;
+ struct {
+ struct list_head list;
+ size_t len;
+ } contacts;
- struct list_head alts;
- size_t n_alts;
+ struct {
+ struct list_head list;
+ size_t len;
+ } alts;
time_t t_refr;
@@ -203,1093 +215,1461 @@ struct bucket {
uint8_t mask;
struct bucket * parent;
- struct bucket * children[1L << KAD_BETA];
+ struct bucket * children[1L << DHT_BETA];
};
struct cmd {
- struct list_head next;
-
- struct shm_du_buff * sdb;
+ struct list_head next;
+ buffer_t cbuf;
};
struct dir_ops dht_dir_ops = {
- .create = dht_create,
- .destroy = dht_destroy,
- .bootstrap = dht_bootstrap,
- .reg = dht_reg,
- .unreg = dht_unreg,
- .query = dht_query,
- .wait_running = dht_wait_running
+ .init = (int (*)(void *)) dht_init,
+ .fini = dht_fini,
+ .start = dht_start,
+ .stop = dht_stop,
+ .reg = dht_reg,
+ .unreg = dht_unreg,
+ .query = dht_query
};
-struct dht {
- size_t alpha;
- size_t b;
- size_t k;
+struct {
+ struct { /* Kademlia parameters */
+ uint32_t alpha; /* Number of concurrent requests */
+ size_t k; /* Number of replicas to store */
+ time_t t_expire; /* Expiry time for values (s) */
+ time_t t_refresh; /* Refresh time for contacts (s) */
+ time_t t_repl; /* Replication time for values (s) */
+ };
- time_t t_expire;
- time_t t_refresh;
- time_t t_replic;
- time_t t_repub;
+ buffer_t id;
- uint8_t * id;
- uint64_t addr;
+ time_t t0; /* Creation time */
+ uint64_t addr; /* Our own address */
+ uint64_t peer; /* Enrollment peer address */
+ uint64_t magic; /* Magic cookie for retransmit */
- struct bucket * buckets;
+ uint64_t eid; /* Entity ID */
- struct list_head entries;
+ struct tpm * tpm;
+ pthread_t worker;
- struct list_head refs;
+ enum dht_state state;
- struct list_head lookups;
+ struct {
+ struct {
+ struct bucket * root;
+ } contacts;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ size_t vals;
+ size_t lvals;
+ } kv;
+
+ pthread_rwlock_t lock;
+ } db;
+
+ struct {
+ struct list_head list;
+ size_t len;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ } reqs;
+
+ struct {
+ struct list_head list;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ } cmds;
+} dht;
+
+
+/* DHT RIB */
+
+static const char * dht_dir[] = {
+ "database",
+ "stats",
+ NULL
+};
- struct list_head requests;
- struct bmp * cookies;
+const char * dht_stats = \
+ "DHT: " HASH_FMT64 "\n"
+ " Created: %s\n"
+ " Address: " ADDR_FMT32 "\n"
+ " Kademlia parameters:\n"
+ " Number of concurrent requests (alpha): %10zu\n"
+ " Number of replicas (k): %10zu\n"
+ " Expiry time for values (s): %10ld\n"
+ " Refresh time for contacts (s): %10ld\n"
+ " Replication time for values (s): %10ld\n"
+ " Number of keys: %10zu\n"
+ " Number of local values: %10zu\n"
+ " Number of non-local values: %10zu\n";
- enum dht_state state;
- struct list_head cmds;
- pthread_cond_t cond;
- pthread_mutex_t mtx;
+static int dht_rib_statfile(char * buf,
+ size_t len)
+{
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ size_t keys;
+ size_t vals;
+ size_t lvals;
- pthread_rwlock_t lock;
+ assert(buf != NULL);
+ assert(len > 0);
- uint64_t eid;
+ pthread_rwlock_rdlock(&dht.db.lock);
- struct tpm * tpm;
+ keys = dht.db.kv.len;
+ lvals = dht.db.kv.lvals;
+ vals = dht.db.kv.vals;
- pthread_t worker;
-};
+ pthread_rwlock_unlock(&dht.db.lock);
-struct join_info {
- struct dht * dht;
- uint64_t addr;
-};
+ tm = gmtime(&dht.t0);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
-struct packet_info {
- struct dht * dht;
- struct shm_du_buff * sdb;
-};
+ snprintf(buf, len, dht_stats,
+ HASH_VAL64(dht.id.data),
+ tmstr,
+ ADDR_VAL32(&dht.addr),
+ dht.alpha, dht.k,
+ dht.t_expire, dht.t_refresh, dht.t_repl,
+ keys, vals, lvals);
+
+ return strlen(buf);
+}
-static uint8_t * dht_dup_key(const uint8_t * key,
- size_t len)
+static size_t dht_db_file_len(void)
{
- uint8_t * dup;
+ size_t sz;
+ size_t vals;
- dup = malloc(sizeof(*dup) * len);
- if (dup == NULL)
- return NULL;
+ sz = 18; /* DHT database + 2 * \n */
- memcpy(dup, key, len);
+ pthread_rwlock_rdlock(&dht.db.lock);
- return dup;
-}
+ if (dht.db.kv.len == 0) {
+ pthread_rwlock_unlock(&dht.db.lock);
+ sz += 14; /* No entries */
+ return sz;
+ }
-static enum dht_state dht_get_state(struct dht * dht)
-{
- enum dht_state state;
+ sz += 39 * 3 + 1; /* tally + extra newline */
+ sz += dht.db.kv.len * (25 + 19 + 23 + 1);
- pthread_mutex_lock(&dht->mtx);
+ vals = dht.db.kv.vals + dht.db.kv.lvals;
- state = dht->state;
+ sz += vals * (48 + 2 * RIB_TM_STRLEN);
- pthread_mutex_unlock(&dht->mtx);
+ pthread_rwlock_unlock(&dht.db.lock);
- return state;
+ return sz;
}
-static int dht_set_state(struct dht * dht,
- enum dht_state state)
+static int dht_rib_dbfile(char * buf,
+ size_t len)
{
- pthread_mutex_lock(&dht->mtx);
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ char exstr[RIB_TM_STRLEN];
+ size_t i = 0;
+ struct list_head * p;
- if (state == DHT_JOINING && dht->state != DHT_INIT) {
- pthread_mutex_unlock(&dht->mtx);
- return -1;
+ assert(buf != NULL);
+ assert(len > 0);
+
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ if (dht.db.kv.len == 0) {
+ i += snprintf(buf, len, " No entries.\n");
+ pthread_rwlock_unlock(&dht.db.lock);
+ return i;
}
- dht->state = state;
+ i += snprintf(buf + i, len - i, "DHT database:\n\n");
+ i += snprintf(buf + i, len - i,
+ "Number of keys: %10zu\n"
+ "Number of local values: %10zu\n"
+ "Number of non-local values: %10zu\n\n",
+ dht.db.kv.len, dht.db.kv.vals, dht.db.kv.lvals);
+
+ list_for_each(p, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ struct list_head * h;
+
+ i += snprintf(buf + i, len - i, "Key: " KEY_FMT "\n",
+ KEY_VAL(e->key));
+ i += snprintf(buf + i, len - i, " Local entries:\n");
+
+ list_for_each(h, &e->vals.list) {
+ struct val_entry * v;
+
+ v = list_entry(h, struct val_entry, next);
+
+ tm = gmtime(&v->t_repl);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ tm = gmtime(&v->t_exp);
+ strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm);
+
+ i += snprintf(buf + i, len - i,
+ " " VAL_FMT
+ ", t_replicated=%.*s, t_expire=%.*s\n",
+ VAL_VAL(v->val),
+ RIB_TM_STRLEN, tmstr,
+ RIB_TM_STRLEN, exstr);
+ }
- pthread_cond_broadcast(&dht->cond);
+ i += snprintf(buf + i, len - i, "\n");
- pthread_mutex_unlock(&dht->mtx);
+ i += snprintf(buf + i, len - i, " Non-local entries:\n");
+
+ list_for_each(h, &e->lvals.list) {
+ struct val_entry * v;
+
+ v= list_entry(h, struct val_entry, next);
+
+ tm = gmtime(&v->t_repl);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ tm = gmtime(&v->t_exp);
+ strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm);
+
+ i += snprintf(buf + i, len - i,
+ " " VAL_FMT
+ ", t_replicated=%.*s, t_expire=%.*s\n",
+ VAL_VAL(v->val),
+ RIB_TM_STRLEN, tmstr,
+ RIB_TM_STRLEN, exstr);
+
+ }
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ printf("DHT RIB DB file generated (%zu bytes).\n", i);
+
+ return i;
+}
+
+static int dht_rib_read(const char * path,
+ char * buf,
+ size_t len)
+{
+ char * entry;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+
+ if (strcmp(entry, "database") == 0) {
+ return dht_rib_dbfile(buf, len);
+ } else if (strcmp(entry, "stats") == 0) {
+ return dht_rib_statfile(buf, len);
+ }
return 0;
}
-int dht_wait_running(void * dir)
+static int dht_rib_readdir(char *** buf)
{
- struct dht * dht;
- int ret = 0;
+ int i = 0;
+
+ while (dht_dir[i++] != NULL);
+
+ *buf = malloc(sizeof(**buf) * i);
+ if (*buf == NULL)
+ goto fail_buf;
+
+ i = 0;
+
+ while (dht_dir[i] != NULL) {
+ (*buf)[i] = strdup(dht_dir[i]);
+ if ((*buf)[i] == NULL)
+ goto fail_dup;
+ i++;
+ }
- dht = (struct dht *) dir;
+ return i;
+ fail_dup:
+ freepp(char, *buf, i);
+ fail_buf:
+ return -ENOMEM;
+}
- pthread_mutex_lock(&dht->mtx);
+static int dht_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ struct timespec now;
+ char * entry;
- pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- while (dht->state == DHT_JOINING)
- pthread_cond_wait(&dht->cond, &dht->mtx);
+ attr->mtime = now.tv_sec;
- if (dht->state != DHT_RUNNING)
- ret = -1;
+ entry = strstr(path, RIB_SEPARATOR) + 1;
- pthread_cleanup_pop(true);
+ if (strcmp(entry, "database") == 0) {
+ attr->size = dht_db_file_len();
+ } else if (strcmp(entry, "stats") == 0) {
+ attr->size = 545;
+ }
- return ret;
+ return 0;
}
-static uint8_t * create_id(size_t len)
+static struct rib_ops r_ops = {
+ .read = dht_rib_read,
+ .readdir = dht_rib_readdir,
+ .getattr = dht_rib_getattr
+};
+
+/* Helper functions */
+
+static uint8_t * generate_id(void)
{
uint8_t * id;
- id = malloc(len);
- if (id == NULL)
+ if(dht.id.len < sizeof(uint64_t)) {
+ log_err("DHT ID length is too short (%zu < %zu).",
+ dht.id.len, sizeof(uint64_t));
return NULL;
+ };
- if (random_buffer(id, len) < 0) {
- free(id);
- return NULL;
+ id = malloc(dht.id.len);
+ if (id == NULL) {
+ log_err("Failed to malloc ID.");
+ goto fail_id;
+ }
+
+ if (random_buffer(id, dht.id.len) < 0) {
+ log_err("Failed to generate random ID.");
+ goto fail_rnd;
}
return id;
+ fail_rnd:
+ free(id);
+ fail_id:
+ return NULL;
}
-static void kad_req_create(struct dht * dht,
- dht_msg_t * msg,
- uint64_t addr)
+static uint64_t generate_cookie(void)
{
- struct kad_req * req;
- pthread_condattr_t cattr;
- struct timespec t;
- size_t b;
+ uint64_t cookie = DHT_INVALID;
- clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ while (cookie == DHT_INVALID)
+ random_buffer((uint8_t *) &cookie, sizeof(cookie));
- req = malloc(sizeof(*req));
- if (req == NULL)
- goto fail_malloc;
+ return cookie;
+}
- list_head_init(&req->next);
+/*
+ * If someone builds a network where the n (n > k) closest nodes all
+ * have IDs starting with the same 64 bits: by all means, change this.
+ */
+static uint64_t dist(const uint8_t * src,
+ const uint8_t * dst)
+{
+ assert(dht.id.len >= sizeof(uint64_t));
- req->t_exp = t.tv_sec + KAD_T_RESP;
- req->addr = addr;
- req->state = REQ_INIT;
- req->cookie = msg->cookie;
- req->code = msg->code;
- req->key = NULL;
+ return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst));
+}
- pthread_rwlock_rdlock(&dht->lock);
- b = dht->b;
- pthread_rwlock_unlock(&dht->lock);
+#define IS_CLOSER(x, y) (dist((x), dht.id.data) < dist((y), dht.id.data))
- if (msg->has_key) {
- req->key = dht_dup_key(msg->key.data, b);
- if (req->key == NULL)
- goto fail_dup_key;
- }
+static int addr_to_buf(const uint64_t addr,
+ buffer_t * buf)
+{
+ size_t len;
+ uint64_t _addr;
- if (pthread_mutex_init(&req->lock, NULL))
- goto fail_mutex;
+ len = sizeof(addr);
+ _addr = hton64(addr);
+ assert(buf != NULL);
- if (pthread_condattr_init(&cattr))
- goto fail_condattr;
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
+ buf->data = malloc(len);
+ if (buf->data == NULL)
+ goto fail_malloc;
- if (pthread_cond_init(&req->cond, &cattr))
- goto fail_cond_init;
+ buf->len = sizeof(_addr);
+ memcpy(buf->data, &_addr, sizeof(_addr));
- pthread_condattr_destroy(&cattr);
+ return 0;
+ fail_malloc:
+ return -ENOMEM;
+}
- pthread_rwlock_wrlock(&dht->lock);
+static int buf_to_addr(const buffer_t buf,
+ uint64_t * addr)
+{
+ assert(addr != NULL);
+ assert(buf.data != NULL);
- list_add(&req->next, &dht->requests);
+ if (buf.len != sizeof(*addr))
+ return - EINVAL;
- pthread_rwlock_unlock(&dht->lock);
+ *addr = ntoh64(*((uint64_t *) buf.data));
- return;
+ if (*addr == dht.addr)
+ *addr = INVALID_ADDR;
- fail_cond_init:
- pthread_condattr_destroy(&cattr);
- fail_condattr:
- pthread_mutex_destroy(&req->lock);
- fail_mutex:
- free(req->key);
- fail_dup_key:
- free(req);
- fail_malloc:
- return;
+ return 0;
}
-static void cancel_req_destroy(void * o)
+static uint8_t * dht_dup_key(const uint8_t * key)
{
- struct kad_req * req = (struct kad_req *) o;
+ uint8_t * dup;
- pthread_mutex_unlock(&req->lock);
+ assert(key != NULL);
+ assert(dht.id.len != 0);
- pthread_cond_destroy(&req->cond);
- pthread_mutex_destroy(&req->lock);
+ dup = malloc(dht.id.len);
+ if (dup == NULL)
+ return NULL;
- if (req->key != NULL)
- free(req->key);
+ memcpy(dup, key, dht.id.len);
- free(req);
+ return dup;
}
-static void kad_req_destroy(struct kad_req * req)
+/* DHT */
+
+static struct val_entry * val_entry_create(const buffer_t val,
+ time_t exp)
{
- struct timespec t;
- struct timespec intv = TIMESPEC_INIT_S(20);
+ struct val_entry * e;
+ struct timespec now;
- clock_gettime(PTHREAD_COND_CLOCK, &t);
- ts_add(&t, &intv, &t);
+ assert(val.data != NULL);
+ assert(val.len > 0);
- assert(req);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- pthread_mutex_lock(&req->lock);
+#ifndef __DHT_TEST_ALLOW_EXPIRED__
+ if (exp < now.tv_sec)
+ return NULL; /* Refuse to add expired values */
+#endif
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ goto fail_entry;
- switch (req->state) {
- case REQ_DESTROY:
- pthread_mutex_unlock(&req->lock);
- return;
- case REQ_PENDING:
- req->state = REQ_DESTROY;
- pthread_cond_broadcast(&req->cond);
- break;
- case REQ_INIT:
- case REQ_DONE:
- req->state = REQ_NULL;
- break;
- case REQ_RESPONSE:
- case REQ_NULL:
- default:
- break;
- }
+ list_head_init(&e->next);
- pthread_cleanup_push(cancel_req_destroy, req);
+ e->val.len = val.len;
+ e->val.data = malloc(val.len);
+ if (e->val.data == NULL)
+ goto fail_val;
- while (req->state != REQ_NULL && req->state != REQ_DONE)
- pthread_cond_timedwait(&req->cond, &req->lock, &t);
+ memcpy(e->val.data, val.data, val.len);
- pthread_cleanup_pop(true);
+ e->t_repl = 0;
+ e->t_exp = exp;
+
+ return e;
+
+ fail_val:
+ free(e);
+ fail_entry:
+ return NULL;
}
-static int kad_req_wait(struct kad_req * req,
- time_t t)
+static void val_entry_destroy(struct val_entry * v)
{
- struct timespec timeo = TIMESPEC_INIT_S(0);
- struct timespec abs;
- int ret = 0;
+ assert(v->val.data != NULL);
- assert(req);
+ freebuf(v->val);
+ free(v);
+}
- timeo.tv_sec = t;
+static struct dht_entry * dht_entry_create(const uint8_t * key)
+{
+ struct dht_entry * e;
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
+ assert(key != NULL);
- ts_add(&abs, &timeo, &abs);
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ goto fail_entry;
- pthread_mutex_lock(&req->lock);
+ list_head_init(&e->next);
+ list_head_init(&e->vals.list);
+ list_head_init(&e->lvals.list);
- req->state = REQ_PENDING;
+ e->vals.len = 0;
+ e->lvals.len = 0;
- pthread_cleanup_push(__cleanup_mutex_unlock, &req->lock);
+ e->key = dht_dup_key(key);
+ if (e->key == NULL)
+ goto fail_key;
- while (req->state == REQ_PENDING && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs);
+ return e;
+ fail_key:
+ free(e);
+ fail_entry:
+ return NULL;
+}
- switch(req->state) {
- case REQ_DESTROY:
- ret = -1;
- req->state = REQ_NULL;
- pthread_cond_broadcast(&req->cond);
- break;
- case REQ_PENDING: /* ETIMEDOUT */
- case REQ_RESPONSE:
- req->state = REQ_DONE;
- pthread_cond_broadcast(&req->cond);
- break;
- default:
- break;
+static void dht_entry_destroy(struct dht_entry * e)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ assert(e != NULL);
+
+ list_for_each_safe(p, h, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->vals.len;
+ --dht.db.kv.vals;
}
- pthread_cleanup_pop(true);
+ list_for_each_safe(p, h, &e->lvals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->lvals.len;
+ --dht.db.kv.lvals;
+ }
+
+ free(e->key);
- return ret;
+ assert(e->vals.len == 0 && e->lvals.len == 0);
+
+ free(e);
}
-static void kad_req_respond(struct kad_req * req)
+static struct val_entry * dht_entry_get_lval(const struct dht_entry * e,
+ const buffer_t val)
{
- pthread_mutex_lock(&req->lock);
+ struct list_head * p;
- req->state = REQ_RESPONSE;
- pthread_cond_broadcast(&req->cond);
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ list_for_each(p, &e->lvals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (bufcmp(&v->val, &val) == 0)
+ return v;
+ }
- pthread_mutex_unlock(&req->lock);
+ return NULL;
}
-static struct contact * contact_create(const uint8_t * id,
- size_t len,
- uint64_t addr)
+static struct val_entry * dht_entry_get_val(const struct dht_entry * e,
+ const buffer_t val)
{
- struct contact * c;
- struct timespec t;
-
- c = malloc(sizeof(*c));
- if (c == NULL)
- return NULL;
+ struct list_head * p;
- list_head_init(&c->next);
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
- clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ list_for_each(p, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (bufcmp(&v->val, &val) == 0)
+ return v;
- c->addr = addr;
- c->fails = 0;
- c->t_seen = t.tv_sec;
- c->id = dht_dup_key(id, len);
- if (c->id == NULL) {
- free(c);
- return NULL;
}
- return c;
+ return NULL;
}
-static void contact_destroy(struct contact * c)
+static int dht_entry_update_val(struct dht_entry * e,
+ buffer_t val,
+ time_t exp)
{
- if (c != NULL)
- free(c->id);
+ struct val_entry * v;
+ struct timespec now;
- free(c);
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (exp < now.tv_sec)
+ return -EINVAL; /* Refuse to add expired values */
+
+ if (dht_entry_get_lval(e, val) != NULL) {
+ log_dbg(KV_FMT " Val already in lvals.", KV_VAL(e->key, val));
+ return 0; /* Refuse to add local values */
+ }
+
+ v = dht_entry_get_val(e, val);
+ if (v == NULL) {
+ v = val_entry_create(val, exp);
+ if (v == NULL)
+ return -ENOMEM;
+
+ list_add_tail(&v->next, &e->vals.list);
+ ++e->vals.len;
+ ++dht.db.kv.vals;
+
+ return 0;
+ }
+
+ if (v->t_exp < exp)
+ v->t_exp = exp;
+
+ return 0;
}
-static struct bucket * iter_bucket(struct bucket * b,
- const uint8_t * id)
+static int dht_entry_update_lval(struct dht_entry * e,
+ buffer_t val)
{
- uint8_t byte;
- uint8_t mask;
+ struct val_entry * v;
+ struct timespec now;
- assert(b);
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
- if (b->children[0] == NULL)
- return b;
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- byte = id[(b->depth * KAD_BETA) / CHAR_BIT];
+ v = dht_entry_get_lval(e, val);
+ if (v == NULL) {
+ log_dbg(KV_FMT " Adding lval.", KV_VAL(e->key, val));
+ v = val_entry_create(val, now.tv_sec + dht.t_expire);
+ if (v == NULL)
+ return -ENOMEM;
- mask = ((1L << KAD_BETA) - 1) & 0xFF;
+ list_add_tail(&v->next, &e->lvals.list);
+ ++e->lvals.len;
+ ++dht.db.kv.lvals;
- byte >>= (CHAR_BIT - KAD_BETA) -
- (((b->depth) * KAD_BETA) & (CHAR_BIT - 1));
+ return 0;
+ }
- return iter_bucket(b->children[(byte & mask)], id);
+ return 0;
}
-static struct bucket * dht_get_bucket(struct dht * dht,
- const uint8_t * id)
+static int dht_entry_remove_lval(struct dht_entry * e,
+ buffer_t val)
{
- assert(dht->buckets);
+ struct val_entry * v;
+
+ assert(e != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ v = dht_entry_get_lval(e, val);
+ if (v == NULL)
+ return -ENOENT;
+
+ log_dbg(KV_FMT " Removing lval.", KV_VAL(e->key, val));
- return iter_bucket(dht->buckets, id);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->lvals.len;
+ --dht.db.kv.lvals;
+
+ return 0;
}
-/*
- * If someone builds a network where the n (n > k) closest nodes all
- * have IDs starting with the same 64 bits: by all means, change this.
- */
-static uint64_t dist(const uint8_t * src,
- const uint8_t * dst)
+#define IS_EXPIRED(v, now) ((now)->tv_sec > (v)->t_exp)
+static void dht_entry_remove_expired_vals(struct dht_entry * e)
{
- return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst));
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+
+ assert(e != NULL);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ list_for_each_safe(p, h, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (!IS_EXPIRED(v, &now))
+ continue;
+
+ log_dbg(KV_FMT " Value expired." , KV_VAL(e->key, v->val));
+ list_del(&v->next);
+ val_entry_destroy(v);
+ --e->vals.len;
+ --dht.db.kv.vals;
+ }
}
-static size_t list_add_sorted(struct list_head * l,
- struct contact * c,
- const uint8_t * key)
+static struct dht_entry * __dht_kv_find_entry(const uint8_t * key)
{
struct list_head * p;
- assert(l);
- assert(c);
- assert(key);
- assert(c->id);
+ assert(key != NULL);
- list_for_each(p, l) {
- struct contact * e = list_entry(p, struct contact, next);
- if (dist(c->id, key) > dist(e->id, key))
- break;
+ list_for_each(p, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ if (!memcmp(key, e->key, dht.id.len))
+ return e;
}
- list_add_tail(&c->next, p);
-
- return 1;
+ return NULL;
}
-static size_t dht_contact_list(struct dht * dht,
- struct list_head * l,
- const uint8_t * key)
+static void dht_kv_remove_expired_entries(void)
{
struct list_head * p;
- struct bucket * b;
- size_t len = 0;
- size_t i;
- struct timespec t;
-
- assert(l);
- assert(dht);
- assert(key);
- assert(list_is_empty(l));
+ struct list_head * h;
+ struct timespec now;
- clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- b = dht_get_bucket(dht, key);
- if (b == NULL)
- return 0;
+ pthread_rwlock_wrlock(&dht.db.lock);
- b->t_refr = t.tv_sec + KAD_T_REFR;
+ list_for_each_safe(p, h, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ dht_entry_remove_expired_vals(e);
+ if (e->lvals.len > 0 || e->vals.len > 0)
+ continue;
- if (b->n_contacts == dht->k || b->parent == NULL) {
- list_for_each(p, &b->contacts) {
- struct contact * c;
- c = list_entry(p, struct contact, next);
- c = contact_create(c->id, dht->b, c->addr);
- if (list_add_sorted(l, c, key) == 1)
- if (++len == dht->k)
- break;
- }
- } else {
- struct bucket * d = b->parent;
- for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) {
- list_for_each(p, &d->children[i]->contacts) {
- struct contact * c;
- c = list_entry(p, struct contact, next);
- c = contact_create(c->id, dht->b, c->addr);
- if (c == NULL)
- continue;
- if (list_add_sorted(l, c, key) == 1)
- if (++len == dht->k)
- break;
- }
- }
+ log_dbg(KEY_FMT " Entry removed. ", KEY_VAL(e->key));
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ --dht.db.kv.len;
}
- assert(len == dht->k || b->parent == NULL);
-
- return len;
+ pthread_rwlock_unlock(&dht.db.lock);
}
-static struct lookup * lookup_create(struct dht * dht,
- const uint8_t * id)
+
+static struct contact * contact_create(const uint8_t * id,
+ uint64_t addr)
{
- struct lookup * lu;
- pthread_condattr_t cattr;
+ struct contact * c;
+ struct timespec t;
- assert(dht);
- assert(id);
+ c = malloc(sizeof(*c));
+ if (c == NULL)
+ return NULL;
- lu = malloc(sizeof(*lu));
- if (lu == NULL)
- goto fail_malloc;
+ list_head_init(&c->next);
- list_head_init(&lu->contacts);
- list_head_init(&lu->cookies);
+ clock_gettime(CLOCK_REALTIME_COARSE, &t);
- lu->state = LU_INIT;
- lu->addrs = NULL;
- lu->n_addrs = 0;
- lu->key = dht_dup_key(id, dht->b);
- if (lu->key == NULL)
- goto fail_id;
+ c->addr = addr;
+ c->fails = 0;
+ c->t_seen = t.tv_sec;
+ c->id = dht_dup_key(id);
+ if (c->id == NULL) {
+ free(c);
+ return NULL;
+ }
- if (pthread_mutex_init(&lu->lock, NULL))
- goto fail_mutex;
+ return c;
+}
- pthread_condattr_init(&cattr);
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
+static void contact_destroy(struct contact * c)
+{
+ assert(c != NULL);
+ assert(list_is_empty(&c->next));
+
+ free(c->id);
+ free(c);
+}
- if (pthread_cond_init(&lu->cond, &cattr))
- goto fail_cond;
+static struct dht_req * dht_req_create(const uint8_t * key)
+{
+ struct dht_req * req;
+ struct timespec now;
- pthread_condattr_destroy(&cattr);
+ assert(key != NULL);
- pthread_rwlock_wrlock(&dht->lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- list_add(&lu->next, &dht->lookups);
+ req = malloc(sizeof(*req));
+ if (req == NULL)
+ goto fail_malloc;
- lu->n_contacts = dht_contact_list(dht, &lu->contacts, id);
+ list_head_init(&req->next);
- pthread_rwlock_unlock(&dht->lock);
+ req->t_exp = now.tv_sec + DHT_T_RESP;
- return lu;
+ list_head_init(&req->peers.list);
+ req->peers.len = 0;
- fail_cond:
- pthread_condattr_destroy(&cattr);
- pthread_mutex_destroy(&lu->lock);
- fail_mutex:
- free(lu->key);
- fail_id:
- free(lu);
+ req->key = dht_dup_key(key);
+ if (req->key == NULL)
+ goto fail_dup_key;
+
+ list_head_init(&req->cache.list);
+ req->cache.len = 0;
+
+ return req;
+
+ fail_dup_key:
+ free(req);
fail_malloc:
return NULL;
}
-static void cancel_lookup_destroy(void * o)
+static void dht_req_destroy(struct dht_req * req)
{
- struct lookup * lu;
struct list_head * p;
struct list_head * h;
- lu = (struct lookup *) o;
-
- if (lu->key != NULL)
- free(lu->key);
- if (lu->addrs != NULL)
- free(lu->addrs);
+ assert(req);
+ assert(req->key);
- list_for_each_safe(p, h, &lu->contacts) {
- struct contact * c = list_entry(p, struct contact, next);
- list_del(&c->next);
- contact_destroy(c);
+ list_for_each_safe(p, h, &req->peers.list) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+ --req->peers.len;
}
- list_for_each_safe(p, h, &lu->cookies) {
- struct cookie_el * c = list_entry(p, struct cookie_el, next);
- list_del(&c->next);
- free(c);
+ list_for_each_safe(p, h, &req->cache.list) {
+ struct val_entry * e = list_entry(p, struct val_entry, next);
+ list_del(&e->next);
+ val_entry_destroy(e);
+ --req->cache.len;
}
- pthread_mutex_unlock(&lu->lock);
+ free(req->key);
- pthread_mutex_destroy(&lu->lock);
+ assert(req->peers.len == 0);
- free(lu);
+ free(req);
}
-static void lookup_destroy(struct lookup * lu)
+static struct peer_entry * dht_req_get_peer(struct dht_req * req,
+ struct peer_entry * e)
{
- assert(lu);
-
- pthread_mutex_lock(&lu->lock);
+ struct list_head * p;
- switch (lu->state) {
- case LU_DESTROY:
- pthread_mutex_unlock(&lu->lock);
- return;
- case LU_PENDING:
- lu->state = LU_DESTROY;
- pthread_cond_broadcast(&lu->cond);
- break;
- case LU_INIT:
- case LU_UPDATE:
- case LU_COMPLETE:
- lu->state = LU_NULL;
- break;
- case LU_NULL:
- default:
- break;
+ list_for_each(p, &req->peers.list) {
+ struct peer_entry * x = list_entry(p, struct peer_entry, next);
+ if (x->addr == e->addr)
+ return x;
}
- pthread_cleanup_push(cancel_lookup_destroy, lu);
-
- while (lu->state != LU_NULL)
- pthread_cond_wait(&lu->cond, &lu->lock);
-
- pthread_cleanup_pop(true);
+ return NULL;
}
-static void lookup_update(struct dht * dht,
- struct lookup * lu,
- dht_msg_t * msg)
+#define IS_MAGIC(peer) ((peer)->cookie == dht.magic)
+void dht_req_add_peer(struct dht_req * req,
+ struct peer_entry * e)
{
- struct list_head * p = NULL;
- struct list_head * h;
- struct contact * c = NULL;
- size_t n;
- size_t pos = 0;
- bool mod = false;
+ struct peer_entry * x; /* existing */
+ struct list_head * p; /* iterator */
+ size_t pos = 0;
- assert(lu);
- assert(msg);
+ assert(req != NULL);
+ assert(e != NULL);
+ assert(e->id != NULL);
- if (dht_get_state(dht) != DHT_RUNNING)
- return;
+ /*
+ * Dedupe messages to the same peer, unless
+ * 1) The previous request was FIND_NODE and now it's FIND_VALUE
+ * 2) We urgently need contacts from emergency peer (magic cookie)
+ */
+ x = dht_req_get_peer(req, e);
+ if (x != NULL && x->code >= e->code && !IS_MAGIC(e))
+ goto skip;
- pthread_mutex_lock(&lu->lock);
-
- list_for_each_safe(p, h, &lu->cookies) {
- struct cookie_el * e = list_entry(p, struct cookie_el, next);
- if (e->cookie == msg->cookie) {
- list_del(&e->next);
- free(e);
- break;
+ /* Find how this contact ranks in distance to the key */
+ list_for_each(p, &req->peers.list) {
+ struct peer_entry * y = list_entry(p, struct peer_entry, next);
+ if (IS_CLOSER(y->id, e->id)) {
+ pos++;
+ continue;
}
+ break;
}
- if (lu->state == LU_COMPLETE) {
- pthread_mutex_unlock(&lu->lock);
- return;
- }
+ /* Add a new peer to this request if we need to */
+ if (pos < dht.alpha || !IS_MAGIC(e)) {
+ x = malloc(sizeof(*x));
+ if (x == NULL) {
+ log_err("Failed to malloc peer entry.");
+ goto skip;
+ }
- if (msg->n_addrs > 0) {
- if (lu->addrs == NULL) {
- lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs);
- for (n = 0; n < msg->n_addrs; ++n)
- lu->addrs[n] = msg->addrs[n];
- lu->n_addrs = msg->n_addrs;
+ x->cookie = e->cookie;
+ x->addr = e->addr;
+ x->code = e->code;
+ x->t_sent = e->t_sent;
+ x->id = dht_dup_key(e->id);
+ if (x->id == NULL) {
+ log_err("Failed to dup peer ID.");
+ free(x);
+ goto skip;
}
- lu->state = LU_COMPLETE;
- pthread_cond_broadcast(&lu->cond);
- pthread_mutex_unlock(&lu->lock);
+ if (IS_MAGIC(e))
+ list_add(&x->next, p);
+ else
+ list_add_tail(&x->next, p);
+ ++req->peers.len;
return;
}
+ skip:
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+}
+
+static size_t dht_req_add_peers(struct dht_req * req,
+ struct list_head * pl)
+{
+ struct list_head * p;
+ struct list_head * h;
+ size_t n = 0;
- pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock);
+ assert(req != NULL);
+ assert(pl != NULL);
- while (lu->state == LU_INIT) {
- pthread_rwlock_unlock(&dht->lock);
- pthread_cond_wait(&lu->cond, &lu->lock);
- pthread_rwlock_rdlock(&dht->lock);
+ list_for_each_safe(p, h, pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ dht_req_add_peer(req, e);
}
- pthread_cleanup_pop(false);
+ return n;
+}
- for (n = 0; n < msg->n_contacts; ++n) {
- c = contact_create(msg->contacts[n]->id.data,
- dht->b, msg->contacts[n]->addr);
- if (c == NULL)
- continue;
+static bool dht_req_has_peer(struct dht_req * req,
+ uint64_t cookie)
+{
+ struct list_head * p;
- pos = 0;
+ assert(req != NULL);
- list_for_each(p, &lu->contacts) {
- struct contact * e;
- e = list_entry(p, struct contact, next);
- if (!memcmp(e->id, c->id, dht->b)) {
- contact_destroy(c);
- c = NULL;
- break;
- }
+ list_for_each(p, &req->peers.list) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ if (e->cookie == cookie)
+ return true;
+ }
- if (dist(c->id, lu->key) > dist(e->id, lu->key))
- break;
+ return false;
+}
- pos++;
- }
+static void peer_list_destroy(struct list_head * pl)
+{
+ struct list_head * p;
+ struct list_head * h;
- if (c == NULL)
- continue;
+ assert(pl != NULL);
- if (lu->n_contacts < dht->k) {
- list_add_tail(&c->next, p);
- ++lu->n_contacts;
- mod = true;
- } else if (pos == dht->k) {
- contact_destroy(c);
- } else {
- struct contact * d;
- d = list_last_entry(&lu->contacts,
- struct contact, next);
- list_add_tail(&c->next, p);
- list_del(&d->next);
- contact_destroy(d);
- mod = true;
- }
+ list_for_each_safe(p, h, pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ list_del(&e->next);
+ free(e->id);
+ free(e);
}
-
- if (list_is_empty(&lu->cookies) && !mod)
- lu->state = LU_COMPLETE;
- else
- lu->state = LU_UPDATE;
-
- pthread_cond_broadcast(&lu->cond);
- pthread_mutex_unlock(&lu->lock);
- return;
}
-static ssize_t lookup_get_addrs(struct lookup * lu,
- uint64_t * addrs)
+static int dht_kv_create_peer_list(struct list_head * cl,
+ struct list_head * pl,
+ enum dht_code code)
{
- ssize_t n;
-
- assert(lu);
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
+ size_t len;
- pthread_mutex_lock(&lu->lock);
+ assert(cl != NULL);
+ assert(pl != NULL);
+ assert(list_is_empty(pl));
- for (n = 0; (size_t) n < lu->n_addrs; ++n)
- addrs[n] = lu->addrs[n];
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- assert((size_t) n == lu->n_addrs);
+ len = 0;
- pthread_mutex_unlock(&lu->lock);
+ list_for_each_safe(p, h, cl) {
+ struct contact * c = list_entry(p, struct contact, next);
+ struct peer_entry * e;
+ if (len++ == dht.alpha)
+ break;
- return n;
-}
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return -ENOMEM;
-static ssize_t lookup_contact_addrs(struct lookup * lu,
- uint64_t * addrs)
-{
- struct list_head * p;
- ssize_t n = 0;
+ e->cookie = generate_cookie();
+ e->code = code;
+ e->addr = c->addr;
+ e->t_sent = now.tv_sec;
- assert(lu);
- assert(addrs);
+ e->id = c->id;
- pthread_mutex_lock(&lu->lock);
+ list_add_tail(&e->next, pl);
- list_for_each(p, &lu->contacts) {
- struct contact * c = list_entry(p, struct contact, next);
- addrs[n] = c->addr;
- n++;
+ list_del(&c->next);
+ c->id = NULL; /* we stole the id */
+ contact_destroy(c);
}
- pthread_mutex_unlock(&lu->lock);
-
- return n;
+ return 0;
}
-static void lookup_new_addrs(struct lookup * lu,
- uint64_t * addrs)
+static struct dht_req * __dht_kv_req_get_req(const uint8_t * key)
{
struct list_head * p;
- size_t n = 0;
- assert(lu);
- assert(addrs);
+ list_for_each(p, &dht.reqs.list) {
+ struct dht_req * r = list_entry(p, struct dht_req, next);
+ if (memcmp(r->key, key, dht.id.len) == 0)
+ return r;
+ }
- pthread_mutex_lock(&lu->lock);
+ return NULL;
+}
- /* Uses fails to check if the contact has been contacted. */
- list_for_each(p, &lu->contacts) {
- struct contact * c = list_entry(p, struct contact, next);
- if (c->fails == 0) {
- c->fails = 1;
- addrs[n] = c->addr;
- n++;
- }
+static struct dht_req * __dht_kv_get_req_cache(const uint8_t * key)
+{
+ struct dht_req * req;
- if (n == KAD_ALPHA)
- break;
- }
+ assert(key != NULL);
- assert(n <= KAD_ALPHA);
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL)
+ return NULL;
- addrs[n] = 0;
+ if (req->cache.len == 0)
+ return NULL;
- pthread_mutex_unlock(&lu->lock);
+ return req;
}
-static void lookup_set_state(struct lookup * lu,
- enum lookup_state state)
+static void __dht_kv_req_remove(const uint8_t * key)
{
- pthread_mutex_lock(&lu->lock);
+ struct dht_req * req;
- lu->state = state;
- pthread_cond_broadcast(&lu->cond);
+ assert(key != NULL);
- pthread_mutex_unlock(&lu->lock);
-}
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL)
+ return;
-static void cancel_lookup_wait(void * o)
-{
- struct lookup * lu = (struct lookup *) o;
- lu->state = LU_NULL;
- pthread_mutex_unlock(&lu->lock);
- lookup_destroy(lu);
+ list_del(&req->next);
+ --dht.reqs.len;
+
+ dht_req_destroy(req);
}
-static enum lookup_state lookup_wait(struct lookup * lu)
+static struct dht_req * __dht_kv_get_req_peer(const uint8_t * key,
+ uint64_t cookie)
{
- struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP);
- struct timespec abs;
- enum lookup_state state;
- int ret = 0;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
+ struct dht_req * req;
- ts_add(&abs, &timeo, &abs);
+ assert(key != NULL);
- pthread_mutex_lock(&lu->lock);
-
- if (lu->state == LU_INIT || lu->state == LU_UPDATE)
- lu->state = LU_PENDING;
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL)
+ return NULL;
- pthread_cleanup_push(cancel_lookup_wait, lu);
+ if (!dht_req_has_peer(req, cookie))
+ return NULL;
- while (lu->state == LU_PENDING && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs);
+ return req;
+}
- pthread_cleanup_pop(false);
+static bool dht_kv_has_req(const uint8_t * key,
+ uint64_t cookie)
+{
+ bool found;
- if (ret == -ETIMEDOUT)
- lu->state = LU_COMPLETE;
+ pthread_mutex_lock(&dht.reqs.mtx);
- state = lu->state;
+ found = __dht_kv_get_req_peer(key, cookie) != NULL;
- pthread_mutex_unlock(&lu->lock);
+ pthread_mutex_unlock(&dht.reqs.mtx);
- return state;
+ return found;
}
-static struct kad_req * dht_find_request(struct dht * dht,
- dht_msg_t * msg)
+/*
+ * This will filter the peer list for addresses that still need to be
+ * contacted.
+ */
+static int dht_kv_update_req(const uint8_t * key,
+ struct list_head * pl)
{
- struct list_head * p;
+ struct dht_req * req;
+ struct timespec now;
- assert(dht);
- assert(msg);
+ assert(key != NULL);
+ assert(pl != NULL);
+ assert(!list_is_empty(pl));
- list_for_each(p, &dht->requests) {
- struct kad_req * r = list_entry(p, struct kad_req, next);
- if (r->cookie == msg->cookie)
- return r;
- }
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- return NULL;
-}
+ pthread_mutex_lock(&dht.reqs.mtx);
-static struct lookup * dht_find_lookup(struct dht * dht,
- uint32_t cookie)
-{
- struct list_head * p;
- struct list_head * p2;
- struct list_head * h2;
-
- assert(dht);
- assert(cookie > 0);
-
- list_for_each(p, &dht->lookups) {
- struct lookup * l = list_entry(p, struct lookup, next);
- pthread_mutex_lock(&l->lock);
- list_for_each_safe(p2, h2, &l->cookies) {
- struct cookie_el * e;
- e = list_entry(p2, struct cookie_el, next);
- if (e->cookie == cookie) {
- list_del(&e->next);
- free(e);
- pthread_mutex_unlock(&l->lock);
- return l;
- }
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL) {
+ if (dht.reqs.len == DHT_MAX_REQS) {
+ log_err(KEY_FMT " Max reqs reached (%zu).",
+ KEY_VAL(key), dht.reqs.len);
+ peer_list_destroy(pl);
+ goto fail_req;
}
- pthread_mutex_unlock(&l->lock);
+ req = dht_req_create(key);
+ if (req == NULL) {
+ log_err(KEY_FMT "Failed to create req.", KEY_VAL(key));
+ goto fail_req;
+ }
+ list_add_tail(&req->next, &dht.reqs.list);
+ ++dht.reqs.len;
}
- return NULL;
+ if (req->cache.len > 0) /* Already have values */
+ peer_list_destroy(pl);
+
+ dht_req_add_peers(req, pl);
+ req->t_exp = now.tv_sec + DHT_T_RESP;
+
+ if (dht.reqs.len > DHT_WARN_REQS) {
+ log_warn("Number of outstanding requests (%zu) exceeds %u.",
+ dht.reqs.len, DHT_WARN_REQS);
+ }
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ return 0;
+ fail_req:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -1;
}
-static struct val * val_create(uint64_t addr,
- time_t exp)
+static int dht_kv_respond_req(uint8_t * key,
+ binary_data_t * vals,
+ size_t len)
{
- struct val * v;
- struct timespec t;
+ struct dht_req * req;
+ struct timespec now;
+ size_t i;
- v = malloc(sizeof(*v));
- if (v == NULL)
- return NULL;
+ assert(key != NULL);
+ assert(vals != NULL);
+ assert(len > 0);
- list_head_init(&v->next);
- v->addr = addr;
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ pthread_mutex_lock(&dht.reqs.mtx);
- v->t_exp = t.tv_sec + exp;
- v->t_rep = t.tv_sec + KAD_T_REPL;
+ req = __dht_kv_req_get_req(key);
+ if (req == NULL) {
+ log_warn(KEY_FMT " Failed to find req.", KEY_VAL(key));
+ goto fail_req;
+ }
+
+ for (i = 0; i < len; ++i) {
+ struct val_entry * e;
+ buffer_t val;
+ val.data = vals[i].data;
+ val.len = vals[i].len;
+ e = val_entry_create(val, now.tv_sec + DHT_T_CACHE);
+ if (e == NULL) {
+ log_err(" Failed to create val_entry.");
+ continue;
+ }
- return v;
-}
+ list_add_tail(&e->next, &req->cache.list);
+ ++req->cache.len;
+ }
-static void val_destroy(struct val * v)
-{
- assert(v);
+ pthread_cond_broadcast(&dht.reqs.cond);
- free(v);
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ fail_req:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -1;
}
-static struct ref_entry * ref_entry_create(struct dht * dht,
- const uint8_t * key)
+static ssize_t dht_kv_wait_req(const uint8_t * key,
+ buffer_t ** vals)
{
- struct ref_entry * e;
+ struct list_head * p;
+ struct dht_req * req;
struct timespec t;
+#ifdef __DHT_TEST__
+ struct timespec intv = TIMESPEC_INIT_MS(10);
+#else
+ struct timespec intv = TIMESPEC_INIT_S(DHT_T_RESP);
+#endif
+ size_t max;
+ size_t i = 0;
+ int ret = 0;
- assert(dht);
- assert(key);
+ assert(key != NULL);
+ assert(vals != NULL);
- e = malloc(sizeof(*e));
- if (e == NULL)
- return NULL;
+ clock_gettime(PTHREAD_COND_CLOCK, &t);
- e->key = dht_dup_key(key, dht->b);
- if (e->key == NULL) {
- free(e);
- return NULL;
+ ts_add(&t, &intv, &t);
+
+ pthread_mutex_lock(&dht.reqs.mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &dht.reqs.mtx);
+
+ while ((req = __dht_kv_get_req_cache(key)) == NULL) {
+ ret = pthread_cond_timedwait(&dht.reqs.cond, &dht.reqs.mtx, &t);
+ if (ret == ETIMEDOUT)
+ break;
}
- clock_gettime(CLOCK_REALTIME_COARSE, &t);
+ pthread_cleanup_pop(false);
- e->t_rep = t.tv_sec + dht->t_repub;
+ if (ret == ETIMEDOUT) {
+ log_warn(KEY_FMT " Req timed out.", KEY_VAL(key));
+ __dht_kv_req_remove(key);
+ goto timedout;
+ }
- return e;
-}
+ max = MIN(req->cache.len, DHT_MAX_VALS);
+ if (max == 0)
+ goto no_vals;
-static void ref_entry_destroy(struct ref_entry * e)
-{
- free(e->key);
- free(e);
+ *vals = malloc(max * sizeof(**vals));
+ if (*vals == NULL) {
+ log_err(KEY_FMT "Failed to malloc val buffer.", KEY_VAL(key));
+ goto fail_vals;
+ }
+
+ memset(*vals, 0, max * sizeof(**vals));
+
+ list_for_each(p, &req->cache.list) {
+ struct val_entry * v;
+ if (i == max)
+ break; /* We have enough values */
+ v = list_entry(p, struct val_entry, next);
+ (*vals)[i].data = malloc(v->val.len);
+ if ((*vals)[i].data == NULL)
+ goto fail_val_data;
+
+ (*vals)[i].len = v->val.len;
+ memcpy((*vals)[i++].data, v->val.data, v->val.len);
+ }
+
+ pthread_mutex_unlock(&dht.reqs.mtx);
+
+ return i;
+ no_vals:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return 0;
+ fail_val_data:
+ freebufs(*vals, i);
+ fail_vals:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -ENOMEM;
+ timedout:
+ pthread_mutex_unlock(&dht.reqs.mtx);
+ return -ETIMEDOUT;
}
-static struct dht_entry * dht_entry_create(struct dht * dht,
- const uint8_t * key)
+static struct bucket * iter_bucket(struct bucket * b,
+ const uint8_t * id)
{
- struct dht_entry * e;
+ uint8_t byte;
+ uint8_t mask;
- assert(dht);
- assert(key);
+ assert(b != NULL);
- e = malloc(sizeof(*e));
- if (e == NULL)
- return NULL;
+ if (b->children[0] == NULL)
+ return b;
- list_head_init(&e->next);
- list_head_init(&e->vals);
+ byte = id[(b->depth * DHT_BETA) / CHAR_BIT];
- e->n_vals = 0;
+ mask = ((1L << DHT_BETA) - 1) & 0xFF;
- e->key = dht_dup_key(key, dht->b);
- if (e->key == NULL) {
- free(e);
- return NULL;
- }
+ byte >>= (CHAR_BIT - DHT_BETA) -
+ (((b->depth) * DHT_BETA) & (CHAR_BIT - 1));
- return e;
+ return iter_bucket(b->children[(byte & mask)], id);
}
-static void dht_entry_destroy(struct dht_entry * e)
+static struct bucket * __dht_kv_get_bucket(const uint8_t * id)
+{
+ assert(dht.db.contacts.root != NULL);
+
+ return iter_bucket(dht.db.contacts.root, id);
+}
+
+static void contact_list_add(struct list_head * l,
+ struct contact * c)
{
struct list_head * p;
- struct list_head * h;
- assert(e);
+ assert(l != NULL);
+ assert(c != NULL);
- list_for_each_safe(p, h, &e->vals) {
- struct val * v = list_entry(p, struct val, next);
- list_del(&v->next);
- val_destroy(v);
+ list_for_each(p, l) {
+ struct contact * e = list_entry(p, struct contact, next);
+ if (IS_CLOSER(e->id, c->id))
+ continue;
}
- free(e->key);
-
- free(e);
+ list_add_tail(&c->next, p);
}
-static int dht_entry_add_addr(struct dht_entry * e,
- uint64_t addr,
- time_t exp)
+static ssize_t dht_kv_contact_list(const uint8_t * key,
+ struct list_head * l,
+ size_t max)
{
struct list_head * p;
- struct val * val;
- struct timespec t;
+ struct bucket * b;
+ struct timespec t;
+ size_t i;
+ size_t len = 0;
+
+ assert(l != NULL);
+ assert(key != NULL);
+ assert(list_is_empty(l));
clock_gettime(CLOCK_REALTIME_COARSE, &t);
- list_for_each(p, &e->vals) {
- struct val * v = list_entry(p, struct val, next);
- if (v->addr == addr) {
- if (v->t_exp < t.tv_sec + exp) {
- v->t_exp = t.tv_sec + exp;
- v->t_rep = t.tv_sec + KAD_T_REPL;
- }
+ max = MIN(max, dht.k);
- return 0;
- }
+ pthread_rwlock_rdlock(&dht.db.lock);
+
+ b = __dht_kv_get_bucket(key);
+ if (b == NULL) {
+ log_err(KEY_FMT " Failed to get bucket.", KEY_VAL(key));
+ goto fail_bucket;
}
- val = val_create(addr, exp);
- if (val == NULL)
- return -ENOMEM;
+ b->t_refr = t.tv_sec + dht.t_refresh;
- list_add(&val->next, &e->vals);
- ++e->n_vals;
+ if (b->contacts.len == dht.k || b->parent == NULL) {
+ list_for_each(p, &b->contacts.list) {
+ struct contact * c;
+ struct contact * d;
+ c = list_entry(p, struct contact, next);
+ if (c->addr == dht.addr)
+ continue;
+ d = contact_create(c->id, c->addr);
+ if (d == NULL)
+ continue;
+ contact_list_add(l, d);
+ if (++len == max)
+ break;
+ }
+ } else {
+ struct bucket * d = b->parent;
+ for (i = 0; i < (1L << DHT_BETA) && len < dht.k; ++i) {
+ list_for_each(p, &d->children[i]->contacts.list) {
+ struct contact * c;
+ struct contact * d;
+ c = list_entry(p, struct contact, next);
+ if (c->addr == dht.addr)
+ continue;
+ d = contact_create(c->id, c->addr);
+ if (d == NULL)
+ continue;
+ contact_list_add(l, d);
+ if (++len == max)
+ break;
+ }
+ }
+ }
- return 0;
-}
+ pthread_rwlock_unlock(&dht.db.lock);
+ return len;
+ fail_bucket:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
+}
-static void dht_entry_del_addr(struct dht_entry * e,
- uint64_t addr)
+static void contact_list_destroy(struct list_head * l)
{
struct list_head * p;
struct list_head * h;
- assert(e);
-
- list_for_each_safe(p, h, &e->vals) {
- struct val * v = list_entry(p, struct val, next);
- if (v->addr == addr) {
- list_del(&v->next);
- val_destroy(v);
- --e->n_vals;
- }
- }
+ assert(l != NULL);
- if (e->n_vals == 0) {
- list_del(&e->next);
- dht_entry_destroy(e);
+ list_for_each_safe(p, h, l) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
}
}
-static uint64_t dht_entry_get_addr(struct dht * dht,
- struct dht_entry * e)
+static ssize_t dht_kv_get_contacts(const uint8_t * key,
+ dht_contact_msg_t *** msgs)
{
+ struct list_head cl;
struct list_head * p;
+ struct list_head * h;
+ size_t len;
+ size_t i = 0;
+
+ assert(key != NULL);
+ assert(msgs != NULL);
- assert(e);
- assert(!list_is_empty(&e->vals));
+ list_head_init(&cl);
- list_for_each(p, &e->vals) {
- struct val * v = list_entry(p, struct val, next);
- if (v->addr != dht->addr)
- return v->addr;
+ len = dht_kv_contact_list(key, &cl, dht.k);
+ if (len == 0) {
+ *msgs = NULL;
+ return 0;
}
- return 0;
-}
+ *msgs = malloc(len * sizeof(**msgs));
+ if (*msgs == NULL)
+ goto fail_msgs;
-/* Forward declaration. */
-static struct lookup * kad_lookup(struct dht * dht,
- const uint8_t * key,
- enum kad_code code);
+ list_for_each_safe(p, h, &cl) {
+ struct contact * c;
+ (*msgs)[i] = malloc(sizeof(***msgs));
+ if ((*msgs)[i] == NULL)
+ goto fail_contact;
+ dht_contact_msg__init((*msgs)[i]);
+ c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ (*msgs)[i]->id.data = c->id;
+ (*msgs)[i]->id.len = dht.id.len;
+ (*msgs)[i++]->addr = c->addr;
+ free(c);
+ }
+
+ return i;
+ fail_contact:
+ while (i-- > 0)
+ dht_contact_msg__free_unpacked((*msgs)[i], NULL);
+ free(*msgs);
+ *msgs = NULL;
+ fail_msgs:
+ contact_list_destroy(&cl);
+ return -ENOMEM;
+}
/* Build a refresh list. */
-static void bucket_refresh(struct dht * dht,
- struct bucket * b,
- time_t t,
- struct list_head * r)
+static void __dht_kv_bucket_refresh_list(struct bucket * b,
+ time_t t,
+ struct list_head * r)
{
- size_t i;
+ struct contact * c;
+ struct contact * d;
- if (*b->children != NULL)
- for (i = 0; i < (1L << KAD_BETA); ++i)
- bucket_refresh(dht, b->children[i], t, r);
+ assert(b != NULL);
- if (b->n_contacts == 0)
+ if (t < b->t_refr)
return;
- if (t > b->t_refr) {
- struct contact * c;
- struct contact * d;
- c = list_first_entry(&b->contacts, struct contact, next);
- d = contact_create(c->id, dht->b, c->addr);
+ if (*b->children != NULL) {
+ size_t i;
+ for (i = 0; i < (1L << DHT_BETA); ++i)
+ __dht_kv_bucket_refresh_list(b->children[i], t, r);
+ }
+
+ if (b->contacts.len == 0)
+ return;
+
+ c = list_first_entry(&b->contacts.list, struct contact, next);
+ if (t > c->t_seen + dht.t_refresh) {
+ d = contact_create(c->id, c->addr);
if (d != NULL)
list_add(&d->next, r);
- return;
}
}
-
static struct bucket * bucket_create(void)
{
struct bucket * b;
@@ -1300,20 +1680,21 @@ static struct bucket * bucket_create(void)
if (b == NULL)
return NULL;
- list_head_init(&b->contacts);
- b->n_contacts = 0;
+ list_head_init(&b->contacts.list);
+ b->contacts.len = 0;
- list_head_init(&b->alts);
- b->n_alts = 0;
+ list_head_init(&b->alts.list);
+ b->alts.len = 0;
clock_gettime(CLOCK_REALTIME_COARSE, &t);
- b->t_refr = t.tv_sec + KAD_T_REFR;
+ b->t_refr = t.tv_sec + dht.t_refresh;
- for (i = 0; i < (1L << KAD_BETA); ++i)
+ for (i = 0; i < (1L << DHT_BETA); ++i)
b->children[i] = NULL;
b->parent = NULL;
b->depth = 0;
+ b->mask = 0;
return b;
}
@@ -1324,24 +1705,24 @@ static void bucket_destroy(struct bucket * b)
struct list_head * h;
size_t i;
- assert(b);
+ assert(b != NULL);
- for (i = 0; i < (1L << KAD_BETA); ++i)
+ for (i = 0; i < (1L << DHT_BETA); ++i)
if (b->children[i] != NULL)
bucket_destroy(b->children[i]);
- list_for_each_safe(p, h, &b->contacts) {
+ list_for_each_safe(p, h, &b->contacts.list) {
struct contact * c = list_entry(p, struct contact, next);
list_del(&c->next);
contact_destroy(c);
- --b->n_contacts;
+ --b->contacts.len;
}
- list_for_each_safe(p, h, &b->alts) {
+ list_for_each_safe(p, h, &b->alts.list) {
struct contact * c = list_entry(p, struct contact, next);
list_del(&c->next);
contact_destroy(c);
- --b->n_contacts;
+ --b->alts.len;
}
free(b);
@@ -1356,1537 +1737,2297 @@ static bool bucket_has_id(struct bucket * b,
if (b->depth == 0)
return true;
- byte = id[(b->depth * KAD_BETA) / CHAR_BIT];
+ byte = id[(b->depth * DHT_BETA) / CHAR_BIT];
- mask = ((1L << KAD_BETA) - 1) & 0xFF;
+ mask = ((1L << DHT_BETA) - 1) & 0xFF;
- byte >>= (CHAR_BIT - KAD_BETA) -
- (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1));
+ byte >>= (CHAR_BIT - DHT_BETA) -
+ (((b->depth - 1) * DHT_BETA) & (CHAR_BIT - 1));
return ((byte & mask) == b->mask);
}
-static int split_bucket(struct bucket * b)
+static int move_contacts(struct bucket * b,
+ struct bucket * c)
{
struct list_head * p;
struct list_head * h;
+ struct contact * d;
+
+ assert(b != NULL);
+ assert(c != NULL);
+
+ list_for_each_safe(p, h, &b->contacts.list) {
+ d = list_entry(p, struct contact, next);
+ if (bucket_has_id(c, d->id)) {
+ list_del(&d->next);
+ --b->contacts.len;
+ list_add_tail(&d->next, &c->contacts.list);
+ ++c->contacts.len;
+ }
+ }
+
+ return 0;
+}
+
+static int split_bucket(struct bucket * b)
+{
uint8_t mask = 0;
size_t i;
- size_t c;
+ size_t b_len;
assert(b);
- assert(b->n_alts == 0);
- assert(b->n_contacts);
+ assert(b->alts.len == 0);
+ assert(b->contacts.len != 0);
assert(b->children[0] == NULL);
- c = b->n_contacts;
+ b_len = b->contacts.len;
- for (i = 0; i < (1L << KAD_BETA); ++i) {
+ for (i = 0; i < (1L << DHT_BETA); ++i) {
b->children[i] = bucket_create();
- if (b->children[i] == NULL) {
- size_t j;
- for (j = 0; j < i; ++j)
- bucket_destroy(b->children[j]);
- return -1;
- }
+ if (b->children[i] == NULL)
+ goto fail_child;
b->children[i]->depth = b->depth + 1;
b->children[i]->mask = mask;
b->children[i]->parent = b;
- list_for_each_safe(p, h, &b->contacts) {
- struct contact * c;
- c = list_entry(p, struct contact, next);
- if (bucket_has_id(b->children[i], c->id)) {
- list_del(&c->next);
- --b->n_contacts;
- list_add(&c->next, &b->children[i]->contacts);
- ++b->children[i]->n_contacts;
- }
- }
+ move_contacts(b, b->children[i]);
mask++;
}
- for (i = 0; i < (1L << KAD_BETA); ++i)
- if (b->children[i]->n_contacts == c)
+ for (i = 0; i < (1L << DHT_BETA); ++i)
+ if (b->children[i]->contacts.len == b_len)
split_bucket(b->children[i]);
return 0;
+ fail_child:
+ while (i-- > 0)
+ bucket_destroy(b->children[i]);
+ return -1;
}
-/* Locked externally to mandate update as (final) part of join transaction. */
-static int dht_update_bucket(struct dht * dht,
- const uint8_t * id,
- uint64_t addr)
+static int dht_kv_update_contacts(const uint8_t * id,
+ uint64_t addr)
{
struct list_head * p;
struct list_head * h;
struct bucket * b;
struct contact * c;
- assert(dht);
+ assert(id != NULL);
+ assert(addr != INVALID_ADDR);
- b = dht_get_bucket(dht, id);
- if (b == NULL)
- return -1;
+ pthread_rwlock_wrlock(&dht.db.lock);
- c = contact_create(id, dht->b, addr);
- if (c == NULL)
- return -1;
+ b = __dht_kv_get_bucket(id);
+ if (b == NULL) {
+ log_err(PEER_FMT " Failed to get bucket.", PEER_VAL(id, addr));
+ goto fail_update;
+ }
+
+ c = contact_create(id, addr);
+ if (c == NULL) {
+ log_err(PEER_FMT " Failed to create contact.",
+ PEER_VAL(id, addr));
+ goto fail_update;
+ }
- list_for_each_safe(p, h, &b->contacts) {
+ list_for_each_safe(p, h, &b->contacts.list) {
struct contact * d = list_entry(p, struct contact, next);
if (d->addr == addr) {
list_del(&d->next);
contact_destroy(d);
- --b->n_contacts;
+ --b->contacts.len;
}
}
- if (b->n_contacts == dht->k) {
- if (bucket_has_id(b, dht->id)) {
- list_add_tail(&c->next, &b->contacts);
- ++b->n_contacts;
+ if (b->contacts.len == dht.k) {
+ if (bucket_has_id(b, dht.id.data)) {
+ list_add_tail(&c->next, &b->contacts.list);
+ ++b->contacts.len;
if (split_bucket(b)) {
list_del(&c->next);
contact_destroy(c);
- --b->n_contacts;
+ --b->contacts.len;
}
- } else if (b->n_alts == dht->k) {
+ } else if (b->alts.len == dht.k) {
struct contact * d;
- d = list_first_entry(&b->alts, struct contact, next);
+ d = list_first_entry(&b->alts.list,
+ struct contact, next);
list_del(&d->next);
contact_destroy(d);
- list_add_tail(&c->next, &b->alts);
+ list_add_tail(&c->next, &b->alts.list);
+ ++b->alts.len;
} else {
- list_add_tail(&c->next, &b->alts);
- ++b->n_alts;
+ list_add_tail(&c->next, &b->alts.list);
+ ++b->alts.len;
}
} else {
- list_add_tail(&c->next, &b->contacts);
- ++b->n_contacts;
+ list_add_tail(&c->next, &b->contacts.list);
+ ++b->contacts.len;
}
+ pthread_rwlock_unlock(&dht.db.lock);
+
return 0;
+ fail_update:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
}
-static int send_msg(struct dht * dht,
- dht_msg_t * msg,
- uint64_t addr)
+static time_t gcd(time_t a,
+ time_t b)
{
-#ifndef __DHT_TEST__
- struct shm_du_buff * sdb;
- size_t len;
-#endif
- int retr = 0;
+ if (a == 0)
+ return b;
- if (msg->code == KAD_RESPONSE)
- retr = KAD_RESP_RETR;
+ return gcd(b % a, a);
+}
- pthread_rwlock_wrlock(&dht->lock);
+static dht_contact_msg_t * dht_kv_src_contact_msg(void)
+{
+ dht_contact_msg_t * src;
- if (dht->id != NULL) {
- msg->has_s_id = true;
- msg->s_id.data = dht->id;
- msg->s_id.len = dht->b;
- }
+ src = malloc(sizeof(*src));
+ if (src == NULL)
+ goto fail_malloc;
- msg->s_addr = dht->addr;
+ dht_contact_msg__init(src);
- if (msg->code < KAD_STORE) {
- msg->cookie = bmp_allocate(dht->cookies);
- if (!bmp_is_id_valid(dht->cookies, msg->cookie)) {
- pthread_rwlock_unlock(&dht->lock);
- goto fail_bmp_alloc;
- }
- }
+ src->id.data = dht_dup_key(dht.id.data);
+ if (src->id.data == NULL)
+ goto fail_id;
- pthread_rwlock_unlock(&dht->lock);
+ src->id.len = dht.id.len;
+ src->addr = dht.addr;
-#ifndef __DHT_TEST__
- len = dht_msg__get_packed_size(msg);
- if (len == 0)
- goto fail_msg;
+ return src;
+ fail_id:
+ dht_contact_msg__free_unpacked(src, NULL);
+ fail_malloc:
+ return NULL;
+}
- while (true) {
- if (ipcp_sdb_reserve(&sdb, len))
- goto fail_msg;
+static dht_msg_t * dht_kv_find_req_msg(const uint8_t * key,
+ enum dht_code code)
+{
+ dht_msg_t * msg;
- dht_msg__pack(msg, shm_du_buff_head(sdb));
+ assert(key != NULL);
- if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
- break;
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
- ipcp_sdb_release(sdb);
+ dht_msg__init(msg);
+ msg->code = code;
- sleep(1);
+ msg->src = dht_kv_src_contact_msg();
+ if (msg->src == NULL)
+ goto fail_msg;
- if (--retr < 0)
- goto fail_msg;
- }
+ msg->find = malloc(sizeof(*msg->find));
+ if (msg->find == NULL)
+ goto fail_msg;
-#else
- (void) addr;
- (void) retr;
-#endif /* __DHT_TEST__ */
+ dht_find_req_msg__init(msg->find);
- if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN)
- kad_req_create(dht, msg, addr);
+ msg->find->key.data = dht_dup_key(key);
+ if (msg->find->key.data == NULL)
+ goto fail_msg;
+
+ msg->find->key.len = dht.id.len;
+ msg->find->cookie = DHT_INVALID;
+
+ return msg;
- return msg->cookie;
-#ifndef __DHT_TEST__
fail_msg:
- pthread_rwlock_wrlock(&dht->lock);
- bmp_release(dht->cookies, msg->cookie);
- pthread_rwlock_unlock(&dht->lock);
-#endif /* !__DHT_TEST__ */
- fail_bmp_alloc:
- return -1;
+ dht_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
}
-static struct dht_entry * dht_find_entry(struct dht * dht,
- const uint8_t * key)
+static dht_msg_t * dht_kv_find_node_req_msg(const uint8_t * key)
{
- struct list_head * p;
+ return dht_kv_find_req_msg(key, DHT_FIND_NODE_REQ);
+}
- list_for_each(p, &dht->entries) {
- struct dht_entry * e = list_entry(p, struct dht_entry, next);
- if (!memcmp(key, e->key, dht->b))
- return e;
+static dht_msg_t * dht_kv_find_value_req_msg(const uint8_t * key)
+{
+ return dht_kv_find_req_msg(key, DHT_FIND_VALUE_REQ);
+}
+
+static dht_msg_t * dht_kv_find_node_rsp_msg(uint8_t * key,
+ uint64_t cookie,
+ dht_contact_msg_t *** contacts,
+ size_t len)
+{
+ dht_msg_t * msg;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ dht_msg__init(msg);
+ msg->code = DHT_FIND_NODE_RSP;
+
+ msg->src = dht_kv_src_contact_msg();
+ if (msg->src == NULL)
+ goto fail_msg;
+
+ msg->node = malloc(sizeof(*msg->node));
+ if (msg->node == NULL)
+ goto fail_msg;
+
+ dht_find_node_rsp_msg__init(msg->node);
+
+ msg->node->key.data = dht_dup_key(key);
+ if (msg->node->key.data == NULL)
+ goto fail_msg;
+
+ msg->node->cookie = cookie;
+ msg->node->key.len = dht.id.len;
+ msg->node->n_contacts = len;
+ if (len != 0) { /* Steal the ptr */
+ msg->node->contacts = *contacts;
+ *contacts = NULL;
}
+ return msg;
+
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_malloc:
return NULL;
}
-static int kad_add(struct dht * dht,
- const dht_contact_msg_t * contacts,
- ssize_t n,
- time_t exp)
+static dht_msg_t * dht_kv_find_value_rsp_msg(uint8_t * key,
+ uint64_t cookie,
+ dht_contact_msg_t *** contacts,
+ size_t n_contacts,
+ buffer_t ** vals,
+ size_t n_vals)
{
- struct dht_entry * e;
+ dht_msg_t * msg;
- pthread_rwlock_wrlock(&dht->lock);
+ msg = dht_kv_find_node_rsp_msg(key, cookie, contacts, n_contacts);
+ if (msg == NULL)
+ goto fail_node_rsp;
- while (n-- > 0) {
- if (contacts[n].id.len != dht->b)
- log_warn("Bad key length in contact data.");
+ msg->code = DHT_FIND_VALUE_RSP;
- e = dht_find_entry(dht, contacts[n].id.data);
- if (e != NULL) {
- if (dht_entry_add_addr(e, contacts[n].addr, exp))
- goto fail;
- } else {
- e = dht_entry_create(dht, contacts[n].id.data);
- if (e == NULL)
- goto fail;
+ msg->val = malloc(sizeof(*msg->val));
+ if (msg->val == NULL)
+ goto fail_msg;
- if (dht_entry_add_addr(e, contacts[n].addr, exp)) {
- dht_entry_destroy(e);
- goto fail;
- }
+ dht_find_value_rsp_msg__init(msg->val);
- list_add(&e->next, &dht->entries);
- }
- }
+ msg->val->n_values = n_vals;
+ if (n_vals != 0) /* Steal the ptr */
+ msg->val->values = (binary_data_t *) *vals;
- pthread_rwlock_unlock(&dht->lock);
- return 0;
+ return msg;
- fail:
- pthread_rwlock_unlock(&dht->lock);
- return -ENOMEM;
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_node_rsp:
+ return NULL;
}
-static int wait_resp(struct dht * dht,
- dht_msg_t * msg,
- time_t timeo)
+static dht_msg_t * dht_kv_store_msg(const uint8_t * key,
+ const buffer_t val,
+ time_t exp)
{
- struct kad_req * req;
+ dht_msg_t * msg;
+
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
- assert(dht);
- assert(msg);
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
- pthread_rwlock_rdlock(&dht->lock);
+ dht_msg__init(msg);
- req = dht_find_request(dht, msg);
- if (req == NULL) {
- pthread_rwlock_unlock(&dht->lock);
- return -EPERM;
- }
+ msg->code = DHT_STORE;
+
+ msg->src = dht_kv_src_contact_msg();
+ if (msg->src == NULL)
+ goto fail_msg;
+
+ msg->store = malloc(sizeof(*msg->store));
+ if (msg->store == NULL)
+ goto fail_msg;
+
+ dht_store_msg__init(msg->store);
+
+ msg->store->key.data = dht_dup_key(key);
+ if (msg->store->key.data == NULL)
+ goto fail_msg;
+
+ msg->store->key.len = dht.id.len;
+ msg->store->val.data = malloc(val.len);
+ if (msg->store->val.data == NULL)
+ goto fail_msg;
+
+ memcpy(msg->store->val.data, val.data, val.len);
+
+ msg->store->val.len = val.len;
+ msg->store->exp = exp;
- pthread_rwlock_unlock(&dht->lock);
+ return msg;
- return kad_req_wait(req, timeo);
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
}
-static int kad_store(struct dht * dht,
- const uint8_t * key,
- uint64_t addr,
- uint64_t r_addr,
- time_t ttl)
+static ssize_t dht_kv_retrieve(const uint8_t * key,
+ buffer_t ** vals)
{
- dht_msg_t msg = DHT_MSG__INIT;
- dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT;
- dht_contact_msg_t * cmsgp[1];
+ struct dht_entry * e;
+ struct list_head * p;
+ size_t n;
+ size_t i;
- cmsg.id.data = (uint8_t *) key;
- cmsg.addr = addr;
+ assert(key != NULL);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.db.lock);
- cmsg.id.len = dht->b;
+ e = __dht_kv_find_entry(key);
+ if (e == NULL)
+ goto no_vals;
- pthread_rwlock_unlock(&dht->lock);
+ n = MIN(DHT_MAX_VALS, e->vals.len + e->lvals.len);
+ if (n == 0)
+ goto no_vals;
- cmsgp[0] = &cmsg;
+ *vals = malloc(n * sizeof(**vals));
+ if (*vals == NULL)
+ goto fail_vals;
- msg.code = KAD_STORE;
- msg.has_t_expire = true;
- msg.t_expire = ttl;
- msg.n_contacts = 1;
- msg.contacts = cmsgp;
+ memset(*vals, 0, n * sizeof(**vals));
- if (send_msg(dht, &msg, r_addr) < 0)
- return -1;
+ i = 0;
+
+ list_for_each(p, &e->vals.list) {
+ struct val_entry * v;
+ if (i == n)
+ break; /* We have enough values */
+ v = list_entry(p, struct val_entry, next);
+ (*vals)[i].data = malloc(v->val.len);
+ if ((*vals)[i].data == NULL)
+ goto fail_val_data;
+
+ (*vals)[i].len = v->val.len;
+ memcpy((*vals)[i++].data, v->val.data, v->val.len);
+ }
+
+ list_for_each(p, &e->lvals.list) {
+ struct val_entry * v;
+ if (i == n)
+ break; /* We have enough values */
+ v = list_entry(p, struct val_entry, next);
+ (*vals)[i].data = malloc(v->val.len);
+ if ((*vals)[i].data == NULL)
+ goto fail_val_data;
+
+ (*vals)[i].len = v->val.len;
+ memcpy((*vals)[i++].data, v->val.data, v->val.len);
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return (ssize_t) i;
+ fail_val_data:
+ pthread_rwlock_unlock(&dht.db.lock);
+ freebufs(*vals, i);
+ *vals = NULL;
+ return -ENOMEM;
+ fail_vals:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -ENOMEM;
+ no_vals:
+ pthread_rwlock_unlock(&dht.db.lock);
+ *vals = NULL;
return 0;
}
-static ssize_t kad_find(struct dht * dht,
- struct lookup * lu,
- const uint64_t * addrs,
- enum kad_code code)
+static void __cleanup_dht_msg(void * msg)
+{
+ dht_msg__free_unpacked((dht_msg_t *) msg, NULL);
+}
+
+#ifdef DEBUG_PROTO_DHT
+static void dht_kv_debug_msg(dht_msg_t * msg)
{
- dht_msg_t msg = DHT_MSG__INIT;
- ssize_t sent = 0;
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ time_t stamp;
+ size_t i;
- assert(dht);
- assert(lu->key);
+ if (msg == NULL)
+ return;
- msg.code = code;
+ pthread_cleanup_push(__cleanup_dht_msg, msg);
+
+ switch (msg->code) {
+ case DHT_STORE:
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->store->key.data),
+ msg->store->key.len);
+ log_proto(" val: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->store->val.data),
+ msg->store->val.len);
+ stamp = msg->store->exp;
+ tm = gmtime(&stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+ log_proto(" exp: %s.", tmstr);
+ break;
+ case DHT_FIND_NODE_REQ:
+ /* FALLTHRU */
+ case DHT_FIND_VALUE_REQ:
+ log_proto(" cookie: " HASH_FMT64,
+ HASH_VAL64(&msg->find->cookie));
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->find->key.data),
+ msg->find->key.len);
+ break;
+ case DHT_FIND_VALUE_RSP:
+ log_proto(" cookie: " HASH_FMT64,
+ HASH_VAL64(&msg->node->cookie));
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->node->key.data),
+ msg->node->key.len);
+ log_proto(" values: [%zd]", msg->val->n_values);
+ for (i = 0; i < msg->val->n_values; i++)
+ log_proto(" " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->val->values[i].data),
+ msg->val->values[i].len);
+ log_proto(" contacts: [%zd]", msg->node->n_contacts);
+ for (i = 0; i < msg->node->n_contacts; i++) {
+ dht_contact_msg_t * c = msg->node->contacts[i];
+ log_proto(" " PEER_FMT,
+ PEER_VAL(c->id.data, c->addr));
+ }
+ break;
+ case DHT_FIND_NODE_RSP:
+ log_proto(" cookie: " HASH_FMT64,
+ HASH_VAL64(&msg->node->cookie));
+ log_proto(" key: " HASH_FMT64 " [%zu bytes]",
+ HASH_VAL64(msg->node->key.data), msg->node->key.len);
+ log_proto(" contacts: [%zd]", msg->node->n_contacts);
+ for (i = 0; i < msg->node->n_contacts; i++) {
+ dht_contact_msg_t * c = msg->node->contacts[i];
+ log_proto(" " PEER_FMT,
+ PEER_VAL(c->id.data, c->addr));
+ }
- msg.has_key = true;
- msg.key.data = (uint8_t *) lu->key;
- msg.key.len = dht->b;
+ break;
+ default:
+ break;
+ }
- while (*addrs != 0) {
- struct cookie_el * c;
- int ret;
+ pthread_cleanup_pop(false);
+}
- if (*addrs == dht->addr) {
- ++addrs;
- continue;
- }
+static void dht_kv_debug_msg_snd(dht_msg_t * msg,
+ uint8_t * id,
+ uint64_t addr)
+{
+ if (msg == NULL)
+ return;
- ret = send_msg(dht, &msg, *addrs);
- if (ret < 0)
- break;
+ log_proto(TX_HDR_FMT ".", TX_HDR_VAL(msg, id, addr));
- c = malloc(sizeof(*c));
- if (c == NULL)
- break;
+ dht_kv_debug_msg(msg);
+}
+
+static void dht_kv_debug_msg_rcv(dht_msg_t * msg)
+{
+ if (msg == NULL)
+ return;
+
+ log_proto(RX_HDR_FMT ".", RX_HDR_VAL(msg));
+
+ dht_kv_debug_msg(msg);
+}
+#endif
+
+#ifndef __DHT_TEST__
+static int dht_send_msg(dht_msg_t * msg,
+ uint64_t addr)
+{
+ size_t len;
+ struct shm_du_buff * sdb;
+
+ if (msg == NULL)
+ return 0;
+
+ assert(addr != INVALID_ADDR && addr != dht.addr);
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ log_warn("%s failed to pack.", DHT_CODE(msg));
+ goto fail_msg;
+ }
+
+ if (ipcp_sdb_reserve(&sdb, len)) {
+ log_warn("%s failed to get sdb.", DHT_CODE(msg));
+ goto fail_msg;
+ }
+
+ dht_msg__pack(msg, shm_du_buff_head(sdb));
+
+ if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) {
+ log_warn("%s write failed", DHT_CODE(msg));
+ goto fail_send;
+ }
- c->cookie = (uint32_t) ret;
+ return 0;
+ fail_send:
+ ipcp_sdb_release(sdb);
+ fail_msg:
+ return -1;
+}
+#else /* funtion for testing */
+static int dht_send_msg(dht_msg_t * msg,
+ uint64_t addr)
+{
+ buffer_t buf;
+
+ assert(msg != NULL);
+ assert(addr != INVALID_ADDR && addr != dht.addr);
- pthread_mutex_lock(&lu->lock);
+ buf.len = dht_msg__get_packed_size(msg);
+ if (buf.len == 0) {
+ log_warn("%s failed to pack.", DHT_CODE(msg));
+ goto fail_msg;
+ }
- list_add_tail(&c->next, &lu->cookies);
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ log_warn("%s failed to malloc buf.", DHT_CODE(msg));
+ goto fail_msg;
+ }
- pthread_mutex_unlock(&lu->lock);
+ dht_msg__pack(msg, buf.data);
- ++sent;
- ++addrs;
+ if (sink_send_msg(&buf, addr) < 0) {
+ log_warn("%s write failed", DHT_CODE(msg));
+ goto fail_send;
}
- return sent;
+ return 0;
+ fail_send:
+ freebuf(buf);
+ fail_msg:
+ return -1;
}
+#endif /* __DHT_TEST__ */
-static void lookup_detach(struct dht * dht,
- struct lookup * lu)
+static void __cleanup_peer_list(void * pl)
{
- pthread_rwlock_wrlock(&dht->lock);
+ struct list_head * p;
+ struct list_head * h;
- list_del(&lu->next);
+ assert(pl != NULL);
- pthread_rwlock_unlock(&dht->lock);
+ list_for_each_safe(p, h, (struct list_head *) pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ list_del(&e->next);
+ free(e->id);
+ free(e);
+ }
}
-static struct lookup * kad_lookup(struct dht * dht,
- const uint8_t * id,
- enum kad_code code)
+
+static int dht_kv_send_msgs(dht_msg_t * msg,
+ struct list_head * pl)
{
- uint64_t addrs[KAD_ALPHA + 1];
- enum lookup_state state;
- struct lookup * lu;
+ struct list_head * p;
+ struct list_head * h;
- lu = lookup_create(dht, id);
- if (lu == NULL)
- return NULL;
+ pthread_cleanup_push(__cleanup_dht_msg, msg);
+ pthread_cleanup_push(__cleanup_peer_list, pl);
- lookup_new_addrs(lu, addrs);
+ list_for_each_safe(p, h, pl) {
+ struct peer_entry * e = list_entry(p, struct peer_entry, next);
+ if (IS_REQUEST(msg->code)) {
+ msg->find->cookie = e->cookie;
+ assert(msg->find->cookie != DHT_INVALID);
+ }
+ if (dht_send_msg(msg, e->addr) < 0)
+ continue;
- if (addrs[0] == 0) {
- lookup_detach(dht, lu);
- lookup_destroy(lu);
- return NULL;
+#ifdef DEBUG_PROTO_DHT
+ dht_kv_debug_msg_snd(msg, e->id, e->addr);
+#endif
+ list_del(&e->next);
+ free(e->id);
+ free(e);
}
- if (kad_find(dht, lu, addrs, code) == 0) {
- lookup_detach(dht, lu);
- return lu;
+ pthread_cleanup_pop(false);
+ pthread_cleanup_pop(false);
+
+ return list_is_empty(pl) ? 0 : -1;
+}
+
+static int dht_kv_get_peer_list_for_msg(dht_msg_t * msg,
+ struct list_head * pl)
+{
+ struct list_head cl; /* contact list */
+ uint8_t * key; /* key in the request */
+ size_t max;
+
+ assert(msg != NULL);
+
+ assert(list_is_empty(pl));
+
+ max = msg->code == DHT_STORE ? dht.k : dht.alpha;
+
+ switch (msg->code) {
+ case DHT_FIND_NODE_REQ:
+ /* FALLTHRU */
+ case DHT_FIND_VALUE_REQ:
+ key = msg->find->key.data;
+ break;
+ case DHT_STORE:
+ key = msg->store->key.data;
+ break;
+ default:
+ log_err("Invalid DHT msg code (%d).", msg->code);
+ return -1;
}
- while ((state = lookup_wait(lu)) != LU_COMPLETE) {
- switch (state) {
- case LU_UPDATE:
- lookup_new_addrs(lu, addrs);
- if (addrs[0] == 0)
- break;
+ list_head_init(&cl);
- kad_find(dht, lu, addrs, code);
- break;
- case LU_DESTROY:
- lookup_detach(dht, lu);
- lookup_set_state(lu, LU_NULL);
- return NULL;
- default:
- break;
- }
+ if (dht_kv_contact_list(key, &cl, max) < 0) {
+ log_err(KEY_FMT " Failed to get contact list.", KEY_VAL(key));
+ goto fail_contacts;
}
- assert(state == LU_COMPLETE);
+ if (list_is_empty(&cl)) {
+ log_warn(KEY_FMT " No available contacts.", KEY_VAL(key));
+ goto fail_contacts;
+ }
- lookup_detach(dht, lu);
+ if (dht_kv_create_peer_list(&cl, pl, msg->code) < 0) {
+ log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key));
+ goto fail_peers;
+ }
- return lu;
+ contact_list_destroy(&cl);
+ return 0;
+ fail_peers:
+ contact_list_destroy(&cl);
+ fail_contacts:
+ return -1;
}
-static void kad_publish(struct dht * dht,
- const uint8_t * key,
- uint64_t addr,
- time_t exp)
+static int dht_kv_store_remote(const uint8_t * key,
+ const buffer_t val,
+ time_t exp)
{
- struct lookup * lu;
- uint64_t * addrs;
- ssize_t n;
- size_t k;
- time_t t_expire;
+ dht_msg_t * msg;
+ struct timespec now;
+ struct list_head pl;
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
- assert(dht);
- assert(key);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- pthread_rwlock_rdlock(&dht->lock);
+ msg = dht_kv_store_msg(key, val, exp);
+ if (msg == NULL) {
+ log_err(KV_FMT " Failed to create %s.",
+ KV_VAL(key, val), dht_code_str[DHT_STORE]);
+ goto fail_msg;
+ }
- k = dht->k;
- t_expire = dht->t_expire;
+ list_head_init(&pl);
- pthread_rwlock_unlock(&dht->lock);
+ if (dht_kv_get_peer_list_for_msg(msg, &pl) < 0) {
+ log_dbg(KV_FMT " Failed to get peer list.", KV_VAL(key, val));
+ goto fail_peer_list;
+ }
- addrs = malloc(k * sizeof(*addrs));
- if (addrs == NULL)
- return;
+ if (dht_kv_send_msgs(msg, &pl) < 0) {
+ log_warn(KV_FMT " Failed to send any %s msg.",
+ KV_VAL(key, val), DHT_CODE(msg));
+ goto fail_msgs;
+ }
- lu = kad_lookup(dht, key, KAD_FIND_NODE);
- if (lu == NULL) {
- free(addrs);
- return;
+ dht_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msgs:
+ peer_list_destroy(&pl);
+ fail_peer_list:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return -1;
+}
+
+/* recursive lookup, start with pl NULL */
+static int dht_kv_query_contacts(const uint8_t * key,
+ struct list_head * pl)
+{
+ struct list_head p;
+
+ dht_msg_t * msg;
+
+ assert(key != NULL);
+
+ msg = dht_kv_find_node_req_msg(key);
+ if (msg == NULL) {
+ log_err(KEY_FMT " Failed to create %s msg.",
+ KEY_VAL(key), dht_code_str[DHT_FIND_NODE_REQ]);
+ goto fail_msg;
}
- n = lookup_contact_addrs(lu, addrs);
+ if (pl == NULL) {
+ list_head_init(&p);
+ pl = &p;
+ }
- while (n-- > 0) {
- if (addrs[n] == dht->addr) {
- dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT;
- msg.id.data = (uint8_t *) key;
- msg.id.len = dht->b;
- msg.addr = addr;
- kad_add(dht, &msg, 1, exp);
- } else {
- if (kad_store(dht, key, addr, addrs[n], t_expire))
- log_warn("Failed to send store message.");
- }
+ if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) {
+ log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key));
+ goto fail_peer_list;
}
- lookup_destroy(lu);
+ if (dht_kv_update_req(key, pl) < 0) {
+ log_warn(KEY_FMT " Failed to update req.", KEY_VAL(key));
+ goto fail_update;
+ }
- free(addrs);
+ if (dht_kv_send_msgs(msg, pl)) {
+ log_warn(KEY_FMT " Failed to send any %s msg.",
+ KEY_VAL(key), DHT_CODE(msg));
+ goto fail_update;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_update:
+ peer_list_destroy(pl);
+ fail_peer_list:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return -1;
}
-static int kad_join(struct dht * dht,
- uint64_t addr)
+/* recursive lookup, start with pl NULL */
+static ssize_t dht_kv_query_remote(const uint8_t * key,
+ buffer_t ** vals,
+ struct list_head * pl)
{
- dht_msg_t msg = DHT_MSG__INIT;
+ struct list_head p;
+ dht_msg_t * msg;
- msg.code = KAD_JOIN;
+ assert(key != NULL);
- msg.has_alpha = true;
- msg.has_b = true;
- msg.has_k = true;
- msg.has_t_refresh = true;
- msg.has_t_replicate = true;
- msg.alpha = KAD_ALPHA;
- msg.k = KAD_K;
- msg.t_refresh = KAD_T_REFR;
- msg.t_replicate = KAD_T_REPL;
+ msg = dht_kv_find_value_req_msg(key);
+ if (msg == NULL) {
+ log_err(KEY_FMT " Failed to create value req.", KEY_VAL(key));
+ goto fail_msg;
+ }
- pthread_rwlock_rdlock(&dht->lock);
+ if (pl == NULL) {
+ list_head_init(&p);
+ pl = &p;
+ }
- msg.b = dht->b;
+ if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) {
+ log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key));
+ goto fail_peer_list;
+ }
- pthread_rwlock_unlock(&dht->lock);
+ if (dht_kv_update_req(key, pl) < 0) {
+ log_err(KEY_FMT " Failed to update request.", KEY_VAL(key));
+ goto fail_update;
+ }
- if (send_msg(dht, &msg, addr) < 0)
- return -1;
+ if (dht_kv_send_msgs(msg, pl)) {
+ log_warn(KEY_FMT " Failed to send %s msg.",
+ KEY_VAL(key), DHT_CODE(msg));
+ goto fail_update;
+ }
- if (wait_resp(dht, &msg, KAD_T_JOIN) < 0)
- return -1;
+ dht_msg__free_unpacked(msg, NULL);
- dht->id = create_id(dht->b);
- if (dht->id == NULL)
- return -1;
+ if (vals == NULL) /* recursive lookup, already waiting */
+ return 0;
- pthread_rwlock_wrlock(&dht->lock);
+ return dht_kv_wait_req(key, vals);
+ fail_update:
+ peer_list_destroy(pl);
+ fail_peer_list:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return -1;
+}
- dht_update_bucket(dht, dht->id, dht->addr);
+static void __add_dht_kv_entry(struct dht_entry * e)
+{
+ struct list_head * p;
- pthread_rwlock_unlock(&dht->lock);
+ assert(e != NULL);
- return 0;
+ list_for_each(p, &dht.db.kv.list) {
+ struct dht_entry * d = list_entry(p, struct dht_entry, next);
+ if (IS_CLOSER(d->key, e->key))
+ continue;
+ break;
+ }
+
+ list_add_tail(&e->next, p);
+ ++dht.db.kv.len;
}
-static void dht_dead_peer(struct dht * dht,
- uint8_t * key,
- uint64_t addr)
+/* incoming store message */
+static int dht_kv_store(const uint8_t * key,
+ const buffer_t val,
+ time_t exp)
{
- struct list_head * p;
- struct list_head * h;
- struct bucket * b;
+ struct dht_entry * e;
+ bool new = false;
- b = dht_get_bucket(dht, key);
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
- list_for_each_safe(p, h, &b->contacts) {
- struct contact * c = list_entry(p, struct contact, next);
- if (b->n_contacts + b->n_alts <= dht->k) {
- ++c->fails;
- return;
- }
+ pthread_rwlock_wrlock(&dht.db.lock);
- if (c->addr == addr) {
- list_del(&c->next);
- contact_destroy(c);
- --b->n_contacts;
- break;
- }
+ e = __dht_kv_find_entry(key);
+ if (e == NULL) {
+ log_dbg(KV_FMT " Adding entry (store).", KV_VAL(key, val));
+ e = dht_entry_create(key);
+ if (e == NULL)
+ goto fail;
+
+ new = true;
+
+ __add_dht_kv_entry(e);
}
- while (b->n_contacts < dht->k && b->n_alts > 0) {
- struct contact * c;
- c = list_first_entry(&b->alts, struct contact, next);
- list_del(&c->next);
- --b->n_alts;
- list_add(&c->next, &b->contacts);
- ++b->n_contacts;
+ if (dht_entry_update_val(e, val, exp) < 0)
+ goto fail_add;
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ return 0;
+ fail_add:
+ if (new) {
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ --dht.db.kv.len;
}
+ fail:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
}
-static int dht_del(struct dht * dht,
- const uint8_t * key,
- uint64_t addr)
+static int dht_kv_publish(const uint8_t * key,
+ const buffer_t val)
{
struct dht_entry * e;
+ struct timespec now;
+ bool new = false;
+
+ assert(key != NULL);
+ assert(val.data != NULL);
+ assert(val.len > 0);
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&dht.db.lock);
- e = dht_find_entry(dht, key);
+ e = __dht_kv_find_entry(key);
if (e == NULL) {
- return -EPERM;
+ log_dbg(KV_FMT " Adding entry (publish).", KV_VAL(key, val));
+ e = dht_entry_create(key);
+ if (e == NULL)
+ goto fail;
+
+ __add_dht_kv_entry(e);
+ new = true;
}
- dht_entry_del_addr(e, addr);
+ if (dht_entry_update_lval(e, val) < 0)
+ goto fail_add;
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ dht_kv_store_remote(key, val, now.tv_sec + dht.t_expire);
return 0;
+ fail_add:
+ if (new) {
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ --dht.db.kv.len;
+ }
+ fail:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -1;
}
-static buffer_t dht_retrieve(struct dht * dht,
- const uint8_t * key)
+static int dht_kv_unpublish(const uint8_t * key,
+ const buffer_t val)
{
struct dht_entry * e;
- struct list_head * p;
- buffer_t buf;
- uint64_t * pos;
- size_t addrs = 0;
+ int rc;
- pthread_rwlock_rdlock(&dht->lock);
+ assert(key != NULL);
- e = dht_find_entry(dht, key);
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
if (e == NULL)
- goto fail;
+ goto no_entry;
- buf.len = MIN(DHT_RETR_ADDR, e->n_vals);
- if (buf.len == 0)
- goto fail;
+ rc = dht_entry_remove_lval(e, val);
- pos = malloc(sizeof(dht->addr) * buf.len);
- if (pos == NULL)
- goto fail;
+ pthread_rwlock_unlock(&dht.db.lock);
- buf.data = (uint8_t *) pos;
+ return rc;
+ no_entry:
+ pthread_rwlock_unlock(&dht.db.lock);
+ return -ENOENT;
- list_for_each(p, &e->vals) {
- struct val * v = list_entry(p, struct val, next);
- *pos++ = v->addr;
- if (++addrs >= buf.len)
- break;
+}
+
+/* message validation */
+static int dht_kv_validate_store_msg(const dht_store_msg_t * store)
+{
+ if (store == NULL) {
+ log_warn("Store in msg is NULL.");
+ return -EINVAL;
}
- pthread_rwlock_unlock(&dht->lock);
+ if (store->key.data == NULL || store->key.len == 0) {
+ log_warn("Invalid key in DHT store msg.");
+ return -EINVAL;
+ }
- return buf;
+ if (store->key.len != dht.id.len) {
+ log_warn("Invalid key length in DHT store msg.");
+ return -EINVAL;
+ }
- fail:
- pthread_rwlock_unlock(&dht->lock);
- buf.len = 0;
- buf.data = NULL;
- return buf;
+ if (store->val.data == NULL || store->val.len == 0) {
+ log_warn("Invalid value in DHT store msg.");
+ return -EINVAL;
+ }
+
+ return 0;
}
-static ssize_t dht_get_contacts(struct dht * dht,
- const uint8_t * key,
- dht_contact_msg_t *** msgs)
+static int validate_find_req_msg(const dht_find_req_msg_t * req)
{
- struct list_head l;
- struct list_head * p;
- struct list_head * h;
- size_t len;
- size_t i = 0;
+ if (req == NULL) {
+ log_warn("Request in msg is NULL.");
+ return -EINVAL;
+ }
- list_head_init(&l);
+ if (req->key.data == NULL || req->key.len == 0) {
+ log_warn("Find request without key.");
+ return -EINVAL;
+ }
- pthread_rwlock_wrlock(&dht->lock);
+ if (req->key.len != dht.id.len) {
+ log_warn("Invalid key length in request msg.");
+ return -EINVAL;
+ }
- len = dht_contact_list(dht, &l, key);
- if (len == 0) {
- pthread_rwlock_unlock(&dht->lock);
- *msgs = NULL;
+ return 0;
+}
+
+static int validate_node_rsp_msg(const dht_find_node_rsp_msg_t * rsp)
+{
+ if (rsp == NULL) {
+ log_warn("Node rsp in msg is NULL.");
+ return -EINVAL;
+ }
+
+ if (rsp->key.data == NULL) {
+ log_warn("Invalid key in DHT response msg.");
+ return -EINVAL;
+ }
+
+ if (rsp->key.len != dht.id.len) {
+ log_warn("Invalid key length in DHT response msg.");
+ return -EINVAL;
+ }
+
+ if (!dht_kv_has_req(rsp->key.data, rsp->cookie)) {
+ log_warn(KEY_FMT " No request " CK_FMT ".",
+ KEY_VAL(rsp->key.data), CK_VAL(rsp->cookie));
+
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int validate_value_rsp_msg(const dht_find_value_rsp_msg_t * rsp)
+{
+ if (rsp == NULL) {
+ log_warn("Invalid DHT find value response msg.");
+ return -EINVAL;
+ }
+
+ if (rsp->values == NULL && rsp->n_values > 0) {
+ log_dbg("No values in DHT response msg.");
return 0;
}
- *msgs = malloc(len * sizeof(**msgs));
- if (*msgs == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ if (rsp->n_values == 0 && rsp->values != NULL) {
+ log_dbg("DHT response did not set values NULL.");
return 0;
}
- list_for_each_safe(p, h, &l) {
- struct contact * c = list_entry(p, struct contact, next);
- (*msgs)[i] = malloc(sizeof(***msgs));
- if ((*msgs)[i] == NULL) {
- pthread_rwlock_unlock(&dht->lock);
- while (i > 0)
- free(*msgs[--i]);
- free(*msgs);
- *msgs = NULL;
- return 0;
- }
+ return 0;
+}
- dht_contact_msg__init((*msgs)[i]);
+static int dht_kv_validate_msg(dht_msg_t * msg)
+{
- (*msgs)[i]->id.data = c->id;
- (*msgs)[i]->id.len = dht->b;
- (*msgs)[i++]->addr = c->addr;
- list_del(&c->next);
- free(c);
+ assert(msg != NULL);
+
+ if (msg->src->id.len != dht.id.len) {
+ log_warn("%s Invalid source contact ID.", DHT_CODE(msg));
+ return -EINVAL;
}
- pthread_rwlock_unlock(&dht->lock);
+ if (msg->src->addr == INVALID_ADDR) {
+ log_warn("%s Invalid source address.", DHT_CODE(msg));
+ return -EINVAL;
+ }
- return i;
+ switch (msg->code) {
+ case DHT_FIND_VALUE_REQ:
+ /* FALLTHRU */
+ case DHT_FIND_NODE_REQ:
+ if (validate_find_req_msg(msg->find) < 0)
+ return -EINVAL;
+ break;
+ case DHT_FIND_VALUE_RSP:
+ if (validate_value_rsp_msg(msg->val) < 0)
+ return -EINVAL;
+ /* FALLTHRU */
+ case DHT_FIND_NODE_RSP:
+ if (validate_node_rsp_msg(msg->node) < 0)
+ return -EINVAL;
+ break;
+ case DHT_STORE:
+ if (dht_kv_validate_store_msg(msg->store) < 0)
+ return -EINVAL;
+ break;
+ default:
+ log_warn("Invalid DHT msg code (%d).", msg->code);
+ return -ENOENT;
+ }
+
+ return 0;
}
-static time_t gcd(time_t a,
- time_t b)
+static void do_dht_kv_store(const dht_store_msg_t * store)
{
- if (a == 0)
- return b;
+ struct tm * tm;
+ char tmstr[RIB_TM_STRLEN];
+ buffer_t val;
+ uint8_t * key;
+ time_t exp;
- return gcd(b % a, a);
+ assert(store != NULL);
+
+ val.data = store->val.data;
+ val.len = store->val.len;
+ key = store->key.data;
+ exp = store->exp;
+
+ if (dht_kv_store(store->key.data, val, store->exp) < 0) {
+ log_err(KV_FMT " Failed to store.", KV_VAL(key, val));
+ return;
+ }
+
+ tm = gmtime(&exp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+ log_info(KV_FMT " Stored value until %s.", KV_VAL(key, val), tmstr);
}
-static void * work(void * o)
+static dht_msg_t * do_dht_kv_find_node_req(const dht_find_req_msg_t * req)
{
- struct dht * dht;
- struct timespec now;
- struct list_head * p;
- struct list_head * h;
- struct list_head reflist;
- time_t intv;
- struct lookup * lu;
+ dht_contact_msg_t ** contacts;
+ dht_msg_t * rsp;
+ uint8_t * key;
+ uint64_t cookie;
+ ssize_t len;
- dht = (struct dht *) o;
+ assert(req != NULL);
- pthread_rwlock_rdlock(&dht->lock);
+ key = req->key.data;
+ cookie = req->cookie;
- intv = gcd(dht->t_expire, dht->t_repub);
- intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
+ len = dht_kv_get_contacts(key, &contacts);
+ if (len < 0) {
+ log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key));
+ goto fail_contacts;
+ }
- pthread_rwlock_unlock(&dht->lock);
+ rsp = dht_kv_find_node_rsp_msg(key, cookie, &contacts, len);
+ if (rsp == NULL) {
+ log_err(KEY_FMT " Failed to create %s.", KEY_VAL(key),
+ dht_code_str[DHT_FIND_NODE_RSP]);
+ goto fail_msg;
+ }
- list_head_init(&reflist);
+ assert(rsp->code == DHT_FIND_NODE_RSP);
- while (true) {
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_wrlock(&dht->lock);
-
- /* Republish registered hashes. */
- list_for_each(p, &dht->refs) {
- struct ref_entry * e;
- uint8_t * key;
- uint64_t addr;
- time_t t_expire;
- e = list_entry(p, struct ref_entry, next);
- if (now.tv_sec > e->t_rep) {
- key = dht_dup_key(e->key, dht->b);
- if (key == NULL)
- continue;
- addr = dht->addr;
- t_expire = dht->t_expire;
- e->t_rep = now.tv_sec + dht->t_repub;
-
- pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, addr, t_expire);
- pthread_rwlock_wrlock(&dht->lock);
- free(key);
- }
- }
+ log_info(KEY_FMT " Responding with %zd contacts", KEY_VAL(key), len);
- /* Remove stale entries and republish if necessary. */
- list_for_each_safe(p, h, &dht->entries) {
- struct list_head * p1;
- struct list_head * h1;
- struct dht_entry * e;
- uint8_t * key;
- time_t t_expire;
- e = list_entry (p, struct dht_entry, next);
- list_for_each_safe(p1, h1, &e->vals) {
- struct val * v;
- uint64_t addr;
- v = list_entry(p1, struct val, next);
- if (now.tv_sec > v->t_exp) {
- list_del(&v->next);
- val_destroy(v);
- continue;
- }
-
- if (now.tv_sec > v->t_rep) {
- key = dht_dup_key(e->key, dht->b);
- addr = v->addr;
- t_expire = dht->t_expire = now.tv_sec;
- v->t_rep = now.tv_sec + dht->t_replic;
- pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, addr, t_expire);
- pthread_rwlock_wrlock(&dht->lock);
- free(key);
- }
- }
- }
+ return rsp;
+ fail_msg:
+ while (len-- > 0)
+ dht_contact_msg__free_unpacked(contacts[len], NULL);
+ free(contacts);
+ fail_contacts:
+ return NULL;
+}
- /* Check the requests list for unresponsive nodes. */
- list_for_each_safe(p, h, &dht->requests) {
- struct kad_req * r;
- r = list_entry(p, struct kad_req, next);
- if (now.tv_sec > r->t_exp) {
- list_del(&r->next);
- bmp_release(dht->cookies, r->cookie);
- dht_dead_peer(dht, r->key, r->addr);
- kad_req_destroy(r);
- }
- }
+static void dht_kv_process_node_rsp(dht_contact_msg_t ** contacts,
+ size_t len,
+ struct list_head * pl,
+ enum dht_code code)
+{
+ struct timespec now;
+ size_t i;
- /* Refresh unaccessed buckets. */
- bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist);
+ assert(contacts != NULL);
+ assert(len > 0);
+ assert(pl != NULL);
+ assert(list_is_empty(pl));
- pthread_rwlock_unlock(&dht->lock);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- list_for_each_safe(p, h, &reflist) {
- struct contact * c;
- c = list_entry(p, struct contact, next);
- lu = kad_lookup(dht, c->id, KAD_FIND_NODE);
- if (lu != NULL)
- lookup_destroy(lu);
- list_del(&c->next);
- contact_destroy(c);
+ for (i = 0; i < len; i++) {
+ dht_contact_msg_t * c = contacts[i];
+ struct peer_entry * e;
+ if (c->addr == dht.addr)
+ continue;
+
+ if (dht_kv_update_contacts(c->id.data, c->addr) < 0)
+ log_warn(PEER_FMT " Failed to update contacts.",
+ PEER_VAL(c->id.data, c->addr));
+
+ e = malloc(sizeof(*e));
+ if (e == NULL) {
+ log_err(PEER_FMT " Failed to malloc entry.",
+ PEER_VAL(c->id.data, c->addr));
+ continue;
}
- sleep(intv);
- }
+ e->id = dht_dup_key(c->id.data);
+ if (e->id == NULL) {
+ log_warn(PEER_FMT " Failed to duplicate id.",
+ PEER_VAL(c->id.data, c->addr));
+ free(e);
+ continue;
+ }
- return (void *) 0;
+ e->cookie = generate_cookie();
+ e->code = code;
+ e->addr = c->addr;
+ e->t_sent = now.tv_sec;
+
+ list_add_tail(&e->next, pl);
+ }
}
-static int kad_handle_join_resp(struct dht * dht,
- struct kad_req * req,
- dht_msg_t * msg)
+static dht_msg_t * do_dht_kv_find_value_req(const dht_find_req_msg_t * req)
{
- assert(dht);
- assert(req);
- assert(msg);
+ dht_contact_msg_t ** contacts;
+ ssize_t n_contacts;
+ buffer_t * vals;
+ ssize_t n_vals;
+ dht_msg_t * rsp;
+ uint8_t * key;
+ uint64_t cookie;
- /* We might send version numbers later to warn of updates if needed. */
- if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire &&
- msg->has_t_refresh && msg->has_t_replicate)) {
- log_warn("Join refused by remote.");
- return -1;
+ assert(req != NULL);
+
+ key = req->key.data;
+ cookie = req->cookie;
+
+ n_contacts = dht_kv_get_contacts(key, &contacts);
+ if (n_contacts < 0) {
+ log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key));
+ goto fail_contacts;
}
- if (msg->b < sizeof(uint64_t)) {
- log_err("Hash sizes less than 8 bytes unsupported.");
- return -1;
+ assert(n_contacts > 0 || contacts == NULL);
+
+ n_vals = dht_kv_retrieve(key, &vals);
+ if (n_vals < 0) {
+ log_dbg(KEY_FMT " Failed to get values.", KEY_VAL(key));
+ goto fail_vals;
}
- pthread_rwlock_wrlock(&dht->lock);
+ if (n_vals == 0)
+ log_dbg(KEY_FMT " No values found.", KEY_VAL(key));
- dht->buckets = bucket_create();
- if (dht->buckets == NULL) {
- pthread_rwlock_unlock(&dht->lock);
- return -1;
+ rsp = dht_kv_find_value_rsp_msg(key, cookie, &contacts, n_contacts,
+ &vals, n_vals);
+ if (rsp == NULL) {
+ log_err(KEY_FMT " Failed to create %s msg.",
+ KEY_VAL(key), dht_code_str[DHT_FIND_VALUE_RSP]);
+ goto fail_msg;
}
- /* Likely corrupt packet. The member will refuse, we might here too. */
- if (msg->alpha != KAD_ALPHA || msg->k != KAD_K)
- log_warn("Different kademlia parameters detected.");
+ log_info(KEY_FMT " Responding with %zd contacts, %zd values.",
+ KEY_VAL(req->key.data), n_contacts, n_vals);
- if (msg->t_replicate != KAD_T_REPL)
- log_warn("Different kademlia replication time detected.");
+ return rsp;
- if (msg->t_refresh != KAD_T_REFR)
- log_warn("Different kademlia refresh time detected.");
+ fail_msg:
+ freebufs(vals, n_vals);
+ fail_vals:
+ while (n_contacts-- > 0)
+ dht_contact_msg__free_unpacked(contacts[n_contacts], NULL);
+ free(contacts);
+ fail_contacts:
+ return NULL;
+}
- dht->k = msg->k;
- dht->b = msg->b;
- dht->t_expire = msg->t_expire;
- dht->t_repub = MAX(1, dht->t_expire - 10);
+static void do_dht_kv_find_node_rsp(const dht_find_node_rsp_msg_t * rsp)
+{
+ struct list_head pl;
- if (pthread_create(&dht->worker, NULL, work, dht)) {
- bucket_destroy(dht->buckets);
- pthread_rwlock_unlock(&dht->lock);
- return -1;
- }
+ assert(rsp != NULL);
- kad_req_respond(req);
+ list_head_init(&pl);
- dht_update_bucket(dht, msg->s_id.data, msg->s_addr);
+ dht_kv_process_node_rsp(rsp->contacts, rsp->n_contacts, &pl,
+ DHT_FIND_NODE_REQ);
- pthread_rwlock_unlock(&dht->lock);
+ if (list_is_empty(&pl))
+ goto no_contacts;
- log_dbg("Enrollment of DHT completed.");
+ if (dht_kv_update_req(rsp->key.data, &pl) < 0) {
+ log_err(KEY_FMT " Failed to update request.",
+ KEY_VAL(rsp->key.data));
+ goto fail_update;
+ }
- return 0;
+ dht_kv_query_contacts(rsp->key.data, &pl);
+
+ return;
+
+ fail_update:
+ peer_list_destroy(&pl);
+ no_contacts:
+ return;
}
-static int kad_handle_find_resp(struct dht * dht,
- struct kad_req * req,
- dht_msg_t * msg)
+static void do_dht_kv_find_value_rsp(const dht_find_node_rsp_msg_t * node,
+ const dht_find_value_rsp_msg_t * val)
{
- struct lookup * lu;
+ struct list_head pl;
+ uint8_t * key;
- assert(dht);
- assert(req);
- assert(msg);
+ assert(node != NULL);
+ assert(val != NULL);
- pthread_rwlock_rdlock(&dht->lock);
+ list_head_init(&pl);
- lu = dht_find_lookup(dht, req->cookie);
- if (lu == NULL) {
- pthread_rwlock_unlock(&dht->lock);
- return -1;
+ key = node->key.data;
+
+ dht_kv_process_node_rsp(node->contacts, node->n_contacts, &pl,
+ DHT_FIND_VALUE_REQ);
+
+ if (val->n_values > 0) {
+ log_dbg(KEY_FMT " %zd new values received.",
+ KEY_VAL(key), val->n_values);
+ dht_kv_respond_req(key, val->values, val->n_values);
+ peer_list_destroy(&pl);
+ return; /* done! */
}
- lookup_update(dht, lu, msg);
+ if (list_is_empty(&pl))
+ goto no_contacts;
- pthread_rwlock_unlock(&dht->lock);
+ if (dht_kv_update_req(key, &pl) < 0) {
+ log_err(KEY_FMT " Failed to update request.", KEY_VAL(key));
+ goto fail_update;
+ }
- return 0;
+ dht_kv_query_remote(key, NULL, &pl);
+
+ return;
+ fail_update:
+ peer_list_destroy(&pl);
+ no_contacts:
+ return;
}
-static void kad_handle_response(struct dht * dht,
- dht_msg_t * msg)
+static dht_msg_t * dht_wait_for_dht_msg(void)
{
- struct kad_req * req;
+ dht_msg_t * msg;
+ struct cmd * cmd;
- assert(dht);
- assert(msg);
+ pthread_mutex_lock(&dht.cmds.mtx);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &dht.cmds.mtx);
- req = dht_find_request(dht, msg);
- if (req == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ while (list_is_empty(&dht.cmds.list))
+ pthread_cond_wait(&dht.cmds.cond, &dht.cmds.mtx);
+
+ cmd = list_last_entry(&dht.cmds.list, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_cleanup_pop(true);
+
+ msg = dht_msg__unpack(NULL, cmd->cbuf.len, cmd->cbuf.data);
+ if (msg == NULL)
+ log_warn("Failed to unpack DHT msg.");
+
+ freebuf(cmd->cbuf);
+ free(cmd);
+
+ return msg;
+}
+
+static void do_dht_msg(dht_msg_t * msg)
+{
+ dht_msg_t * rsp = NULL;
+ uint8_t * id;
+ uint64_t addr;
+
+#ifdef DEBUG_PROTO_DHT
+ dht_kv_debug_msg_rcv(msg);
+#endif
+ if (dht_kv_validate_msg(msg) == -EINVAL) {
+ log_warn("%s Validation failed.", DHT_CODE(msg));
+ dht_msg__free_unpacked(msg, NULL);
return;
}
- bmp_release(dht->cookies, req->cookie);
- list_del(&req->next);
+ id = msg->src->id.data;
+ addr = msg->src->addr;
- pthread_rwlock_unlock(&dht->lock);
+ if (dht_kv_update_contacts(id, addr) < 0)
+ log_warn(PEER_FMT " Failed to update contact from msg src.",
+ PEER_VAL(id, addr));
- switch(req->code) {
- case KAD_JOIN:
- if (kad_handle_join_resp(dht, req, msg))
- log_err("Enrollment of DHT failed.");
+ pthread_cleanup_push(__cleanup_dht_msg, msg);
+
+ switch(msg->code) {
+ case DHT_FIND_VALUE_REQ:
+ rsp = do_dht_kv_find_value_req(msg->find);
break;
- case KAD_FIND_VALUE:
- case KAD_FIND_NODE:
- if (dht_get_state(dht) != DHT_RUNNING)
- break;
- kad_handle_find_resp(dht, req, msg);
+ case DHT_FIND_NODE_REQ:
+ rsp = do_dht_kv_find_node_req(msg->find);
break;
- default:
+ case DHT_STORE:
+ do_dht_kv_store(msg->store);
break;
+ case DHT_FIND_NODE_RSP:
+ do_dht_kv_find_node_rsp(msg->node);
+ break;
+ case DHT_FIND_VALUE_RSP:
+ do_dht_kv_find_value_rsp(msg->node, msg->val);
+ break;
+ default:
+ assert(false); /* already validated */
}
- kad_req_destroy(req);
+ pthread_cleanup_pop(true);
+
+ if (rsp == NULL)
+ return;
+
+ pthread_cleanup_push(__cleanup_dht_msg, rsp);
+
+ dht_send_msg(rsp, addr);
+
+ pthread_cleanup_pop(true); /* free rsp */
}
-int dht_bootstrap(void * dir)
+static void * dht_handle_packet(void * o)
{
- struct dht * dht;
+ (void) o;
- dht = (struct dht *) dir;
+ while (true) {
+ dht_msg_t * msg;
- assert(dht);
+ msg = dht_wait_for_dht_msg();
+ if (msg == NULL)
+ continue;
+
+ tpm_begin_work(dht.tpm);
- pthread_rwlock_wrlock(&dht->lock);
+ do_dht_msg(msg);
+ tpm_end_work(dht.tpm);
+ }
+
+ return (void *) 0;
+}
#ifndef __DHT_TEST__
- dht->b = ipcp_dir_hash_len();
-#else
- dht->b = DHT_TEST_KEY_LEN;
-#endif
+static void dht_post_packet(void * comp,
+ struct shm_du_buff * sdb)
+{
+ struct cmd * cmd;
- dht->id = create_id(dht->b);
- if (dht->id == NULL)
- goto fail_id;
+ (void) comp;
- dht->buckets = bucket_create();
- if (dht->buckets == NULL)
- goto fail_buckets;
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command malloc failed.");
+ goto fail_cmd;
+ }
- dht->buckets->depth = 0;
- dht->buckets->mask = 0;
+ cmd->cbuf.data = malloc(shm_du_buff_len(sdb));
+ if (cmd->cbuf.data == NULL) {
+ log_err("Command buffer malloc failed.");
+ goto fail_buf;
+ }
- dht->t_expire = 86400; /* 1 day */
- dht->t_repub = dht->t_expire - 10;
- dht->k = KAD_K;
+ cmd->cbuf.len = shm_du_buff_len(sdb);
- if (pthread_create(&dht->worker, NULL, work, dht))
- goto fail_pthread_create;
+ memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len);
- dht->state = DHT_RUNNING;
+ ipcp_sdb_release(sdb);
- dht_update_bucket(dht, dht->id, dht->addr);
+ pthread_mutex_lock(&dht.cmds.mtx);
- pthread_rwlock_unlock(&dht->lock);
+ list_add(&cmd->next, &dht.cmds.list);
- return 0;
+ pthread_cond_signal(&dht.cmds.cond);
- fail_pthread_create:
- bucket_destroy(dht->buckets);
- dht->buckets = NULL;
- fail_buckets:
- free(dht->id);
- dht->id = NULL;
- fail_id:
- pthread_rwlock_unlock(&dht->lock);
- return -1;
+ pthread_mutex_unlock(&dht.cmds.mtx);
+
+ return;
+
+ fail_buf:
+ free(cmd);
+ fail_cmd:
+ ipcp_sdb_release(sdb);
+ return;
}
+#endif
-static struct ref_entry * ref_entry_get(struct dht * dht,
- const uint8_t * key)
+int dht_reg(const uint8_t * key)
{
- struct list_head * p;
+ buffer_t val;
- list_for_each(p, &dht->refs) {
- struct ref_entry * r = list_entry(p, struct ref_entry, next);
- if (!memcmp(key, r->key, dht-> b) )
- return r;
+ if (addr_to_buf(dht.addr, &val) < 0) {
+ log_err("Failed to convert address to buffer.");
+ goto fail_a2b;
}
- return NULL;
+ if (dht_kv_publish(key, val)) {
+ log_err(KV_FMT " Failed to publish.", KV_VAL(key, val));
+ goto fail_publish;
+ }
+
+ freebuf(val);
+
+ return 0;
+ fail_publish:
+ freebuf(val);
+ fail_a2b:
+ return -1;
}
-int dht_reg(void * dir,
- const uint8_t * key)
+int dht_unreg(const uint8_t * key)
{
- struct dht * dht;
- struct ref_entry * e;
- uint64_t addr;
- time_t t_expire;
+ buffer_t val;
- dht = (struct dht *) dir;
+ if (addr_to_buf(dht.addr, &val) < 0) {
+ log_err("Failed to convert address to buffer.");
+ goto fail_a2b;
+ }
- assert(dht);
- assert(key);
- assert(dht->addr != 0);
+ if (dht_kv_unpublish(key, val)) {
+ log_err(KV_FMT " Failed to unpublish.", KV_VAL(key, val));
+ goto fail_unpublish;
+ }
- if (dht_wait_running(dht))
- return -1;
+ freebuf(val);
- pthread_rwlock_wrlock(&dht->lock);
+ return 0;
+ fail_unpublish:
+ freebuf(val);
+ fail_a2b:
+ return -ENOMEM;
+}
- if (ref_entry_get(dht, key) != NULL) {
- log_dbg("Name already registered.");
- pthread_rwlock_unlock(&dht->lock);
- return 0;
- }
+uint64_t dht_query(const uint8_t * key)
+{
+ buffer_t * vals;
+ ssize_t n;
+ uint64_t addr;
- e = ref_entry_create(dht, key);
- if (e == NULL) {
- pthread_rwlock_unlock(&dht->lock);
- return -ENOMEM;
+ n = dht_kv_retrieve(key, &vals);
+ if (n < 0) {
+ log_err(KEY_FMT " Failed to query db.", KEY_VAL(key));
+ goto fail_vals;
}
- list_add(&e->next, &dht->refs);
+ if (n == 0) {
+ log_dbg(KEY_FMT " No local values.", KEY_VAL(key));
+ n = dht_kv_query_remote(key, &vals, NULL);
+ if (n < 0) {
+ log_warn(KEY_FMT " Failed to query DHT.", KEY_VAL(key));
+ goto fail_vals;
+ }
+ if (n == 0) {
+ log_dbg(KEY_FMT " No values.", KEY_VAL(key));
+ goto no_vals;
+ }
+ }
- t_expire = dht->t_expire;
- addr = dht->addr;
+ if (buf_to_addr(vals[0], &addr) < 0) {
+ log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[0]));
+ goto fail_b2a;
+ }
- pthread_rwlock_unlock(&dht->lock);
+ if (n > 1 && addr == INVALID_ADDR && buf_to_addr(vals[1], &addr) < 0) {
+ log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[1]));
+ goto fail_b2a;
+ }
- kad_publish(dht, key, addr, t_expire);
+ freebufs(vals, n);
- return 0;
+ return addr;
+ fail_b2a:
+ freebufs(vals, n);
+ return INVALID_ADDR;
+ no_vals:
+ free(vals);
+ fail_vals:
+ return INVALID_ADDR;
}
-int dht_unreg(void * dir,
- const uint8_t * key)
+static int emergency_peer(struct list_head * pl)
{
- struct dht * dht;
- struct list_head * p;
- struct list_head * h;
-
- dht = (struct dht *) dir;
+ struct peer_entry * e;
+ struct timespec now;
- assert(dht);
- assert(key);
+ assert(pl != NULL);
+ assert(list_is_empty(pl));
- if (dht_get_state(dht) != DHT_RUNNING)
+ if (dht.peer == INVALID_ADDR)
return -1;
- pthread_rwlock_wrlock(&dht->lock);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- list_for_each_safe(p, h, &dht->refs) {
- struct ref_entry * r = list_entry(p, struct ref_entry, next);
- if (!memcmp(key, r->key, dht-> b) ) {
- list_del(&r->next);
- ref_entry_destroy(r);
- }
+ e = malloc(sizeof(*e));
+ if (e == NULL) {
+ log_err("Failed to malloc emergency peer entry.");
+ goto fail_malloc;
}
- dht_del(dht, key, dht->addr);
+ e->id = dht_dup_key(dht.id.data);
+ if (e->id == NULL) {
+ log_err("Failed to duplicate DHT ID for emergency peer.");
+ goto fail_id;
+ }
+
+ e->addr = dht.peer;
+ e->cookie = dht.magic;
+ e->code = DHT_FIND_NODE_REQ;
+ e->t_sent = now.tv_sec;
- pthread_rwlock_unlock(&dht->lock);
+ list_add_tail(&e->next, pl);
return 0;
+ fail_id:
+ free(e);
+ fail_malloc:
+ return -ENOMEM;
}
-uint64_t dht_query(void * dir,
- const uint8_t * key)
+static int dht_kv_seed_bootstrap_peer(void)
{
- struct dht * dht;
- struct dht_entry * e;
- struct lookup * lu;
- uint64_t addrs[KAD_K];
- size_t n;
+ struct list_head pl;
- dht = (struct dht *) dir;
+ list_head_init(&pl);
- assert(dht);
+ if (dht.peer == INVALID_ADDR) {
+ log_dbg("No-one to contact.");
+ return 0;
+ }
- addrs[0] = 0;
+ if (emergency_peer(&pl) < 0) {
+ log_err("Could not create emergency peer.");
+ goto fail_peer;
+ }
- if (dht_wait_running(dht))
- return 0;
+ log_dbg("Pinging emergency peer " ADDR_FMT32 ".",
+ ADDR_VAL32(&dht.peer));
- pthread_rwlock_rdlock(&dht->lock);
+ if (dht_kv_query_contacts(dht.id.data, &pl) < 0) {
+ log_warn("Failed to bootstrap peer.");
+ goto fail_query;
+ }
- e = dht_find_entry(dht, key);
- if (e != NULL)
- addrs[0] = dht_entry_get_addr(dht, e);
+ peer_list_destroy(&pl);
- pthread_rwlock_unlock(&dht->lock);
+ return 0;
+ fail_query:
+ peer_list_destroy(&pl);
+ fail_peer:
+ return -EAGAIN;
+}
- if (addrs[0] != 0)
- return addrs[0];
+static void dht_kv_check_contacts(void)
+{
+ struct list_head cl;
+ struct list_head pl;
- lu = kad_lookup(dht, key, KAD_FIND_VALUE);
- if (lu == NULL)
- return 0;
+ list_head_init(&cl);
- n = lookup_get_addrs(lu, addrs);
- if (n == 0) {
- lookup_destroy(lu);
- return 0;
+ dht_kv_contact_list(dht.id.data, &cl, dht.k);
+
+ if (!list_is_empty(&cl))
+ goto success;
+
+ contact_list_destroy(&cl);
+
+ list_head_init(&pl);
+
+ if (dht.peer == INVALID_ADDR) {
+ log_dbg("No-one to contact.");
+ return;
}
- lookup_destroy(lu);
+ if (emergency_peer(&pl) < 0) {
+ log_err("Could not create emergency peer.");
+ goto fail_peer;
+ }
- /* Current behaviour is anycast and return the first peer address. */
- if (addrs[0] != dht->addr)
- return addrs[0];
+ log_dbg("No contacts found, using emergency peer " ADDR_FMT32 ".",
+ ADDR_VAL32(&dht.peer));
- if (n > 1)
- return addrs[1];
+ dht_kv_query_contacts(dht.id.data, &pl);
- return 0;
+ peer_list_destroy(&pl);
+
+ return;
+ success:
+ contact_list_destroy(&cl);
+ return;
+ fail_peer:
+ return;
}
-static void * dht_handle_packet(void * o)
+static void dht_kv_remove_expired_reqs(void)
{
- struct dht * dht = (struct dht *) o;
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
- assert(dht);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- while (true) {
- dht_msg_t * msg;
- dht_contact_msg_t ** cmsgs;
- dht_msg_t resp_msg = DHT_MSG__INIT;
- uint64_t addr;
- buffer_t buf;
- size_t i;
- size_t b;
- size_t t_expire;
- struct cmd * cmd;
+ pthread_mutex_lock(&dht.reqs.mtx);
- pthread_mutex_lock(&dht->mtx);
+ list_for_each_safe(p, h, &dht.reqs.list) {
+ struct dht_req * e;
+ e = list_entry(p, struct dht_req, next);
+ if (IS_EXPIRED(e, &now)) {
+ log_dbg(KEY_FMT " Removing expired request.",
+ KEY_VAL(e->key));
+ list_del(&e->next);
+ dht_req_destroy(e);
+ --dht.reqs.len;
+ }
+ }
- pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx);
+ pthread_mutex_unlock(&dht.reqs.mtx);
+}
- while (list_is_empty(&dht->cmds))
- pthread_cond_wait(&dht->cond, &dht->mtx);
+static void value_list_destroy(struct list_head * vl)
+{
+ struct list_head * p;
+ struct list_head * h;
- cmd = list_last_entry(&dht->cmds, struct cmd, next);
- list_del(&cmd->next);
+ assert(vl != NULL);
- pthread_cleanup_pop(true);
+ list_for_each_safe(p, h, vl) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ list_del(&v->next);
+ val_entry_destroy(v);
+ }
+}
- i = shm_du_buff_len(cmd->sdb);
+#define MUST_REPLICATE(v, now) ((now)->tv_sec > (v)->t_repl + dht.t_repl)
+#define MUST_REPUBLISH(v, now) /* Close to expiry deadline */ \
+ (((v)->t_exp - (now)->tv_sec) < (DHT_N_REPUB * dht.t_repl))
+static void dht_entry_get_repl_lists(const struct dht_entry * e,
+ struct list_head * repl,
+ struct list_head * rebl,
+ struct timespec * now)
+{
+ struct list_head * p;
+ struct val_entry * n;
- msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
-#ifndef __DHT_TEST__
- ipcp_sdb_release(cmd->sdb);
-#endif
- free(cmd);
+ list_for_each(p, &e->vals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (MUST_REPLICATE(v, now) && !IS_EXPIRED(v, now)) {
+ n = val_entry_create(v->val, v->t_exp);
+ if (n == NULL)
+ continue;
- if (msg == NULL) {
- log_err("Failed to unpack message.");
- continue;
+ list_add_tail(&n->next, repl);
}
+ }
- if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
- dht_msg__free_unpacked(msg, NULL);
- log_dbg("Got a request message when not running.");
- continue;
+ list_for_each(p, &e->lvals.list) {
+ struct val_entry * v = list_entry(p, struct val_entry, next);
+ if (MUST_REPLICATE(v, now) && MUST_REPUBLISH(v, now)) {
+ /* Add expire time here, to allow creating val_entry */
+ n = val_entry_create(v->val, now->tv_sec + dht.t_expire);
+ if (n == NULL)
+ continue;
+
+ list_add_tail(&n->next, rebl);
}
+ }
+}
- pthread_rwlock_rdlock(&dht->lock);
+static int dht_kv_next_values(uint8_t * key,
+ struct list_head * repl,
+ struct list_head * rebl)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ struct dht_entry * e = NULL;
- b = dht->b;
- t_expire = dht->t_expire;
+ assert(key != NULL);
+ assert(repl != NULL);
+ assert(rebl != NULL);
- pthread_rwlock_unlock(&dht->lock);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- if (msg->has_key && msg->key.len != b) {
- dht_msg__free_unpacked(msg, NULL);
- log_warn("Bad key in message.");
- continue;
- }
+ assert(list_is_empty(repl));
+ assert(list_is_empty(rebl));
- if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
- dht_msg__free_unpacked(msg, NULL);
- log_warn("Bad source ID in message of type %d.",
- msg->code);
- continue;
- }
+ pthread_rwlock_rdlock(&dht.db.lock);
- tpm_begin_work(dht->tpm);
+ if (dht.db.kv.len == 0)
+ goto no_entries;
- addr = msg->s_addr;
+ list_for_each_safe(p, h, &dht.db.kv.list) {
+ e = list_entry(p, struct dht_entry, next);
+ if (IS_CLOSER(e->key, key))
+ continue; /* Already processed */
+ }
- resp_msg.code = KAD_RESPONSE;
- resp_msg.cookie = msg->cookie;
+ if (e != NULL) {
+ memcpy(key, e->key, dht.id.len);
+ dht_entry_get_repl_lists(e, repl, rebl, &now);
+ }
+ no_entries:
+ pthread_rwlock_unlock(&dht.db.lock);
- switch(msg->code) {
- case KAD_JOIN:
- /* Refuse enrollee on check fails. */
- if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
- log_warn("Parameter mismatch. "
- "DHT enrolment refused.");
- break;
- }
+ return list_is_empty(repl) && list_is_empty(rebl) ? -ENOENT : 0;
+}
- if (msg->t_replicate != KAD_T_REPL) {
- log_warn("Replication time mismatch. "
- "DHT enrolment refused.");
+static void dht_kv_replicate_value(const uint8_t * key,
+ struct val_entry * v,
+ const struct timespec * now)
+{
+ assert(MUST_REPLICATE(v, now));
- break;
- }
+ (void) now;
- if (msg->t_refresh != KAD_T_REFR) {
- log_warn("Refresh time mismatch. "
- "DHT enrolment refused.");
- break;
- }
+ if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) {
+ log_dbg(KV_FMT " Replicated.", KV_VAL(key, v->val));
+ return;
+ }
- resp_msg.has_alpha = true;
- resp_msg.has_b = true;
- resp_msg.has_k = true;
- resp_msg.has_t_expire = true;
- resp_msg.has_t_refresh = true;
- resp_msg.has_t_replicate = true;
- resp_msg.alpha = KAD_ALPHA;
- resp_msg.b = b;
- resp_msg.k = KAD_K;
- resp_msg.t_expire = t_expire;
- resp_msg.t_refresh = KAD_T_REFR;
- resp_msg.t_replicate = KAD_T_REPL;
- break;
- case KAD_FIND_VALUE:
- buf = dht_retrieve(dht, msg->key.data);
- if (buf.len != 0) {
- resp_msg.n_addrs = buf.len;
- resp_msg.addrs = (uint64_t *) buf.data;
- break;
- }
- /* FALLTHRU */
- case KAD_FIND_NODE:
- /* Return k closest contacts. */
- resp_msg.n_contacts =
- dht_get_contacts(dht, msg->key.data, &cmsgs);
- resp_msg.contacts = cmsgs;
- break;
- case KAD_STORE:
- if (msg->n_contacts < 1) {
- log_warn("No contacts in store message.");
- break;
- }
+ log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val));
- if (!msg->has_t_expire) {
- log_warn("No expiry time in store message.");
- break;
- }
+ list_del(&v->next);
+ val_entry_destroy(v);
+}
- kad_add(dht, *msg->contacts, msg->n_contacts,
- msg->t_expire);
- break;
- case KAD_RESPONSE:
- kad_handle_response(dht, msg);
- break;
- default:
- assert(false);
- break;
- }
+static void dht_kv_republish_value(const uint8_t * key,
+ struct val_entry * v,
+ const struct timespec * now)
+{
+ assert(MUST_REPLICATE(v, now));
- if (msg->code != KAD_JOIN) {
- pthread_rwlock_wrlock(&dht->lock);
- if (dht_get_state(dht) == DHT_JOINING &&
- dht->buckets == NULL) {
- pthread_rwlock_unlock(&dht->lock);
- goto finish;
- }
+ if (MUST_REPUBLISH(v, now))
+ assert(v->t_exp >= now->tv_sec + dht.t_expire);
- if (dht_update_bucket(dht, msg->s_id.data, addr))
- log_warn("Failed to update bucket.");
- pthread_rwlock_unlock(&dht->lock);
- }
+ if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) {
+ log_dbg(KV_FMT " Republished.", KV_VAL(key, v->val));
+ return;
+ }
- if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0)
- log_warn("Failed to send response.");
+ if (MUST_REPUBLISH(v, now))
+ log_warn(KV_FMT " Republish failed.", KV_VAL(key, v->val));
+ else
+ log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val));
- finish:
- dht_msg__free_unpacked(msg, NULL);
+ list_del(&v->next);
+ val_entry_destroy(v);
+}
+
+static void dht_kv_update_replication_times(const uint8_t * key,
+ struct list_head * repl,
+ struct list_head * rebl,
+ const struct timespec * now)
+{
+ struct dht_entry * e;
+ struct list_head * p;
+ struct list_head * h;
+ struct val_entry * v;
- if (resp_msg.n_addrs > 0)
- free(resp_msg.addrs);
+ assert(key != NULL);
+ assert(repl != NULL);
+ assert(rebl != NULL);
+ assert(now != NULL);
- if (resp_msg.n_contacts == 0) {
- tpm_end_work(dht->tpm);
+ pthread_rwlock_wrlock(&dht.db.lock);
+
+ e = __dht_kv_find_entry(key);
+ if (e == NULL) {
+ pthread_rwlock_unlock(&dht.db.lock);
+ return;
+ }
+
+ list_for_each_safe(p, h, repl) {
+ struct val_entry * x;
+ v = list_entry(p, struct val_entry, next);
+ x = dht_entry_get_val(e, v->val);
+ if (x == NULL) {
+ log_err(KV_FMT " Not in vals.", KV_VAL(key, v->val));
continue;
}
- for (i = 0; i < resp_msg.n_contacts; ++i)
- dht_contact_msg__free_unpacked(resp_msg.contacts[i],
- NULL);
- free(resp_msg.contacts);
+ x->t_repl = now->tv_sec;
- tpm_end_work(dht->tpm);
+ list_del(&v->next);
+ val_entry_destroy(v);
}
- return (void *) 0;
+ list_for_each_safe(p, h, rebl) {
+ struct val_entry * x;
+ v = list_entry(p, struct val_entry, next);
+ x = dht_entry_get_lval(e, v->val);
+ if (x == NULL) {
+ log_err(KV_FMT " Not in lvals.", KV_VAL(key, v->val));
+ continue;
+ }
+
+ x->t_repl = now->tv_sec;
+ if (v->t_exp > x->t_exp) {
+ x->t_exp = v->t_exp; /* update expiration time */
+ }
+
+ list_del(&v->next);
+ val_entry_destroy(v);
+ }
+
+ pthread_rwlock_unlock(&dht.db.lock);
}
-static void dht_post_packet(void * comp,
- struct shm_du_buff * sdb)
+static void dht_kv_replicate_values(const uint8_t * key,
+ struct list_head * repl,
+ struct list_head * rebl)
{
- struct cmd * cmd;
- struct dht * dht = (struct dht *) comp;
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
- if (dht_get_state(dht) == DHT_SHUTDOWN) {
-#ifndef __DHT_TEST__
- ipcp_sdb_release(sdb);
-#endif
- return;
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ list_for_each_safe(p, h, repl) {
+ struct val_entry * v;
+ v = list_entry(p, struct val_entry, next);
+ dht_kv_replicate_value(key, v, &now);
}
- cmd = malloc(sizeof(*cmd));
- if (cmd == NULL) {
- log_err("Command failed. Out of memory.");
- return;
+ list_for_each_safe(p, h, rebl) {
+ struct val_entry * v;
+ v = list_entry(p, struct val_entry, next);
+ dht_kv_republish_value(key, v, &now);
}
- cmd->sdb = sdb;
+ /* removes non-replicated items from the list */
+ dht_kv_update_replication_times(key, repl, rebl, &now);
+
+ if (list_is_empty(repl) && list_is_empty(rebl))
+ return;
+
+ log_warn(KEY_FMT " Failed to update replication times.", KEY_VAL(key));
+}
+
+static void dht_kv_replicate(void)
+{
+ struct list_head repl; /* list of values to replicate */
+ struct list_head rebl; /* list of local values to republish */
+ uint8_t * key;
+
+ key = dht_dup_key(dht.id.data); /* dist == 0 */
+ if (key == NULL) {
+ log_err("Replicate: Failed to duplicate DHT ID.");
+ return;
+ }
- pthread_mutex_lock(&dht->mtx);
+ list_head_init(&repl);
+ list_head_init(&rebl);
- list_add(&cmd->next, &dht->cmds);
+ while (dht_kv_next_values(key, &repl, &rebl) == 0) {
+ dht_kv_replicate_values(key, &repl, &rebl);
+ if (!list_is_empty(&repl)) {
+ log_warn(KEY_FMT " Replication items left.",
+ KEY_VAL(key));
+ value_list_destroy(&repl);
+ }
- pthread_cond_signal(&dht->cond);
+ if (!list_is_empty(&rebl)) {
+ log_warn(KEY_FMT " Republish items left.",
+ KEY_VAL(key));
+ value_list_destroy(&rebl);
+ }
+ }
- pthread_mutex_unlock(&dht->mtx);
+ free(key);
}
-void dht_destroy(void * dir)
+static void dht_kv_refresh_contacts(void)
{
- struct dht * dht;
struct list_head * p;
struct list_head * h;
+ struct list_head rl; /* refresh list */
+ struct timespec now;
- dht = (struct dht *) dir;
- if (dht == NULL)
- return;
+ list_head_init(&rl);
-#ifndef __DHT_TEST__
- tpm_stop(dht->tpm);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- tpm_destroy(dht->tpm);
-#endif
- if (dht_get_state(dht) == DHT_RUNNING) {
- dht_set_state(dht, DHT_SHUTDOWN);
- pthread_cancel(dht->worker);
- pthread_join(dht->worker, NULL);
- }
+ pthread_rwlock_rdlock(&dht.db.lock);
- pthread_rwlock_wrlock(&dht->lock);
+ __dht_kv_bucket_refresh_list(dht.db.contacts.root, now.tv_sec, &rl);
- list_for_each_safe(p, h, &dht->cmds) {
- struct cmd * c = list_entry(p, struct cmd, next);
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ list_for_each_safe(p, h, &rl) {
+ struct contact * c;
+ c = list_entry(p, struct contact, next);
+ log_dbg(PEER_FMT " Refreshing contact.",
+ PEER_VAL(c->id, c->addr));
+ dht_kv_query_contacts(c->id, NULL);
list_del(&c->next);
-#ifndef __DHT_TEST__
- ipcp_sdb_release(c->sdb);
-#endif
- free(c);
+ contact_destroy(c);
}
- list_for_each_safe(p, h, &dht->entries) {
- struct dht_entry * e = list_entry(p, struct dht_entry, next);
- list_del(&e->next);
- dht_entry_destroy(e);
- }
+ assert(list_is_empty(&rl));
+}
- list_for_each_safe(p, h, &dht->requests) {
- struct kad_req * r = list_entry(p, struct kad_req, next);
- list_del(&r->next);
- kad_req_destroy(r);
- }
+static void (*tasks[])(void) = {
+ dht_kv_check_contacts,
+ dht_kv_remove_expired_entries,
+ dht_kv_remove_expired_reqs,
+ dht_kv_replicate,
+ dht_kv_refresh_contacts,
+ NULL
+};
- list_for_each_safe(p, h, &dht->refs) {
- struct ref_entry * e = list_entry(p, struct ref_entry, next);
- list_del(&e->next);
- ref_entry_destroy(e);
+static void * work(void * o)
+{
+ struct timespec now = TIMESPEC_INIT_MS(1);
+ time_t intv;
+ size_t n; /* number of tasks */
+
+ n = sizeof(tasks) / sizeof(tasks[0]) - 1; /* last is NULL */
+
+ (void) o;
+
+ while (dht_kv_seed_bootstrap_peer() == -EAGAIN) {
+ ts_add(&now, &now, &now); /* exponential backoff */
+ if (now.tv_sec > 1) /* cap at 1 second */
+ now.tv_sec = 1;
+ nanosleep(&now, NULL);
}
- list_for_each_safe(p, h, &dht->lookups) {
- struct lookup * l = list_entry(p, struct lookup, next);
- list_del(&l->next);
- lookup_destroy(l);
+ intv = gcd(dht.t_expire, (dht.t_expire - DHT_N_REPUB * dht.t_repl));
+ intv = gcd(intv, gcd(dht.t_repl, dht.t_refresh)) / 2;
+ intv = MAX(1, intv / n);
+
+ log_dbg("DHT worker starting %ld seconds interval.", intv * n);
+
+ while (true) {
+ int i = 0;
+ while (tasks[i] != NULL) {
+ tasks[i++]();
+ sleep(intv);
+ }
}
- pthread_rwlock_unlock(&dht->lock);
+ return (void *) 0;
+}
- if (dht->buckets != NULL)
- bucket_destroy(dht->buckets);
+int dht_start(void)
+{
+ dht.state = DHT_RUNNING;
- bmp_destroy(dht->cookies);
+ if (tpm_start(dht.tpm))
+ goto fail_tpm_start;
- pthread_mutex_destroy(&dht->mtx);
+#ifndef __DHT_TEST__
+ if (pthread_create(&dht.worker, NULL, work, NULL)) {
+ log_err("Failed to create DHT worker thread.");
+ goto fail_worker;
+ }
- pthread_rwlock_destroy(&dht->lock);
+ dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT);
+ if ((int) dht.eid < 0) {
+ log_err("Failed to register DHT component.");
+ goto fail_reg;
+ }
+#else
+ (void) work;
+#endif
+ return 0;
+#ifndef __DHT_TEST__
+ fail_reg:
+ pthread_cancel(dht.worker);
+ pthread_join(dht.worker, NULL);
+ fail_worker:
+ tpm_stop(dht.tpm);
+#endif
+ fail_tpm_start:
+ dht.state = DHT_INIT;
+ return -1;
+}
+
+void dht_stop(void)
+{
+ assert(dht.state == DHT_RUNNING);
- free(dht->id);
+#ifndef __DHT_TEST__
+ dt_unreg_comp(dht.eid);
- free(dht);
+ pthread_cancel(dht.worker);
+ pthread_join(dht.worker, NULL);
+#endif
+ tpm_stop(dht.tpm);
+
+ dht.state = DHT_INIT;
}
-static void * join_thr(void * o)
+int dht_init(struct dir_dht_config * conf)
{
- struct join_info * info = (struct join_info *) o;
- struct lookup * lu;
- size_t retr = 0;
+ struct timespec now;
+ pthread_condattr_t cattr;
- assert(info);
+ assert(conf != NULL);
- while (kad_join(info->dht, info->addr)) {
- if (dht_get_state(info->dht) == DHT_SHUTDOWN) {
- log_dbg("DHT enrollment aborted.");
- goto finish;
- }
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- if (retr++ == KAD_JOIN_RETR) {
- dht_set_state(info->dht, DHT_INIT);
- log_warn("DHT enrollment attempt failed.");
- goto finish;
- }
+#ifndef __DHT_TEST__
+ dht.id.len = ipcp_dir_hash_len();
+ dht.addr = addr_auth_address();
+#else
+ dht.id.len = DHT_TEST_KEY_LEN;
+ dht.addr = DHT_TEST_ADDR;
+#endif
+ dht.t0 = now.tv_sec;
+ dht.alpha = conf->params.alpha;
+ dht.k = conf->params.k;
+ dht.t_expire = conf->params.t_expire;
+ dht.t_refresh = conf->params.t_refresh;
+ dht.t_repl = conf->params.t_replicate;
+ dht.peer = conf->peer;
+
+ dht.magic = generate_cookie();
+
+ /* Send my address on enrollment */
+ conf->peer = dht.addr;
+
+ dht.id.data = generate_id();
+ if (dht.id.data == NULL) {
+ log_err("Failed to create DHT ID.");
+ goto fail_id;
+ }
- sleep(KAD_JOIN_INTV);
+ list_head_init(&dht.cmds.list);
+
+ if (pthread_mutex_init(&dht.cmds.mtx, NULL)) {
+ log_err("Failed to initialize command mutex.");
+ goto fail_cmds_mutex;
}
- dht_set_state(info->dht, DHT_RUNNING);
+ if (pthread_cond_init(&dht.cmds.cond, NULL)) {
+ log_err("Failed to initialize command condvar.");
+ goto fail_cmds_cond;
+ }
- lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);
- if (lu != NULL)
- lookup_destroy(lu);
+ list_head_init(&dht.reqs.list);
+ dht.reqs.len = 0;
- finish:
- free(info);
+ if (pthread_mutex_init(&dht.reqs.mtx, NULL)) {
+ log_err("Failed to initialize request mutex.");
+ goto fail_reqs_mutex;
+ }
- return (void *) 0;
-}
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Failed to initialize request condvar attributes.");
+ goto fail_cattr;
+ }
+#ifndef __APPLE__
+ if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) {
+ log_err("Failed to set request condvar clock.");
+ goto fail_cattr;
+ }
+#endif
+ if (pthread_cond_init(&dht.reqs.cond, &cattr)) {
+ log_err("Failed to initialize request condvar.");
+ goto fail_reqs_cond;
+ }
-static void handle_event(void * self,
- int event,
- const void * o)
-{
- struct dht * dht = (struct dht *) self;
+ list_head_init(&dht.db.kv.list);
+ dht.db.kv.len = 0;
+ dht.db.kv.vals = 0;
+ dht.db.kv.lvals = 0;
- if (event == NOTIFY_DT_CONN_ADD) {
- pthread_t thr;
- struct join_info * inf;
- struct conn * c = (struct conn *) o;
- struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK);
+ if (pthread_rwlock_init(&dht.db.lock, NULL)) {
+ log_err("Failed to initialize store rwlock.");
+ goto fail_rwlock;
+ }
- /* Give the pff some time to update for the new link. */
- nanosleep(&slack, NULL);
+ dht.db.contacts.root = bucket_create();
+ if (dht.db.contacts.root == NULL) {
+ log_err("Failed to create DHT buckets.");
+ goto fail_buckets;
+ }
- switch(dht_get_state(dht)) {
- case DHT_INIT:
- inf = malloc(sizeof(*inf));
- if (inf == NULL)
- break;
+ if (rib_reg(DHT, &r_ops) < 0) {
+ log_err("Failed to register DHT RIB operations.");
+ goto fail_rib_reg;
+ }
- inf->dht = dht;
- inf->addr = c->conn_info.addr;
-
- if (dht_set_state(dht, DHT_JOINING) == 0 ||
- dht_wait_running(dht)) {
- if (pthread_create(&thr, NULL, join_thr, inf)) {
- dht_set_state(dht, DHT_INIT);
- free(inf);
- return;
- }
- pthread_detach(thr);
- } else {
- free(inf);
- }
- break;
- case DHT_RUNNING:
- /*
- * FIXME: this lookup for effiency reasons
- * causes a SEGV when stressed with rapid
- * enrollments.
- * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
- * if (lu != NULL)
- * lookup_destroy(lu);
- */
- break;
- default:
- break;
- }
+ dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL);
+ if (dht.tpm == NULL) {
+ log_err("Failed to create TPM for DHT.");
+ goto fail_tpm_create;
}
+
+ if (dht_kv_update_contacts(dht.id.data, dht.addr) < 0)
+ log_warn("Failed to update contacts with DHT ID.");
+
+ pthread_condattr_destroy(&cattr);
+#ifndef __DHT_TEST__
+ log_info("DHT initialized.");
+ log_dbg(" ID: " HASH_FMT64 " [%zu bytes].",
+ HASH_VAL64(dht.id.data), dht.id.len);
+ log_dbg(" address: " ADDR_FMT32 ".", ADDR_VAL32(&dht.addr));
+ log_dbg(" peer: " ADDR_FMT32 ".", ADDR_VAL32(&dht.peer));
+ log_dbg(" magic cookie: " HASH_FMT64 ".", HASH_VAL64(&dht.magic));
+ log_info(" parameters: alpha=%u, k=%zu, t_expire=%ld, "
+ "t_refresh=%ld, t_replicate=%ld.",
+ dht.alpha, dht.k, dht.t_expire, dht.t_refresh, dht.t_repl);
+#endif
+ dht.state = DHT_INIT;
+
+ return 0;
+
+ fail_tpm_create:
+ rib_unreg(DHT);
+ fail_rib_reg:
+ bucket_destroy(dht.db.contacts.root);
+ fail_buckets:
+ pthread_rwlock_destroy(&dht.db.lock);
+ fail_rwlock:
+ pthread_cond_destroy(&dht.reqs.cond);
+ fail_reqs_cond:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&dht.reqs.mtx);
+ fail_reqs_mutex:
+ pthread_cond_destroy(&dht.cmds.cond);
+ fail_cmds_cond:
+ pthread_mutex_destroy(&dht.cmds.mtx);
+ fail_cmds_mutex:
+ freebuf(dht.id);
+ fail_id:
+ return -1;
}
-void * dht_create(void)
+void dht_fini(void)
{
- struct dht * dht;
+ struct list_head * p;
+ struct list_head * h;
- dht = malloc(sizeof(*dht));
- if (dht == NULL)
- goto fail_malloc;
+ rib_unreg(DHT);
- dht->buckets = NULL;
+ tpm_destroy(dht.tpm);
- list_head_init(&dht->entries);
- list_head_init(&dht->requests);
- list_head_init(&dht->refs);
- list_head_init(&dht->lookups);
- list_head_init(&dht->cmds);
+ pthread_mutex_lock(&dht.cmds.mtx);
- if (pthread_rwlock_init(&dht->lock, NULL))
- goto fail_rwlock;
+ list_for_each_safe(p, h, &dht.cmds.list) {
+ struct cmd * c = list_entry(p, struct cmd, next);
+ list_del(&c->next);
+ freebuf(c->cbuf);
+ free(c);
+ }
- if (pthread_mutex_init(&dht->mtx, NULL))
- goto fail_mutex;
+ pthread_mutex_unlock(&dht.cmds.mtx);
- if (pthread_cond_init(&dht->cond, NULL))
- goto fail_cond;
+ pthread_cond_destroy(&dht.cmds.cond);
+ pthread_mutex_destroy(&dht.cmds.mtx);
- dht->cookies = bmp_create(DHT_MAX_REQS, 1);
- if (dht->cookies == NULL)
- goto fail_bmp;
+ pthread_mutex_lock(&dht.reqs.mtx);
- dht->b = 0;
- dht->id = NULL;
-#ifndef __DHT_TEST__
- dht->addr = addr_auth_address();
- if (dht->addr == INVALID_ADDR)
- goto fail_bmp;
+ list_for_each_safe(p, h, &dht.reqs.list) {
+ struct dht_req * r = list_entry(p, struct dht_req, next);
+ list_del(&r->next);
+ dht_req_destroy(r);
+ dht.reqs.len--;
+ }
- dht->tpm = tpm_create(2, 1, dht_handle_packet, dht);
- if (dht->tpm == NULL)
- goto fail_tpm_create;
+ pthread_mutex_unlock(&dht.reqs.mtx);
- if (tpm_start(dht->tpm))
- goto fail_tpm_start;
+ pthread_cond_destroy(&dht.reqs.cond);
+ pthread_mutex_destroy(&dht.reqs.mtx);
- dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT);
- if ((int) dht->eid < 0)
- goto fail_tpm_start;
+ pthread_rwlock_wrlock(&dht.db.lock);
- if (notifier_reg(handle_event, dht))
- goto fail_notifier_reg;
-#else
- (void) handle_event;
- (void) dht_handle_packet;
- (void) dht_post_packet;
-#endif
- dht->state = DHT_INIT;
+ list_for_each_safe(p, h, &dht.db.kv.list) {
+ struct dht_entry * e = list_entry(p, struct dht_entry, next);
+ list_del(&e->next);
+ dht_entry_destroy(e);
+ dht.db.kv.len--;
+ }
- return (void *) dht;
-#ifndef __DHT_TEST__
- fail_notifier_reg:
- tpm_stop(dht->tpm);
- fail_tpm_start:
- tpm_destroy(dht->tpm);
- fail_tpm_create:
- bmp_destroy(dht->cookies);
-#endif
- fail_bmp:
- pthread_cond_destroy(&dht->cond);
- fail_cond:
- pthread_mutex_destroy(&dht->mtx);
- fail_mutex:
- pthread_rwlock_destroy(&dht->lock);
- fail_rwlock:
- free(dht);
- fail_malloc:
- return NULL;
+ if (dht.db.contacts.root != NULL)
+ bucket_destroy(dht.db.contacts.root);
+
+ pthread_rwlock_unlock(&dht.db.lock);
+
+ pthread_rwlock_destroy(&dht.db.lock);
+
+ assert(dht.db.kv.len == 0);
+ assert(dht.db.kv.vals == 0);
+ assert(dht.db.kv.lvals == 0);
+ assert(dht.reqs.len == 0);
+
+ freebuf(dht.id);
}
diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h
index 311c6b23..852a5130 100644
--- a/src/ipcpd/unicast/dir/dht.h
+++ b/src/ipcpd/unicast/dir/dht.h
@@ -30,22 +30,19 @@
#include <stdint.h>
#include <sys/types.h>
-void * dht_create(void);
+int dht_init(struct dir_dht_config * conf);
-void dht_destroy(void * dir);
+void dht_fini(void);
-int dht_bootstrap(void * dir);
+int dht_start(void);
-int dht_reg(void * dir,
- const uint8_t * key);
+void dht_stop(void);
-int dht_unreg(void * dir,
- const uint8_t * key);
+int dht_reg(const uint8_t * key);
-uint64_t dht_query(void * dir,
- const uint8_t * key);
+int dht_unreg(const uint8_t * key);
-int dht_wait_running(void * dir);
+uint64_t dht_query(const uint8_t * key);
extern struct dir_ops dht_dir_ops;
diff --git a/src/ipcpd/unicast/dir/dht.proto b/src/ipcpd/unicast/dir/dht.proto
index 4c5b06db..ea74805f 100644
--- a/src/ipcpd/unicast/dir/dht.proto
+++ b/src/ipcpd/unicast/dir/dht.proto
@@ -27,19 +27,32 @@ message dht_contact_msg {
required uint64 addr = 2;
}
+message dht_find_req_msg {
+ required uint64 cookie = 1;
+ required bytes key = 2;
+}
+
+message dht_find_node_rsp_msg {
+ required uint64 cookie = 1;
+ required bytes key = 2;
+ repeated dht_contact_msg contacts = 3;
+}
+
+message dht_find_value_rsp_msg {
+ repeated bytes values = 1;
+}
+
+message dht_store_msg {
+ required bytes key = 1;
+ required bytes val = 2;
+ required uint32 exp = 3;
+}
+
message dht_msg {
- required uint32 code = 1;
- required uint32 cookie = 2;
- required uint64 s_addr = 3;
- optional bytes s_id = 4;
- optional bytes key = 5;
- repeated uint64 addrs = 6;
- repeated dht_contact_msg contacts = 7;
- // enrolment parameters
- optional uint32 alpha = 8;
- optional uint32 b = 9;
- optional uint32 k = 10;
- optional uint32 t_expire = 11;
- optional uint32 t_refresh = 12;
- optional uint32 t_replicate = 13;
+ required uint32 code = 1;
+ required dht_contact_msg src = 2;
+ optional dht_store_msg store = 3;
+ optional dht_find_req_msg find = 4;
+ optional dht_find_node_rsp_msg node = 5;
+ optional dht_find_value_rsp_msg val = 6;
}
diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h
index 6ff61ce6..8c6e5eb5 100644
--- a/src/ipcpd/unicast/dir/ops.h
+++ b/src/ipcpd/unicast/dir/ops.h
@@ -23,24 +23,20 @@
#ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H
#define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H
-
struct dir_ops {
- void * (* create)(void);
+ int (* init)(void * config);
- void (* destroy)(void * dir);
+ void (* fini)(void);
- int (* bootstrap)(void * dir);
+ int (* start)(void);
- int (* reg)(void * dir,
- const uint8_t * hash);
+ void (* stop)(void);
- int (* unreg)(void * dir,
- const uint8_t * hash);
+ int (* reg)(const uint8_t * hash);
- uint64_t (* query)(void * dir,
- const uint8_t * hash);
+ int (* unreg)(const uint8_t * hash);
- int (* wait_running)(void * dir);
+ uint64_t (* query)(const uint8_t * hash);
};
#endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */
diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt
index f62ed993..41a18c27 100644
--- a/src/ipcpd/unicast/dir/tests/CMakeLists.txt
+++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt
@@ -15,6 +15,9 @@ include_directories(${CMAKE_BINARY_DIR}/include)
get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
+set(DEBUG_PROTO_DHT FALSE CACHE BOOL
+ "Add DHT protocol message output to debug logging")
+
create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
# Add new tests here
dht_test.c
diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c
index bea2c3e7..72e8f0df 100644
--- a/src/ipcpd/unicast/dir/tests/dht_test.c
+++ b/src/ipcpd/unicast/dir/tests/dht_test.c
@@ -4,7 +4,6 @@
* Unit tests of the DHT
*
* Dimitri Staessens <dimitri@ouroboros.rocks>
- * Sander Vrijders <sander@ouroboros.rocks>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -21,76 +20,1899 @@
*/
#define __DHT_TEST__
-#define DHT_TEST_KEY_LEN 32
-#include "dht.c"
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include <ouroboros/test.h>
+#include <ouroboros/list.h>
+#include <ouroboros/utils.h>
-#include <pthread.h>
+#include "dht.pb-c.h"
+
+#include <assert.h>
+#include <inttypes.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
-#define CONTACTS 1000
+#define DHT_MAX_RAND_SIZE 64
+#define DHT_TEST_KEY_LEN 32
+#define DHT_TEST_ADDR 0x1234567890abcdefULL
-int dht_test(int argc,
- char ** argv)
+/* forward declare for use in the dht code */
+/* Packet sink for DHT tests */
+struct {
+ bool enabled;
+
+ struct list_head list;
+ size_t len;
+} sink;
+
+struct message {
+ struct list_head next;
+ void * msg;
+ uint64_t dst;
+};
+
+static int sink_send_msg(buffer_t * pkt,
+ uint64_t addr)
{
- struct dht * dht;
- uint8_t key[DHT_TEST_KEY_LEN];
- size_t i;
+ struct message * m;
- (void) argc;
- (void) argv;
+ assert(pkt != NULL);
+ assert(addr != 0);
+
+ assert(!list_is_empty(&sink.list) || sink.len == 0);
+
+ if (!sink.enabled)
+ goto finish;
+
+ m = malloc(sizeof(*m));
+ if (m == NULL) {
+ printf("Failed to malloc message.");
+ goto fail_malloc;
+ }
+
+ m->msg = dht_msg__unpack(NULL, pkt->len, pkt->data);
+ if (m->msg == NULL)
+ goto fail_unpack;
+
+ m->dst = addr;
+
+ list_add_tail(&m->next, &sink.list);
+
+ ++sink.len;
+ finish:
+ freebuf(*pkt);
+
+ return 0;
+ fail_unpack:
+ free(m);
+ fail_malloc:
+ freebuf(*pkt);
+ return -1;
+}
+
+#include "dht.c"
+
+/* Test helpers */
+
+static void sink_init(void)
+{
+ list_head_init(&sink.list);
+ sink.len = 0;
+ sink.enabled = true;
+}
+
+static void sink_clear(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, &sink.list) {
+ struct message * m = list_entry(p, struct message, next);
+ list_del(&m->next);
+ dht_msg__free_unpacked((dht_msg_t *) m->msg, NULL);
+ free(m);
+ --sink.len;
+ }
+
+ assert(list_is_empty(&sink.list));
+}
+
+static void sink_fini(void)
+{
+ sink_clear();
+
+ assert(list_is_empty(&sink.list) || sink.len != 0);
+}
+
+static dht_msg_t * sink_read(void)
+{
+ struct message * m;
+ dht_msg_t * msg;
+
+ assert(!list_is_empty(&sink.list) || sink.len == 0);
+
+ if (list_is_empty(&sink.list))
+ return NULL;
+
+ m = list_first_entry(&sink.list, struct message, next);
+
+ --sink.len;
+
+ list_del(&m->next);
+
+ msg = m->msg;
+
+ free(m);
+
+ return (dht_msg_t *) msg;
+}
+
+static const buffer_t test_val = {
+ .data = (uint8_t *) "test_value",
+ .len = 10
+};
+
+static const buffer_t test_val2 = {
+ .data = (uint8_t *) "test_value_2",
+ .len = 12
+};
+
+static struct dir_dht_config test_dht_config = default_dht_config;
+
+static int random_value_len(buffer_t * b)
+{
+ assert(b != NULL);
+ assert(b->len > 0 && b->len <= DHT_MAX_RAND_SIZE);
+
+ b->data = malloc(b->len);
+ if (b->data == NULL)
+ goto fail_malloc;
+
+ random_buffer(b->data, b->len);
+
+ return 0;
+
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int random_value(buffer_t * b)
+{
+ assert(b != NULL);
+
+ b->len = rand() % DHT_MAX_RAND_SIZE + 1;
+
+ return random_value_len(b);
+}
+
+static int fill_dht_with_contacts(size_t n)
+{
+ size_t i;
+ uint8_t * id;
+
+ for (i = 0; i < n; i++) {
+ uint64_t addr = generate_cookie();
+ id = generate_id();
+ if (id == NULL)
+ goto fail_id;
+
+ if (dht_kv_update_contacts(id, addr) < 0)
+ goto fail_update;
+ free(id);
+ }
+
+ return 0;
+
+ fail_update:
+ free(id);
+ fail_id:
+ return -1;
+}
+
+static int fill_store_with_random_values(const uint8_t * key,
+ size_t len,
+ size_t n_values)
+{
+ buffer_t val;
+ struct timespec now;
+ size_t i;
+ uint8_t * _key;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ for (i = 0; i < n_values; ++i) {
+ if (key != NULL)
+ _key = (uint8_t *) key;
+ else {
+ _key = generate_id();
+ if (_key == NULL)
+ goto fail_key;
+ }
+
+ if (len == 0)
+ val.len = rand() % DHT_MAX_RAND_SIZE + 1;
+ else
+ val.len = len;
+
+ if (random_value_len(&val) < 0)
+ goto fail_value;
+
+ if (dht_kv_store(_key, val, now.tv_sec + 10) < 0)
+ goto fail_store;
+
+ freebuf(val);
+ if (key == NULL)
+ free(_key);
+ }
+
+ return 0;
+
+ fail_store:
+ freebuf(val);
+ fail_value:
+ free(_key);
+ fail_key:
+ return -1;
+}
+
+static int random_contact_list(dht_contact_msg_t *** contacts,
+ size_t max)
+{
+ size_t i;
+
+ assert(contacts != NULL);
+ assert(max > 0);
+
+ *contacts = malloc(max * sizeof(**contacts));
+ if (*contacts == NULL)
+ goto fail_malloc;
+
+ for (i = 0; i < max; i++) {
+ (*contacts)[i] = malloc(sizeof(*(*contacts)[i]));
+ if ((*contacts)[i] == NULL)
+ goto fail_contacts;
+
+ dht_contact_msg__init((*contacts)[i]);
+
+ (*contacts)[i]->id.data = generate_id();
+ if ((*contacts)[i]->id.data == NULL)
+ goto fail_contact;
+
+ (*contacts)[i]->id.len = dht.id.len;
+ (*contacts)[i]->addr = generate_cookie();
+ }
+
+ return 0;
+
+ fail_contact:
+ dht_contact_msg__free_unpacked((*contacts)[i], NULL);
+ fail_contacts:
+ while (i-- > 0)
+ free((*contacts)[i]);
+ free(*contacts);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static void clear_contacts(dht_contact_msg_t ** contacts,
+ size_t len)
+{
+ size_t i;
+
+ assert(contacts != NULL);
+ if (*contacts == NULL)
+ return;
+
+ for (i = 0; i < len; ++i)
+ dht_contact_msg__free_unpacked((contacts)[i], NULL);
+
+ free(*contacts);
+ *contacts = NULL;
+}
+
+/* Start of actual tests */
+
+static int test_dht_init_fini(void)
+{
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_start_stop(void)
+{
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (dht_start() < 0) {
+ printf("Failed to start dht.\n");
+ goto fail_start;
+ }
+
+ dht_stop();
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
- dht = dht_create();
- if (dht == NULL) {
+ fail_start:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_val_entry_create_destroy(void)
+{
+ struct val_entry * e;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
printf("Failed to create dht.\n");
- return -1;
+ goto fail_init;
}
- dht_destroy(dht);
+ e = val_entry_create(test_val, now.tv_sec + 10);
+ if (e == NULL) {
+ printf("Failed to create val entry.\n");
+ goto fail_entry;
+ }
+
+ val_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
- dht = dht_create();
- if (dht == NULL) {
- printf("Failed to re-create dht.\n");
- return -1;
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_entry_create_destroy(void)
+{
+ struct dht_entry * e;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
}
- if (dht_bootstrap(dht)) {
- printf("Failed to bootstrap dht.\n");
- dht_destroy(dht);
- return -1;
+ e = dht_entry_create(dht.id.data);
+ if (e == NULL) {
+ printf("Failed to create dht entry.\n");
+ goto fail_entry;
}
- dht_destroy(dht);
+ dht_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_entry_update_get_val(void)
+{
+ struct dht_entry * e;
+ struct val_entry * v;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ e = dht_entry_create(dht.id.data);
+ if (e == NULL) {
+ printf("Failed to create dht entry.\n");
+ goto fail_entry;
+ }
+
+ if (dht_entry_get_val(e, test_val) != NULL) {
+ printf("Found value in empty dht entry.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_val(e, test_val, now.tv_sec + 10) < 0) {
+ printf("Failed to update dht entry value.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_val(e, test_val2) != NULL) {
+ printf("Found value in dht entry with different key.\n");
+ goto fail_get;
+ }
+
+ v = dht_entry_get_val(e, test_val);
+ if (v == NULL) {
+ printf("Failed to get value from dht entry.\n");
+ goto fail_get;
+ }
+
+ if (v->val.len != test_val.len) {
+ printf("Length in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if(memcmp(v->val.data, test_val.data, test_val.len) != 0) {
+ printf("Data in dht entry does not match expected.\n");
+ goto fail_get;
+ }
- dht = dht_create();
- if (dht == NULL) {
- printf("Failed to re-create dht.\n");
- return -1;
+ if (dht_entry_update_val(e, test_val, now.tv_sec + 15) < 0) {
+ printf("Failed to update exsting dht entry value.\n");
+ goto fail_get;
}
- if (dht_bootstrap(dht)) {
- printf("Failed to bootstrap dht.\n");
- dht_destroy(dht);
- return -1;
+ if (v->t_exp != now.tv_sec + 15) {
+ printf("Expiration time in dht entry value not updated.\n");
+ goto fail_get;
}
- for (i = 0; i < CONTACTS; ++i) {
- uint64_t addr;
- random_buffer(&addr, sizeof(addr));
- random_buffer(key, DHT_TEST_KEY_LEN);
- pthread_rwlock_wrlock(&dht->lock);
- if (dht_update_bucket(dht, key, addr)) {
- pthread_rwlock_unlock(&dht->lock);
- printf("Failed to update bucket.\n");
- dht_destroy(dht);
- return -1;
+ if (dht_entry_update_val(e, test_val, now.tv_sec + 5) < 0) {
+ printf("Failed to update existing dht entry value (5).\n");
+ goto fail_get;
+ }
+
+ if (v->t_exp != now.tv_sec + 15) {
+ printf("Expiration time in dht entry shortened.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_val(e, test_val) != v) {
+ printf("Wrong value in dht entry found after update.\n");
+ goto fail_get;
+ }
+
+ dht_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get:
+ dht_entry_destroy(e);
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_entry_update_get_lval(void)
+{
+ struct dht_entry * e;
+ struct val_entry * v;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ e = dht_entry_create(dht.id.data);
+ if (e == NULL) {
+ printf("Failed to create dht entry.\n");
+ goto fail_entry;
+ }
+
+ if (dht_entry_get_lval(e, test_val) != NULL) {
+ printf("Found value in empty dht entry.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_lval(e, test_val) < 0) {
+ printf("Failed to update dht entry value.\n");
+ goto fail_get;
+ }
+
+ v = dht_entry_get_lval(e, test_val);
+ if (v== NULL) {
+ printf("Failed to get value from dht entry.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_lval(e, test_val2) != NULL) {
+ printf("Found value in dht entry in vals.\n");
+ goto fail_get;
+ }
+
+ if (v->val.len != test_val.len) {
+ printf("Length in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if(memcmp(v->val.data, test_val.data, test_val.len) != 0) {
+ printf("Data in dht entry does not match expected.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_update_lval(e, test_val) < 0) {
+ printf("Failed to update existing dht entry value.\n");
+ goto fail_get;
+ }
+
+ if (dht_entry_get_lval(e, test_val) != v) {
+ printf("Wrong value in dht entry found after update.\n");
+ goto fail_get;
+ }
+
+ dht_entry_destroy(e);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get:
+ dht_entry_destroy(e);
+ fail_entry:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_contact_create_destroy(void)
+{
+ struct contact * c;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ c = contact_create(dht.id.data, dht.addr);
+ if (c == NULL) {
+ printf("Failed to create contact.\n");
+ goto fail_contact;
+ }
+
+ contact_destroy(c);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_contact:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_update_bucket(void)
+{
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(1000) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_update;
+ }
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_update:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_contact_list(void)
+{
+ struct list_head cl;
+ ssize_t len;
+ ssize_t items;
+
+ TEST_START();
+
+ list_head_init(&cl);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ items = 5;
+
+ if (fill_dht_with_contacts(items) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_contact_list(dht.id.data, &cl, dht.k);
+ if (len < 0) {
+ printf("Failed to get contact list.\n");
+ goto fail_fill;
+ }
+
+ if (len != items) {
+ printf("Failed to get contacts (%zu != %zu).\n", len, items);
+ goto fail_contact_list;
+ }
+
+ contact_list_destroy(&cl);
+
+ items = 100;
+
+ if (fill_dht_with_contacts(items) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_contact_list(dht.id.data, &cl, items);
+ if (len < 0) {
+ printf("Failed to get contact list.\n");
+ goto fail_fill;
+ }
+
+ if ((size_t) len < dht.k) {
+ printf("Failed to get contacts (%zu < %zu).\n", len, dht.k);
+ goto fail_contact_list;
+ }
+
+ contact_list_destroy(&cl);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_contact_list:
+ contact_list_destroy(&cl);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_get_values(void)
+{
+ buffer_t * vals;
+ ssize_t len;
+ size_t n = sizeof(uint64_t);
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_store_with_random_values(dht.id.data, n, 3) < 0) {
+ printf("Failed to fill store with random values.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_retrieve(dht.id.data, &vals);
+ if (len < 0) {
+ printf("Failed to get values from store.\n");
+ goto fail_fill;
+ }
+
+ if (len != 3) {
+ printf("Failed to get %ld values (%zu).\n", 3L, len);
+ goto fail_get_values;
+ }
+
+ freebufs(vals, len);
+
+ if (fill_store_with_random_values(dht.id.data, n, 20) < 0) {
+ printf("Failed to fill store with random values.\n");
+ goto fail_fill;
+ }
+
+ len = dht_kv_retrieve(dht.id.data, &vals);
+ if (len < 0) {
+ printf("Failed to get values from store.\n");
+ goto fail_fill;
+ }
+
+ if (len != DHT_MAX_VALS) {
+ printf("Failed to get %d values.\n", DHT_MAX_VALS);
+ goto fail_get_values;
+ }
+
+ freebufs(vals, len);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get_values:
+ freebufs(vals, len);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_node_req_msg(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_node_req_msg(dht.id.data);
+ if (msg == NULL) {
+ printf("Failed to get find node request message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_NODE_REQ) {
+ printf("Wrong code in find_node_req message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_NODE_REQ]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_node_req.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_req buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_node_req message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_req message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_node_rsp_msg(void)
+{
+ dht_contact_msg_t ** contacts;
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, 0);
+ if (msg == NULL) {
+ printf("Failed to get find node response message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_NODE_RSP) {
+ printf("Wrong code in find_node_rsp message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_NODE_RSP]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_node_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_node_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_node_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_node_rsp_msg_contacts(void)
+{
+ dht_contact_msg_t ** contacts;
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ uint8_t * buf;
+ size_t len;
+ ssize_t n;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(100) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ n = dht_kv_get_contacts(dht.id.data, &contacts);
+ if (n < 0) {
+ printf("Failed to get contacts.\n");
+ goto fail_fill;
+ }
+
+ if ((size_t) n < dht.k) {
+ printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k);
+ goto fail_fill;
+ }
+
+ msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, n);
+ if (msg == NULL) {
+ printf("Failed to build find node response message.\n");
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_node_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_node_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_node_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ clear_contacts(contacts, n);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_req_msg(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_value_req_msg(dht.id.data);
+ if (msg == NULL) {
+ printf("Failed to build find value request message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_VALUE_REQ) {
+ printf("Wrong code in find_value_req message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_VALUE_REQ]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_req.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_node_req buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_req message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_req message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_rsp_msg(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, NULL, 0, NULL, 0);
+ if (msg == NULL) {
+ printf("Failed to build find value response message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_VALUE_RSP) {
+ printf("Wrong code in find_value_rsp message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_FIND_VALUE_RSP]);
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_value_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_rsp_msg_contacts(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+ dht_contact_msg_t ** contacts;
+ ssize_t n;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(100) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_fill;
+ }
+
+ n = dht_kv_get_contacts(dht.id.data, &contacts);
+ if (n < 0) {
+ printf("Failed to get contacts.\n");
+ goto fail_fill;
+ }
+
+ if ((size_t) n < dht.k) {
+ printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k);
+ goto fail_fill;
+ }
+
+ msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, &contacts, n, NULL, 0);
+ if (msg == NULL) {
+ printf("Failed to build find value response message.\n");
+ goto fail_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_value_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ clear_contacts(contacts, n);
+ fail_fill:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_find_value_rsp_msg_values(void)
+{
+ dht_msg_t * msg;
+ dht_msg_t * upk;
+ size_t len;
+ uint8_t * buf;
+ buffer_t * values;
+ size_t i;
+ uint64_t ck;
+
+ TEST_START();
+
+ ck = generate_cookie();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ values = malloc(sizeof(*values) * 8);
+ if (values == NULL) {
+ printf("Failed to malloc values.\n");
+ goto fail_values;
+ }
+
+ for (i = 0; i < 8; i++) {
+ if (random_value(&values[i]) < 0) {
+ printf("Failed to create random value.\n");
+ goto fail_fill;
}
- pthread_rwlock_unlock(&dht->lock);
}
- dht_destroy(dht);
+ msg = dht_kv_find_value_rsp_msg(dht.id.data, ck, NULL, 0, &values, 8);
+ if (msg == NULL) {
+ printf("Failed to build find value response message.\n");
+ goto fail_msg;
+ }
- return 0;
+ values = NULL; /* msg owns the values now */
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed length of find_value_rsp.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc find_value_rsp buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack find_value_rsp message.\n");
+ goto fail_pack;
+ }
+
+ upk = dht_msg__unpack(NULL, len, buf);
+ if (upk == NULL) {
+ printf("Failed to unpack find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ if (upk->code != DHT_FIND_VALUE_RSP) {
+ printf("Wrong code in find_value_rsp message (%s != %s).\n",
+ dht_code_str[upk->code],
+ dht_code_str[DHT_FIND_VALUE_RSP]);
+ goto fail_unpack;
+ }
+
+ if (upk->val == NULL) {
+ printf("No values in find_value_rsp message.\n");
+ goto fail_unpack;
+ }
+
+ if (upk->val->n_values != 8) {
+ printf("Not enough values in find_value_rsp (%zu != %zu).\n",
+ upk->val->n_values, 8UL);
+ goto fail_unpack;
+ }
+
+ free(buf);
+ dht_msg__free_unpacked(msg, NULL);
+ dht_msg__free_unpacked(upk, NULL);
+
+ free(values);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_unpack:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_pack:
+ free(buf);
+ fail_msg:
+ fail_fill:
+ while((i--) > 0)
+ freebuf(values[i]);
+ free(values);
+ fail_values:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_store_msg(void)
+{
+ dht_msg_t * msg;
+ size_t len;
+ uint8_t * buf;
+ struct timespec now;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ msg = dht_kv_store_msg(dht.id.data, test_val, now.tv_sec + 10);
+ if (msg == NULL) {
+ printf("Failed to get store message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_STORE) {
+ printf("Wrong code in store message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_STORE]);
+ goto fail_store_msg;
+ }
+
+ if (dht_kv_validate_msg(msg) < 0) {
+ printf("Failed to validate store message.\n");
+ goto fail_store_msg;
+ }
+
+ len = dht_msg__get_packed_size(msg);
+ if (len == 0) {
+ printf("Failed to get packed msg length.\n");
+ goto fail_msg;
+ }
+
+ buf = malloc(len);
+ if (buf == NULL) {
+ printf("Failed to malloc store msg buf.\n");
+ goto fail_msg;
+ }
+
+ if (dht_msg__pack(msg, buf) != len) {
+ printf("Failed to pack store message.\n");
+ goto fail_pack;
+ }
+
+ free(buf);
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_pack:
+ free(buf);
+ fail_store_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_kv_query_contacts_req_rsp(void)
+{
+ dht_msg_t * req;
+ dht_msg_t * rsp;
+ dht_contact_msg_t ** contacts;
+ size_t len = 2;
+
+ uint8_t * key;
+
+ TEST_START();
+
+ sink_init();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(1) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_prep;
+ }
+
+ key = generate_id();
+ if (key == NULL) {
+ printf("Failed to generate key.\n");
+ goto fail_prep;
+ }
+
+ if (dht_kv_query_contacts(key, NULL) < 0) {
+ printf("Failed to query contacts.\n");
+ goto fail_query;
+ }
+
+ req = sink_read();
+ if (req == NULL) {
+ printf("Failed to read request from sink.\n");
+ goto fail_query;
+ }
+
+ if (dht_kv_validate_msg(req) < 0) {
+ printf("Failed to validate find node req.\n");
+ goto fail_val_req;
+ }
+
+ if (random_contact_list(&contacts, len) < 0) {
+ printf("Failed to create random contact.\n");
+ goto fail_val_req;
+ }
+
+ rsp = dht_kv_find_node_rsp_msg(key, req->find->cookie, &contacts, len);
+ if (rsp == NULL) {
+ printf("Failed to create find node response message.\n");
+ goto fail_rsp;
+ }
+
+ memcpy(rsp->src->id.data, dht.id.data, dht.id.len);
+ rsp->src->addr = generate_cookie();
+
+ if (dht_kv_validate_msg(rsp) < 0) {
+ printf("Failed to validate find node response message.\n");
+ goto fail_val_rsp;
+ }
+
+ do_dht_kv_find_node_rsp(rsp->node);
+
+ /* dht_contact_msg__free_unpacked(contacts[0], NULL); set to NULL */
+
+ free(contacts);
+
+ dht_msg__free_unpacked(rsp, NULL);
+
+ free(key);
+
+ dht_msg__free_unpacked(req, NULL);
+
+ sink_fini();
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_val_rsp:
+ dht_msg__free_unpacked(rsp, NULL);
+ fail_rsp:
+ while (len-- > 0)
+ dht_contact_msg__free_unpacked(contacts[len], NULL);
+ free(contacts);
+ fail_val_req:
+ dht_msg__free_unpacked(req, NULL);
+ fail_query:
+ free(key);
+ fail_prep:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_req_create_destroy(void)
+{
+ struct dht_req * req;
+
+ TEST_START();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ req = dht_req_create(dht.id.data);
+ if (req == NULL) {
+ printf("Failed to create kad request.\n");
+ goto fail_req;
+ }
+
+ dht_req_destroy(req);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_req:
+ dht_fini();
+ fail_init:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_reg_unreg(void)
+{
+ TEST_START();
+
+ sink_init();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (dht_reg(dht.id.data) < 0) {
+ printf("Failed to register own id.\n");
+ goto fail_reg;
+ }
+
+ if (sink.len != 0) {
+ printf("Packet sent without contacts!");
+ goto fail_msg;
+ }
+
+ if (dht_unreg(dht.id.data) < 0) {
+ printf("Failed to unregister own id.\n");
+ goto fail_msg;
+ }
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_msg:
+ dht_unreg(dht.id.data);
+ fail_reg:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_reg_unreg_contacts(void)
+{
+ dht_msg_t * msg;
+
+ TEST_START();
+
+ sink_init();
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(4) < 0) {
+ printf("Failed to fill bucket with contacts.\n");
+ goto fail_reg;
+ }
+
+ if (dht_reg(dht.id.data) < 0) {
+ printf("Failed to register own id.\n");
+ goto fail_reg;
+ }
+
+ if (sink.len != dht.alpha) {
+ printf("Packet sent to too few contacts!\n");
+ goto fail_msg;
+ }
+
+ msg = sink_read();
+ if (msg == NULL) {
+ printf("Failed to read message from sink.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_STORE) {
+ printf("Wrong code in dht reg message (%s != %s).\n",
+ dht_code_str[msg->code],
+ dht_code_str[DHT_STORE]);
+ goto fail_validation;
+ }
+
+ if (dht_kv_validate_msg(msg) < 0) {
+ printf("Failed to validate dht message.\n");
+ goto fail_validation;
+ }
+
+ if (dht_unreg(dht.id.data) < 0) {
+ printf("Failed to unregister own id.\n");
+ goto fail_validation;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_validation:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ sink_clear();
+ dht_unreg(dht.id.data);
+ fail_reg:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_reg_query_local(void)
+{
+ struct timespec now;
+ buffer_t test_addr;
+
+ TEST_START();
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ if (addr_to_buf(1234321, &test_addr) < 0) {
+ printf("Failed to convert test address to buffer.\n");
+ goto fail_buf;
+ }
+
+ if (dht_init(&test_dht_config) < 0) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (dht_reg(dht.id.data) < 0) {
+ printf("Failed to register own id.\n");
+ goto fail_reg;
+ }
+
+ if (dht_query(dht.id.data) == dht.addr) {
+ printf("Succeeded to query own id.\n");
+ goto fail_get;
+ }
+
+ if (dht_kv_store(dht.id.data, test_addr, now.tv_sec + 5) < 0) {
+ printf("Failed to publish value.\n");
+ goto fail_get;
+ }
+
+ if (dht_query(dht.id.data) != 1234321) {
+ printf("Failed to return remote addr.\n");
+ goto fail_get;
+ }
+
+ if (dht_unreg(dht.id.data) < 0) {
+ printf("Failed to unregister own id.\n");
+ goto fail_get;
+ }
+
+ freebuf(test_addr);
+
+ dht_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_get:
+ dht_unreg(dht.id.data);
+ fail_reg:
+ dht_fini();
+ fail_init:
+ freebuf(test_addr);
+ fail_buf:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_query(void)
+{
+ uint8_t * key;
+ struct dir_dht_config cfg;
+
+ TEST_START();
+
+ sink_init();
+
+ cfg = default_dht_config;
+ cfg.peer = generate_cookie();
+
+ if (dht_init(&cfg)) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ key = generate_id();
+ if (key == NULL) {
+ printf("Failed to generate key.\n");
+ goto fail_key;
+ }
+
+ if (dht_query(key) != INVALID_ADDR) {
+ printf("Succeeded to get address without contacts.\n");
+ goto fail_get;
+ }
+
+ if (sink.len != 0) {
+ printf("Packet sent without contacts!");
+ goto fail_test;
+ }
+
+ free(key);
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ sink_clear();
+ fail_get:
+ free(key);
+ fail_key:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ return TEST_RC_FAIL;
+}
+
+static int test_dht_query_contacts(void)
+{
+ dht_msg_t * msg;
+ uint8_t * key;
+ struct dir_dht_config cfg;
+
+
+ TEST_START();
+
+ sink_init();
+
+ cfg = default_dht_config;
+ cfg.peer = generate_cookie();
+
+ if (dht_init(&cfg)) {
+ printf("Failed to create dht.\n");
+ goto fail_init;
+ }
+
+ if (fill_dht_with_contacts(10) < 0) {
+ printf("Failed to fill with contacts!");
+ goto fail_contacts;
+ }
+
+ key = generate_id();
+ if (key == NULL) {
+ printf("Failed to generate key.");
+ goto fail_contacts;
+ }
+
+ if (dht_query(key) != INVALID_ADDR) {
+ printf("Succeeded to get address for random id.\n");
+ goto fail_query;
+ }
+
+ msg = sink_read();
+ if (msg == NULL) {
+ printf("Failed to read message.!\n");
+ goto fail_read;
+ }
+
+ if (dht_kv_validate_msg(msg) < 0) {
+ printf("Failed to validate dht message.\n");
+ goto fail_msg;
+ }
+
+ if (msg->code != DHT_FIND_VALUE_REQ) {
+ printf("Failed to validate dht message.\n");
+ goto fail_msg;
+ }
+
+ dht_msg__free_unpacked(msg, NULL);
+
+ free(key);
+
+ sink_clear();
+
+ dht_fini();
+
+ sink_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_msg:
+ dht_msg__free_unpacked(msg, NULL);
+ fail_read:
+ sink_clear();
+ fail_query:
+ free(key);
+ fail_contacts:
+ dht_fini();
+ fail_init:
+ sink_fini();
+ return TEST_RC_FAIL;
+}
+
+int dht_test(int argc,
+ char ** argv)
+{
+ int rc = 0;
+
+ (void) argc;
+ (void) argv;
+
+ rc |= test_dht_init_fini();
+ rc |= test_dht_start_stop();
+ rc |= test_val_entry_create_destroy();
+ rc |= test_dht_entry_create_destroy();
+ rc |= test_dht_entry_update_get_val();
+ rc |= test_dht_entry_update_get_lval();
+ rc |= test_dht_kv_contact_create_destroy();
+ rc |= test_dht_kv_contact_list();
+ rc |= test_dht_kv_update_bucket();
+ rc |= test_dht_kv_get_values();
+ rc |= test_dht_kv_find_node_req_msg();
+ rc |= test_dht_kv_find_node_rsp_msg();
+ rc |= test_dht_kv_find_node_rsp_msg_contacts();
+ rc |= test_dht_kv_query_contacts_req_rsp();
+ rc |= test_dht_kv_find_value_req_msg();
+ rc |= test_dht_kv_find_value_rsp_msg();
+ rc |= test_dht_kv_find_value_rsp_msg_contacts();
+ rc |= test_dht_kv_find_value_rsp_msg_values();
+ rc |= test_dht_kv_store_msg();
+ rc |= test_dht_req_create_destroy();
+ rc |= test_dht_reg_unreg();
+ rc |= test_dht_reg_unreg_contacts();
+ rc |= test_dht_reg_query_local();
+ rc |= test_dht_query();
+ rc |= test_dht_query_contacts();
+
+ return rc;
}