From fa1af6aaed6a46acd0af1600f4c63e79fcf9ff84 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 6 Aug 2025 12:29:02 +0200 Subject: ipcpd: Update DHT for unicast layer This is a rewrite of the DHT for name-to-address resolution in the unicast layer. It is now integrated as a proper directory policy. The dir_wait_running function is removed, instead the a DHT peer is passed on during IPCP enrolment. Each DHT request/response gets a random 64-bit ID ('cookie'). DHT messages to the same peer are deduped, except in the case when the DHT is low on contacts. In that case, it will contact the per it received at enrolment for more contacts. To combat packet loss, these messages are not deduped by means of a 'magic cookie', chosen at random when the DHT starts. The DHT parameters (Kademlia) can be set using the configfile or the IRM command line tools: if DIRECTORY_POLICY == DHT [dht_alpha (default: 3)] [dht_k (default: 8)] [dht_t_expire (default: 86400)] [dht_t_refresh (default: 900)] [dht_t_replicate (default: 900)] This commit also adds support for a protocol debug level (PP). Protocol debugging for the DHT can be enabled using the DEBUG_PROTO_DHT build flag. The DHT has the following message types: DHT_STORE, sent to k peers. Not acknowledged. DHT_STORE --> [2861814146dbf9b5|ed:d9:e2:c4]. key: bcc236ab6ec69e65 [32 bytes] val: 00000000c4e2d9ed [8 bytes] exp: 2025-08-03 17:29:44 (UTC). DHT_FIND_NODE_REQ, sent to 'alpha' peers, with a corresponding response. This is used to update the peer routing table to iteratively look for the nodes with IDs closest to the requested key. DHT_FIND_NODE_REQ --> [a62f92abffb451c4|ed:d9:e2:c4]. cookie: 2d4b7acef8308210 key: a62f92abffb451c4 [32 bytes] DHT_FIND_NODE_RSP <-- [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 2d4b7acef8308210 key: a62f92abffb451c4 [32 bytes] contacts: [1] [a62f92abffb451c4|9f:0d:c1:fb] DHT_FIND_VALUE_REQ, sent to 'k' peers, with a corresponding response. Used to find a value for a key. Will also send its closest known peers in the response. DHT_FIND_VALUE_REQ --> [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 80a1adcb09a2ff0a key: 42dee3b0415b4f69 [32 bytes] DHT_FIND_VALUE_RSP <-- [2861814146dbf9b5|ed:d9:e2:c4]. cookie: 80a1adcb09a2ff0a key: 42dee3b0415b4f69 [32 bytes] values: [1] 00000000c4e2d9ed [8 bytes] contacts: [1] [a62f92abffb451c4|9f:0d:c1:fb] Also removes ubuntu 20 from appveyor config as it is not supported anymore. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/unicast/dir/dht.c | 5127 ++++++++++++++++++++++++++----------------- 1 file changed, 3134 insertions(+), 1993 deletions(-) (limited to 'src/ipcpd/unicast/dir/dht.c') diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index da39e567..c7205505 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -4,7 +4,6 @@ * Distributed Hash Table based on Kademlia * * Dimitri Staessens - * Sander Vrijders * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,10 +19,12 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L +#if !defined (__DHT_TEST__) + #if defined(__linux__) || defined(__CYGWIN__) + #define _DEFAULT_SOURCE + #else + #define _POSIX_C_SOURCE 200112L + #endif #endif #include "config.h" @@ -40,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -59,143 +61,153 @@ #include #include "dht.pb-c.h" -typedef DhtMsg dht_msg_t; -typedef DhtContactMsg dht_contact_msg_t; +typedef DhtMsg dht_msg_t; +typedef DhtContactMsg dht_contact_msg_t; +typedef DhtStoreMsg dht_store_msg_t; +typedef DhtFindReqMsg dht_find_req_msg_t; +typedef DhtFindNodeRspMsg dht_find_node_rsp_msg_t; +typedef DhtFindValueRspMsg dht_find_value_rsp_msg_t; +typedef ProtobufCBinaryData binary_data_t; #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define DHT_MAX_REQS 128 /* KAD recommends rnd(), bmp can be changed. */ +#define DHT_WARN_REQS 100 /* Warn if number of requests exceeds this. */ +#define DHT_MAX_VALS 8 /* Max number of values to return for a key. */ +#define DHT_T_CACHE 60 /* Max cache time for values (s) */ +#define DHT_T_RESP 2 /* Response time to wait for a response (s). */ +#define DHT_N_REPUB 5 /* Republish if expiry within n replications. */ +#define DHT_R_PING 2 /* Ping retries before declaring peer dead. */ +#define DHT_QUEER 15 /* Time to declare peer questionable. */ +#define DHT_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define DHT_RESP_RETR 6 /* Number of retries on sending a response. */ #define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_INVALID 0 /* Invalid cookie value. */ -enum dht_state { - DHT_INIT = 0, - DHT_SHUTDOWN, - DHT_JOINING, - DHT_RUNNING, -}; +#define KEY_FMT "K<" HASH_FMT64 ">" +#define KEY_VAL(key) HASH_VAL64(key) -enum kad_code { - KAD_JOIN = 0, - KAD_FIND_NODE, - KAD_FIND_VALUE, - /* Messages without a response below. */ - KAD_STORE, - KAD_RESPONSE -}; +#define VAL_FMT "V<" HASH_FMT64 ">" +#define VAL_VAL(val) HASH_VAL64((val).data) -enum kad_req_state { - REQ_NULL = 0, - REQ_INIT, - REQ_PENDING, - REQ_RESPONSE, - REQ_DONE, - REQ_DESTROY -}; +#define KV_FMT "<" HASH_FMT64 ", " HASH_FMT64 ">" +#define KV_VAL(key, val) HASH_VAL64(key), HASH_VAL64((val).data) -enum lookup_state { - LU_NULL = 0, - LU_INIT, - LU_PENDING, - LU_UPDATE, - LU_COMPLETE, - LU_DESTROY -}; +#define PEER_FMT "[" HASH_FMT64 "|" ADDR_FMT32 "]" +#define PEER_VAL(id, addr) HASH_VAL64(id), ADDR_VAL32(&(addr)) + +#define DHT_CODE(msg) dht_code_str[(msg)->code] + +#define TX_HDR_FMT "%s --> " PEER_FMT +#define TX_HDR_VAL(msg, id, addr) DHT_CODE(msg), PEER_VAL(id, addr) -struct kad_req { - struct list_head next; +#define RX_HDR_FMT "%s <-- " PEER_FMT +#define RX_HDR_VAL(msg) DHT_CODE(msg), \ + PEER_VAL(msg->src->id.data, msg->src->addr) - uint32_t cookie; - enum kad_code code; - uint8_t * key; - uint64_t addr; +#define CK_FMT "|" HASH_FMT64 "|" +#define CK_VAL(cookie) HASH_VAL64(&(cookie)) - enum kad_req_state state; - pthread_cond_t cond; - pthread_mutex_t lock; +#define IS_REQUEST(code) \ + (code == DHT_FIND_NODE_REQ || code == DHT_FIND_VALUE_REQ) - time_t t_exp; +enum dht_code { + DHT_STORE, + DHT_FIND_NODE_REQ, + DHT_FIND_NODE_RSP, + DHT_FIND_VALUE_REQ, + DHT_FIND_VALUE_RSP }; -struct cookie_el { - struct list_head next; +const char * dht_code_str[] = { + "DHT_STORE", + "DHT_FIND_NODE_REQ", + "DHT_FIND_NODE_RSP", + "DHT_FIND_VALUE_REQ", + "DHT_FIND_VALUE_RSP" +}; - uint32_t cookie; +enum dht_state { + DHT_NULL = 0, + DHT_INIT, + DHT_RUNNING }; -struct lookup { - struct list_head next; +struct val_entry { + struct list_head next; + + buffer_t val; - struct list_head cookies; + time_t t_exp; /* Expiry time */ + time_t t_repl; /* Last replication time */ +}; - uint8_t * key; +struct dht_entry { + struct list_head next; - struct list_head contacts; - size_t n_contacts; + uint8_t * key; - uint64_t * addrs; - size_t n_addrs; + struct { + struct list_head list; + size_t len; + } vals; /* We don't own these, only replicate */ - enum lookup_state state; - pthread_cond_t cond; - pthread_mutex_t lock; + struct { + struct list_head list; + size_t len; + } lvals; /* We own these, must be republished */ }; -struct val { +struct contact { struct list_head next; + uint8_t * id; uint64_t addr; - time_t t_exp; - time_t t_rep; + size_t fails; + time_t t_seen; }; -struct ref_entry { +struct peer_entry { struct list_head next; - uint8_t * key; + uint64_t cookie; + uint8_t * id; + uint64_t addr; + enum dht_code code; - time_t t_rep; + time_t t_sent; }; -struct dht_entry { +struct dht_req { struct list_head next; uint8_t * key; - size_t n_vals; - struct list_head vals; -}; - -struct contact { - struct list_head next; + time_t t_exp; - uint8_t * id; - uint64_t addr; + struct { + struct list_head list; + size_t len; + } peers; - size_t fails; - time_t t_seen; + struct { + struct list_head list; + size_t len; + } cache; }; struct bucket { - struct list_head contacts; - size_t n_contacts; + struct { + struct list_head list; + size_t len; + } contacts; - struct list_head alts; - size_t n_alts; + struct { + struct list_head list; + size_t len; + } alts; time_t t_refr; @@ -203,395 +215,354 @@ struct bucket { uint8_t mask; struct bucket * parent; - struct bucket * children[1L << KAD_BETA]; + struct bucket * children[1L << DHT_BETA]; }; struct cmd { - struct list_head next; - - struct shm_du_buff * sdb; + struct list_head next; + buffer_t cbuf; }; struct dir_ops dht_dir_ops = { - .create = dht_create, - .destroy = dht_destroy, - .bootstrap = dht_bootstrap, - .reg = dht_reg, - .unreg = dht_unreg, - .query = dht_query, - .wait_running = dht_wait_running + .init = (int (*)(void *)) dht_init, + .fini = dht_fini, + .start = dht_start, + .stop = dht_stop, + .reg = dht_reg, + .unreg = dht_unreg, + .query = dht_query }; -struct dht { - size_t alpha; - size_t b; - size_t k; - - time_t t_expire; - time_t t_refresh; - time_t t_replic; - time_t t_repub; - - uint8_t * id; - uint64_t addr; - - struct bucket * buckets; - - struct list_head entries; +struct { + struct { /* Kademlia parameters */ + uint32_t alpha; /* Number of concurrent requests */ + size_t k; /* Number of replicas to store */ + time_t t_expire; /* Expiry time for values (s) */ + time_t t_refresh; /* Refresh time for contacts (s) */ + time_t t_repl; /* Replication time for values (s) */ + }; - struct list_head refs; + buffer_t id; - struct list_head lookups; + time_t t0; /* Creation time */ + uint64_t addr; /* Our own address */ + uint64_t peer; /* Enrollment peer address */ + uint64_t magic; /* Magic cookie for retransmit */ - struct list_head requests; - struct bmp * cookies; + uint64_t eid; /* Entity ID */ - enum dht_state state; - struct list_head cmds; - pthread_cond_t cond; - pthread_mutex_t mtx; + struct tpm * tpm; + pthread_t worker; - pthread_rwlock_t lock; - - uint64_t eid; - - struct tpm * tpm; - - pthread_t worker; -}; + enum dht_state state; -struct join_info { - struct dht * dht; - uint64_t addr; + struct { + struct { + struct bucket * root; + } contacts; + + struct { + struct list_head list; + size_t len; + size_t vals; + size_t lvals; + } kv; + + pthread_rwlock_t lock; + } db; + + struct { + struct list_head list; + size_t len; + pthread_cond_t cond; + pthread_mutex_t mtx; + } reqs; + + struct { + struct list_head list; + pthread_cond_t cond; + pthread_mutex_t mtx; + } cmds; +} dht; + + +/* DHT RIB */ + +static const char * dht_dir[] = { + "database", + "stats", + NULL }; -struct packet_info { - struct dht * dht; - struct shm_du_buff * sdb; -}; +const char * dht_stats = \ + "DHT: " HASH_FMT64 "\n" + " Created: %s\n" + " Address: " ADDR_FMT32 "\n" + " Kademlia parameters:\n" + " Number of concurrent requests (alpha): %10zu\n" + " Number of replicas (k): %10zu\n" + " Expiry time for values (s): %10ld\n" + " Refresh time for contacts (s): %10ld\n" + " Replication time for values (s): %10ld\n" + " Number of keys: %10zu\n" + " Number of local values: %10zu\n" + " Number of non-local values: %10zu\n"; -static uint8_t * dht_dup_key(const uint8_t * key, - size_t len) +static int dht_rib_statfile(char * buf, + size_t len) { - uint8_t * dup; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + size_t keys; + size_t vals; + size_t lvals; - dup = malloc(sizeof(*dup) * len); - if (dup == NULL) - return NULL; + assert(buf != NULL); + assert(len > 0); - memcpy(dup, key, len); - - return dup; -} + pthread_rwlock_rdlock(&dht.db.lock); -static enum dht_state dht_get_state(struct dht * dht) -{ - enum dht_state state; + keys = dht.db.kv.len; + lvals = dht.db.kv.lvals; + vals = dht.db.kv.vals; - pthread_mutex_lock(&dht->mtx); + pthread_rwlock_unlock(&dht.db.lock); - state = dht->state; + tm = gmtime(&dht.t0); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); - pthread_mutex_unlock(&dht->mtx); + snprintf(buf, len, dht_stats, + HASH_VAL64(dht.id.data), + tmstr, + ADDR_VAL32(&dht.addr), + dht.alpha, dht.k, + dht.t_expire, dht.t_refresh, dht.t_repl, + keys, vals, lvals); - return state; + return strlen(buf); } -static int dht_set_state(struct dht * dht, - enum dht_state state) +static size_t dht_db_file_len(void) { - pthread_mutex_lock(&dht->mtx); - - if (state == DHT_JOINING && dht->state != DHT_INIT) { - pthread_mutex_unlock(&dht->mtx); - return -1; - } - - dht->state = state; - - pthread_cond_broadcast(&dht->cond); - - pthread_mutex_unlock(&dht->mtx); + size_t sz; + size_t vals; - return 0; -} - -int dht_wait_running(void * dir) -{ - struct dht * dht; - int ret = 0; + sz = 18; /* DHT database + 2 * \n */ - dht = (struct dht *) dir; + pthread_rwlock_rdlock(&dht.db.lock); - pthread_mutex_lock(&dht->mtx); + if (dht.db.kv.len == 0) { + pthread_rwlock_unlock(&dht.db.lock); + sz += 14; /* No entries */ + return sz; + } - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + sz += 39 * 3 + 1; /* tally + extra newline */ + sz += dht.db.kv.len * (25 + 19 + 23 + 1); - while (dht->state == DHT_JOINING) - pthread_cond_wait(&dht->cond, &dht->mtx); + vals = dht.db.kv.vals + dht.db.kv.lvals; - if (dht->state != DHT_RUNNING) - ret = -1; + sz += vals * (48 + 2 * RIB_TM_STRLEN); - pthread_cleanup_pop(true); + pthread_rwlock_unlock(&dht.db.lock); - return ret; + return sz; } -static uint8_t * create_id(size_t len) +static int dht_rib_dbfile(char * buf, + size_t len) { - uint8_t * id; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + char exstr[RIB_TM_STRLEN]; + size_t i = 0; + struct list_head * p; - id = malloc(len); - if (id == NULL) - return NULL; + assert(buf != NULL); + assert(len > 0); - if (random_buffer(id, len) < 0) { - free(id); - return NULL; + pthread_rwlock_rdlock(&dht.db.lock); + + if (dht.db.kv.len == 0) { + i += snprintf(buf, len, " No entries.\n"); + pthread_rwlock_unlock(&dht.db.lock); + return i; } - return id; -} + i += snprintf(buf + i, len - i, "DHT database:\n\n"); + i += snprintf(buf + i, len - i, + "Number of keys: %10zu\n" + "Number of local values: %10zu\n" + "Number of non-local values: %10zu\n\n", + dht.db.kv.len, dht.db.kv.vals, dht.db.kv.lvals); -static void kad_req_create(struct dht * dht, - dht_msg_t * msg, - uint64_t addr) -{ - struct kad_req * req; - pthread_condattr_t cattr; - struct timespec t; - size_t b; + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + struct list_head * h; - clock_gettime(CLOCK_REALTIME_COARSE, &t); + i += snprintf(buf + i, len - i, "Key: " KEY_FMT "\n", + KEY_VAL(e->key)); + i += snprintf(buf + i, len - i, " Local entries:\n"); - req = malloc(sizeof(*req)); - if (req == NULL) - goto fail_malloc; + list_for_each(h, &e->vals.list) { + struct val_entry * v; - list_head_init(&req->next); + v = list_entry(h, struct val_entry, next); - req->t_exp = t.tv_sec + KAD_T_RESP; - req->addr = addr; - req->state = REQ_INIT; - req->cookie = msg->cookie; - req->code = msg->code; - req->key = NULL; + tm = gmtime(&v->t_repl); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); - pthread_rwlock_rdlock(&dht->lock); - b = dht->b; - pthread_rwlock_unlock(&dht->lock); + tm = gmtime(&v->t_exp); + strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); - if (msg->has_key) { - req->key = dht_dup_key(msg->key.data, b); - if (req->key == NULL) - goto fail_dup_key; - } + i += snprintf(buf + i, len - i, + " " VAL_FMT + ", t_replicated=%.*s, t_expire=%.*s\n", + VAL_VAL(v->val), + RIB_TM_STRLEN, tmstr, + RIB_TM_STRLEN, exstr); + } - if (pthread_mutex_init(&req->lock, NULL)) - goto fail_mutex; + i += snprintf(buf + i, len - i, "\n"); + i += snprintf(buf + i, len - i, " Non-local entries:\n"); - if (pthread_condattr_init(&cattr)) - goto fail_condattr; -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif + list_for_each(h, &e->lvals.list) { + struct val_entry * v; - if (pthread_cond_init(&req->cond, &cattr)) - goto fail_cond_init; + v= list_entry(h, struct val_entry, next); - pthread_condattr_destroy(&cattr); + tm = gmtime(&v->t_repl); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); - pthread_rwlock_wrlock(&dht->lock); + tm = gmtime(&v->t_exp); + strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); - list_add(&req->next, &dht->requests); + i += snprintf(buf + i, len - i, + " " VAL_FMT + ", t_replicated=%.*s, t_expire=%.*s\n", + VAL_VAL(v->val), + RIB_TM_STRLEN, tmstr, + RIB_TM_STRLEN, exstr); - pthread_rwlock_unlock(&dht->lock); + } + } - return; + pthread_rwlock_unlock(&dht.db.lock); - fail_cond_init: - pthread_condattr_destroy(&cattr); - fail_condattr: - pthread_mutex_destroy(&req->lock); - fail_mutex: - free(req->key); - fail_dup_key: - free(req); - fail_malloc: - return; + printf("DHT RIB DB file generated (%zu bytes).\n", i); + + return i; } -static void cancel_req_destroy(void * o) +static int dht_rib_read(const char * path, + char * buf, + size_t len) { - struct kad_req * req = (struct kad_req *) o; - - pthread_mutex_unlock(&req->lock); + char * entry; - pthread_cond_destroy(&req->cond); - pthread_mutex_destroy(&req->lock); + entry = strstr(path, RIB_SEPARATOR) + 1; - if (req->key != NULL) - free(req->key); + if (strcmp(entry, "database") == 0) { + return dht_rib_dbfile(buf, len); + } else if (strcmp(entry, "stats") == 0) { + return dht_rib_statfile(buf, len); + } - free(req); + return 0; } -static void kad_req_destroy(struct kad_req * req) +static int dht_rib_readdir(char *** buf) { - struct timespec t; - struct timespec intv = TIMESPEC_INIT_S(20); + int i = 0; - clock_gettime(PTHREAD_COND_CLOCK, &t); - ts_add(&t, &intv, &t); + while (dht_dir[i++] != NULL); - assert(req); + *buf = malloc(sizeof(**buf) * i); + if (*buf == NULL) + goto fail_buf; - pthread_mutex_lock(&req->lock); + i = 0; - switch (req->state) { - case REQ_DESTROY: - pthread_mutex_unlock(&req->lock); - return; - case REQ_PENDING: - req->state = REQ_DESTROY; - pthread_cond_broadcast(&req->cond); - break; - case REQ_INIT: - case REQ_DONE: - req->state = REQ_NULL; - break; - case REQ_RESPONSE: - case REQ_NULL: - default: - break; + while (dht_dir[i] != NULL) { + (*buf)[i] = strdup(dht_dir[i]); + if ((*buf)[i] == NULL) + goto fail_dup; + i++; } - pthread_cleanup_push(cancel_req_destroy, req); - - while (req->state != REQ_NULL && req->state != REQ_DONE) - pthread_cond_timedwait(&req->cond, &req->lock, &t); - - pthread_cleanup_pop(true); + return i; + fail_dup: + freepp(char, *buf, i); + fail_buf: + return -ENOMEM; } -static int kad_req_wait(struct kad_req * req, - time_t t) +static int dht_rib_getattr(const char * path, + struct rib_attr * attr) { - struct timespec timeo = TIMESPEC_INIT_S(0); - struct timespec abs; - int ret = 0; - - assert(req); - - timeo.tv_sec = t; - - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - ts_add(&abs, &timeo, &abs); + struct timespec now; + char * entry; - pthread_mutex_lock(&req->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - req->state = REQ_PENDING; + attr->mtime = now.tv_sec; - pthread_cleanup_push(__cleanup_mutex_unlock, &req->lock); + entry = strstr(path, RIB_SEPARATOR) + 1; - while (req->state == REQ_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); - - switch(req->state) { - case REQ_DESTROY: - ret = -1; - req->state = REQ_NULL; - pthread_cond_broadcast(&req->cond); - break; - case REQ_PENDING: /* ETIMEDOUT */ - case REQ_RESPONSE: - req->state = REQ_DONE; - pthread_cond_broadcast(&req->cond); - break; - default: - break; + if (strcmp(entry, "database") == 0) { + attr->size = dht_db_file_len(); + } else if (strcmp(entry, "stats") == 0) { + attr->size = 545; } - pthread_cleanup_pop(true); - - return ret; + return 0; } -static void kad_req_respond(struct kad_req * req) -{ - pthread_mutex_lock(&req->lock); - - req->state = REQ_RESPONSE; - pthread_cond_broadcast(&req->cond); +static struct rib_ops r_ops = { + .read = dht_rib_read, + .readdir = dht_rib_readdir, + .getattr = dht_rib_getattr +}; - pthread_mutex_unlock(&req->lock); -} +/* Helper functions */ -static struct contact * contact_create(const uint8_t * id, - size_t len, - uint64_t addr) +static uint8_t * generate_id(void) { - struct contact * c; - struct timespec t; + uint8_t * id; - c = malloc(sizeof(*c)); - if (c == NULL) + if(dht.id.len < sizeof(uint64_t)) { + log_err("DHT ID length is too short (%zu < %zu).", + dht.id.len, sizeof(uint64_t)); return NULL; + }; - list_head_init(&c->next); - - clock_gettime(CLOCK_REALTIME_COARSE, &t); - - c->addr = addr; - c->fails = 0; - c->t_seen = t.tv_sec; - c->id = dht_dup_key(id, len); - if (c->id == NULL) { - free(c); - return NULL; + id = malloc(dht.id.len); + if (id == NULL) { + log_err("Failed to malloc ID."); + goto fail_id; } - return c; -} - -static void contact_destroy(struct contact * c) -{ - if (c != NULL) - free(c->id); + if (random_buffer(id, dht.id.len) < 0) { + log_err("Failed to generate random ID."); + goto fail_rnd; + } - free(c); + return id; + fail_rnd: + free(id); + fail_id: + return NULL; } -static struct bucket * iter_bucket(struct bucket * b, - const uint8_t * id) +static uint64_t generate_cookie(void) { - uint8_t byte; - uint8_t mask; - - assert(b); - - if (b->children[0] == NULL) - return b; - - byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; - - mask = ((1L << KAD_BETA) - 1) & 0xFF; - - byte >>= (CHAR_BIT - KAD_BETA) - - (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + uint64_t cookie = DHT_INVALID; - return iter_bucket(b->children[(byte & mask)], id); -} - -static struct bucket * dht_get_bucket(struct dht * dht, - const uint8_t * id) -{ - assert(dht->buckets); + while (cookie == DHT_INVALID) + random_buffer((uint8_t *) &cookie, sizeof(cookie)); - return iter_bucket(dht->buckets, id); + return cookie; } /* @@ -601,719 +572,1129 @@ static struct bucket * dht_get_bucket(struct dht * dht, static uint64_t dist(const uint8_t * src, const uint8_t * dst) { + assert(dht.id.len >= sizeof(uint64_t)); + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); } -static size_t list_add_sorted(struct list_head * l, - struct contact * c, - const uint8_t * key) +#define IS_CLOSER(x, y) (dist((x), dht.id.data) < dist((y), dht.id.data)) + +static int addr_to_buf(const uint64_t addr, + buffer_t * buf) { - struct list_head * p; + size_t len; + uint64_t _addr; - assert(l); - assert(c); - assert(key); - assert(c->id); + len = sizeof(addr); + _addr = hton64(addr); - list_for_each(p, l) { - struct contact * e = list_entry(p, struct contact, next); - if (dist(c->id, key) > dist(e->id, key)) - break; - } + assert(buf != NULL); - list_add_tail(&c->next, p); + buf->data = malloc(len); + if (buf->data == NULL) + goto fail_malloc; + + buf->len = sizeof(_addr); + memcpy(buf->data, &_addr, sizeof(_addr)); - return 1; + return 0; + fail_malloc: + return -ENOMEM; } -static size_t dht_contact_list(struct dht * dht, - struct list_head * l, - const uint8_t * key) +static int buf_to_addr(const buffer_t buf, + uint64_t * addr) { - struct list_head * p; - struct bucket * b; - size_t len = 0; - size_t i; - struct timespec t; + assert(addr != NULL); + assert(buf.data != NULL); - assert(l); - assert(dht); - assert(key); - assert(list_is_empty(l)); + if (buf.len != sizeof(*addr)) + return - EINVAL; - clock_gettime(CLOCK_REALTIME_COARSE, &t); + *addr = ntoh64(*((uint64_t *) buf.data)); - b = dht_get_bucket(dht, key); - if (b == NULL) - return 0; + if (*addr == dht.addr) + *addr = INVALID_ADDR; - b->t_refr = t.tv_sec + KAD_T_REFR; + return 0; +} - if (b->n_contacts == dht->k || b->parent == NULL) { - list_for_each(p, &b->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); - if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) - break; - } - } else { - struct bucket * d = b->parent; - for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { - list_for_each(p, &d->children[i]->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - c = contact_create(c->id, dht->b, c->addr); - if (c == NULL) - continue; - if (list_add_sorted(l, c, key) == 1) - if (++len == dht->k) - break; - } - } - } +static uint8_t * dht_dup_key(const uint8_t * key) +{ + uint8_t * dup; - assert(len == dht->k || b->parent == NULL); + assert(key != NULL); + assert(dht.id.len != 0); - return len; -} + dup = malloc(dht.id.len); + if (dup == NULL) + return NULL; -static struct lookup * lookup_create(struct dht * dht, - const uint8_t * id) -{ - struct lookup * lu; - pthread_condattr_t cattr; + memcpy(dup, key, dht.id.len); - assert(dht); - assert(id); + return dup; +} - lu = malloc(sizeof(*lu)); - if (lu == NULL) - goto fail_malloc; +/* DHT */ - list_head_init(&lu->contacts); - list_head_init(&lu->cookies); +static struct val_entry * val_entry_create(const buffer_t val, + time_t exp) +{ + struct val_entry * e; + struct timespec now; - lu->state = LU_INIT; - lu->addrs = NULL; - lu->n_addrs = 0; - lu->key = dht_dup_key(id, dht->b); - if (lu->key == NULL) - goto fail_id; + assert(val.data != NULL); + assert(val.len > 0); - if (pthread_mutex_init(&lu->lock, NULL)) - goto fail_mutex; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_condattr_init(&cattr); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#ifndef __DHT_TEST_ALLOW_EXPIRED__ + if (exp < now.tv_sec) + return NULL; /* Refuse to add expired values */ #endif + e = malloc(sizeof(*e)); + if (e == NULL) + goto fail_entry; - if (pthread_cond_init(&lu->cond, &cattr)) - goto fail_cond; + list_head_init(&e->next); - pthread_condattr_destroy(&cattr); + e->val.len = val.len; + e->val.data = malloc(val.len); + if (e->val.data == NULL) + goto fail_val; - pthread_rwlock_wrlock(&dht->lock); + memcpy(e->val.data, val.data, val.len); - list_add(&lu->next, &dht->lookups); + e->t_repl = 0; + e->t_exp = exp; - lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + return e; - pthread_rwlock_unlock(&dht->lock); + fail_val: + free(e); + fail_entry: + return NULL; +} - return lu; +static void val_entry_destroy(struct val_entry * v) +{ + assert(v->val.data != NULL); - fail_cond: - pthread_condattr_destroy(&cattr); - pthread_mutex_destroy(&lu->lock); - fail_mutex: - free(lu->key); - fail_id: - free(lu); - fail_malloc: - return NULL; + freebuf(v->val); + free(v); } -static void cancel_lookup_destroy(void * o) +static struct dht_entry * dht_entry_create(const uint8_t * key) { - struct lookup * lu; - struct list_head * p; - struct list_head * h; + struct dht_entry * e; - lu = (struct lookup *) o; + assert(key != NULL); - if (lu->key != NULL) - free(lu->key); - if (lu->addrs != NULL) - free(lu->addrs); + e = malloc(sizeof(*e)); + if (e == NULL) + goto fail_entry; - list_for_each_safe(p, h, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - list_del(&c->next); - contact_destroy(c); - } + list_head_init(&e->next); + list_head_init(&e->vals.list); + list_head_init(&e->lvals.list); - list_for_each_safe(p, h, &lu->cookies) { - struct cookie_el * c = list_entry(p, struct cookie_el, next); - list_del(&c->next); - free(c); - } + e->vals.len = 0; + e->lvals.len = 0; - pthread_mutex_unlock(&lu->lock); + e->key = dht_dup_key(key); + if (e->key == NULL) + goto fail_key; - pthread_mutex_destroy(&lu->lock); - - free(lu); + return e; + fail_key: + free(e); + fail_entry: + return NULL; } -static void lookup_destroy(struct lookup * lu) +static void dht_entry_destroy(struct dht_entry * e) { - assert(lu); + struct list_head * p; + struct list_head * h; - pthread_mutex_lock(&lu->lock); + assert(e != NULL); - switch (lu->state) { - case LU_DESTROY: - pthread_mutex_unlock(&lu->lock); - return; - case LU_PENDING: - lu->state = LU_DESTROY; - pthread_cond_broadcast(&lu->cond); - break; - case LU_INIT: - case LU_UPDATE: - case LU_COMPLETE: - lu->state = LU_NULL; - break; - case LU_NULL: - default: - break; + list_for_each_safe(p, h, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + --e->vals.len; + --dht.db.kv.vals; + } + + list_for_each_safe(p, h, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + --e->lvals.len; + --dht.db.kv.lvals; } - pthread_cleanup_push(cancel_lookup_destroy, lu); + free(e->key); - while (lu->state != LU_NULL) - pthread_cond_wait(&lu->cond, &lu->lock); + assert(e->vals.len == 0 && e->lvals.len == 0); - pthread_cleanup_pop(true); + free(e); } -static void lookup_update(struct dht * dht, - struct lookup * lu, - dht_msg_t * msg) +static struct val_entry * dht_entry_get_lval(const struct dht_entry * e, + const buffer_t val) { - struct list_head * p = NULL; - struct list_head * h; - struct contact * c = NULL; - size_t n; - size_t pos = 0; - bool mod = false; - - assert(lu); - assert(msg); - - if (dht_get_state(dht) != DHT_RUNNING) - return; + struct list_head * p; - pthread_mutex_lock(&lu->lock); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - list_for_each_safe(p, h, &lu->cookies) { - struct cookie_el * e = list_entry(p, struct cookie_el, next); - if (e->cookie == msg->cookie) { - list_del(&e->next); - free(e); - break; - } + list_for_each(p, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (bufcmp(&v->val, &val) == 0) + return v; } - if (lu->state == LU_COMPLETE) { - pthread_mutex_unlock(&lu->lock); - return; - } + return NULL; +} - if (msg->n_addrs > 0) { - if (lu->addrs == NULL) { - lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); - for (n = 0; n < msg->n_addrs; ++n) - lu->addrs[n] = msg->addrs[n]; - lu->n_addrs = msg->n_addrs; - } +static struct val_entry * dht_entry_get_val(const struct dht_entry * e, + const buffer_t val) +{ + struct list_head * p; - lu->state = LU_COMPLETE; - pthread_cond_broadcast(&lu->cond); - pthread_mutex_unlock(&lu->lock); - return; - } + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); + list_for_each(p, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (bufcmp(&v->val, &val) == 0) + return v; - while (lu->state == LU_INIT) { - pthread_rwlock_unlock(&dht->lock); - pthread_cond_wait(&lu->cond, &lu->lock); - pthread_rwlock_rdlock(&dht->lock); } - pthread_cleanup_pop(false); + return NULL; +} - for (n = 0; n < msg->n_contacts; ++n) { - c = contact_create(msg->contacts[n]->id.data, - dht->b, msg->contacts[n]->addr); - if (c == NULL) - continue; +static int dht_entry_update_val(struct dht_entry * e, + buffer_t val, + time_t exp) +{ + struct val_entry * v; + struct timespec now; - pos = 0; + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - list_for_each(p, &lu->contacts) { - struct contact * e; - e = list_entry(p, struct contact, next); - if (!memcmp(e->id, c->id, dht->b)) { - contact_destroy(c); - c = NULL; - break; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (dist(c->id, lu->key) > dist(e->id, lu->key)) - break; + if (exp < now.tv_sec) + return -EINVAL; /* Refuse to add expired values */ - pos++; - } + if (dht_entry_get_lval(e, val) != NULL) { + log_dbg(KV_FMT " Val already in lvals.", KV_VAL(e->key, val)); + return 0; /* Refuse to add local values */ + } - if (c == NULL) - continue; + v = dht_entry_get_val(e, val); + if (v == NULL) { + v = val_entry_create(val, exp); + if (v == NULL) + return -ENOMEM; - if (lu->n_contacts < dht->k) { - list_add_tail(&c->next, p); - ++lu->n_contacts; - mod = true; - } else if (pos == dht->k) { - contact_destroy(c); - } else { - struct contact * d; - d = list_last_entry(&lu->contacts, - struct contact, next); - list_add_tail(&c->next, p); - list_del(&d->next); - contact_destroy(d); - mod = true; - } + list_add_tail(&v->next, &e->vals.list); + ++e->vals.len; + ++dht.db.kv.vals; + + return 0; } - if (list_is_empty(&lu->cookies) && !mod) - lu->state = LU_COMPLETE; - else - lu->state = LU_UPDATE; + if (v->t_exp < exp) + v->t_exp = exp; - pthread_cond_broadcast(&lu->cond); - pthread_mutex_unlock(&lu->lock); - return; + return 0; } -static ssize_t lookup_get_addrs(struct lookup * lu, - uint64_t * addrs) +static int dht_entry_update_lval(struct dht_entry * e, + buffer_t val) { - ssize_t n; + struct val_entry * v; + struct timespec now; - assert(lu); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - pthread_mutex_lock(&lu->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - for (n = 0; (size_t) n < lu->n_addrs; ++n) - addrs[n] = lu->addrs[n]; + v = dht_entry_get_lval(e, val); + if (v == NULL) { + log_dbg(KV_FMT " Adding lval.", KV_VAL(e->key, val)); + v = val_entry_create(val, now.tv_sec + dht.t_expire); + if (v == NULL) + return -ENOMEM; - assert((size_t) n == lu->n_addrs); + list_add_tail(&v->next, &e->lvals.list); + ++e->lvals.len; + ++dht.db.kv.lvals; - pthread_mutex_unlock(&lu->lock); + return 0; + } - return n; + return 0; } -static ssize_t lookup_contact_addrs(struct lookup * lu, - uint64_t * addrs) +static int dht_entry_remove_lval(struct dht_entry * e, + buffer_t val) { - struct list_head * p; - ssize_t n = 0; + struct val_entry * v; - assert(lu); - assert(addrs); + assert(e != NULL); + assert(val.data != NULL); + assert(val.len > 0); - pthread_mutex_lock(&lu->lock); + v = dht_entry_get_lval(e, val); + if (v == NULL) + return -ENOENT; - list_for_each(p, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - addrs[n] = c->addr; - n++; - } + log_dbg(KV_FMT " Removing lval.", KV_VAL(e->key, val)); - pthread_mutex_unlock(&lu->lock); + list_del(&v->next); + val_entry_destroy(v); + --e->lvals.len; + --dht.db.kv.lvals; - return n; + return 0; } -static void lookup_new_addrs(struct lookup * lu, - uint64_t * addrs) +#define IS_EXPIRED(v, now) ((now)->tv_sec > (v)->t_exp) +static void dht_entry_remove_expired_vals(struct dht_entry * e) { struct list_head * p; - size_t n = 0; + struct list_head * h; + struct timespec now; - assert(lu); - assert(addrs); + assert(e != NULL); - pthread_mutex_lock(&lu->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - /* Uses fails to check if the contact has been contacted. */ - list_for_each(p, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - if (c->fails == 0) { - c->fails = 1; - addrs[n] = c->addr; - n++; - } + list_for_each_safe(p, h, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (!IS_EXPIRED(v, &now)) + continue; - if (n == KAD_ALPHA) - break; + log_dbg(KV_FMT " Value expired." , KV_VAL(e->key, v->val)); + list_del(&v->next); + val_entry_destroy(v); + --e->vals.len; + --dht.db.kv.vals; } - - assert(n <= KAD_ALPHA); - - addrs[n] = 0; - - pthread_mutex_unlock(&lu->lock); } -static void lookup_set_state(struct lookup * lu, - enum lookup_state state) +static struct dht_entry * __dht_kv_find_entry(const uint8_t * key) { - pthread_mutex_lock(&lu->lock); + struct list_head * p; - lu->state = state; - pthread_cond_broadcast(&lu->cond); + assert(key != NULL); - pthread_mutex_unlock(&lu->lock); -} + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht.id.len)) + return e; + } -static void cancel_lookup_wait(void * o) -{ - struct lookup * lu = (struct lookup *) o; - lu->state = LU_NULL; - pthread_mutex_unlock(&lu->lock); - lookup_destroy(lu); + return NULL; } -static enum lookup_state lookup_wait(struct lookup * lu) +static void dht_kv_remove_expired_entries(void) { - struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP); - struct timespec abs; - enum lookup_state state; - int ret = 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - ts_add(&abs, &timeo, &abs); - - pthread_mutex_lock(&lu->lock); + struct list_head * p; + struct list_head * h; + struct timespec now; - if (lu->state == LU_INIT || lu->state == LU_UPDATE) - lu->state = LU_PENDING; + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_cleanup_push(cancel_lookup_wait, lu); + pthread_rwlock_wrlock(&dht.db.lock); - while (lu->state == LU_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); + list_for_each_safe(p, h, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + dht_entry_remove_expired_vals(e); + if (e->lvals.len > 0 || e->vals.len > 0) + continue; - pthread_cleanup_pop(false); + log_dbg(KEY_FMT " Entry removed. ", KEY_VAL(e->key)); + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } - if (ret == -ETIMEDOUT) - lu->state = LU_COMPLETE; + pthread_rwlock_unlock(&dht.db.lock); +} - state = lu->state; - pthread_mutex_unlock(&lu->lock); +static struct contact * contact_create(const uint8_t * id, + uint64_t addr) +{ + struct contact * c; + struct timespec t; - return state; -} + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; -static struct kad_req * dht_find_request(struct dht * dht, - dht_msg_t * msg) -{ - struct list_head * p; + list_head_init(&c->next); - assert(dht); - assert(msg); + clock_gettime(CLOCK_REALTIME_COARSE, &t); - list_for_each(p, &dht->requests) { - struct kad_req * r = list_entry(p, struct kad_req, next); - if (r->cookie == msg->cookie) - return r; + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id); + if (c->id == NULL) { + free(c); + return NULL; } - return NULL; + return c; } -static struct lookup * dht_find_lookup(struct dht * dht, - uint32_t cookie) +static void contact_destroy(struct contact * c) { - struct list_head * p; - struct list_head * p2; - struct list_head * h2; - - assert(dht); - assert(cookie > 0); - - list_for_each(p, &dht->lookups) { - struct lookup * l = list_entry(p, struct lookup, next); - pthread_mutex_lock(&l->lock); - list_for_each_safe(p2, h2, &l->cookies) { - struct cookie_el * e; - e = list_entry(p2, struct cookie_el, next); - if (e->cookie == cookie) { - list_del(&e->next); - free(e); - pthread_mutex_unlock(&l->lock); - return l; - } - } - pthread_mutex_unlock(&l->lock); - } + assert(c != NULL); + assert(list_is_empty(&c->next)); - return NULL; + free(c->id); + free(c); } -static struct val * val_create(uint64_t addr, - time_t exp) +static struct dht_req * dht_req_create(const uint8_t * key) { - struct val * v; - struct timespec t; + struct dht_req * req; + struct timespec now; - v = malloc(sizeof(*v)); - if (v == NULL) - return NULL; + assert(key != NULL); - list_head_init(&v->next); - v->addr = addr; + clock_gettime(PTHREAD_COND_CLOCK, &now); - clock_gettime(CLOCK_REALTIME_COARSE, &t); + req = malloc(sizeof(*req)); + if (req == NULL) + goto fail_malloc; - v->t_exp = t.tv_sec + exp; - v->t_rep = t.tv_sec + KAD_T_REPL; + list_head_init(&req->next); - return v; -} + req->t_exp = now.tv_sec + DHT_T_RESP; -static void val_destroy(struct val * v) -{ - assert(v); + list_head_init(&req->peers.list); + req->peers.len = 0; - free(v); + req->key = dht_dup_key(key); + if (req->key == NULL) + goto fail_dup_key; + + list_head_init(&req->cache.list); + req->cache.len = 0; + + return req; + + fail_dup_key: + free(req); + fail_malloc: + return NULL; } -static struct ref_entry * ref_entry_create(struct dht * dht, - const uint8_t * key) +static void dht_req_destroy(struct dht_req * req) { - struct ref_entry * e; - struct timespec t; - - assert(dht); - assert(key); + struct list_head * p; + struct list_head * h; - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; + assert(req); + assert(req->key); - e->key = dht_dup_key(key, dht->b); - if (e->key == NULL) { + list_for_each_safe(p, h, &req->peers.list) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); free(e); - return NULL; + --req->peers.len; } - clock_gettime(CLOCK_REALTIME_COARSE, &t); + list_for_each_safe(p, h, &req->cache.list) { + struct val_entry * e = list_entry(p, struct val_entry, next); + list_del(&e->next); + val_entry_destroy(e); + --req->cache.len; + } + + free(req->key); - e->t_rep = t.tv_sec + dht->t_repub; + assert(req->peers.len == 0); - return e; + free(req); } -static void ref_entry_destroy(struct ref_entry * e) +static struct peer_entry * dht_req_get_peer(struct dht_req * req, + struct peer_entry * e) { - free(e->key); - free(e); + struct list_head * p; + + list_for_each(p, &req->peers.list) { + struct peer_entry * x = list_entry(p, struct peer_entry, next); + if (x->addr == e->addr) + return x; + } + + return NULL; } -static struct dht_entry * dht_entry_create(struct dht * dht, - const uint8_t * key) +#define IS_MAGIC(peer) ((peer)->cookie == dht.magic) +void dht_req_add_peer(struct dht_req * req, + struct peer_entry * e) { - struct dht_entry * e; + struct peer_entry * x; /* existing */ + struct list_head * p; /* iterator */ + size_t pos = 0; - assert(dht); - assert(key); + assert(req != NULL); + assert(e != NULL); + assert(e->id != NULL); - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; + /* + * Dedupe messages to the same peer, unless + * 1) The previous request was FIND_NODE and now it's FIND_VALUE + * 2) We urgently need contacts from emergency peer (magic cookie) + */ + x = dht_req_get_peer(req, e); + if (x != NULL && x->code >= e->code && !IS_MAGIC(e)) + goto skip; - list_head_init(&e->next); - list_head_init(&e->vals); + /* Find how this contact ranks in distance to the key */ + list_for_each(p, &req->peers.list) { + struct peer_entry * y = list_entry(p, struct peer_entry, next); + if (IS_CLOSER(y->id, e->id)) { + pos++; + continue; + } + break; + } - e->n_vals = 0; + /* Add a new peer to this request if we need to */ + if (pos < dht.alpha || !IS_MAGIC(e)) { + x = malloc(sizeof(*x)); + if (x == NULL) { + log_err("Failed to malloc peer entry."); + goto skip; + } - e->key = dht_dup_key(key, dht->b); - if (e->key == NULL) { - free(e); - return NULL; - } + x->cookie = e->cookie; + x->addr = e->addr; + x->code = e->code; + x->t_sent = e->t_sent; + x->id = dht_dup_key(e->id); + if (x->id == NULL) { + log_err("Failed to dup peer ID."); + free(x); + goto skip; + } - return e; + if (IS_MAGIC(e)) + list_add(&x->next, p); + else + list_add_tail(&x->next, p); + ++req->peers.len; + return; + } + skip: + list_del(&e->next); + free(e->id); + free(e); } -static void dht_entry_destroy(struct dht_entry * e) +static size_t dht_req_add_peers(struct dht_req * req, + struct list_head * pl) { - struct list_head * p; - struct list_head * h; + struct list_head * p; + struct list_head * h; + size_t n = 0; - assert(e); + assert(req != NULL); + assert(pl != NULL); - list_for_each_safe(p, h, &e->vals) { - struct val * v = list_entry(p, struct val, next); - list_del(&v->next); - val_destroy(v); + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + dht_req_add_peer(req, e); } - free(e->key); - - free(e); + return n; } -static int dht_entry_add_addr(struct dht_entry * e, - uint64_t addr, - time_t exp) +static bool dht_req_has_peer(struct dht_req * req, + uint64_t cookie) { struct list_head * p; - struct val * val; - struct timespec t; - clock_gettime(CLOCK_REALTIME_COARSE, &t); - - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr == addr) { - if (v->t_exp < t.tv_sec + exp) { - v->t_exp = t.tv_sec + exp; - v->t_rep = t.tv_sec + KAD_T_REPL; - } + assert(req != NULL); - return 0; - } + list_for_each(p, &req->peers.list) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + if (e->cookie == cookie) + return true; } - val = val_create(addr, exp); - if (val == NULL) - return -ENOMEM; - - list_add(&val->next, &e->vals); - ++e->n_vals; - - return 0; + return false; } - -static void dht_entry_del_addr(struct dht_entry * e, - uint64_t addr) +static void peer_list_destroy(struct list_head * pl) { struct list_head * p; struct list_head * h; - assert(e); + assert(pl != NULL); - list_for_each_safe(p, h, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr == addr) { - list_del(&v->next); - val_destroy(v); - --e->n_vals; - } - } - - if (e->n_vals == 0) { + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); list_del(&e->next); - dht_entry_destroy(e); + free(e->id); + free(e); } } -static uint64_t dht_entry_get_addr(struct dht * dht, - struct dht_entry * e) +static int dht_kv_create_peer_list(struct list_head * cl, + struct list_head * pl, + enum dht_code code) { - struct list_head * p; + struct list_head * p; + struct list_head * h; + struct timespec now; + size_t len; - assert(e); - assert(!list_is_empty(&e->vals)); + assert(cl != NULL); + assert(pl != NULL); + assert(list_is_empty(pl)); - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - if (v->addr != dht->addr) - return v->addr; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - return 0; -} + len = 0; -/* Forward declaration. */ -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * key, - enum kad_code code); + list_for_each_safe(p, h, cl) { + struct contact * c = list_entry(p, struct contact, next); + struct peer_entry * e; + if (len++ == dht.alpha) + break; + e = malloc(sizeof(*e)); + if (e == NULL) + return -ENOMEM; -/* Build a refresh list. */ -static void bucket_refresh(struct dht * dht, - struct bucket * b, - time_t t, - struct list_head * r) -{ - size_t i; + e->cookie = generate_cookie(); + e->code = code; + e->addr = c->addr; + e->t_sent = now.tv_sec; - if (*b->children != NULL) - for (i = 0; i < (1L << KAD_BETA); ++i) - bucket_refresh(dht, b->children[i], t, r); + e->id = c->id; - if (b->n_contacts == 0) - return; + list_add_tail(&e->next, pl); - if (t > b->t_refr) { - struct contact * c; - struct contact * d; - c = list_first_entry(&b->contacts, struct contact, next); - d = contact_create(c->id, dht->b, c->addr); - if (d != NULL) - list_add(&d->next, r); - return; + list_del(&c->next); + c->id = NULL; /* we stole the id */ + contact_destroy(c); } + + return 0; } +static struct dht_req * __dht_kv_req_get_req(const uint8_t * key) +{ + struct list_head * p; -static struct bucket * bucket_create(void) + list_for_each(p, &dht.reqs.list) { + struct dht_req * r = list_entry(p, struct dht_req, next); + if (memcmp(r->key, key, dht.id.len) == 0) + return r; + } + + return NULL; +} + +static struct dht_req * __dht_kv_get_req_cache(const uint8_t * key) { - struct bucket * b; - struct timespec t; - size_t i; + struct dht_req * req; - b = malloc(sizeof(*b)); - if (b == NULL) + assert(key != NULL); + + req = __dht_kv_req_get_req(key); + if (req == NULL) return NULL; - list_head_init(&b->contacts); - b->n_contacts = 0; + if (req->cache.len == 0) + return NULL; - list_head_init(&b->alts); - b->n_alts = 0; + return req; +} - clock_gettime(CLOCK_REALTIME_COARSE, &t); - b->t_refr = t.tv_sec + KAD_T_REFR; +static void __dht_kv_req_remove(const uint8_t * key) +{ + struct dht_req * req; - for (i = 0; i < (1L << KAD_BETA); ++i) - b->children[i] = NULL; + assert(key != NULL); + + req = __dht_kv_req_get_req(key); + if (req == NULL) + return; + + list_del(&req->next); + --dht.reqs.len; + + dht_req_destroy(req); +} + +static struct dht_req * __dht_kv_get_req_peer(const uint8_t * key, + uint64_t cookie) +{ + struct dht_req * req; + + assert(key != NULL); + + req = __dht_kv_req_get_req(key); + if (req == NULL) + return NULL; + + if (!dht_req_has_peer(req, cookie)) + return NULL; + + return req; +} + +static bool dht_kv_has_req(const uint8_t * key, + uint64_t cookie) +{ + bool found; + + pthread_mutex_lock(&dht.reqs.mtx); + + found = __dht_kv_get_req_peer(key, cookie) != NULL; + + pthread_mutex_unlock(&dht.reqs.mtx); + + return found; +} + +/* + * This will filter the peer list for addresses that still need to be + * contacted. + */ +static int dht_kv_update_req(const uint8_t * key, + struct list_head * pl) +{ + struct dht_req * req; + struct timespec now; + + assert(key != NULL); + assert(pl != NULL); + assert(!list_is_empty(pl)); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&dht.reqs.mtx); + + req = __dht_kv_req_get_req(key); + if (req == NULL) { + if (dht.reqs.len == DHT_MAX_REQS) { + log_err(KEY_FMT " Max reqs reached (%zu).", + KEY_VAL(key), dht.reqs.len); + peer_list_destroy(pl); + goto fail_req; + } + req = dht_req_create(key); + if (req == NULL) { + log_err(KEY_FMT "Failed to create req.", KEY_VAL(key)); + goto fail_req; + } + list_add_tail(&req->next, &dht.reqs.list); + ++dht.reqs.len; + } + + if (req->cache.len > 0) /* Already have values */ + peer_list_destroy(pl); + + dht_req_add_peers(req, pl); + req->t_exp = now.tv_sec + DHT_T_RESP; + + if (dht.reqs.len > DHT_WARN_REQS) { + log_warn("Number of outstanding requests (%zu) exceeds %u.", + dht.reqs.len, DHT_WARN_REQS); + } + + pthread_mutex_unlock(&dht.reqs.mtx); + + return 0; + fail_req: + pthread_mutex_unlock(&dht.reqs.mtx); + return -1; +} + +static int dht_kv_respond_req(uint8_t * key, + binary_data_t * vals, + size_t len) +{ + struct dht_req * req; + struct timespec now; + size_t i; + + assert(key != NULL); + assert(vals != NULL); + assert(len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_mutex_lock(&dht.reqs.mtx); + + req = __dht_kv_req_get_req(key); + if (req == NULL) { + log_warn(KEY_FMT " Failed to find req.", KEY_VAL(key)); + goto fail_req; + } + + for (i = 0; i < len; ++i) { + struct val_entry * e; + buffer_t val; + val.data = vals[i].data; + val.len = vals[i].len; + e = val_entry_create(val, now.tv_sec + DHT_T_CACHE); + if (e == NULL) { + log_err(" Failed to create val_entry."); + continue; + } + + list_add_tail(&e->next, &req->cache.list); + ++req->cache.len; + } + + pthread_cond_broadcast(&dht.reqs.cond); + + pthread_mutex_unlock(&dht.reqs.mtx); + fail_req: + pthread_mutex_unlock(&dht.reqs.mtx); + return -1; +} + +static ssize_t dht_kv_wait_req(const uint8_t * key, + buffer_t ** vals) +{ + struct list_head * p; + struct dht_req * req; + struct timespec t; +#ifdef __DHT_TEST__ + struct timespec intv = TIMESPEC_INIT_MS(10); +#else + struct timespec intv = TIMESPEC_INIT_S(DHT_T_RESP); +#endif + size_t max; + size_t i = 0; + int ret = 0; + + assert(key != NULL); + assert(vals != NULL); + + clock_gettime(PTHREAD_COND_CLOCK, &t); + + ts_add(&t, &intv, &t); + + pthread_mutex_lock(&dht.reqs.mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.reqs.mtx); + + while ((req = __dht_kv_get_req_cache(key)) == NULL) { + ret = pthread_cond_timedwait(&dht.reqs.cond, &dht.reqs.mtx, &t); + if (ret == ETIMEDOUT) + break; + } + + pthread_cleanup_pop(false); + + if (ret == ETIMEDOUT) { + log_warn(KEY_FMT " Req timed out.", KEY_VAL(key)); + __dht_kv_req_remove(key); + goto timedout; + } + + max = MIN(req->cache.len, DHT_MAX_VALS); + if (max == 0) + goto no_vals; + + *vals = malloc(max * sizeof(**vals)); + if (*vals == NULL) { + log_err(KEY_FMT "Failed to malloc val buffer.", KEY_VAL(key)); + goto fail_vals; + } + + memset(*vals, 0, max * sizeof(**vals)); + + list_for_each(p, &req->cache.list) { + struct val_entry * v; + if (i == max) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + pthread_mutex_unlock(&dht.reqs.mtx); + + return i; + no_vals: + pthread_mutex_unlock(&dht.reqs.mtx); + return 0; + fail_val_data: + freebufs(*vals, i); + fail_vals: + pthread_mutex_unlock(&dht.reqs.mtx); + return -ENOMEM; + timedout: + pthread_mutex_unlock(&dht.reqs.mtx); + return -ETIMEDOUT; +} + +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) +{ + uint8_t byte; + uint8_t mask; + + assert(b != NULL); + + if (b->children[0] == NULL) + return b; + + byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; + + mask = ((1L << DHT_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - DHT_BETA) - + (((b->depth) * DHT_BETA) & (CHAR_BIT - 1)); + + return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * __dht_kv_get_bucket(const uint8_t * id) +{ + assert(dht.db.contacts.root != NULL); + + return iter_bucket(dht.db.contacts.root, id); +} + +static void contact_list_add(struct list_head * l, + struct contact * c) +{ + struct list_head * p; + + assert(l != NULL); + assert(c != NULL); + + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (IS_CLOSER(e->id, c->id)) + continue; + } + + list_add_tail(&c->next, p); +} + +static ssize_t dht_kv_contact_list(const uint8_t * key, + struct list_head * l, + size_t max) +{ + struct list_head * p; + struct bucket * b; + struct timespec t; + size_t i; + size_t len = 0; + + assert(l != NULL); + assert(key != NULL); + assert(list_is_empty(l)); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + max = MIN(max, dht.k); + + pthread_rwlock_rdlock(&dht.db.lock); + + b = __dht_kv_get_bucket(key); + if (b == NULL) { + log_err(KEY_FMT " Failed to get bucket.", KEY_VAL(key)); + goto fail_bucket; + } + + b->t_refr = t.tv_sec + dht.t_refresh; + + if (b->contacts.len == dht.k || b->parent == NULL) { + list_for_each(p, &b->contacts.list) { + struct contact * c; + struct contact * d; + c = list_entry(p, struct contact, next); + if (c->addr == dht.addr) + continue; + d = contact_create(c->id, c->addr); + if (d == NULL) + continue; + contact_list_add(l, d); + if (++len == max) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << DHT_BETA) && len < dht.k; ++i) { + list_for_each(p, &d->children[i]->contacts.list) { + struct contact * c; + struct contact * d; + c = list_entry(p, struct contact, next); + if (c->addr == dht.addr) + continue; + d = contact_create(c->id, c->addr); + if (d == NULL) + continue; + contact_list_add(l, d); + if (++len == max) + break; + } + } + } + + pthread_rwlock_unlock(&dht.db.lock); + + return len; + fail_bucket: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} + +static void contact_list_destroy(struct list_head * l) +{ + struct list_head * p; + struct list_head * h; + + assert(l != NULL); + + list_for_each_safe(p, h, l) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } +} + +static ssize_t dht_kv_get_contacts(const uint8_t * key, + dht_contact_msg_t *** msgs) +{ + struct list_head cl; + struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + assert(key != NULL); + assert(msgs != NULL); + + list_head_init(&cl); + + len = dht_kv_contact_list(key, &cl, dht.k); + if (len == 0) { + *msgs = NULL; + return 0; + } + + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) + goto fail_msgs; + + list_for_each_safe(p, h, &cl) { + struct contact * c; + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) + goto fail_contact; + + dht_contact_msg__init((*msgs)[i]); + c = list_entry(p, struct contact, next); + list_del(&c->next); + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht.id.len; + (*msgs)[i++]->addr = c->addr; + free(c); + } + + return i; + fail_contact: + while (i-- > 0) + dht_contact_msg__free_unpacked((*msgs)[i], NULL); + free(*msgs); + *msgs = NULL; + fail_msgs: + contact_list_destroy(&cl); + return -ENOMEM; +} + +/* Build a refresh list. */ +static void __dht_kv_bucket_refresh_list(struct bucket * b, + time_t t, + struct list_head * r) +{ + struct contact * c; + struct contact * d; + + assert(b != NULL); + + if (t < b->t_refr) + return; + + if (*b->children != NULL) { + size_t i; + for (i = 0; i < (1L << DHT_BETA); ++i) + __dht_kv_bucket_refresh_list(b->children[i], t, r); + } + + if (b->contacts.len == 0) + return; + + c = list_first_entry(&b->contacts.list, struct contact, next); + if (t > c->t_seen + dht.t_refresh) { + d = contact_create(c->id, c->addr); + if (d != NULL) + list_add(&d->next, r); + } +} + +static struct bucket * bucket_create(void) +{ + struct bucket * b; + struct timespec t; + size_t i; + + b = malloc(sizeof(*b)); + if (b == NULL) + return NULL; + + list_head_init(&b->contacts.list); + b->contacts.len = 0; + + list_head_init(&b->alts.list); + b->alts.len = 0; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + b->t_refr = t.tv_sec + dht.t_refresh; + + for (i = 0; i < (1L << DHT_BETA); ++i) + b->children[i] = NULL; b->parent = NULL; b->depth = 0; + b->mask = 0; return b; } @@ -1324,24 +1705,24 @@ static void bucket_destroy(struct bucket * b) struct list_head * h; size_t i; - assert(b); + assert(b != NULL); - for (i = 0; i < (1L << KAD_BETA); ++i) + for (i = 0; i < (1L << DHT_BETA); ++i) if (b->children[i] != NULL) bucket_destroy(b->children[i]); - list_for_each_safe(p, h, &b->contacts) { + list_for_each_safe(p, h, &b->contacts.list) { struct contact * c = list_entry(p, struct contact, next); list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->contacts.len; } - list_for_each_safe(p, h, &b->alts) { + list_for_each_safe(p, h, &b->alts.list) { struct contact * c = list_entry(p, struct contact, next); list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->alts.len; } free(b); @@ -1356,1537 +1737,2297 @@ static bool bucket_has_id(struct bucket * b, if (b->depth == 0) return true; - byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; - mask = ((1L << KAD_BETA) - 1) & 0xFF; + mask = ((1L << DHT_BETA) - 1) & 0xFF; - byte >>= (CHAR_BIT - KAD_BETA) - - (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + byte >>= (CHAR_BIT - DHT_BETA) - + (((b->depth - 1) * DHT_BETA) & (CHAR_BIT - 1)); return ((byte & mask) == b->mask); } -static int split_bucket(struct bucket * b) +static int move_contacts(struct bucket * b, + struct bucket * c) { struct list_head * p; struct list_head * h; + struct contact * d; + + assert(b != NULL); + assert(c != NULL); + + list_for_each_safe(p, h, &b->contacts.list) { + d = list_entry(p, struct contact, next); + if (bucket_has_id(c, d->id)) { + list_del(&d->next); + --b->contacts.len; + list_add_tail(&d->next, &c->contacts.list); + ++c->contacts.len; + } + } + + return 0; +} + +static int split_bucket(struct bucket * b) +{ uint8_t mask = 0; size_t i; - size_t c; + size_t b_len; assert(b); - assert(b->n_alts == 0); - assert(b->n_contacts); + assert(b->alts.len == 0); + assert(b->contacts.len != 0); assert(b->children[0] == NULL); - c = b->n_contacts; + b_len = b->contacts.len; - for (i = 0; i < (1L << KAD_BETA); ++i) { + for (i = 0; i < (1L << DHT_BETA); ++i) { b->children[i] = bucket_create(); - if (b->children[i] == NULL) { - size_t j; - for (j = 0; j < i; ++j) - bucket_destroy(b->children[j]); - return -1; - } + if (b->children[i] == NULL) + goto fail_child; b->children[i]->depth = b->depth + 1; b->children[i]->mask = mask; b->children[i]->parent = b; - list_for_each_safe(p, h, &b->contacts) { - struct contact * c; - c = list_entry(p, struct contact, next); - if (bucket_has_id(b->children[i], c->id)) { - list_del(&c->next); - --b->n_contacts; - list_add(&c->next, &b->children[i]->contacts); - ++b->children[i]->n_contacts; - } - } + move_contacts(b, b->children[i]); mask++; } - for (i = 0; i < (1L << KAD_BETA); ++i) - if (b->children[i]->n_contacts == c) + for (i = 0; i < (1L << DHT_BETA); ++i) + if (b->children[i]->contacts.len == b_len) split_bucket(b->children[i]); return 0; + fail_child: + while (i-- > 0) + bucket_destroy(b->children[i]); + return -1; } -/* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(struct dht * dht, - const uint8_t * id, - uint64_t addr) +static int dht_kv_update_contacts(const uint8_t * id, + uint64_t addr) { struct list_head * p; struct list_head * h; struct bucket * b; struct contact * c; - assert(dht); + assert(id != NULL); + assert(addr != INVALID_ADDR); - b = dht_get_bucket(dht, id); - if (b == NULL) - return -1; + pthread_rwlock_wrlock(&dht.db.lock); - c = contact_create(id, dht->b, addr); - if (c == NULL) - return -1; + b = __dht_kv_get_bucket(id); + if (b == NULL) { + log_err(PEER_FMT " Failed to get bucket.", PEER_VAL(id, addr)); + goto fail_update; + } + + c = contact_create(id, addr); + if (c == NULL) { + log_err(PEER_FMT " Failed to create contact.", + PEER_VAL(id, addr)); + goto fail_update; + } - list_for_each_safe(p, h, &b->contacts) { + list_for_each_safe(p, h, &b->contacts.list) { struct contact * d = list_entry(p, struct contact, next); if (d->addr == addr) { list_del(&d->next); contact_destroy(d); - --b->n_contacts; + --b->contacts.len; } } - if (b->n_contacts == dht->k) { - if (bucket_has_id(b, dht->id)) { - list_add_tail(&c->next, &b->contacts); - ++b->n_contacts; + if (b->contacts.len == dht.k) { + if (bucket_has_id(b, dht.id.data)) { + list_add_tail(&c->next, &b->contacts.list); + ++b->contacts.len; if (split_bucket(b)) { list_del(&c->next); contact_destroy(c); - --b->n_contacts; + --b->contacts.len; } - } else if (b->n_alts == dht->k) { + } else if (b->alts.len == dht.k) { struct contact * d; - d = list_first_entry(&b->alts, struct contact, next); + d = list_first_entry(&b->alts.list, + struct contact, next); list_del(&d->next); contact_destroy(d); - list_add_tail(&c->next, &b->alts); + list_add_tail(&c->next, &b->alts.list); + ++b->alts.len; } else { - list_add_tail(&c->next, &b->alts); - ++b->n_alts; + list_add_tail(&c->next, &b->alts.list); + ++b->alts.len; } } else { - list_add_tail(&c->next, &b->contacts); - ++b->n_contacts; + list_add_tail(&c->next, &b->contacts.list); + ++b->contacts.len; } + pthread_rwlock_unlock(&dht.db.lock); + return 0; + fail_update: + pthread_rwlock_unlock(&dht.db.lock); + return -1; } -static int send_msg(struct dht * dht, - dht_msg_t * msg, - uint64_t addr) +static time_t gcd(time_t a, + time_t b) { -#ifndef __DHT_TEST__ - struct shm_du_buff * sdb; - size_t len; -#endif - int retr = 0; + if (a == 0) + return b; + + return gcd(b % a, a); +} + +static dht_contact_msg_t * dht_kv_src_contact_msg(void) +{ + dht_contact_msg_t * src; + + src = malloc(sizeof(*src)); + if (src == NULL) + goto fail_malloc; + + dht_contact_msg__init(src); + + src->id.data = dht_dup_key(dht.id.data); + if (src->id.data == NULL) + goto fail_id; + + src->id.len = dht.id.len; + src->addr = dht.addr; + + return src; + fail_id: + dht_contact_msg__free_unpacked(src, NULL); + fail_malloc: + return NULL; +} + +static dht_msg_t * dht_kv_find_req_msg(const uint8_t * key, + enum dht_code code) +{ + dht_msg_t * msg; + + assert(key != NULL); + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + msg->code = code; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; + + msg->find = malloc(sizeof(*msg->find)); + if (msg->find == NULL) + goto fail_msg; + + dht_find_req_msg__init(msg->find); + + msg->find->key.data = dht_dup_key(key); + if (msg->find->key.data == NULL) + goto fail_msg; + + msg->find->key.len = dht.id.len; + msg->find->cookie = DHT_INVALID; + + return msg; - if (msg->code == KAD_RESPONSE) - retr = KAD_RESP_RETR; + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +static dht_msg_t * dht_kv_find_node_req_msg(const uint8_t * key) +{ + return dht_kv_find_req_msg(key, DHT_FIND_NODE_REQ); +} + +static dht_msg_t * dht_kv_find_value_req_msg(const uint8_t * key) +{ + return dht_kv_find_req_msg(key, DHT_FIND_VALUE_REQ); +} + +static dht_msg_t * dht_kv_find_node_rsp_msg(uint8_t * key, + uint64_t cookie, + dht_contact_msg_t *** contacts, + size_t len) +{ + dht_msg_t * msg; - pthread_rwlock_wrlock(&dht->lock); + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + msg->code = DHT_FIND_NODE_RSP; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; + + msg->node = malloc(sizeof(*msg->node)); + if (msg->node == NULL) + goto fail_msg; - if (dht->id != NULL) { - msg->has_s_id = true; - msg->s_id.data = dht->id; - msg->s_id.len = dht->b; + dht_find_node_rsp_msg__init(msg->node); + + msg->node->key.data = dht_dup_key(key); + if (msg->node->key.data == NULL) + goto fail_msg; + + msg->node->cookie = cookie; + msg->node->key.len = dht.id.len; + msg->node->n_contacts = len; + if (len != 0) { /* Steal the ptr */ + msg->node->contacts = *contacts; + *contacts = NULL; } - msg->s_addr = dht->addr; + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +static dht_msg_t * dht_kv_find_value_rsp_msg(uint8_t * key, + uint64_t cookie, + dht_contact_msg_t *** contacts, + size_t n_contacts, + buffer_t ** vals, + size_t n_vals) +{ + dht_msg_t * msg; + + msg = dht_kv_find_node_rsp_msg(key, cookie, contacts, n_contacts); + if (msg == NULL) + goto fail_node_rsp; + + msg->code = DHT_FIND_VALUE_RSP; + + msg->val = malloc(sizeof(*msg->val)); + if (msg->val == NULL) + goto fail_msg; + + dht_find_value_rsp_msg__init(msg->val); + + msg->val->n_values = n_vals; + if (n_vals != 0) /* Steal the ptr */ + msg->val->values = (binary_data_t *) *vals; + + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_node_rsp: + return NULL; +} + +static dht_msg_t * dht_kv_store_msg(const uint8_t * key, + const buffer_t val, + time_t exp) +{ + dht_msg_t * msg; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + dht_msg__init(msg); + + msg->code = DHT_STORE; + + msg->src = dht_kv_src_contact_msg(); + if (msg->src == NULL) + goto fail_msg; - if (msg->code < KAD_STORE) { - msg->cookie = bmp_allocate(dht->cookies); - if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { - pthread_rwlock_unlock(&dht->lock); - goto fail_bmp_alloc; + msg->store = malloc(sizeof(*msg->store)); + if (msg->store == NULL) + goto fail_msg; + + dht_store_msg__init(msg->store); + + msg->store->key.data = dht_dup_key(key); + if (msg->store->key.data == NULL) + goto fail_msg; + + msg->store->key.len = dht.id.len; + msg->store->val.data = malloc(val.len); + if (msg->store->val.data == NULL) + goto fail_msg; + + memcpy(msg->store->val.data, val.data, val.len); + + msg->store->val.len = val.len; + msg->store->exp = exp; + + return msg; + + fail_msg: + dht_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +static ssize_t dht_kv_retrieve(const uint8_t * key, + buffer_t ** vals) +{ + struct dht_entry * e; + struct list_head * p; + size_t n; + size_t i; + + assert(key != NULL); + + pthread_rwlock_rdlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) + goto no_vals; + + n = MIN(DHT_MAX_VALS, e->vals.len + e->lvals.len); + if (n == 0) + goto no_vals; + + *vals = malloc(n * sizeof(**vals)); + if (*vals == NULL) + goto fail_vals; + + memset(*vals, 0, n * sizeof(**vals)); + + i = 0; + + list_for_each(p, &e->vals.list) { + struct val_entry * v; + if (i == n) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + list_for_each(p, &e->lvals.list) { + struct val_entry * v; + if (i == n) + break; /* We have enough values */ + v = list_entry(p, struct val_entry, next); + (*vals)[i].data = malloc(v->val.len); + if ((*vals)[i].data == NULL) + goto fail_val_data; + + (*vals)[i].len = v->val.len; + memcpy((*vals)[i++].data, v->val.data, v->val.len); + } + + pthread_rwlock_unlock(&dht.db.lock); + + return (ssize_t) i; + + fail_val_data: + pthread_rwlock_unlock(&dht.db.lock); + freebufs(*vals, i); + *vals = NULL; + return -ENOMEM; + fail_vals: + pthread_rwlock_unlock(&dht.db.lock); + return -ENOMEM; + no_vals: + pthread_rwlock_unlock(&dht.db.lock); + *vals = NULL; + return 0; +} + +static void __cleanup_dht_msg(void * msg) +{ + dht_msg__free_unpacked((dht_msg_t *) msg, NULL); +} + +#ifdef DEBUG_PROTO_DHT +static void dht_kv_debug_msg(dht_msg_t * msg) +{ + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + time_t stamp; + size_t i; + + if (msg == NULL) + return; + + pthread_cleanup_push(__cleanup_dht_msg, msg); + + switch (msg->code) { + case DHT_STORE: + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->store->key.data), + msg->store->key.len); + log_proto(" val: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->store->val.data), + msg->store->val.len); + stamp = msg->store->exp; + tm = gmtime(&stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + log_proto(" exp: %s.", tmstr); + break; + case DHT_FIND_NODE_REQ: + /* FALLTHRU */ + case DHT_FIND_VALUE_REQ: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->find->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->find->key.data), + msg->find->key.len); + break; + case DHT_FIND_VALUE_RSP: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->node->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->node->key.data), + msg->node->key.len); + log_proto(" values: [%zd]", msg->val->n_values); + for (i = 0; i < msg->val->n_values; i++) + log_proto(" " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->val->values[i].data), + msg->val->values[i].len); + log_proto(" contacts: [%zd]", msg->node->n_contacts); + for (i = 0; i < msg->node->n_contacts; i++) { + dht_contact_msg_t * c = msg->node->contacts[i]; + log_proto(" " PEER_FMT, + PEER_VAL(c->id.data, c->addr)); } + break; + case DHT_FIND_NODE_RSP: + log_proto(" cookie: " HASH_FMT64, + HASH_VAL64(&msg->node->cookie)); + log_proto(" key: " HASH_FMT64 " [%zu bytes]", + HASH_VAL64(msg->node->key.data), msg->node->key.len); + log_proto(" contacts: [%zd]", msg->node->n_contacts); + for (i = 0; i < msg->node->n_contacts; i++) { + dht_contact_msg_t * c = msg->node->contacts[i]; + log_proto(" " PEER_FMT, + PEER_VAL(c->id.data, c->addr)); + } + + break; + default: + break; } - pthread_rwlock_unlock(&dht->lock); + pthread_cleanup_pop(false); +} + +static void dht_kv_debug_msg_snd(dht_msg_t * msg, + uint8_t * id, + uint64_t addr) +{ + if (msg == NULL) + return; + + log_proto(TX_HDR_FMT ".", TX_HDR_VAL(msg, id, addr)); + + dht_kv_debug_msg(msg); +} + +static void dht_kv_debug_msg_rcv(dht_msg_t * msg) +{ + if (msg == NULL) + return; + + log_proto(RX_HDR_FMT ".", RX_HDR_VAL(msg)); + + dht_kv_debug_msg(msg); +} +#endif #ifndef __DHT_TEST__ +static int dht_send_msg(dht_msg_t * msg, + uint64_t addr) +{ + size_t len; + struct shm_du_buff * sdb; + + if (msg == NULL) + return 0; + + assert(addr != INVALID_ADDR && addr != dht.addr); + len = dht_msg__get_packed_size(msg); - if (len == 0) + if (len == 0) { + log_warn("%s failed to pack.", DHT_CODE(msg)); goto fail_msg; + } - while (true) { - if (ipcp_sdb_reserve(&sdb, len)) - goto fail_msg; + if (ipcp_sdb_reserve(&sdb, len)) { + log_warn("%s failed to get sdb.", DHT_CODE(msg)); + goto fail_msg; + } - dht_msg__pack(msg, shm_du_buff_head(sdb)); + dht_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) - break; + if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) { + log_warn("%s write failed", DHT_CODE(msg)); + goto fail_send; + } - ipcp_sdb_release(sdb); + return 0; + fail_send: + ipcp_sdb_release(sdb); + fail_msg: + return -1; +} +#else /* funtion for testing */ +static int dht_send_msg(dht_msg_t * msg, + uint64_t addr) +{ + buffer_t buf; - sleep(1); + assert(msg != NULL); + assert(addr != INVALID_ADDR && addr != dht.addr); - if (--retr < 0) - goto fail_msg; + buf.len = dht_msg__get_packed_size(msg); + if (buf.len == 0) { + log_warn("%s failed to pack.", DHT_CODE(msg)); + goto fail_msg; } -#else - (void) addr; - (void) retr; + buf.data = malloc(buf.len); + if (buf.data == NULL) { + log_warn("%s failed to malloc buf.", DHT_CODE(msg)); + goto fail_msg; + } + + dht_msg__pack(msg, buf.data); + + if (sink_send_msg(&buf, addr) < 0) { + log_warn("%s write failed", DHT_CODE(msg)); + goto fail_send; + } + + return 0; + fail_send: + freebuf(buf); + fail_msg: + return -1; +} #endif /* __DHT_TEST__ */ - if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) - kad_req_create(dht, msg, addr); +static void __cleanup_peer_list(void * pl) +{ + struct list_head * p; + struct list_head * h; - return msg->cookie; -#ifndef __DHT_TEST__ + assert(pl != NULL); + + list_for_each_safe(p, h, (struct list_head *) pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + list_del(&e->next); + free(e->id); + free(e); + } +} + + +static int dht_kv_send_msgs(dht_msg_t * msg, + struct list_head * pl) +{ + struct list_head * p; + struct list_head * h; + + pthread_cleanup_push(__cleanup_dht_msg, msg); + pthread_cleanup_push(__cleanup_peer_list, pl); + + list_for_each_safe(p, h, pl) { + struct peer_entry * e = list_entry(p, struct peer_entry, next); + if (IS_REQUEST(msg->code)) { + msg->find->cookie = e->cookie; + assert(msg->find->cookie != DHT_INVALID); + } + if (dht_send_msg(msg, e->addr) < 0) + continue; + +#ifdef DEBUG_PROTO_DHT + dht_kv_debug_msg_snd(msg, e->id, e->addr); +#endif + list_del(&e->next); + free(e->id); + free(e); + } + + pthread_cleanup_pop(false); + pthread_cleanup_pop(false); + + return list_is_empty(pl) ? 0 : -1; +} + +static int dht_kv_get_peer_list_for_msg(dht_msg_t * msg, + struct list_head * pl) +{ + struct list_head cl; /* contact list */ + uint8_t * key; /* key in the request */ + size_t max; + + assert(msg != NULL); + + assert(list_is_empty(pl)); + + max = msg->code == DHT_STORE ? dht.k : dht.alpha; + + switch (msg->code) { + case DHT_FIND_NODE_REQ: + /* FALLTHRU */ + case DHT_FIND_VALUE_REQ: + key = msg->find->key.data; + break; + case DHT_STORE: + key = msg->store->key.data; + break; + default: + log_err("Invalid DHT msg code (%d).", msg->code); + return -1; + } + + list_head_init(&cl); + + if (dht_kv_contact_list(key, &cl, max) < 0) { + log_err(KEY_FMT " Failed to get contact list.", KEY_VAL(key)); + goto fail_contacts; + } + + if (list_is_empty(&cl)) { + log_warn(KEY_FMT " No available contacts.", KEY_VAL(key)); + goto fail_contacts; + } + + if (dht_kv_create_peer_list(&cl, pl, msg->code) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peers; + } + + contact_list_destroy(&cl); + return 0; + fail_peers: + contact_list_destroy(&cl); + fail_contacts: + return -1; +} + +static int dht_kv_store_remote(const uint8_t * key, + const buffer_t val, + time_t exp) +{ + dht_msg_t * msg; + struct timespec now; + struct list_head pl; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + msg = dht_kv_store_msg(key, val, exp); + if (msg == NULL) { + log_err(KV_FMT " Failed to create %s.", + KV_VAL(key, val), dht_code_str[DHT_STORE]); + goto fail_msg; + } + + list_head_init(&pl); + + if (dht_kv_get_peer_list_for_msg(msg, &pl) < 0) { + log_dbg(KV_FMT " Failed to get peer list.", KV_VAL(key, val)); + goto fail_peer_list; + } + + if (dht_kv_send_msgs(msg, &pl) < 0) { + log_warn(KV_FMT " Failed to send any %s msg.", + KV_VAL(key, val), DHT_CODE(msg)); + goto fail_msgs; + } + + dht_msg__free_unpacked(msg, NULL); + + return 0; + fail_msgs: + peer_list_destroy(&pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; +} + +/* recursive lookup, start with pl NULL */ +static int dht_kv_query_contacts(const uint8_t * key, + struct list_head * pl) +{ + struct list_head p; + + dht_msg_t * msg; + + assert(key != NULL); + + msg = dht_kv_find_node_req_msg(key); + if (msg == NULL) { + log_err(KEY_FMT " Failed to create %s msg.", + KEY_VAL(key), dht_code_str[DHT_FIND_NODE_REQ]); + goto fail_msg; + } + + if (pl == NULL) { + list_head_init(&p); + pl = &p; + } + + if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peer_list; + } + + if (dht_kv_update_req(key, pl) < 0) { + log_warn(KEY_FMT " Failed to update req.", KEY_VAL(key)); + goto fail_update; + } + + if (dht_kv_send_msgs(msg, pl)) { + log_warn(KEY_FMT " Failed to send any %s msg.", + KEY_VAL(key), DHT_CODE(msg)); + goto fail_update; + } + + dht_msg__free_unpacked(msg, NULL); + + return 0; + fail_update: + peer_list_destroy(pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); fail_msg: - pthread_rwlock_wrlock(&dht->lock); - bmp_release(dht->cookies, msg->cookie); - pthread_rwlock_unlock(&dht->lock); -#endif /* !__DHT_TEST__ */ - fail_bmp_alloc: return -1; } -static struct dht_entry * dht_find_entry(struct dht * dht, - const uint8_t * key) +/* recursive lookup, start with pl NULL */ +static ssize_t dht_kv_query_remote(const uint8_t * key, + buffer_t ** vals, + struct list_head * pl) +{ + struct list_head p; + dht_msg_t * msg; + + assert(key != NULL); + + msg = dht_kv_find_value_req_msg(key); + if (msg == NULL) { + log_err(KEY_FMT " Failed to create value req.", KEY_VAL(key)); + goto fail_msg; + } + + if (pl == NULL) { + list_head_init(&p); + pl = &p; + } + + if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { + log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); + goto fail_peer_list; + } + + if (dht_kv_update_req(key, pl) < 0) { + log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); + goto fail_update; + } + + if (dht_kv_send_msgs(msg, pl)) { + log_warn(KEY_FMT " Failed to send %s msg.", + KEY_VAL(key), DHT_CODE(msg)); + goto fail_update; + } + + dht_msg__free_unpacked(msg, NULL); + + if (vals == NULL) /* recursive lookup, already waiting */ + return 0; + + return dht_kv_wait_req(key, vals); + fail_update: + peer_list_destroy(pl); + fail_peer_list: + dht_msg__free_unpacked(msg, NULL); + fail_msg: + return -1; +} + +static void __add_dht_kv_entry(struct dht_entry * e) +{ + struct list_head * p; + + assert(e != NULL); + + list_for_each(p, &dht.db.kv.list) { + struct dht_entry * d = list_entry(p, struct dht_entry, next); + if (IS_CLOSER(d->key, e->key)) + continue; + break; + } + + list_add_tail(&e->next, p); + ++dht.db.kv.len; +} + +/* incoming store message */ +static int dht_kv_store(const uint8_t * key, + const buffer_t val, + time_t exp) +{ + struct dht_entry * e; + bool new = false; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) { + log_dbg(KV_FMT " Adding entry (store).", KV_VAL(key, val)); + e = dht_entry_create(key); + if (e == NULL) + goto fail; + + new = true; + + __add_dht_kv_entry(e); + } + + if (dht_entry_update_val(e, val, exp) < 0) + goto fail_add; + + pthread_rwlock_unlock(&dht.db.lock); + + return 0; + fail_add: + if (new) { + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } + fail: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} + +static int dht_kv_publish(const uint8_t * key, + const buffer_t val) +{ + struct dht_entry * e; + struct timespec now; + bool new = false; + + assert(key != NULL); + assert(val.data != NULL); + assert(val.len > 0); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) { + log_dbg(KV_FMT " Adding entry (publish).", KV_VAL(key, val)); + e = dht_entry_create(key); + if (e == NULL) + goto fail; + + __add_dht_kv_entry(e); + new = true; + } + + if (dht_entry_update_lval(e, val) < 0) + goto fail_add; + + pthread_rwlock_unlock(&dht.db.lock); + + dht_kv_store_remote(key, val, now.tv_sec + dht.t_expire); + + return 0; + fail_add: + if (new) { + list_del(&e->next); + dht_entry_destroy(e); + --dht.db.kv.len; + } + fail: + pthread_rwlock_unlock(&dht.db.lock); + return -1; +} + +static int dht_kv_unpublish(const uint8_t * key, + const buffer_t val) +{ + struct dht_entry * e; + int rc; + + assert(key != NULL); + + pthread_rwlock_wrlock(&dht.db.lock); + + e = __dht_kv_find_entry(key); + if (e == NULL) + goto no_entry; + + rc = dht_entry_remove_lval(e, val); + + pthread_rwlock_unlock(&dht.db.lock); + + return rc; + no_entry: + pthread_rwlock_unlock(&dht.db.lock); + return -ENOENT; + +} + +/* message validation */ +static int dht_kv_validate_store_msg(const dht_store_msg_t * store) +{ + if (store == NULL) { + log_warn("Store in msg is NULL."); + return -EINVAL; + } + + if (store->key.data == NULL || store->key.len == 0) { + log_warn("Invalid key in DHT store msg."); + return -EINVAL; + } + + if (store->key.len != dht.id.len) { + log_warn("Invalid key length in DHT store msg."); + return -EINVAL; + } + + if (store->val.data == NULL || store->val.len == 0) { + log_warn("Invalid value in DHT store msg."); + return -EINVAL; + } + + return 0; +} + +static int validate_find_req_msg(const dht_find_req_msg_t * req) { - struct list_head * p; + if (req == NULL) { + log_warn("Request in msg is NULL."); + return -EINVAL; + } - list_for_each(p, &dht->entries) { - struct dht_entry * e = list_entry(p, struct dht_entry, next); - if (!memcmp(key, e->key, dht->b)) - return e; + if (req->key.data == NULL || req->key.len == 0) { + log_warn("Find request without key."); + return -EINVAL; } - return NULL; + if (req->key.len != dht.id.len) { + log_warn("Invalid key length in request msg."); + return -EINVAL; + } + + return 0; } -static int kad_add(struct dht * dht, - const dht_contact_msg_t * contacts, - ssize_t n, - time_t exp) +static int validate_node_rsp_msg(const dht_find_node_rsp_msg_t * rsp) { - struct dht_entry * e; - - pthread_rwlock_wrlock(&dht->lock); + if (rsp == NULL) { + log_warn("Node rsp in msg is NULL."); + return -EINVAL; + } - while (n-- > 0) { - if (contacts[n].id.len != dht->b) - log_warn("Bad key length in contact data."); + if (rsp->key.data == NULL) { + log_warn("Invalid key in DHT response msg."); + return -EINVAL; + } - e = dht_find_entry(dht, contacts[n].id.data); - if (e != NULL) { - if (dht_entry_add_addr(e, contacts[n].addr, exp)) - goto fail; - } else { - e = dht_entry_create(dht, contacts[n].id.data); - if (e == NULL) - goto fail; + if (rsp->key.len != dht.id.len) { + log_warn("Invalid key length in DHT response msg."); + return -EINVAL; + } - if (dht_entry_add_addr(e, contacts[n].addr, exp)) { - dht_entry_destroy(e); - goto fail; - } + if (!dht_kv_has_req(rsp->key.data, rsp->cookie)) { + log_warn(KEY_FMT " No request " CK_FMT ".", + KEY_VAL(rsp->key.data), CK_VAL(rsp->cookie)); - list_add(&e->next, &dht->entries); - } + return -EINVAL; } - pthread_rwlock_unlock(&dht->lock); return 0; - - fail: - pthread_rwlock_unlock(&dht->lock); - return -ENOMEM; } -static int wait_resp(struct dht * dht, - dht_msg_t * msg, - time_t timeo) +static int validate_value_rsp_msg(const dht_find_value_rsp_msg_t * rsp) { - struct kad_req * req; - - assert(dht); - assert(msg); - - pthread_rwlock_rdlock(&dht->lock); + if (rsp == NULL) { + log_warn("Invalid DHT find value response msg."); + return -EINVAL; + } - req = dht_find_request(dht, msg); - if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -EPERM; + if (rsp->values == NULL && rsp->n_values > 0) { + log_dbg("No values in DHT response msg."); + return 0; } - pthread_rwlock_unlock(&dht->lock); + if (rsp->n_values == 0 && rsp->values != NULL) { + log_dbg("DHT response did not set values NULL."); + return 0; + } - return kad_req_wait(req, timeo); + return 0; } -static int kad_store(struct dht * dht, - const uint8_t * key, - uint64_t addr, - uint64_t r_addr, - time_t ttl) +static int dht_kv_validate_msg(dht_msg_t * msg) { - dht_msg_t msg = DHT_MSG__INIT; - dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT; - dht_contact_msg_t * cmsgp[1]; - - cmsg.id.data = (uint8_t *) key; - cmsg.addr = addr; - - pthread_rwlock_rdlock(&dht->lock); - - cmsg.id.len = dht->b; - pthread_rwlock_unlock(&dht->lock); + assert(msg != NULL); - cmsgp[0] = &cmsg; + if (msg->src->id.len != dht.id.len) { + log_warn("%s Invalid source contact ID.", DHT_CODE(msg)); + return -EINVAL; + } - msg.code = KAD_STORE; - msg.has_t_expire = true; - msg.t_expire = ttl; - msg.n_contacts = 1; - msg.contacts = cmsgp; + if (msg->src->addr == INVALID_ADDR) { + log_warn("%s Invalid source address.", DHT_CODE(msg)); + return -EINVAL; + } - if (send_msg(dht, &msg, r_addr) < 0) - return -1; + switch (msg->code) { + case DHT_FIND_VALUE_REQ: + /* FALLTHRU */ + case DHT_FIND_NODE_REQ: + if (validate_find_req_msg(msg->find) < 0) + return -EINVAL; + break; + case DHT_FIND_VALUE_RSP: + if (validate_value_rsp_msg(msg->val) < 0) + return -EINVAL; + /* FALLTHRU */ + case DHT_FIND_NODE_RSP: + if (validate_node_rsp_msg(msg->node) < 0) + return -EINVAL; + break; + case DHT_STORE: + if (dht_kv_validate_store_msg(msg->store) < 0) + return -EINVAL; + break; + default: + log_warn("Invalid DHT msg code (%d).", msg->code); + return -ENOENT; + } return 0; } -static ssize_t kad_find(struct dht * dht, - struct lookup * lu, - const uint64_t * addrs, - enum kad_code code) +static void do_dht_kv_store(const dht_store_msg_t * store) { - dht_msg_t msg = DHT_MSG__INIT; - ssize_t sent = 0; + struct tm * tm; + char tmstr[RIB_TM_STRLEN]; + buffer_t val; + uint8_t * key; + time_t exp; - assert(dht); - assert(lu->key); + assert(store != NULL); - msg.code = code; + val.data = store->val.data; + val.len = store->val.len; + key = store->key.data; + exp = store->exp; - msg.has_key = true; - msg.key.data = (uint8_t *) lu->key; - msg.key.len = dht->b; - - while (*addrs != 0) { - struct cookie_el * c; - int ret; - - if (*addrs == dht->addr) { - ++addrs; - continue; - } - - ret = send_msg(dht, &msg, *addrs); - if (ret < 0) - break; - - c = malloc(sizeof(*c)); - if (c == NULL) - break; + if (dht_kv_store(store->key.data, val, store->exp) < 0) { + log_err(KV_FMT " Failed to store.", KV_VAL(key, val)); + return; + } - c->cookie = (uint32_t) ret; + tm = gmtime(&exp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + log_info(KV_FMT " Stored value until %s.", KV_VAL(key, val), tmstr); +} - pthread_mutex_lock(&lu->lock); +static dht_msg_t * do_dht_kv_find_node_req(const dht_find_req_msg_t * req) +{ + dht_contact_msg_t ** contacts; + dht_msg_t * rsp; + uint8_t * key; + uint64_t cookie; + ssize_t len; - list_add_tail(&c->next, &lu->cookies); + assert(req != NULL); - pthread_mutex_unlock(&lu->lock); + key = req->key.data; + cookie = req->cookie; - ++sent; - ++addrs; + len = dht_kv_get_contacts(key, &contacts); + if (len < 0) { + log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); + goto fail_contacts; } - return sent; -} + rsp = dht_kv_find_node_rsp_msg(key, cookie, &contacts, len); + if (rsp == NULL) { + log_err(KEY_FMT " Failed to create %s.", KEY_VAL(key), + dht_code_str[DHT_FIND_NODE_RSP]); + goto fail_msg; + } -static void lookup_detach(struct dht * dht, - struct lookup * lu) -{ - pthread_rwlock_wrlock(&dht->lock); + assert(rsp->code == DHT_FIND_NODE_RSP); - list_del(&lu->next); + log_info(KEY_FMT " Responding with %zd contacts", KEY_VAL(key), len); - pthread_rwlock_unlock(&dht->lock); + return rsp; + fail_msg: + while (len-- > 0) + dht_contact_msg__free_unpacked(contacts[len], NULL); + free(contacts); + fail_contacts: + return NULL; } -static struct lookup * kad_lookup(struct dht * dht, - const uint8_t * id, - enum kad_code code) +static void dht_kv_process_node_rsp(dht_contact_msg_t ** contacts, + size_t len, + struct list_head * pl, + enum dht_code code) { - uint64_t addrs[KAD_ALPHA + 1]; - enum lookup_state state; - struct lookup * lu; - - lu = lookup_create(dht, id); - if (lu == NULL) - return NULL; + struct timespec now; + size_t i; - lookup_new_addrs(lu, addrs); + assert(contacts != NULL); + assert(len > 0); + assert(pl != NULL); + assert(list_is_empty(pl)); - if (addrs[0] == 0) { - lookup_detach(dht, lu); - lookup_destroy(lu); - return NULL; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (kad_find(dht, lu, addrs, code) == 0) { - lookup_detach(dht, lu); - return lu; - } + for (i = 0; i < len; i++) { + dht_contact_msg_t * c = contacts[i]; + struct peer_entry * e; + if (c->addr == dht.addr) + continue; - while ((state = lookup_wait(lu)) != LU_COMPLETE) { - switch (state) { - case LU_UPDATE: - lookup_new_addrs(lu, addrs); - if (addrs[0] == 0) - break; + if (dht_kv_update_contacts(c->id.data, c->addr) < 0) + log_warn(PEER_FMT " Failed to update contacts.", + PEER_VAL(c->id.data, c->addr)); - kad_find(dht, lu, addrs, code); - break; - case LU_DESTROY: - lookup_detach(dht, lu); - lookup_set_state(lu, LU_NULL); - return NULL; - default: - break; + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err(PEER_FMT " Failed to malloc entry.", + PEER_VAL(c->id.data, c->addr)); + continue; } - } - assert(state == LU_COMPLETE); + e->id = dht_dup_key(c->id.data); + if (e->id == NULL) { + log_warn(PEER_FMT " Failed to duplicate id.", + PEER_VAL(c->id.data, c->addr)); + free(e); + continue; + } - lookup_detach(dht, lu); + e->cookie = generate_cookie(); + e->code = code; + e->addr = c->addr; + e->t_sent = now.tv_sec; - return lu; + list_add_tail(&e->next, pl); + } } -static void kad_publish(struct dht * dht, - const uint8_t * key, - uint64_t addr, - time_t exp) +static dht_msg_t * do_dht_kv_find_value_req(const dht_find_req_msg_t * req) { - struct lookup * lu; - uint64_t * addrs; - ssize_t n; - size_t k; - time_t t_expire; - + dht_contact_msg_t ** contacts; + ssize_t n_contacts; + buffer_t * vals; + ssize_t n_vals; + dht_msg_t * rsp; + uint8_t * key; + uint64_t cookie; - assert(dht); - assert(key); + assert(req != NULL); - pthread_rwlock_rdlock(&dht->lock); + key = req->key.data; + cookie = req->cookie; - k = dht->k; - t_expire = dht->t_expire; - - pthread_rwlock_unlock(&dht->lock); + n_contacts = dht_kv_get_contacts(key, &contacts); + if (n_contacts < 0) { + log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); + goto fail_contacts; + } - addrs = malloc(k * sizeof(*addrs)); - if (addrs == NULL) - return; + assert(n_contacts > 0 || contacts == NULL); - lu = kad_lookup(dht, key, KAD_FIND_NODE); - if (lu == NULL) { - free(addrs); - return; + n_vals = dht_kv_retrieve(key, &vals); + if (n_vals < 0) { + log_dbg(KEY_FMT " Failed to get values.", KEY_VAL(key)); + goto fail_vals; } - n = lookup_contact_addrs(lu, addrs); + if (n_vals == 0) + log_dbg(KEY_FMT " No values found.", KEY_VAL(key)); - while (n-- > 0) { - if (addrs[n] == dht->addr) { - dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT; - msg.id.data = (uint8_t *) key; - msg.id.len = dht->b; - msg.addr = addr; - kad_add(dht, &msg, 1, exp); - } else { - if (kad_store(dht, key, addr, addrs[n], t_expire)) - log_warn("Failed to send store message."); - } + rsp = dht_kv_find_value_rsp_msg(key, cookie, &contacts, n_contacts, + &vals, n_vals); + if (rsp == NULL) { + log_err(KEY_FMT " Failed to create %s msg.", + KEY_VAL(key), dht_code_str[DHT_FIND_VALUE_RSP]); + goto fail_msg; } - lookup_destroy(lu); + log_info(KEY_FMT " Responding with %zd contacts, %zd values.", + KEY_VAL(req->key.data), n_contacts, n_vals); + + return rsp; - free(addrs); + fail_msg: + freebufs(vals, n_vals); + fail_vals: + while (n_contacts-- > 0) + dht_contact_msg__free_unpacked(contacts[n_contacts], NULL); + free(contacts); + fail_contacts: + return NULL; } -static int kad_join(struct dht * dht, - uint64_t addr) +static void do_dht_kv_find_node_rsp(const dht_find_node_rsp_msg_t * rsp) { - dht_msg_t msg = DHT_MSG__INIT; - - msg.code = KAD_JOIN; - - msg.has_alpha = true; - msg.has_b = true; - msg.has_k = true; - msg.has_t_refresh = true; - msg.has_t_replicate = true; - msg.alpha = KAD_ALPHA; - msg.k = KAD_K; - msg.t_refresh = KAD_T_REFR; - msg.t_replicate = KAD_T_REPL; + struct list_head pl; - pthread_rwlock_rdlock(&dht->lock); + assert(rsp != NULL); - msg.b = dht->b; + list_head_init(&pl); - pthread_rwlock_unlock(&dht->lock); + dht_kv_process_node_rsp(rsp->contacts, rsp->n_contacts, &pl, + DHT_FIND_NODE_REQ); - if (send_msg(dht, &msg, addr) < 0) - return -1; - - if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) - return -1; - - dht->id = create_id(dht->b); - if (dht->id == NULL) - return -1; + if (list_is_empty(&pl)) + goto no_contacts; - pthread_rwlock_wrlock(&dht->lock); + if (dht_kv_update_req(rsp->key.data, &pl) < 0) { + log_err(KEY_FMT " Failed to update request.", + KEY_VAL(rsp->key.data)); + goto fail_update; + } - dht_update_bucket(dht, dht->id, dht->addr); + dht_kv_query_contacts(rsp->key.data, &pl); - pthread_rwlock_unlock(&dht->lock); + return; - return 0; + fail_update: + peer_list_destroy(&pl); + no_contacts: + return; } -static void dht_dead_peer(struct dht * dht, - uint8_t * key, - uint64_t addr) +static void do_dht_kv_find_value_rsp(const dht_find_node_rsp_msg_t * node, + const dht_find_value_rsp_msg_t * val) { - struct list_head * p; - struct list_head * h; - struct bucket * b; + struct list_head pl; + uint8_t * key; - b = dht_get_bucket(dht, key); + assert(node != NULL); + assert(val != NULL); - list_for_each_safe(p, h, &b->contacts) { - struct contact * c = list_entry(p, struct contact, next); - if (b->n_contacts + b->n_alts <= dht->k) { - ++c->fails; - return; - } + list_head_init(&pl); - if (c->addr == addr) { - list_del(&c->next); - contact_destroy(c); - --b->n_contacts; - break; - } - } + key = node->key.data; - while (b->n_contacts < dht->k && b->n_alts > 0) { - struct contact * c; - c = list_first_entry(&b->alts, struct contact, next); - list_del(&c->next); - --b->n_alts; - list_add(&c->next, &b->contacts); - ++b->n_contacts; + dht_kv_process_node_rsp(node->contacts, node->n_contacts, &pl, + DHT_FIND_VALUE_REQ); + + if (val->n_values > 0) { + log_dbg(KEY_FMT " %zd new values received.", + KEY_VAL(key), val->n_values); + dht_kv_respond_req(key, val->values, val->n_values); + peer_list_destroy(&pl); + return; /* done! */ } -} -static int dht_del(struct dht * dht, - const uint8_t * key, - uint64_t addr) -{ - struct dht_entry * e; + if (list_is_empty(&pl)) + goto no_contacts; - e = dht_find_entry(dht, key); - if (e == NULL) { - return -EPERM; + if (dht_kv_update_req(key, &pl) < 0) { + log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); + goto fail_update; } - dht_entry_del_addr(e, addr); + dht_kv_query_remote(key, NULL, &pl); - return 0; + return; + fail_update: + peer_list_destroy(&pl); + no_contacts: + return; } -static buffer_t dht_retrieve(struct dht * dht, - const uint8_t * key) +static dht_msg_t * dht_wait_for_dht_msg(void) { - struct dht_entry * e; - struct list_head * p; - buffer_t buf; - uint64_t * pos; - size_t addrs = 0; - - pthread_rwlock_rdlock(&dht->lock); + dht_msg_t * msg; + struct cmd * cmd; - e = dht_find_entry(dht, key); - if (e == NULL) - goto fail; + pthread_mutex_lock(&dht.cmds.mtx); - buf.len = MIN(DHT_RETR_ADDR, e->n_vals); - if (buf.len == 0) - goto fail; + pthread_cleanup_push(__cleanup_mutex_unlock, &dht.cmds.mtx); - pos = malloc(sizeof(dht->addr) * buf.len); - if (pos == NULL) - goto fail; + while (list_is_empty(&dht.cmds.list)) + pthread_cond_wait(&dht.cmds.cond, &dht.cmds.mtx); - buf.data = (uint8_t *) pos; + cmd = list_last_entry(&dht.cmds.list, struct cmd, next); + list_del(&cmd->next); - list_for_each(p, &e->vals) { - struct val * v = list_entry(p, struct val, next); - *pos++ = v->addr; - if (++addrs >= buf.len) - break; - } + pthread_cleanup_pop(true); - pthread_rwlock_unlock(&dht->lock); + msg = dht_msg__unpack(NULL, cmd->cbuf.len, cmd->cbuf.data); + if (msg == NULL) + log_warn("Failed to unpack DHT msg."); - return buf; + freebuf(cmd->cbuf); + free(cmd); - fail: - pthread_rwlock_unlock(&dht->lock); - buf.len = 0; - buf.data = NULL; - return buf; + return msg; } -static ssize_t dht_get_contacts(struct dht * dht, - const uint8_t * key, - dht_contact_msg_t *** msgs) +static void do_dht_msg(dht_msg_t * msg) { - struct list_head l; - struct list_head * p; - struct list_head * h; - size_t len; - size_t i = 0; - - list_head_init(&l); + dht_msg_t * rsp = NULL; + uint8_t * id; + uint64_t addr; - pthread_rwlock_wrlock(&dht->lock); - - len = dht_contact_list(dht, &l, key); - if (len == 0) { - pthread_rwlock_unlock(&dht->lock); - *msgs = NULL; - return 0; +#ifdef DEBUG_PROTO_DHT + dht_kv_debug_msg_rcv(msg); +#endif + if (dht_kv_validate_msg(msg) == -EINVAL) { + log_warn("%s Validation failed.", DHT_CODE(msg)); + dht_msg__free_unpacked(msg, NULL); + return; } - *msgs = malloc(len * sizeof(**msgs)); - if (*msgs == NULL) { - pthread_rwlock_unlock(&dht->lock); - return 0; - } + id = msg->src->id.data; + addr = msg->src->addr; - list_for_each_safe(p, h, &l) { - struct contact * c = list_entry(p, struct contact, next); - (*msgs)[i] = malloc(sizeof(***msgs)); - if ((*msgs)[i] == NULL) { - pthread_rwlock_unlock(&dht->lock); - while (i > 0) - free(*msgs[--i]); - free(*msgs); - *msgs = NULL; - return 0; - } + if (dht_kv_update_contacts(id, addr) < 0) + log_warn(PEER_FMT " Failed to update contact from msg src.", + PEER_VAL(id, addr)); - dht_contact_msg__init((*msgs)[i]); + pthread_cleanup_push(__cleanup_dht_msg, msg); - (*msgs)[i]->id.data = c->id; - (*msgs)[i]->id.len = dht->b; - (*msgs)[i++]->addr = c->addr; - list_del(&c->next); - free(c); + switch(msg->code) { + case DHT_FIND_VALUE_REQ: + rsp = do_dht_kv_find_value_req(msg->find); + break; + case DHT_FIND_NODE_REQ: + rsp = do_dht_kv_find_node_req(msg->find); + break; + case DHT_STORE: + do_dht_kv_store(msg->store); + break; + case DHT_FIND_NODE_RSP: + do_dht_kv_find_node_rsp(msg->node); + break; + case DHT_FIND_VALUE_RSP: + do_dht_kv_find_value_rsp(msg->node, msg->val); + break; + default: + assert(false); /* already validated */ } - pthread_rwlock_unlock(&dht->lock); + pthread_cleanup_pop(true); - return i; -} + if (rsp == NULL) + return; -static time_t gcd(time_t a, - time_t b) -{ - if (a == 0) - return b; + pthread_cleanup_push(__cleanup_dht_msg, rsp); - return gcd(b % a, a); + dht_send_msg(rsp, addr); + + pthread_cleanup_pop(true); /* free rsp */ } -static void * work(void * o) +static void * dht_handle_packet(void * o) { - struct dht * dht; - struct timespec now; - struct list_head * p; - struct list_head * h; - struct list_head reflist; - time_t intv; - struct lookup * lu; - - dht = (struct dht *) o; - - pthread_rwlock_rdlock(&dht->lock); - - intv = gcd(dht->t_expire, dht->t_repub); - intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; - - pthread_rwlock_unlock(&dht->lock); - - list_head_init(&reflist); + (void) o; while (true) { - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_wrlock(&dht->lock); - - /* Republish registered hashes. */ - list_for_each(p, &dht->refs) { - struct ref_entry * e; - uint8_t * key; - uint64_t addr; - time_t t_expire; - e = list_entry(p, struct ref_entry, next); - if (now.tv_sec > e->t_rep) { - key = dht_dup_key(e->key, dht->b); - if (key == NULL) - continue; - addr = dht->addr; - t_expire = dht->t_expire; - e->t_rep = now.tv_sec + dht->t_repub; - - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); - free(key); - } - } - - /* Remove stale entries and republish if necessary. */ - list_for_each_safe(p, h, &dht->entries) { - struct list_head * p1; - struct list_head * h1; - struct dht_entry * e; - uint8_t * key; - time_t t_expire; - e = list_entry (p, struct dht_entry, next); - list_for_each_safe(p1, h1, &e->vals) { - struct val * v; - uint64_t addr; - v = list_entry(p1, struct val, next); - if (now.tv_sec > v->t_exp) { - list_del(&v->next); - val_destroy(v); - continue; - } - - if (now.tv_sec > v->t_rep) { - key = dht_dup_key(e->key, dht->b); - addr = v->addr; - t_expire = dht->t_expire = now.tv_sec; - v->t_rep = now.tv_sec + dht->t_replic; - pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, addr, t_expire); - pthread_rwlock_wrlock(&dht->lock); - free(key); - } - } - } - - /* Check the requests list for unresponsive nodes. */ - list_for_each_safe(p, h, &dht->requests) { - struct kad_req * r; - r = list_entry(p, struct kad_req, next); - if (now.tv_sec > r->t_exp) { - list_del(&r->next); - bmp_release(dht->cookies, r->cookie); - dht_dead_peer(dht, r->key, r->addr); - kad_req_destroy(r); - } - } + dht_msg_t * msg; - /* Refresh unaccessed buckets. */ - bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + msg = dht_wait_for_dht_msg(); + if (msg == NULL) + continue; - pthread_rwlock_unlock(&dht->lock); + tpm_begin_work(dht.tpm); - list_for_each_safe(p, h, &reflist) { - struct contact * c; - c = list_entry(p, struct contact, next); - lu = kad_lookup(dht, c->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); - list_del(&c->next); - contact_destroy(c); - } + do_dht_msg(msg); - sleep(intv); + tpm_end_work(dht.tpm); } return (void *) 0; } - -static int kad_handle_join_resp(struct dht * dht, - struct kad_req * req, - dht_msg_t * msg) +#ifndef __DHT_TEST__ +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { - assert(dht); - assert(req); - assert(msg); + struct cmd * cmd; - /* We might send version numbers later to warn of updates if needed. */ - if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && - msg->has_t_refresh && msg->has_t_replicate)) { - log_warn("Join refused by remote."); - return -1; + (void) comp; + + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command malloc failed."); + goto fail_cmd; } - if (msg->b < sizeof(uint64_t)) { - log_err("Hash sizes less than 8 bytes unsupported."); - return -1; + cmd->cbuf.data = malloc(shm_du_buff_len(sdb)); + if (cmd->cbuf.data == NULL) { + log_err("Command buffer malloc failed."); + goto fail_buf; } - pthread_rwlock_wrlock(&dht->lock); + cmd->cbuf.len = shm_du_buff_len(sdb); - dht->buckets = bucket_create(); - if (dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -1; - } + memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len); - /* Likely corrupt packet. The member will refuse, we might here too. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) - log_warn("Different kademlia parameters detected."); + ipcp_sdb_release(sdb); - if (msg->t_replicate != KAD_T_REPL) - log_warn("Different kademlia replication time detected."); + pthread_mutex_lock(&dht.cmds.mtx); - if (msg->t_refresh != KAD_T_REFR) - log_warn("Different kademlia refresh time detected."); + list_add(&cmd->next, &dht.cmds.list); - dht->k = msg->k; - dht->b = msg->b; - dht->t_expire = msg->t_expire; - dht->t_repub = MAX(1, dht->t_expire - 10); + pthread_cond_signal(&dht.cmds.cond); - if (pthread_create(&dht->worker, NULL, work, dht)) { - bucket_destroy(dht->buckets); - pthread_rwlock_unlock(&dht->lock); - return -1; - } + pthread_mutex_unlock(&dht.cmds.mtx); + + return; + + fail_buf: + free(cmd); + fail_cmd: + ipcp_sdb_release(sdb); + return; +} +#endif - kad_req_respond(req); +int dht_reg(const uint8_t * key) +{ + buffer_t val; - dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + if (addr_to_buf(dht.addr, &val) < 0) { + log_err("Failed to convert address to buffer."); + goto fail_a2b; + } - pthread_rwlock_unlock(&dht->lock); + if (dht_kv_publish(key, val)) { + log_err(KV_FMT " Failed to publish.", KV_VAL(key, val)); + goto fail_publish; + } - log_dbg("Enrollment of DHT completed."); + freebuf(val); return 0; + fail_publish: + freebuf(val); + fail_a2b: + return -1; } -static int kad_handle_find_resp(struct dht * dht, - struct kad_req * req, - dht_msg_t * msg) +int dht_unreg(const uint8_t * key) { - struct lookup * lu; - - assert(dht); - assert(req); - assert(msg); + buffer_t val; - pthread_rwlock_rdlock(&dht->lock); - - lu = dht_find_lookup(dht, req->cookie); - if (lu == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -1; + if (addr_to_buf(dht.addr, &val) < 0) { + log_err("Failed to convert address to buffer."); + goto fail_a2b; } - lookup_update(dht, lu, msg); + if (dht_kv_unpublish(key, val)) { + log_err(KV_FMT " Failed to unpublish.", KV_VAL(key, val)); + goto fail_unpublish; + } - pthread_rwlock_unlock(&dht->lock); + freebuf(val); return 0; + fail_unpublish: + freebuf(val); + fail_a2b: + return -ENOMEM; } -static void kad_handle_response(struct dht * dht, - dht_msg_t * msg) +uint64_t dht_query(const uint8_t * key) { - struct kad_req * req; - - assert(dht); - assert(msg); - - pthread_rwlock_wrlock(&dht->lock); + buffer_t * vals; + ssize_t n; + uint64_t addr; - req = dht_find_request(dht, msg); - if (req == NULL) { - pthread_rwlock_unlock(&dht->lock); - return; + n = dht_kv_retrieve(key, &vals); + if (n < 0) { + log_err(KEY_FMT " Failed to query db.", KEY_VAL(key)); + goto fail_vals; } - bmp_release(dht->cookies, req->cookie); - list_del(&req->next); + if (n == 0) { + log_dbg(KEY_FMT " No local values.", KEY_VAL(key)); + n = dht_kv_query_remote(key, &vals, NULL); + if (n < 0) { + log_warn(KEY_FMT " Failed to query DHT.", KEY_VAL(key)); + goto fail_vals; + } + if (n == 0) { + log_dbg(KEY_FMT " No values.", KEY_VAL(key)); + goto no_vals; + } + } - pthread_rwlock_unlock(&dht->lock); + if (buf_to_addr(vals[0], &addr) < 0) { + log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[0])); + goto fail_b2a; + } - switch(req->code) { - case KAD_JOIN: - if (kad_handle_join_resp(dht, req, msg)) - log_err("Enrollment of DHT failed."); - break; - case KAD_FIND_VALUE: - case KAD_FIND_NODE: - if (dht_get_state(dht) != DHT_RUNNING) - break; - kad_handle_find_resp(dht, req, msg); - break; - default: - break; + if (n > 1 && addr == INVALID_ADDR && buf_to_addr(vals[1], &addr) < 0) { + log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[1])); + goto fail_b2a; } - kad_req_destroy(req); + freebufs(vals, n); + + return addr; + fail_b2a: + freebufs(vals, n); + return INVALID_ADDR; + no_vals: + free(vals); + fail_vals: + return INVALID_ADDR; } -int dht_bootstrap(void * dir) +static int emergency_peer(struct list_head * pl) { - struct dht * dht; + struct peer_entry * e; + struct timespec now; - dht = (struct dht *) dir; + assert(pl != NULL); + assert(list_is_empty(pl)); - assert(dht); + if (dht.peer == INVALID_ADDR) + return -1; - pthread_rwlock_wrlock(&dht->lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); -#ifndef __DHT_TEST__ - dht->b = ipcp_dir_hash_len(); -#else - dht->b = DHT_TEST_KEY_LEN; -#endif + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err("Failed to malloc emergency peer entry."); + goto fail_malloc; + } - dht->id = create_id(dht->b); - if (dht->id == NULL) + e->id = dht_dup_key(dht.id.data); + if (e->id == NULL) { + log_err("Failed to duplicate DHT ID for emergency peer."); goto fail_id; + } - dht->buckets = bucket_create(); - if (dht->buckets == NULL) - goto fail_buckets; - - dht->buckets->depth = 0; - dht->buckets->mask = 0; - - dht->t_expire = 86400; /* 1 day */ - dht->t_repub = dht->t_expire - 10; - dht->k = KAD_K; + e->addr = dht.peer; + e->cookie = dht.magic; + e->code = DHT_FIND_NODE_REQ; + e->t_sent = now.tv_sec; - if (pthread_create(&dht->worker, NULL, work, dht)) - goto fail_pthread_create; + list_add_tail(&e->next, pl); - dht->state = DHT_RUNNING; + return 0; + fail_id: + free(e); + fail_malloc: + return -ENOMEM; +} - dht_update_bucket(dht, dht->id, dht->addr); +static int dht_kv_seed_bootstrap_peer(void) +{ + struct list_head pl; - pthread_rwlock_unlock(&dht->lock); + list_head_init(&pl); - return 0; + if (dht.peer == INVALID_ADDR) { + log_dbg("No-one to contact."); + return 0; + } - fail_pthread_create: - bucket_destroy(dht->buckets); - dht->buckets = NULL; - fail_buckets: - free(dht->id); - dht->id = NULL; - fail_id: - pthread_rwlock_unlock(&dht->lock); - return -1; -} + if (emergency_peer(&pl) < 0) { + log_err("Could not create emergency peer."); + goto fail_peer; + } -static struct ref_entry * ref_entry_get(struct dht * dht, - const uint8_t * key) -{ - struct list_head * p; + log_dbg("Pinging emergency peer " ADDR_FMT32 ".", + ADDR_VAL32(&dht.peer)); - list_for_each(p, &dht->refs) { - struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) - return r; + if (dht_kv_query_contacts(dht.id.data, &pl) < 0) { + log_warn("Failed to bootstrap peer."); + goto fail_query; } - return NULL; + peer_list_destroy(&pl); + + return 0; + fail_query: + peer_list_destroy(&pl); + fail_peer: + return -EAGAIN; } -int dht_reg(void * dir, - const uint8_t * key) +static void dht_kv_check_contacts(void) { - struct dht * dht; - struct ref_entry * e; - uint64_t addr; - time_t t_expire; + struct list_head cl; + struct list_head pl; - dht = (struct dht *) dir; + list_head_init(&cl); - assert(dht); - assert(key); - assert(dht->addr != 0); + dht_kv_contact_list(dht.id.data, &cl, dht.k); - if (dht_wait_running(dht)) - return -1; + if (!list_is_empty(&cl)) + goto success; - pthread_rwlock_wrlock(&dht->lock); + contact_list_destroy(&cl); - if (ref_entry_get(dht, key) != NULL) { - log_dbg("Name already registered."); - pthread_rwlock_unlock(&dht->lock); - return 0; - } + list_head_init(&pl); - e = ref_entry_create(dht, key); - if (e == NULL) { - pthread_rwlock_unlock(&dht->lock); - return -ENOMEM; + if (dht.peer == INVALID_ADDR) { + log_dbg("No-one to contact."); + return; } - list_add(&e->next, &dht->refs); + if (emergency_peer(&pl) < 0) { + log_err("Could not create emergency peer."); + goto fail_peer; + } - t_expire = dht->t_expire; - addr = dht->addr; + log_dbg("No contacts found, using emergency peer " ADDR_FMT32 ".", + ADDR_VAL32(&dht.peer)); - pthread_rwlock_unlock(&dht->lock); + dht_kv_query_contacts(dht.id.data, &pl); - kad_publish(dht, key, addr, t_expire); + peer_list_destroy(&pl); - return 0; + return; + success: + contact_list_destroy(&cl); + return; + fail_peer: + return; } -int dht_unreg(void * dir, - const uint8_t * key) +static void dht_kv_remove_expired_reqs(void) { - struct dht * dht; struct list_head * p; struct list_head * h; + struct timespec now; - dht = (struct dht *) dir; - - assert(dht); - assert(key); - - if (dht_get_state(dht) != DHT_RUNNING) - return -1; + clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_wrlock(&dht->lock); + pthread_mutex_lock(&dht.reqs.mtx); - list_for_each_safe(p, h, &dht->refs) { - struct ref_entry * r = list_entry(p, struct ref_entry, next); - if (!memcmp(key, r->key, dht-> b) ) { - list_del(&r->next); - ref_entry_destroy(r); + list_for_each_safe(p, h, &dht.reqs.list) { + struct dht_req * e; + e = list_entry(p, struct dht_req, next); + if (IS_EXPIRED(e, &now)) { + log_dbg(KEY_FMT " Removing expired request.", + KEY_VAL(e->key)); + list_del(&e->next); + dht_req_destroy(e); + --dht.reqs.len; } } - dht_del(dht, key, dht->addr); + pthread_mutex_unlock(&dht.reqs.mtx); +} + +static void value_list_destroy(struct list_head * vl) +{ + struct list_head * p; + struct list_head * h; - pthread_rwlock_unlock(&dht->lock); + assert(vl != NULL); - return 0; + list_for_each_safe(p, h, vl) { + struct val_entry * v = list_entry(p, struct val_entry, next); + list_del(&v->next); + val_entry_destroy(v); + } } -uint64_t dht_query(void * dir, - const uint8_t * key) +#define MUST_REPLICATE(v, now) ((now)->tv_sec > (v)->t_repl + dht.t_repl) +#define MUST_REPUBLISH(v, now) /* Close to expiry deadline */ \ + (((v)->t_exp - (now)->tv_sec) < (DHT_N_REPUB * dht.t_repl)) +static void dht_entry_get_repl_lists(const struct dht_entry * e, + struct list_head * repl, + struct list_head * rebl, + struct timespec * now) { - struct dht * dht; - struct dht_entry * e; - struct lookup * lu; - uint64_t addrs[KAD_K]; - size_t n; + struct list_head * p; + struct val_entry * n; - dht = (struct dht *) dir; + list_for_each(p, &e->vals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (MUST_REPLICATE(v, now) && !IS_EXPIRED(v, now)) { + n = val_entry_create(v->val, v->t_exp); + if (n == NULL) + continue; - assert(dht); + list_add_tail(&n->next, repl); + } + } - addrs[0] = 0; + list_for_each(p, &e->lvals.list) { + struct val_entry * v = list_entry(p, struct val_entry, next); + if (MUST_REPLICATE(v, now) && MUST_REPUBLISH(v, now)) { + /* Add expire time here, to allow creating val_entry */ + n = val_entry_create(v->val, now->tv_sec + dht.t_expire); + if (n == NULL) + continue; - if (dht_wait_running(dht)) - return 0; + list_add_tail(&n->next, rebl); + } + } +} + +static int dht_kv_next_values(uint8_t * key, + struct list_head * repl, + struct list_head * rebl) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + struct dht_entry * e = NULL; - pthread_rwlock_rdlock(&dht->lock); + assert(key != NULL); + assert(repl != NULL); + assert(rebl != NULL); - e = dht_find_entry(dht, key); - if (e != NULL) - addrs[0] = dht_entry_get_addr(dht, e); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_unlock(&dht->lock); + assert(list_is_empty(repl)); + assert(list_is_empty(rebl)); - if (addrs[0] != 0) - return addrs[0]; + pthread_rwlock_rdlock(&dht.db.lock); - lu = kad_lookup(dht, key, KAD_FIND_VALUE); - if (lu == NULL) - return 0; + if (dht.db.kv.len == 0) + goto no_entries; - n = lookup_get_addrs(lu, addrs); - if (n == 0) { - lookup_destroy(lu); - return 0; + list_for_each_safe(p, h, &dht.db.kv.list) { + e = list_entry(p, struct dht_entry, next); + if (IS_CLOSER(e->key, key)) + continue; /* Already processed */ + } + + if (e != NULL) { + memcpy(key, e->key, dht.id.len); + dht_entry_get_repl_lists(e, repl, rebl, &now); } + no_entries: + pthread_rwlock_unlock(&dht.db.lock); - lookup_destroy(lu); + return list_is_empty(repl) && list_is_empty(rebl) ? -ENOENT : 0; +} - /* Current behaviour is anycast and return the first peer address. */ - if (addrs[0] != dht->addr) - return addrs[0]; +static void dht_kv_replicate_value(const uint8_t * key, + struct val_entry * v, + const struct timespec * now) +{ + assert(MUST_REPLICATE(v, now)); - if (n > 1) - return addrs[1]; + (void) now; - return 0; + if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { + log_dbg(KV_FMT " Replicated.", KV_VAL(key, v->val)); + return; + } + + log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); + + list_del(&v->next); + val_entry_destroy(v); } -static void * dht_handle_packet(void * o) +static void dht_kv_republish_value(const uint8_t * key, + struct val_entry * v, + const struct timespec * now) { - struct dht * dht = (struct dht *) o; + assert(MUST_REPLICATE(v, now)); - assert(dht); + if (MUST_REPUBLISH(v, now)) + assert(v->t_exp >= now->tv_sec + dht.t_expire); - while (true) { - dht_msg_t * msg; - dht_contact_msg_t ** cmsgs; - dht_msg_t resp_msg = DHT_MSG__INIT; - uint64_t addr; - buffer_t buf; - size_t i; - size_t b; - size_t t_expire; - struct cmd * cmd; - - pthread_mutex_lock(&dht->mtx); + if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { + log_dbg(KV_FMT " Republished.", KV_VAL(key, v->val)); + return; + } - pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + if (MUST_REPUBLISH(v, now)) + log_warn(KV_FMT " Republish failed.", KV_VAL(key, v->val)); + else + log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); - while (list_is_empty(&dht->cmds)) - pthread_cond_wait(&dht->cond, &dht->mtx); + list_del(&v->next); + val_entry_destroy(v); +} - cmd = list_last_entry(&dht->cmds, struct cmd, next); - list_del(&cmd->next); +static void dht_kv_update_replication_times(const uint8_t * key, + struct list_head * repl, + struct list_head * rebl, + const struct timespec * now) +{ + struct dht_entry * e; + struct list_head * p; + struct list_head * h; + struct val_entry * v; - pthread_cleanup_pop(true); + assert(key != NULL); + assert(repl != NULL); + assert(rebl != NULL); + assert(now != NULL); - i = shm_du_buff_len(cmd->sdb); + pthread_rwlock_wrlock(&dht.db.lock); - msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -#ifndef __DHT_TEST__ - ipcp_sdb_release(cmd->sdb); -#endif - free(cmd); + e = __dht_kv_find_entry(key); + if (e == NULL) { + pthread_rwlock_unlock(&dht.db.lock); + return; + } - if (msg == NULL) { - log_err("Failed to unpack message."); + list_for_each_safe(p, h, repl) { + struct val_entry * x; + v = list_entry(p, struct val_entry, next); + x = dht_entry_get_val(e, v->val); + if (x == NULL) { + log_err(KV_FMT " Not in vals.", KV_VAL(key, v->val)); continue; } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - dht_msg__free_unpacked(msg, NULL); - log_dbg("Got a request message when not running."); + x->t_repl = now->tv_sec; + + list_del(&v->next); + val_entry_destroy(v); + } + + list_for_each_safe(p, h, rebl) { + struct val_entry * x; + v = list_entry(p, struct val_entry, next); + x = dht_entry_get_lval(e, v->val); + if (x == NULL) { + log_err(KV_FMT " Not in lvals.", KV_VAL(key, v->val)); continue; } - pthread_rwlock_rdlock(&dht->lock); + x->t_repl = now->tv_sec; + if (v->t_exp > x->t_exp) { + x->t_exp = v->t_exp; /* update expiration time */ + } - b = dht->b; - t_expire = dht->t_expire; + list_del(&v->next); + val_entry_destroy(v); + } - pthread_rwlock_unlock(&dht->lock); + pthread_rwlock_unlock(&dht.db.lock); +} - if (msg->has_key && msg->key.len != b) { - dht_msg__free_unpacked(msg, NULL); - log_warn("Bad key in message."); - continue; - } +static void dht_kv_replicate_values(const uint8_t * key, + struct list_head * repl, + struct list_head * rebl) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; - if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - dht_msg__free_unpacked(msg, NULL); - log_warn("Bad source ID in message of type %d.", - msg->code); - continue; - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - tpm_begin_work(dht->tpm); + list_for_each_safe(p, h, repl) { + struct val_entry * v; + v = list_entry(p, struct val_entry, next); + dht_kv_replicate_value(key, v, &now); + } - addr = msg->s_addr; + list_for_each_safe(p, h, rebl) { + struct val_entry * v; + v = list_entry(p, struct val_entry, next); + dht_kv_republish_value(key, v, &now); + } - resp_msg.code = KAD_RESPONSE; - resp_msg.cookie = msg->cookie; + /* removes non-replicated items from the list */ + dht_kv_update_replication_times(key, repl, rebl, &now); - switch(msg->code) { - case KAD_JOIN: - /* Refuse enrollee on check fails. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { - log_warn("Parameter mismatch. " - "DHT enrolment refused."); - break; - } + if (list_is_empty(repl) && list_is_empty(rebl)) + return; - if (msg->t_replicate != KAD_T_REPL) { - log_warn("Replication time mismatch. " - "DHT enrolment refused."); + log_warn(KEY_FMT " Failed to update replication times.", KEY_VAL(key)); +} - break; - } +static void dht_kv_replicate(void) +{ + struct list_head repl; /* list of values to replicate */ + struct list_head rebl; /* list of local values to republish */ + uint8_t * key; - if (msg->t_refresh != KAD_T_REFR) { - log_warn("Refresh time mismatch. " - "DHT enrolment refused."); - break; - } + key = dht_dup_key(dht.id.data); /* dist == 0 */ + if (key == NULL) { + log_err("Replicate: Failed to duplicate DHT ID."); + return; + } - resp_msg.has_alpha = true; - resp_msg.has_b = true; - resp_msg.has_k = true; - resp_msg.has_t_expire = true; - resp_msg.has_t_refresh = true; - resp_msg.has_t_replicate = true; - resp_msg.alpha = KAD_ALPHA; - resp_msg.b = b; - resp_msg.k = KAD_K; - resp_msg.t_expire = t_expire; - resp_msg.t_refresh = KAD_T_REFR; - resp_msg.t_replicate = KAD_T_REPL; - break; - case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); - if (buf.len != 0) { - resp_msg.n_addrs = buf.len; - resp_msg.addrs = (uint64_t *) buf.data; - break; - } - /* FALLTHRU */ - case KAD_FIND_NODE: - /* Return k closest contacts. */ - resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); - resp_msg.contacts = cmsgs; - break; - case KAD_STORE: - if (msg->n_contacts < 1) { - log_warn("No contacts in store message."); - break; - } + list_head_init(&repl); + list_head_init(&rebl); - if (!msg->has_t_expire) { - log_warn("No expiry time in store message."); - break; - } + while (dht_kv_next_values(key, &repl, &rebl) == 0) { + dht_kv_replicate_values(key, &repl, &rebl); + if (!list_is_empty(&repl)) { + log_warn(KEY_FMT " Replication items left.", + KEY_VAL(key)); + value_list_destroy(&repl); + } - kad_add(dht, *msg->contacts, msg->n_contacts, - msg->t_expire); - break; - case KAD_RESPONSE: - kad_handle_response(dht, msg); - break; - default: - assert(false); - break; + if (!list_is_empty(&rebl)) { + log_warn(KEY_FMT " Republish items left.", + KEY_VAL(key)); + value_list_destroy(&rebl); } + } - if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_get_state(dht) == DHT_JOINING && - dht->buckets == NULL) { - pthread_rwlock_unlock(&dht->lock); - goto finish; - } + free(key); +} - if (dht_update_bucket(dht, msg->s_id.data, addr)) - log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); - } +static void dht_kv_refresh_contacts(void) +{ + struct list_head * p; + struct list_head * h; + struct list_head rl; /* refresh list */ + struct timespec now; - if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) - log_warn("Failed to send response."); + list_head_init(&rl); - finish: - dht_msg__free_unpacked(msg, NULL); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - if (resp_msg.n_addrs > 0) - free(resp_msg.addrs); + pthread_rwlock_rdlock(&dht.db.lock); - if (resp_msg.n_contacts == 0) { - tpm_end_work(dht->tpm); - continue; - } + __dht_kv_bucket_refresh_list(dht.db.contacts.root, now.tv_sec, &rl); - for (i = 0; i < resp_msg.n_contacts; ++i) - dht_contact_msg__free_unpacked(resp_msg.contacts[i], - NULL); - free(resp_msg.contacts); + pthread_rwlock_unlock(&dht.db.lock); - tpm_end_work(dht->tpm); + list_for_each_safe(p, h, &rl) { + struct contact * c; + c = list_entry(p, struct contact, next); + log_dbg(PEER_FMT " Refreshing contact.", + PEER_VAL(c->id, c->addr)); + dht_kv_query_contacts(c->id, NULL); + list_del(&c->next); + contact_destroy(c); } - return (void *) 0; + assert(list_is_empty(&rl)); } -static void dht_post_packet(void * comp, - struct shm_du_buff * sdb) +static void (*tasks[])(void) = { + dht_kv_check_contacts, + dht_kv_remove_expired_entries, + dht_kv_remove_expired_reqs, + dht_kv_replicate, + dht_kv_refresh_contacts, + NULL +}; + +static void * work(void * o) { - struct cmd * cmd; - struct dht * dht = (struct dht *) comp; + struct timespec now = TIMESPEC_INIT_MS(1); + time_t intv; + size_t n; /* number of tasks */ - if (dht_get_state(dht) == DHT_SHUTDOWN) { -#ifndef __DHT_TEST__ - ipcp_sdb_release(sdb); -#endif - return; - } + n = sizeof(tasks) / sizeof(tasks[0]) - 1; /* last is NULL */ - cmd = malloc(sizeof(*cmd)); - if (cmd == NULL) { - log_err("Command failed. Out of memory."); - return; - } + (void) o; - cmd->sdb = sdb; + while (dht_kv_seed_bootstrap_peer() == -EAGAIN) { + ts_add(&now, &now, &now); /* exponential backoff */ + if (now.tv_sec > 1) /* cap at 1 second */ + now.tv_sec = 1; + nanosleep(&now, NULL); + } - pthread_mutex_lock(&dht->mtx); + intv = gcd(dht.t_expire, (dht.t_expire - DHT_N_REPUB * dht.t_repl)); + intv = gcd(intv, gcd(dht.t_repl, dht.t_refresh)) / 2; + intv = MAX(1, intv / n); - list_add(&cmd->next, &dht->cmds); + log_dbg("DHT worker starting %ld seconds interval.", intv * n); - pthread_cond_signal(&dht->cond); + while (true) { + int i = 0; + while (tasks[i] != NULL) { + tasks[i++](); + sleep(intv); + } + } - pthread_mutex_unlock(&dht->mtx); + return (void *) 0; } -void dht_destroy(void * dir) +int dht_start(void) { - struct dht * dht; - struct list_head * p; - struct list_head * h; + dht.state = DHT_RUNNING; - dht = (struct dht *) dir; - if (dht == NULL) - return; + if (tpm_start(dht.tpm)) + goto fail_tpm_start; #ifndef __DHT_TEST__ - tpm_stop(dht->tpm); + if (pthread_create(&dht.worker, NULL, work, NULL)) { + log_err("Failed to create DHT worker thread."); + goto fail_worker; + } - tpm_destroy(dht->tpm); -#endif - if (dht_get_state(dht) == DHT_RUNNING) { - dht_set_state(dht, DHT_SHUTDOWN); - pthread_cancel(dht->worker); - pthread_join(dht->worker, NULL); + dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT); + if ((int) dht.eid < 0) { + log_err("Failed to register DHT component."); + goto fail_reg; } +#else + (void) work; +#endif + return 0; +#ifndef __DHT_TEST__ + fail_reg: + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); + fail_worker: + tpm_stop(dht.tpm); +#endif + fail_tpm_start: + dht.state = DHT_INIT; + return -1; +} - pthread_rwlock_wrlock(&dht->lock); +void dht_stop(void) +{ + assert(dht.state == DHT_RUNNING); - list_for_each_safe(p, h, &dht->cmds) { - struct cmd * c = list_entry(p, struct cmd, next); - list_del(&c->next); #ifndef __DHT_TEST__ - ipcp_sdb_release(c->sdb); + dt_unreg_comp(dht.eid); + + pthread_cancel(dht.worker); + pthread_join(dht.worker, NULL); #endif - free(c); - } + tpm_stop(dht.tpm); - list_for_each_safe(p, h, &dht->entries) { - struct dht_entry * e = list_entry(p, struct dht_entry, next); - list_del(&e->next); - dht_entry_destroy(e); - } + dht.state = DHT_INIT; +} - list_for_each_safe(p, h, &dht->requests) { - struct kad_req * r = list_entry(p, struct kad_req, next); - list_del(&r->next); - kad_req_destroy(r); - } +int dht_init(struct dir_dht_config * conf) +{ + struct timespec now; + pthread_condattr_t cattr; - list_for_each_safe(p, h, &dht->refs) { - struct ref_entry * e = list_entry(p, struct ref_entry, next); - list_del(&e->next); - ref_entry_destroy(e); - } + assert(conf != NULL); - list_for_each_safe(p, h, &dht->lookups) { - struct lookup * l = list_entry(p, struct lookup, next); - list_del(&l->next); - lookup_destroy(l); - } + clock_gettime(CLOCK_REALTIME_COARSE, &now); - pthread_rwlock_unlock(&dht->lock); +#ifndef __DHT_TEST__ + dht.id.len = ipcp_dir_hash_len(); + dht.addr = addr_auth_address(); +#else + dht.id.len = DHT_TEST_KEY_LEN; + dht.addr = DHT_TEST_ADDR; +#endif + dht.t0 = now.tv_sec; + dht.alpha = conf->params.alpha; + dht.k = conf->params.k; + dht.t_expire = conf->params.t_expire; + dht.t_refresh = conf->params.t_refresh; + dht.t_repl = conf->params.t_replicate; + dht.peer = conf->peer; + + dht.magic = generate_cookie(); + + /* Send my address on enrollment */ + conf->peer = dht.addr; + + dht.id.data = generate_id(); + if (dht.id.data == NULL) { + log_err("Failed to create DHT ID."); + goto fail_id; + } - if (dht->buckets != NULL) - bucket_destroy(dht->buckets); + list_head_init(&dht.cmds.list); - bmp_destroy(dht->cookies); + if (pthread_mutex_init(&dht.cmds.mtx, NULL)) { + log_err("Failed to initialize command mutex."); + goto fail_cmds_mutex; + } - pthread_mutex_destroy(&dht->mtx); + if (pthread_cond_init(&dht.cmds.cond, NULL)) { + log_err("Failed to initialize command condvar."); + goto fail_cmds_cond; + } - pthread_rwlock_destroy(&dht->lock); + list_head_init(&dht.reqs.list); + dht.reqs.len = 0; - free(dht->id); + if (pthread_mutex_init(&dht.reqs.mtx, NULL)) { + log_err("Failed to initialize request mutex."); + goto fail_reqs_mutex; + } - free(dht); -} + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize request condvar attributes."); + goto fail_cattr; + } +#ifndef __APPLE__ + if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) { + log_err("Failed to set request condvar clock."); + goto fail_cattr; + } +#endif + if (pthread_cond_init(&dht.reqs.cond, &cattr)) { + log_err("Failed to initialize request condvar."); + goto fail_reqs_cond; + } -static void * join_thr(void * o) -{ - struct join_info * info = (struct join_info *) o; - struct lookup * lu; - size_t retr = 0; + list_head_init(&dht.db.kv.list); + dht.db.kv.len = 0; + dht.db.kv.vals = 0; + dht.db.kv.lvals = 0; - assert(info); + if (pthread_rwlock_init(&dht.db.lock, NULL)) { + log_err("Failed to initialize store rwlock."); + goto fail_rwlock; + } - while (kad_join(info->dht, info->addr)) { - if (dht_get_state(info->dht) == DHT_SHUTDOWN) { - log_dbg("DHT enrollment aborted."); - goto finish; - } + dht.db.contacts.root = bucket_create(); + if (dht.db.contacts.root == NULL) { + log_err("Failed to create DHT buckets."); + goto fail_buckets; + } - if (retr++ == KAD_JOIN_RETR) { - dht_set_state(info->dht, DHT_INIT); - log_warn("DHT enrollment attempt failed."); - goto finish; - } + if (rib_reg(DHT, &r_ops) < 0) { + log_err("Failed to register DHT RIB operations."); + goto fail_rib_reg; + } - sleep(KAD_JOIN_INTV); + dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); + if (dht.tpm == NULL) { + log_err("Failed to create TPM for DHT."); + goto fail_tpm_create; } - dht_set_state(info->dht, DHT_RUNNING); + if (dht_kv_update_contacts(dht.id.data, dht.addr) < 0) + log_warn("Failed to update contacts with DHT ID."); - lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); - if (lu != NULL) - lookup_destroy(lu); + pthread_condattr_destroy(&cattr); +#ifndef __DHT_TEST__ + log_info("DHT initialized."); + log_dbg(" ID: " HASH_FMT64 " [%zu bytes].", + HASH_VAL64(dht.id.data), dht.id.len); + log_dbg(" address: " ADDR_FMT32 ".", ADDR_VAL32(&dht.addr)); + log_dbg(" peer: " ADDR_FMT32 ".", ADDR_VAL32(&dht.peer)); + log_dbg(" magic cookie: " HASH_FMT64 ".", HASH_VAL64(&dht.magic)); + log_info(" parameters: alpha=%u, k=%zu, t_expire=%ld, " + "t_refresh=%ld, t_replicate=%ld.", + dht.alpha, dht.k, dht.t_expire, dht.t_refresh, dht.t_repl); +#endif + dht.state = DHT_INIT; - finish: - free(info); + return 0; - return (void *) 0; + fail_tpm_create: + rib_unreg(DHT); + fail_rib_reg: + bucket_destroy(dht.db.contacts.root); + fail_buckets: + pthread_rwlock_destroy(&dht.db.lock); + fail_rwlock: + pthread_cond_destroy(&dht.reqs.cond); + fail_reqs_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&dht.reqs.mtx); + fail_reqs_mutex: + pthread_cond_destroy(&dht.cmds.cond); + fail_cmds_cond: + pthread_mutex_destroy(&dht.cmds.mtx); + fail_cmds_mutex: + freebuf(dht.id); + fail_id: + return -1; } -static void handle_event(void * self, - int event, - const void * o) +void dht_fini(void) { - struct dht * dht = (struct dht *) self; + struct list_head * p; + struct list_head * h; - if (event == NOTIFY_DT_CONN_ADD) { - pthread_t thr; - struct join_info * inf; - struct conn * c = (struct conn *) o; - struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK); + rib_unreg(DHT); - /* Give the pff some time to update for the new link. */ - nanosleep(&slack, NULL); + tpm_destroy(dht.tpm); - switch(dht_get_state(dht)) { - case DHT_INIT: - inf = malloc(sizeof(*inf)); - if (inf == NULL) - break; + pthread_mutex_lock(&dht.cmds.mtx); - inf->dht = dht; - inf->addr = c->conn_info.addr; - - if (dht_set_state(dht, DHT_JOINING) == 0 || - dht_wait_running(dht)) { - if (pthread_create(&thr, NULL, join_thr, inf)) { - dht_set_state(dht, DHT_INIT); - free(inf); - return; - } - pthread_detach(thr); - } else { - free(inf); - } - break; - case DHT_RUNNING: - /* - * FIXME: this lookup for effiency reasons - * causes a SEGV when stressed with rapid - * enrollments. - * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); - * if (lu != NULL) - * lookup_destroy(lu); - */ - break; - default: - break; - } + list_for_each_safe(p, h, &dht.cmds.list) { + struct cmd * c = list_entry(p, struct cmd, next); + list_del(&c->next); + freebuf(c->cbuf); + free(c); } -} - -void * dht_create(void) -{ - struct dht * dht; - dht = malloc(sizeof(*dht)); - if (dht == NULL) - goto fail_malloc; + pthread_mutex_unlock(&dht.cmds.mtx); - dht->buckets = NULL; + pthread_cond_destroy(&dht.cmds.cond); + pthread_mutex_destroy(&dht.cmds.mtx); - list_head_init(&dht->entries); - list_head_init(&dht->requests); - list_head_init(&dht->refs); - list_head_init(&dht->lookups); - list_head_init(&dht->cmds); + pthread_mutex_lock(&dht.reqs.mtx); - if (pthread_rwlock_init(&dht->lock, NULL)) - goto fail_rwlock; + list_for_each_safe(p, h, &dht.reqs.list) { + struct dht_req * r = list_entry(p, struct dht_req, next); + list_del(&r->next); + dht_req_destroy(r); + dht.reqs.len--; + } - if (pthread_mutex_init(&dht->mtx, NULL)) - goto fail_mutex; + pthread_mutex_unlock(&dht.reqs.mtx); - if (pthread_cond_init(&dht->cond, NULL)) - goto fail_cond; + pthread_cond_destroy(&dht.reqs.cond); + pthread_mutex_destroy(&dht.reqs.mtx); - dht->cookies = bmp_create(DHT_MAX_REQS, 1); - if (dht->cookies == NULL) - goto fail_bmp; + pthread_rwlock_wrlock(&dht.db.lock); - dht->b = 0; - dht->id = NULL; -#ifndef __DHT_TEST__ - dht->addr = addr_auth_address(); - if (dht->addr == INVALID_ADDR) - goto fail_bmp; + list_for_each_safe(p, h, &dht.db.kv.list) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + dht.db.kv.len--; + } - dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); - if (dht->tpm == NULL) - goto fail_tpm_create; + if (dht.db.contacts.root != NULL) + bucket_destroy(dht.db.contacts.root); - if (tpm_start(dht->tpm)) - goto fail_tpm_start; + pthread_rwlock_unlock(&dht.db.lock); - dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); - if ((int) dht->eid < 0) - goto fail_tpm_start; + pthread_rwlock_destroy(&dht.db.lock); - if (notifier_reg(handle_event, dht)) - goto fail_notifier_reg; -#else - (void) handle_event; - (void) dht_handle_packet; - (void) dht_post_packet; -#endif - dht->state = DHT_INIT; + assert(dht.db.kv.len == 0); + assert(dht.db.kv.vals == 0); + assert(dht.db.kv.lvals == 0); + assert(dht.reqs.len == 0); - return (void *) dht; -#ifndef __DHT_TEST__ - fail_notifier_reg: - tpm_stop(dht->tpm); - fail_tpm_start: - tpm_destroy(dht->tpm); - fail_tpm_create: - bmp_destroy(dht->cookies); -#endif - fail_bmp: - pthread_cond_destroy(&dht->cond); - fail_cond: - pthread_mutex_destroy(&dht->mtx); - fail_mutex: - pthread_rwlock_destroy(&dht->lock); - fail_rwlock: - free(dht); - fail_malloc: - return NULL; + freebuf(dht.id); } -- cgit v1.2.3