diff options
Diffstat (limited to 'src/ipcpd/unicast/dir')
| -rw-r--r-- | src/ipcpd/unicast/dir/dht.c | 4923 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/dht.h | 19 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/dht.proto | 58 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/kademlia.proto | 45 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/ops.h | 20 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/pol.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/tests/CMakeLists.txt | 11 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/tests/dht_test.c | 1923 | 
8 files changed, 5008 insertions, 1993 deletions
| diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index 1742267b..6b06def9 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -1,10 +1,9 @@  /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024   *   * Distributed Hash Table based on Kademlia   *   *    Dimitri Staessens <dimitri@ouroboros.rocks> - *    Sander Vrijders   <sander@ouroboros.rocks>   *   * This library is free software; you can redistribute it and/or   * modify it under the terms of the GNU Lesser General Public License @@ -20,10 +19,12 @@   * Foundation, Inc., http://www.fsf.org/about/contact/.   */ -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L +#if !defined (__DHT_TEST__) +  #if defined(__linux__) || defined(__CYGWIN__) +    #define _DEFAULT_SOURCE +  #else +    #define _POSIX_C_SOURCE 200112L +  #endif  #endif  #include "config.h" @@ -31,19 +32,21 @@  #define DHT              "dht"  #define OUROBOROS_PREFIX DHT +#include <ouroboros/endian.h>  #include <ouroboros/hash.h>  #include <ouroboros/ipcp-dev.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/errno.h>  #include <ouroboros/logs.h>  #include <ouroboros/list.h> -#include <ouroboros/notifier.h>  #include <ouroboros/random.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/rib.h> +#include <ouroboros/time.h>  #include <ouroboros/tpm.h>  #include <ouroboros/utils.h>  #include <ouroboros/pthread.h> +#include "addr-auth.h"  #include "common/connmgr.h"  #include "dht.h"  #include "dt.h" @@ -56,144 +59,154 @@  #include <inttypes.h>  #include <limits.h> -#include "kademlia.pb-c.h" -typedef KadMsg kad_msg_t; -typedef KadContactMsg kad_contact_msg_t; +#include "dht.pb-c.h" +typedef DhtMsg              dht_msg_t; +typedef DhtContactMsg       dht_contact_msg_t; +typedef DhtStoreMsg         dht_store_msg_t; +typedef DhtFindReqMsg       dht_find_req_msg_t; +typedef DhtFindNodeRspMsg   dht_find_node_rsp_msg_t; +typedef DhtFindValueRspMsg  dht_find_value_rsp_msg_t; +typedef ProtobufCBinaryData binary_data_t;  #ifndef CLOCK_REALTIME_COARSE  #define CLOCK_REALTIME_COARSE CLOCK_REALTIME  #endif -#define DHT_MAX_REQS  2048 /* KAD recommends rnd(), bmp can be changed.    */ -#define KAD_ALPHA     3    /* Parallel factor, proven optimal value.       */ -#define KAD_K         8    /* Replication factor, MDHT value.              */ -#define KAD_T_REPL    900  /* Replication time, tied to k. MDHT value.     */ -#define KAD_T_REFR    900  /* Refresh time stale bucket, MDHT value.       */ -#define KAD_T_JOIN    8    /* Response time to wait for a join.            */ -#define KAD_T_RESP    5    /* Response time to wait for a response.        */ -#define KAD_R_PING    2    /* Ping retries before declaring peer dead.     */ -#define KAD_QUEER     15   /* Time to declare peer questionable.           */ -#define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8.   */ -#define KAD_RESP_RETR 6    /* Number of retries on sending a response.     */ -#define KAD_JOIN_RETR 8    /* Number of retries sending a join.            */ -#define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.         */ +#define DHT_MAX_REQS  128  /* KAD recommends rnd(), bmp can be changed.    */ +#define DHT_WARN_REQS 100  /* Warn if number of requests exceeds this.     */ +#define DHT_MAX_VALS  8    /* Max number of values to return for a key.    */ +#define DHT_T_CACHE   60   /* Max cache time for values (s)                */ +#define DHT_T_RESP    2    /* Response time to wait for a response (s).    */ +#define DHT_N_REPUB   5    /* Republish if expiry within n replications.   */ +#define DHT_R_PING    2    /* Ping retries before declaring peer dead.     */ +#define DHT_QUEER     15   /* Time to declare peer questionable.           */ +#define DHT_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8.   */ +#define DHT_RESP_RETR 6    /* Number of retries on sending a response.     */  #define HANDLE_TIMEO  1000 /* Timeout for dht_handle_packet tpm check (ms) */ -#define DHT_RETR_ADDR 1    /* Number of addresses to return on retrieve    */ +#define DHT_INVALID   0    /* Invalid cookie value.                        */ -enum dht_state { -        DHT_INIT = 0, -        DHT_SHUTDOWN, -        DHT_JOINING, -        DHT_RUNNING, -}; +#define KEY_FMT "K<" HASH_FMT64 ">" +#define KEY_VAL(key) HASH_VAL64(key) -enum kad_code { -        KAD_JOIN = 0, -        KAD_FIND_NODE, -        KAD_FIND_VALUE, -        /* Messages without a response below. */ -        KAD_STORE, -        KAD_RESPONSE -}; +#define VAL_FMT "V<" HASH_FMT64 ">" +#define VAL_VAL(val) HASH_VAL64((val).data) -enum kad_req_state { -        REQ_NULL = 0, -        REQ_INIT, -        REQ_PENDING, -        REQ_RESPONSE, -        REQ_DONE, -        REQ_DESTROY -}; +#define KV_FMT "<" HASH_FMT64 ", " HASH_FMT64 ">" +#define KV_VAL(key, val) HASH_VAL64(key), HASH_VAL64((val).data) -enum lookup_state { -        LU_NULL = 0, -        LU_INIT, -        LU_PENDING, -        LU_UPDATE, -        LU_COMPLETE, -        LU_DESTROY -}; +#define PEER_FMT "[" HASH_FMT64 "|" ADDR_FMT32 "]" +#define PEER_VAL(id, addr) HASH_VAL64(id), ADDR_VAL32(&(addr)) -struct kad_req { -        struct list_head   next; +#define DHT_CODE(msg) dht_code_str[(msg)->code] -        uint32_t           cookie; -        enum kad_code      code; -        uint8_t *          key; -        uint64_t           addr; +#define TX_HDR_FMT "%s --> " PEER_FMT +#define TX_HDR_VAL(msg, id, addr) DHT_CODE(msg), PEER_VAL(id, addr) -        enum kad_req_state state; -        pthread_cond_t     cond; -        pthread_mutex_t    lock; +#define RX_HDR_FMT "%s <-- " PEER_FMT +#define RX_HDR_VAL(msg) DHT_CODE(msg), \ +        PEER_VAL(msg->src->id.data, msg->src->addr) -        time_t             t_exp; +#define CK_FMT "|" HASH_FMT64 "|" +#define CK_VAL(cookie) HASH_VAL64(&(cookie)) + +#define IS_REQUEST(code) \ +        (code == DHT_FIND_NODE_REQ || code == DHT_FIND_VALUE_REQ) + +enum dht_code { +        DHT_STORE, +        DHT_FIND_NODE_REQ, +        DHT_FIND_NODE_RSP, +        DHT_FIND_VALUE_REQ, +        DHT_FIND_VALUE_RSP  }; -struct cookie_el { -        struct list_head next; +const char * dht_code_str[] = { +        "DHT_STORE", +        "DHT_FIND_NODE_REQ", +        "DHT_FIND_NODE_RSP", +        "DHT_FIND_VALUE_REQ", +        "DHT_FIND_VALUE_RSP" +}; -        uint32_t         cookie; +enum dht_state { +        DHT_NULL = 0, +        DHT_INIT, +        DHT_RUNNING  }; -struct lookup { -        struct list_head  next; +struct val_entry { +        struct list_head next; -        struct list_head  cookies; +        buffer_t         val; -        uint8_t *         key; +        time_t           t_exp;   /* Expiry time           */ +        time_t           t_repl;  /* Last replication time */ +}; + +struct dht_entry { +        struct list_head next; -        struct list_head  contacts; -        size_t            n_contacts; +        uint8_t *        key; -        uint64_t *        addrs; -        size_t            n_addrs; +        struct { +                struct list_head list; +                size_t           len; +        } vals;  /* We don't own these, only replicate */ -        enum lookup_state state; -        pthread_cond_t    cond; -        pthread_mutex_t   lock; +        struct { +                struct list_head list; +                size_t           len; +        } lvals; /* We own these, must be republished  */  }; -struct val { +struct contact {          struct list_head next; +        uint8_t *        id;          uint64_t         addr; -        time_t           t_exp; -        time_t           t_rep; +        size_t           fails; +        time_t           t_seen;  }; -struct ref_entry { +struct peer_entry {          struct list_head next; -        uint8_t *        key; +        uint64_t         cookie; +        uint8_t *        id; +        uint64_t         addr; +        enum dht_code    code; -        time_t           t_rep; +        time_t           t_sent;  }; -struct dht_entry { +struct dht_req {          struct list_head next;          uint8_t *        key; -        size_t           n_vals; -        struct list_head vals; -}; - -struct contact { -        struct list_head next; +        time_t           t_exp; -        uint8_t *        id; -        uint64_t         addr; +        struct { +                struct list_head list; +                size_t           len; +        } peers; -        size_t           fails; -        time_t           t_seen; +        struct { +                struct list_head list; +                size_t           len; +        } cache;  };  struct bucket { -        struct list_head contacts; -        size_t           n_contacts; +        struct { +                struct list_head list; +                size_t           len; +        } contacts; -        struct list_head alts; -        size_t           n_alts; +        struct { +                struct list_head list; +                size_t           len; +        } alts;          time_t           t_refr; @@ -201,1081 +214,1464 @@ struct bucket {          uint8_t          mask;          struct bucket *  parent; -        struct bucket *  children[1L << KAD_BETA]; +        struct bucket *  children[1L << DHT_BETA];  };  struct cmd { -        struct list_head     next; - -        struct shm_du_buff * sdb; +        struct list_head next; +        buffer_t         cbuf;  };  struct dir_ops dht_dir_ops = { -        .create       = dht_create, -        .destroy      = dht_destroy, -        .bootstrap    = dht_bootstrap, -        .reg          = dht_reg, -        .unreg        = dht_unreg, -        .query        = dht_query, -        .wait_running = dht_wait_running +        .init  = (int (*)(void *)) dht_init, +        .fini  = dht_fini, +        .start = dht_start, +        .stop  = dht_stop, +        .reg   = dht_reg, +        .unreg = dht_unreg, +        .query = dht_query  }; -struct dht { -        size_t           alpha; -        size_t           b; -        size_t           k; +struct { +        struct { /* Kademlia parameters */ +                uint32_t alpha;     /* Number of concurrent requests   */ +                size_t   k;         /* Number of replicas to store     */ +                time_t   t_expire;  /* Expiry time for values (s)      */ +                time_t   t_refresh; /* Refresh time for contacts (s)   */ +                time_t   t_repl;    /* Replication time for values (s) */ +        }; -        time_t           t_expire; -        time_t           t_refresh; -        time_t           t_replic; -        time_t           t_repub; +        buffer_t       id; -        uint8_t *        id; -        uint64_t         addr; +        time_t         t0;    /* Creation time               */ +        uint64_t       addr;  /* Our own address             */ +        uint64_t       peer;  /* Enrollment peer address     */ +        uint64_t       magic; /* Magic cookie for retransmit */ -        struct bucket *  buckets; +        uint64_t       eid;   /* Entity ID                   */ -        struct list_head entries; +        struct tpm *   tpm; +        pthread_t      worker; -        struct list_head refs; +        enum dht_state state; -        struct list_head lookups; +        struct { +                struct { +                        struct bucket * root; +                } contacts; + +                struct { +                        struct list_head list; +                        size_t           len; +                        size_t           vals; +                        size_t           lvals; +                } kv; + +                pthread_rwlock_t lock; +        } db; + +        struct { +                struct list_head list; +                size_t           len; +                pthread_cond_t   cond; +                pthread_mutex_t  mtx; +        } reqs; + +        struct { +                struct list_head list; +                pthread_cond_t   cond; +                pthread_mutex_t  mtx; +        } cmds; +} dht; + + +/* DHT RIB */ + +static const char * dht_dir[] = { +        "database", +        "stats", +        NULL +}; -        struct list_head requests; -        struct bmp *     cookies; +const char * dht_stats = \ +        "DHT: " HASH_FMT64 "\n" +        "  Created: %s\n" +        "  Address: " ADDR_FMT32 "\n" +        "  Kademlia parameters:\n" +        "     Number of concurrent requests (alpha): %10zu\n" +        "     Number of replicas (k):                %10zu\n" +        "     Expiry time for values (s):            %10ld\n" +        "     Refresh time for contacts (s):         %10ld\n" +        "     Replication time for values (s):       %10ld\n" +        "  Number of keys:                           %10zu\n" +        "  Number of local values:                   %10zu\n" +        "  Number of non-local values:               %10zu\n"; -        enum dht_state   state; -        struct list_head cmds; -        pthread_cond_t   cond; -        pthread_mutex_t  mtx; +static int dht_rib_statfile(char * buf, +                            size_t len) +{ +        struct tm * tm; +        char        tmstr[RIB_TM_STRLEN]; +        size_t      keys; +        size_t      vals; +        size_t      lvals; -        pthread_rwlock_t lock; +        assert(buf != NULL); +        assert(len > 0); -        uint64_t         eid; +        pthread_rwlock_rdlock(&dht.db.lock); -        struct tpm *     tpm; +        keys  = dht.db.kv.len; +        lvals = dht.db.kv.lvals; +        vals  = dht.db.kv.vals; -        pthread_t        worker; -}; +        pthread_rwlock_unlock(&dht.db.lock); -struct join_info { -        struct dht * dht; -        uint64_t     addr; -}; +        tm = gmtime(&dht.t0); +        strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); -struct packet_info { -        struct dht *         dht; -        struct shm_du_buff * sdb; -}; +        snprintf(buf, len, dht_stats, +                 HASH_VAL64(dht.id.data), +                 tmstr, +                 ADDR_VAL32(&dht.addr), +                 dht.alpha, dht.k, +                 dht.t_expire, dht.t_refresh, dht.t_repl, +                 keys, vals, lvals); + +        return strlen(buf); +} -static uint8_t * dht_dup_key(const uint8_t * key, -                             size_t          len) +static size_t dht_db_file_len(void)  { -        uint8_t * dup; +        size_t sz; +        size_t vals; -        dup = malloc(sizeof(*dup) * len); -        if (dup == NULL) -                return NULL; +        sz = 18; /* DHT database + 2 * \n */ -        memcpy(dup, key, len); +        pthread_rwlock_rdlock(&dht.db.lock); -        return dup; -} +        if (dht.db.kv.len == 0) { +                pthread_rwlock_unlock(&dht.db.lock); +                sz += 14; /* No entries */ +                return sz; +        } -static enum dht_state dht_get_state(struct dht * dht) -{ -        enum dht_state state; +        sz += 39 * 3 + 1; /* tally + extra newline */ +        sz += dht.db.kv.len * (25 + 19 + 23 + 1); -        pthread_mutex_lock(&dht->mtx); +        vals = dht.db.kv.vals + dht.db.kv.lvals; -        state = dht->state; +        sz += vals * (48 + 2 * RIB_TM_STRLEN); -        pthread_mutex_unlock(&dht->mtx); +        pthread_rwlock_unlock(&dht.db.lock); -        return state; +        return sz;  } -static int dht_set_state(struct dht *   dht, -                         enum dht_state state) +static int dht_rib_dbfile(char * buf, +                          size_t len)  { -        pthread_mutex_lock(&dht->mtx); +        struct tm * tm; +        char        tmstr[RIB_TM_STRLEN]; +        char        exstr[RIB_TM_STRLEN]; +        size_t      i = 0; +        struct      list_head * p; -        if (state == DHT_JOINING && dht->state != DHT_INIT) { -                 pthread_mutex_unlock(&dht->mtx); -                 return -1; +        assert(buf != NULL); +        assert(len > 0); + +        pthread_rwlock_rdlock(&dht.db.lock); + +        if (dht.db.kv.len == 0) { +                i += snprintf(buf, len, "  No entries.\n"); +                pthread_rwlock_unlock(&dht.db.lock); +                return i;          } -        dht->state = state; +        i += snprintf(buf + i, len - i, "DHT database:\n\n"); +        i += snprintf(buf + i, len - i, +                      "Number of keys:             %10zu\n" +                      "Number of local values:     %10zu\n" +                      "Number of non-local values: %10zu\n\n", +                      dht.db.kv.len, dht.db.kv.vals, dht.db.kv.lvals); -        pthread_cond_broadcast(&dht->cond); +        list_for_each(p, &dht.db.kv.list) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                struct list_head * h; -        pthread_mutex_unlock(&dht->mtx); +                i += snprintf(buf + i, len - i, "Key: " KEY_FMT "\n", +                              KEY_VAL(e->key)); +                i += snprintf(buf + i, len - i, "  Local entries:\n"); -        return 0; -} +                list_for_each(h, &e->vals.list) { +                        struct val_entry * v; -int dht_wait_running(void * dir) -{ -        struct dht * dht; -        int          ret = 0; +                        v = list_entry(h, struct val_entry, next); -        dht = (struct dht *) dir; +                        tm = gmtime(&v->t_repl); +                        strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); -        pthread_mutex_lock(&dht->mtx); +                        tm = gmtime(&v->t_exp); +                        strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); -        pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); +                        i += snprintf(buf + i, len - i, +                                "    " VAL_FMT +                                ", t_replicated=%.*s, t_expire=%.*s\n", +                                VAL_VAL(v->val), +                                RIB_TM_STRLEN, tmstr, +                                RIB_TM_STRLEN, exstr); +                } -        while (dht->state == DHT_JOINING) -                pthread_cond_wait(&dht->cond, &dht->mtx); +                i += snprintf(buf + i, len - i, "\n"); -        if (dht->state != DHT_RUNNING) -                ret = -1; +                i += snprintf(buf + i, len - i, "  Non-local entries:\n"); -        pthread_cleanup_pop(true); +                list_for_each(h, &e->lvals.list) { +                        struct val_entry * v; + +                        v= list_entry(h, struct val_entry, next); + +                        tm = gmtime(&v->t_repl); +                        strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + +                        tm = gmtime(&v->t_exp); +                        strftime(exstr, sizeof(exstr), RIB_TM_FORMAT, tm); -        return ret; +                        i += snprintf(buf + i, len - i, +                                "    " VAL_FMT +                                ", t_replicated=%.*s, t_expire=%.*s\n", +                                VAL_VAL(v->val), +                                RIB_TM_STRLEN, tmstr, +                                RIB_TM_STRLEN, exstr); + +                } +        } + +        pthread_rwlock_unlock(&dht.db.lock); + +        printf("DHT RIB DB file generated (%zu bytes).\n", i); + +        return i;  } -static uint8_t * create_id(size_t len) +static int dht_rib_read(const char * path, +                        char *       buf, +                        size_t       len)  { -        uint8_t * id; +        char * entry; -        id = malloc(len); -        if (id == NULL) -                return NULL; +        entry = strstr(path, RIB_SEPARATOR) + 1; -        if (random_buffer(id, len) < 0) { -                free(id); -                return NULL; +        if (strcmp(entry, "database") == 0) { +                return dht_rib_dbfile(buf, len); +        } else if (strcmp(entry, "stats") == 0) { +                return dht_rib_statfile(buf, len);          } -        return id; +        return 0;  } -static void kad_req_create(struct dht * dht, -                           kad_msg_t *  msg, -                           uint64_t     addr) +static int dht_rib_readdir(char *** buf)  { -        struct kad_req *   req; -        pthread_condattr_t cattr; -        struct timespec    t; -        size_t             b; +        int i = 0; -        req = malloc(sizeof(*req)); -        if (req == NULL) -                return; +        while (dht_dir[i++] != NULL); -        list_head_init(&req->next); +        *buf = malloc(sizeof(**buf) * i); +        if (*buf == NULL) +                goto fail_buf; -        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        i = 0; -        req->t_exp  = t.tv_sec + KAD_T_RESP; -        req->addr   = addr; -        req->state  = REQ_INIT; -        req->cookie = msg->cookie; -        req->code   = msg->code; -        req->key    = NULL; - -        pthread_rwlock_rdlock(&dht->lock); -        b = dht->b; -        pthread_rwlock_unlock(&dht->lock); - -        if (msg->has_key) { -                req->key = dht_dup_key(msg->key.data, b); -                if (req->key == NULL) { -                        free(req); -                        return; -                } +        while (dht_dir[i] != NULL) { +                (*buf)[i] = strdup(dht_dir[i]); +                if ((*buf)[i] == NULL) +                        goto fail_dup; +                i++;          } -        if (pthread_mutex_init(&req->lock, NULL)) { -                free(req->key); -                free(req); -                return; -        } +        return i; + fail_dup: +        freepp(char, *buf, i); + fail_buf: +        return -ENOMEM; +} -        pthread_condattr_init(&cattr); -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif +static int dht_rib_getattr(const char *      path, +                           struct rib_attr * attr) +{ +        struct timespec now; +        char *          entry; -        if (pthread_cond_init(&req->cond, &cattr)) { -                pthread_condattr_destroy(&cattr); -                pthread_mutex_destroy(&req->lock); -                free(req->key); -                free(req); -                return; -        } +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        pthread_condattr_destroy(&cattr); +        attr->mtime = now.tv_sec; -        pthread_rwlock_wrlock(&dht->lock); +        entry = strstr(path, RIB_SEPARATOR) + 1; -        list_add(&req->next, &dht->requests); +        if (strcmp(entry, "database") == 0) { +                attr->size = dht_db_file_len(); +        } else if (strcmp(entry, "stats") == 0) { +                attr->size =  545; +        } -        pthread_rwlock_unlock(&dht->lock); +        return 0;  } -static void cancel_req_destroy(void * o) +static struct rib_ops r_ops = { +        .read    = dht_rib_read, +        .readdir = dht_rib_readdir, +        .getattr = dht_rib_getattr +}; + +/* Helper functions */ + +static uint8_t * generate_id(void)  { -        struct kad_req * req = (struct kad_req *) o; +        uint8_t * id; -        pthread_mutex_unlock(&req->lock); +        if(dht.id.len < sizeof(uint64_t)) { +                log_err("DHT ID length is too short (%zu < %zu).", +                        dht.id.len, sizeof(uint64_t)); +                return NULL; +        } -        pthread_cond_destroy(&req->cond); -        pthread_mutex_destroy(&req->lock); +        id = malloc(dht.id.len); +        if (id == NULL) { +                log_err("Failed to malloc ID."); +                goto fail_id; +        } -        if (req->key != NULL) -                free(req->key); +        if (random_buffer(id, dht.id.len) < 0) { +                log_err("Failed to generate random ID."); +                goto fail_rnd; +        } -        free(req); +        return id; + fail_rnd: +        free(id); + fail_id: +        return NULL;  } -static void kad_req_destroy(struct kad_req * req) +static uint64_t generate_cookie(void)  { -        assert(req); +        uint64_t cookie = DHT_INVALID; -        pthread_mutex_lock(&req->lock); +        while (cookie == DHT_INVALID) +                random_buffer((uint8_t *) &cookie, sizeof(cookie)); -        switch (req->state) { -        case REQ_DESTROY: -                pthread_mutex_unlock(&req->lock); -                return; -        case REQ_PENDING: -                req->state = REQ_DESTROY; -                pthread_cond_signal(&req->cond); -                break; -        case REQ_INIT: -        case REQ_DONE: -                req->state = REQ_NULL; -                break; -        case REQ_RESPONSE: -        case REQ_NULL: -        default: -                break; -        } - -        pthread_cleanup_push(cancel_req_destroy, req); +        return cookie; +} -        while (req->state != REQ_NULL && req->state != REQ_DONE) -                pthread_cond_wait(&req->cond, &req->lock); +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, +                     const uint8_t * dst) +{ +        assert(dht.id.len >= sizeof(uint64_t)); -        pthread_cleanup_pop(true); +        return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst));  } -static int kad_req_wait(struct kad_req * req, -                        time_t           t) +#define IS_CLOSER(x, y) (dist((x), dht.id.data) < dist((y), dht.id.data)) + +static int addr_to_buf(const uint64_t addr, +                       buffer_t *     buf)  { -        struct timespec timeo = {t, 0}; -        struct timespec abs; -        int ret = 0; +        size_t len; +        uint64_t _addr; -        assert(req); +        len = sizeof(addr); +        _addr = hton64(addr); -        clock_gettime(PTHREAD_COND_CLOCK, &abs); +        assert(buf != NULL); -        ts_add(&abs, &timeo, &abs); +        buf->data = malloc(len); +        if (buf->data == NULL) +                goto fail_malloc; -        pthread_mutex_lock(&req->lock); +        buf->len = sizeof(_addr); +        memcpy(buf->data, &_addr, sizeof(_addr)); -        req->state = REQ_PENDING; +        return 0; + fail_malloc: +        return -ENOMEM; +} -        pthread_cleanup_push(__cleanup_mutex_unlock, &req->lock); +static int buf_to_addr(const buffer_t buf, +                       uint64_t *     addr) +{ +        assert(addr != NULL); +        assert(buf.data != NULL); -        while (req->state == REQ_PENDING && ret != -ETIMEDOUT) -                ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); +        if (buf.len != sizeof(*addr)) +                return - EINVAL; -        switch(req->state) { -        case REQ_DESTROY: -                ret = -1; -                req->state = REQ_NULL; -                pthread_cond_signal(&req->cond); -                break; -        case REQ_PENDING: /* ETIMEDOUT */ -        case REQ_RESPONSE: -                req->state = REQ_DONE; -                pthread_cond_broadcast(&req->cond); -                break; -        default: -                break; -        } +        *addr = ntoh64(*((uint64_t *) buf.data)); -        pthread_cleanup_pop(true); +        if (*addr == dht.addr) +                *addr = INVALID_ADDR; -        return ret; +        return 0;  } -static void kad_req_respond(struct kad_req * req) +static uint8_t * dht_dup_key(const uint8_t * key)  { -        pthread_mutex_lock(&req->lock); +        uint8_t * dup; + +        assert(key != NULL); +        assert(dht.id.len != 0); + +        dup = malloc(dht.id.len); +        if (dup == NULL) +                return NULL; -        req->state = REQ_RESPONSE; -        pthread_cond_signal(&req->cond); +        memcpy(dup, key, dht.id.len); -        pthread_mutex_unlock(&req->lock); +        return dup;  } -static struct contact * contact_create(const uint8_t * id, -                                       size_t          len, -                                       uint64_t        addr) +/* DHT */ + +static struct val_entry * val_entry_create(const buffer_t val, +                                           time_t         exp)  { -        struct contact * c; -        struct timespec  t; +        struct val_entry * e; +        struct timespec    now; -        c = malloc(sizeof(*c)); -        if (c == NULL) -                return NULL; +        assert(val.data != NULL); +        assert(val.len > 0); -        list_head_init(&c->next); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        clock_gettime(CLOCK_REALTIME_COARSE, &t); +#ifndef __DHT_TEST_ALLOW_EXPIRED__ +        if (exp < now.tv_sec) +                return NULL; /* Refuse to add expired values */ +#endif +        e = malloc(sizeof(*e)); +        if (e == NULL) +                goto fail_entry; -        c->addr   = addr; -        c->fails  = 0; -        c->t_seen = t.tv_sec; -        c->id     = dht_dup_key(id, len); -        if (c->id == NULL) { -                free(c); -                return NULL; -        } +        list_head_init(&e->next); -        return c; +        e->val.len = val.len; +        e->val.data = malloc(val.len); +        if (e->val.data == NULL) +                goto fail_val; + +        memcpy(e->val.data, val.data, val.len); + +        e->t_repl  = 0; +        e->t_exp   = exp; + +        return e; + + fail_val: +        free(e); + fail_entry: +        return NULL;  } -static void contact_destroy(struct contact * c) +static void val_entry_destroy(struct val_entry * v)  { -        if (c != NULL) -                free(c->id); +        assert(v->val.data != NULL); -        free(c); +        freebuf(v->val); +        free(v);  } -static struct bucket * iter_bucket(struct bucket * b, -                                   const uint8_t * id) +static struct dht_entry * dht_entry_create(const uint8_t * key)  { -        uint8_t byte; -        uint8_t mask; +        struct dht_entry * e; -        assert(b); +        assert(key != NULL); -        if (b->children[0] == NULL) -                return b; +        e = malloc(sizeof(*e)); +        if (e == NULL) +                goto fail_entry; -        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; +        list_head_init(&e->next); +        list_head_init(&e->vals.list); +        list_head_init(&e->lvals.list); -        mask = ((1L << KAD_BETA) - 1) & 0xFF; +        e->vals.len = 0; +        e->lvals.len = 0; -        byte >>= (CHAR_BIT - KAD_BETA) - -                (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); +        e->key = dht_dup_key(key); +        if (e->key == NULL) +                goto fail_key; -        return iter_bucket(b->children[(byte & mask)], id); +        return e; + fail_key: +        free(e); + fail_entry: +        return NULL;  } -static struct bucket * dht_get_bucket(struct dht *    dht, -                                      const uint8_t * id) +static void dht_entry_destroy(struct dht_entry * e)  { -        assert(dht->buckets); +        struct list_head * p; +        struct list_head * h; + +        assert(e != NULL); + +        list_for_each_safe(p, h, &e->vals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                list_del(&v->next); +                val_entry_destroy(v); +                --e->vals.len; +                --dht.db.kv.vals; +        } + +        list_for_each_safe(p, h, &e->lvals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                list_del(&v->next); +                val_entry_destroy(v); +                --e->lvals.len; +                --dht.db.kv.lvals; +        } + +        free(e->key); -        return iter_bucket(dht->buckets, id); +        assert(e->vals.len == 0 && e->lvals.len == 0); + +        free(e);  } -/* - * If someone builds a network where the n (n > k) closest nodes all - * have IDs starting with the same 64 bits: by all means, change this. - */ -static uint64_t dist(const uint8_t * src, -                     const uint8_t * dst) +static struct val_entry * dht_entry_get_lval(const struct dht_entry * e, +                                             const buffer_t           val)  { -        return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +        struct list_head * p; + +        assert(e != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); + +        list_for_each(p, &e->lvals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                if (bufcmp(&v->val, &val) == 0) +                        return v; +        } + +        return NULL;  } -static size_t list_add_sorted(struct list_head * l, -                              struct contact *   c, -                              const uint8_t *    key) +static struct val_entry * dht_entry_get_val(const struct dht_entry * e, +                                            const buffer_t           val)  {          struct list_head * p; -        assert(l); -        assert(c); -        assert(key); -        assert(c->id); +        assert(e != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        list_for_each(p, l) { -                struct contact * e = list_entry(p, struct contact, next); -                if (dist(c->id, key) > dist(e->id, key)) -                        break; -        } +        list_for_each(p, &e->vals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                if (bufcmp(&v->val, &val) == 0) +                        return v; -        list_add_tail(&c->next, p); +        } -        return 1; +        return NULL;  } -static size_t dht_contact_list(struct dht *       dht, -                               struct list_head * l, -                               const uint8_t *    key) +static int dht_entry_update_val(struct dht_entry * e, +                                buffer_t           val, +                                time_t             exp)  { -        struct list_head * p; -        struct bucket *    b; -        size_t             len = 0; -        size_t             i; -        struct timespec    t; +        struct val_entry * v; +        struct timespec    now; -        assert(l); -        assert(dht); -        assert(key); -        assert(list_is_empty(l)); +        assert(e != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        b = dht_get_bucket(dht, key); -        if (b == NULL) -                return 0; +        if (exp < now.tv_sec) +                return -EINVAL; /* Refuse to add expired values */ -        b->t_refr = t.tv_sec + KAD_T_REFR; +        if (dht_entry_get_lval(e, val) != NULL) { +                log_dbg(KV_FMT " Val already in lvals.", KV_VAL(e->key, val)); +                return 0; /* Refuse to add local values */ +        } -        if (b->n_contacts == dht->k || b->parent == NULL) { -                list_for_each(p, &b->contacts) { -                        struct contact * c; -                        c = list_entry(p, struct contact, next); -                        c = contact_create(c->id, dht->b, c->addr); -                        if (list_add_sorted(l, c, key) == 1) -                                if (++len == dht->k) -                                        break; -                } -        } else { -                struct bucket * d = b->parent; -                for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { -                        list_for_each(p, &d->children[i]->contacts) { -                                struct contact * c; -                                c = list_entry(p, struct contact, next); -                                c = contact_create(c->id, dht->b, c->addr); -                                if (c == NULL) -                                        continue; -                                if (list_add_sorted(l, c, key) == 1) -                                        if (++len == dht->k) -                                                break; -                        } -                } +        v = dht_entry_get_val(e, val); +        if (v == NULL) { +                v = val_entry_create(val, exp); +                if (v == NULL) +                        return -ENOMEM; + +                list_add_tail(&v->next, &e->vals.list); +                ++e->vals.len; +                ++dht.db.kv.vals; + +                return 0;          } -        assert(len == dht->k || b->parent == NULL); +        if (v->t_exp < exp) +                v->t_exp  = exp; -        return len; +        return 0;  } -static struct lookup * lookup_create(struct dht *    dht, -                                     const uint8_t * id) +static int dht_entry_update_lval(struct dht_entry * e, +                                 buffer_t           val)  { -        struct lookup *    lu; -        pthread_condattr_t cattr; +        struct val_entry * v; +        struct timespec    now; -        assert(dht); -        assert(id); +        assert(e != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        lu = malloc(sizeof(*lu)); -        if (lu == NULL) -                goto fail_malloc; +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        list_head_init(&lu->contacts); -        list_head_init(&lu->cookies); +        v = dht_entry_get_lval(e, val); +        if (v == NULL) { +                log_dbg(KV_FMT " Adding lval.", KV_VAL(e->key, val)); +                v = val_entry_create(val, now.tv_sec + dht.t_expire); +                if (v == NULL) +                        return -ENOMEM; -        lu->state   = LU_INIT; -        lu->addrs   = NULL; -        lu->n_addrs = 0; -        lu->key     = dht_dup_key(id, dht->b); -        if (lu->key == NULL) -                goto fail_id; +                list_add_tail(&v->next, &e->lvals.list); +                ++e->lvals.len; +                ++dht.db.kv.lvals; -        if (pthread_mutex_init(&lu->lock, NULL)) -                goto fail_mutex; +                return 0; +        } -        pthread_condattr_init(&cattr); -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif +        return 0; +} -        if (pthread_cond_init(&lu->cond, &cattr)) -                goto fail_cond; +static int dht_entry_remove_lval(struct dht_entry * e, +                                 buffer_t           val) +{ +        struct val_entry * v; -        pthread_condattr_destroy(&cattr); +        assert(e != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        pthread_rwlock_wrlock(&dht->lock); +        v = dht_entry_get_lval(e, val); +        if (v == NULL) +                return -ENOENT; -        list_add(&lu->next, &dht->lookups); +        log_dbg(KV_FMT " Removing lval.", KV_VAL(e->key, val)); -        lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); +        list_del(&v->next); +        val_entry_destroy(v); +        --e->lvals.len; +        --dht.db.kv.lvals; -        pthread_rwlock_unlock(&dht->lock); +        return 0; +} -        return lu; +#define IS_EXPIRED(v, now) ((now)->tv_sec > (v)->t_exp) +static void dht_entry_remove_expired_vals(struct dht_entry * e) +{ +        struct list_head * p; +        struct list_head * h; +        struct timespec    now; + +        assert(e != NULL); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        list_for_each_safe(p, h, &e->vals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                if (!IS_EXPIRED(v, &now)) +                        continue; + +                log_dbg(KV_FMT " Value expired." , KV_VAL(e->key, v->val)); +                list_del(&v->next); +                val_entry_destroy(v); +                --e->vals.len; +                --dht.db.kv.vals; +        } +} + +static struct dht_entry * __dht_kv_find_entry(const uint8_t * key) +{ +        struct list_head * p; + +        assert(key != NULL); + +        list_for_each(p, &dht.db.kv.list) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                if (!memcmp(key, e->key, dht.id.len)) +                        return e; +        } - fail_cond: -        pthread_condattr_destroy(&cattr); -        pthread_mutex_destroy(&lu->lock); - fail_mutex: -        free(lu->key); - fail_id: -        free(lu); - fail_malloc:          return NULL;  } -static void cancel_lookup_destroy(void * o) +static void dht_kv_remove_expired_entries(void)  { -        struct lookup *    lu;          struct list_head * p;          struct list_head * h; +        struct timespec    now; -        lu = (struct lookup *) o; +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        if (lu->key != NULL) -                free(lu->key); -        if (lu->addrs != NULL) -                free(lu->addrs); +        pthread_rwlock_wrlock(&dht.db.lock); -        list_for_each_safe(p, h, &lu->contacts) { -                struct contact * c = list_entry(p, struct contact, next); -                list_del(&c->next); -                contact_destroy(c); +        list_for_each_safe(p, h, &dht.db.kv.list) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                dht_entry_remove_expired_vals(e); +                if (e->lvals.len > 0 || e->vals.len > 0) +                        continue; + +                log_dbg(KEY_FMT " Entry removed. ", KEY_VAL(e->key)); +                list_del(&e->next); +                dht_entry_destroy(e); +                --dht.db.kv.len;          } -        list_for_each_safe(p, h, &lu->cookies) { -                struct cookie_el * c = list_entry(p, struct cookie_el, next); -                list_del(&c->next); +        pthread_rwlock_unlock(&dht.db.lock); +} + + +static struct contact * contact_create(const uint8_t * id, +                                       uint64_t        addr) +{ +        struct contact * c; +        struct timespec  t; + +        c = malloc(sizeof(*c)); +        if (c == NULL) +                return NULL; + +        list_head_init(&c->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        c->addr   = addr; +        c->fails  = 0; +        c->t_seen = t.tv_sec; +        c->id     = dht_dup_key(id); +        if (c->id == NULL) {                  free(c); +                return NULL;          } -        pthread_mutex_unlock(&lu->lock); +        return c; +} -        pthread_mutex_destroy(&lu->lock); +static void contact_destroy(struct contact * c) +{ +        assert(c != NULL); +        assert(list_is_empty(&c->next)); -        free(lu); +        free(c->id); +        free(c);  } -static void lookup_destroy(struct lookup * lu) +static struct dht_req * dht_req_create(const uint8_t * key)  { -        assert(lu); +        struct dht_req * req; +        struct timespec  now; -        pthread_mutex_lock(&lu->lock); +        assert(key != NULL); -        switch (lu->state) { -        case LU_DESTROY: -                pthread_mutex_unlock(&lu->lock); -                return; -        case LU_PENDING: -                lu->state = LU_DESTROY; -                pthread_cond_broadcast(&lu->cond); -                break; -        case LU_INIT: -        case LU_UPDATE: -        case LU_COMPLETE: -                lu->state = LU_NULL; -                break; -        case LU_NULL: -        default: -                break; -        } +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        req = malloc(sizeof(*req)); +        if (req == NULL) +                goto fail_malloc; -        pthread_cleanup_push(cancel_lookup_destroy, lu); +        list_head_init(&req->next); -        while (lu->state != LU_NULL) -                pthread_cond_wait(&lu->cond, &lu->lock); +        req->t_exp = now.tv_sec + DHT_T_RESP; -        pthread_cleanup_pop(true); +        list_head_init(&req->peers.list); +        req->peers.len = 0; + +        req->key = dht_dup_key(key); +        if (req->key == NULL) +                goto fail_dup_key; + +        list_head_init(&req->cache.list); +        req->cache.len = 0; + +        return req; + + fail_dup_key: +        free(req); + fail_malloc: +        return NULL;  } -static void lookup_update(struct dht *    dht, -                          struct lookup * lu, -                          kad_msg_t *     msg) +static void dht_req_destroy(struct dht_req * req)  { -        struct list_head * p = NULL; +        struct list_head * p;          struct list_head * h; -        struct contact *   c = NULL; -        size_t             n; -        size_t             pos = 0; -        bool               mod = false; -        assert(lu); -        assert(msg); - -        if (dht_get_state(dht) != DHT_RUNNING) -                return; - -        pthread_mutex_lock(&lu->lock); +        assert(req); +        assert(req->key); -        list_for_each_safe(p, h, &lu->cookies) { -                struct cookie_el * e = list_entry(p, struct cookie_el, next); -                if (e->cookie == msg->cookie) { -                        list_del(&e->next); -                        free(e); -                        break; -                } +        list_for_each_safe(p, h, &req->peers.list) { +                struct peer_entry * e = list_entry(p, struct peer_entry, next); +                list_del(&e->next); +                free(e->id); +                free(e); +                --req->peers.len;          } -        if (lu->state == LU_COMPLETE) { -                pthread_mutex_unlock(&lu->lock); -                return; +        list_for_each_safe(p, h, &req->cache.list) { +                struct val_entry * e = list_entry(p, struct val_entry, next); +                list_del(&e->next); +                val_entry_destroy(e); +                --req->cache.len;          } -        if (msg->n_addrs > 0) { -                if (lu->addrs == NULL) { -                        lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); -                        for (n = 0; n < msg->n_addrs; ++n) -                                lu->addrs[n] = msg->addrs[n]; -                        lu->n_addrs = msg->n_addrs; -                } +        free(req->key); -                lu->state = LU_COMPLETE; -                pthread_cond_broadcast(&lu->cond); -                pthread_mutex_unlock(&lu->lock); -                return; -        } +        assert(req->peers.len == 0); -        pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); +        free(req); +} -        while (lu->state == LU_INIT) { -                pthread_rwlock_unlock(&dht->lock); -                pthread_cond_wait(&lu->cond, &lu->lock); -                pthread_rwlock_rdlock(&dht->lock); -        } +static struct peer_entry * dht_req_get_peer(struct dht_req *    req, +                                            struct peer_entry * e) +{ +        struct list_head * p; -        pthread_cleanup_pop(false); +        list_for_each(p, &req->peers.list) { +                struct peer_entry * x = list_entry(p, struct peer_entry, next); +                if (x->addr == e->addr) +                        return x; +        } -        for (n = 0; n < msg->n_contacts; ++n) { -                c = contact_create(msg->contacts[n]->id.data, -                                   dht->b, msg->contacts[n]->addr); -                if (c == NULL) -                        continue; +        return NULL; +} -                pos = 0; +#define IS_MAGIC(peer) ((peer)->cookie == dht.magic) +void dht_req_add_peer(struct dht_req * req, +                      struct peer_entry * e) +{ +        struct peer_entry * x; /* existing */ +        struct list_head *  p; /* iterator */ +        size_t              pos = 0; -                list_for_each(p, &lu->contacts) { -                        struct contact * e; -                        e = list_entry(p, struct contact, next); -                        if (!memcmp(e->id, c->id, dht->b)) { -                                contact_destroy(c); -                                c = NULL; -                                break; -                        } +        assert(req   != NULL); +        assert(e     != NULL); +        assert(e->id != NULL); -                        if (dist(c->id, lu->key) > dist(e->id, lu->key)) -                                break; +        /* +         * Dedupe messages to the same peer, unless +         *   1) The previous request was FIND_NODE and now it's FIND_VALUE +         *   2) We urgently need contacts from emergency peer (magic cookie) +         */ +        x = dht_req_get_peer(req, e); +        if (x != NULL && x->code >= e->code && !IS_MAGIC(e)) +                goto skip; +        /* Find how this contact ranks in distance to the key */ +        list_for_each(p, &req->peers.list) { +                struct peer_entry * y = list_entry(p, struct peer_entry, next); +                if (IS_CLOSER(y->id, e->id)) {                          pos++; -                } - -                if (c == NULL)                          continue; - -                if (lu->n_contacts < dht->k) { -                        list_add_tail(&c->next, p); -                        ++lu->n_contacts; -                        mod = true; -                } else if (pos == dht->k) { -                        contact_destroy(c); -                } else { -                        struct contact * d; -                        list_add_tail(&c->next, p); -                        d = list_last_entry(&lu->contacts, -                                            struct contact, next); -                        list_del(&d->next); -                        assert(lu->contacts.prv != &d->next); -                        contact_destroy(d); -                        mod = true;                  } +                break;          } -        if (list_is_empty(&lu->cookies) && !mod) -                lu->state = LU_COMPLETE; -        else -                lu->state = LU_UPDATE; +        /* Add a new peer to this request if we need to */ +        if (pos < dht.alpha || !IS_MAGIC(e)) { +                x = malloc(sizeof(*x)); +                if (x == NULL) { +                        log_err("Failed to malloc peer entry."); +                        goto skip; +                } -        pthread_cond_broadcast(&lu->cond); -        pthread_mutex_unlock(&lu->lock); -        return; +                x->cookie = e->cookie; +                x->addr   = e->addr; +                x->code   = e->code; +                x->t_sent = e->t_sent; +                x->id     = dht_dup_key(e->id); +                if (x->id == NULL) { +                        log_err("Failed to dup peer ID."); +                        free(x); +                        goto skip; +                } + +                if (IS_MAGIC(e)) +                        list_add(&x->next, p); +                else +                        list_add_tail(&x->next, p); +                ++req->peers.len; +                return; +        } + skip: +        list_del(&e->next); +        free(e->id); +        free(e);  } -static ssize_t lookup_get_addrs(struct lookup * lu, -                                uint64_t *      addrs) +static size_t dht_req_add_peers(struct dht_req *   req, +                                struct list_head * pl)  { -        ssize_t n; - -        assert(lu); - -        pthread_mutex_lock(&lu->lock); +        struct list_head *  p; +        struct list_head *  h; +        size_t              n = 0; -        for (n = 0; (size_t) n < lu->n_addrs; ++n) -                addrs[n] = lu->addrs[n]; +        assert(req != NULL); +        assert(pl  != NULL); -        assert((size_t) n == lu->n_addrs); - -        pthread_mutex_unlock(&lu->lock); +        list_for_each_safe(p, h, pl) { +                struct peer_entry * e = list_entry(p, struct peer_entry, next); +                dht_req_add_peer(req, e); +        }          return n;  } -static ssize_t lookup_contact_addrs(struct lookup * lu, -                                    uint64_t *      addrs) +static bool dht_req_has_peer(struct dht_req * req, +                             uint64_t         cookie)  {          struct list_head * p; -        ssize_t            n = 0; - -        assert(lu); -        assert(addrs); -        pthread_mutex_lock(&lu->lock); +        assert(req != NULL); -        list_for_each(p, &lu->contacts) { -                struct contact * c = list_entry(p, struct contact, next); -                addrs[n] = c->addr; -                n++; +        list_for_each(p, &req->peers.list) { +                struct peer_entry * e = list_entry(p, struct peer_entry, next); +                if (e->cookie == cookie) +                        return true;          } -        pthread_mutex_unlock(&lu->lock); - -        return n; +        return false;  } -static void lookup_new_addrs(struct lookup * lu, -                             uint64_t *      addrs) +static void peer_list_destroy(struct list_head * pl)  {          struct list_head * p; -        size_t             n = 0; +        struct list_head * h; -        assert(lu); -        assert(addrs); +        assert(pl != NULL); -        pthread_mutex_lock(&lu->lock); +        list_for_each_safe(p, h, pl) { +                struct peer_entry * e = list_entry(p, struct peer_entry, next); +                list_del(&e->next); +                free(e->id); +                free(e); +        } +} -        /* Uses fails to check if the contact has been contacted. */ -        list_for_each(p, &lu->contacts) { -                struct contact * c = list_entry(p, struct contact, next); -                if (c->fails == 0) { -                        c->fails = 1; -                        addrs[n] = c->addr; -                        n++; -                } +static int dht_kv_create_peer_list(struct list_head * cl, +                                   struct list_head * pl, +                                   enum dht_code      code) +{ +        struct list_head *  p; +        struct list_head *  h; +        struct timespec     now; +        size_t              len; + +        assert(cl != NULL); +        assert(pl != NULL); +        assert(list_is_empty(pl)); -                if (n == KAD_ALPHA) +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        len = 0; + +        list_for_each_safe(p, h, cl) { +                struct contact * c = list_entry(p, struct contact, next); +                struct peer_entry * e; +                if (len++ == dht.alpha)                          break; -        } -        assert(n <= KAD_ALPHA); +                e = malloc(sizeof(*e)); +                if (e == NULL) +                        return -ENOMEM; -        addrs[n] = 0; +                e->cookie = generate_cookie(); +                e->code   = code; +                e->addr   = c->addr; +                e->t_sent = now.tv_sec; -        pthread_mutex_unlock(&lu->lock); -} +                e->id = c->id; -static void lookup_set_state(struct lookup *   lu, -                             enum lookup_state state) -{ -        pthread_mutex_lock(&lu->lock); +                list_add_tail(&e->next, pl); -        lu->state = state; -        pthread_cond_broadcast(&lu->cond); +                list_del(&c->next); +                c->id = NULL; /* we stole the id */ +                contact_destroy(c); +        } -        pthread_mutex_unlock(&lu->lock); +        return 0;  } -static void cancel_lookup_wait(void * o) +static struct dht_req * __dht_kv_req_get_req(const uint8_t * key)  { -        struct lookup * lu = (struct lookup *) o; -        lu->state = LU_NULL; -        pthread_mutex_unlock(&lu->lock); -        lookup_destroy(lu); +        struct list_head * p; + +        list_for_each(p, &dht.reqs.list) { +                struct dht_req * r = list_entry(p, struct dht_req, next); +                if (memcmp(r->key, key, dht.id.len) == 0) +                        return r; +        } + +        return NULL;  } -static enum lookup_state lookup_wait(struct lookup * lu) +static struct dht_req * __dht_kv_get_req_cache(const uint8_t * key)  { -        struct timespec   timeo = {KAD_T_RESP, 0}; -        struct timespec   abs; -        enum lookup_state state; -        int               ret = 0; +        struct dht_req * req; -        clock_gettime(PTHREAD_COND_CLOCK, &abs); +        assert(key != NULL); -        ts_add(&abs, &timeo, &abs); +        req = __dht_kv_req_get_req(key); +        if (req == NULL) +                return NULL; -        pthread_mutex_lock(&lu->lock); +        if (req->cache.len == 0) +                return NULL; -        if (lu->state == LU_INIT || lu->state == LU_UPDATE) -                lu->state = LU_PENDING; +        return req; +} -        pthread_cleanup_push(cancel_lookup_wait, lu); +static void __dht_kv_req_remove(const uint8_t * key) +{ +        struct dht_req * req; -        while (lu->state == LU_PENDING && ret != -ETIMEDOUT) -                ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); +        assert(key != NULL); -        pthread_cleanup_pop(false); +        req = __dht_kv_req_get_req(key); +        if (req == NULL) +                return; -        if (ret == -ETIMEDOUT) -                lu->state = LU_COMPLETE; +        list_del(&req->next); +        --dht.reqs.len; -        state = lu->state; +        dht_req_destroy(req); +} -        pthread_mutex_unlock(&lu->lock); +static struct dht_req * __dht_kv_get_req_peer(const uint8_t * key, +                                              uint64_t        cookie) +{ +        struct dht_req * req; -        return state; +        assert(key != NULL); + +        req = __dht_kv_req_get_req(key); +        if (req == NULL) +                return NULL; + +        if (!dht_req_has_peer(req, cookie)) +                return NULL; + +        return req;  } -static struct kad_req * dht_find_request(struct dht * dht, -                                         kad_msg_t *  msg) +static bool dht_kv_has_req(const uint8_t * key, +                           uint64_t        cookie)  { -        struct list_head * p; +        bool found; -        assert(dht); -        assert(msg); +        pthread_mutex_lock(&dht.reqs.mtx); -        list_for_each(p, &dht->requests) { -                struct kad_req * r = list_entry(p, struct kad_req, next); -                if (r->cookie == msg->cookie) -                        return r; -        } +        found = __dht_kv_get_req_peer(key, cookie) != NULL; -        return NULL; +        pthread_mutex_unlock(&dht.reqs.mtx); + +        return found;  } -static struct lookup * dht_find_lookup(struct dht *    dht, -                                       uint32_t        cookie) +/* + * This will filter the peer list for addresses that still need to be + * contacted. + */ +static int dht_kv_update_req(const uint8_t *    key, +                             struct list_head * pl)  { -        struct list_head * p; -        struct list_head * p2; -        struct list_head * h2; - -        assert(dht); -        assert(cookie > 0); - -        list_for_each(p, &dht->lookups) { -                struct lookup * l = list_entry(p, struct lookup, next); -                pthread_mutex_lock(&l->lock); -                list_for_each_safe(p2, h2, &l->cookies) { -                        struct cookie_el * e; -                        e = list_entry(p2, struct cookie_el, next); -                        if (e->cookie == cookie) { -                                list_del(&e->next); -                                free(e); -                                pthread_mutex_unlock(&l->lock); -                                return l; -                        } +        struct dht_req * req; +        struct timespec  now; + +        assert(key != NULL); +        assert(pl != NULL); +        assert(!list_is_empty(pl)); + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        pthread_mutex_lock(&dht.reqs.mtx); + +        req = __dht_kv_req_get_req(key); +        if (req == NULL) { +                if (dht.reqs.len == DHT_MAX_REQS) { +                        log_err(KEY_FMT " Max reqs reached (%zu).", +                                KEY_VAL(key), dht.reqs.len); +                        peer_list_destroy(pl); +                        goto fail_req; +                } +                req = dht_req_create(key); +                if (req == NULL) { +                        log_err(KEY_FMT "Failed to create req.", KEY_VAL(key)); +                        goto fail_req;                  } -                pthread_mutex_unlock(&l->lock); +                list_add_tail(&req->next, &dht.reqs.list); +                ++dht.reqs.len;          } -        return NULL; +        if (req->cache.len > 0) /* Already have values */ +                peer_list_destroy(pl); + +        dht_req_add_peers(req, pl); +        req->t_exp = now.tv_sec + DHT_T_RESP; + +        if (dht.reqs.len > DHT_WARN_REQS) { +                log_warn("Number of outstanding requests (%zu) exceeds %u.", +                         dht.reqs.len, DHT_WARN_REQS); +        } + +        pthread_mutex_unlock(&dht.reqs.mtx); + +        return 0; + fail_req: +        pthread_mutex_unlock(&dht.reqs.mtx); +        return -1;  } -static struct val * val_create(uint64_t addr, -                               time_t   exp) +static int dht_kv_respond_req(uint8_t *       key, +                              binary_data_t * vals, +                              size_t          len)  { -        struct val *    v; -        struct timespec t; +        struct dht_req * req; +        struct timespec  now; +        size_t i; -        v = malloc(sizeof(*v)); -        if (v == NULL) -                return NULL; +        assert(key != NULL); +        assert(vals != NULL); +        assert(len > 0); -        list_head_init(&v->next); -        v->addr = addr; +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        pthread_mutex_lock(&dht.reqs.mtx); -        v->t_exp = t.tv_sec + exp; -        v->t_rep = t.tv_sec + KAD_T_REPL; +        req = __dht_kv_req_get_req(key); +        if (req == NULL) { +                log_dbg(KEY_FMT " Failed to find req.", KEY_VAL(key)); +                goto fail_req; +        } + +        for (i = 0; i < len; ++i) { +                struct val_entry * e; +                buffer_t val; +                val.data = vals[i].data; +                val.len = vals[i].len; +                e = val_entry_create(val, now.tv_sec + DHT_T_CACHE); +                if (e == NULL) { +                        log_err(" Failed to create val_entry."); +                        continue; +                } -        return v; -} +                list_add_tail(&e->next, &req->cache.list); +                ++req->cache.len; +        } -static void val_destroy(struct val * v) -{ -        assert(v); +        pthread_cond_broadcast(&dht.reqs.cond); -        free(v); +        pthread_mutex_unlock(&dht.reqs.mtx); + +        return 0; + fail_req: +        pthread_mutex_unlock(&dht.reqs.mtx); +        return -1;  } -static struct ref_entry * ref_entry_create(struct dht *    dht, -                                           const uint8_t * key) +static ssize_t dht_kv_wait_req(const uint8_t * key, +                               buffer_t **     vals)  { -        struct ref_entry * e; +        struct list_head * p; +        struct dht_req *   req;          struct timespec    t; +#ifdef __DHT_TEST__ +        struct timespec    intv = TIMESPEC_INIT_MS(10); +#else +        struct timespec    intv = TIMESPEC_INIT_S(DHT_T_RESP); +#endif +        size_t             max; +        size_t             i = 0; +        int                ret = 0; -        assert(dht); -        assert(key); +        assert(key != NULL); +        assert(vals != NULL); -        e = malloc(sizeof(*e)); -        if (e == NULL) -                return NULL; +        clock_gettime(PTHREAD_COND_CLOCK, &t); -        e->key = dht_dup_key(key, dht->b); -        if (e->key == NULL) { -                free(e); -                return NULL; +        ts_add(&t, &intv, &t); + +        pthread_mutex_lock(&dht.reqs.mtx); + +        pthread_cleanup_push(__cleanup_mutex_unlock, &dht.reqs.mtx); + +        while ((req = __dht_kv_get_req_cache(key)) == NULL) { +                ret = pthread_cond_timedwait(&dht.reqs.cond, &dht.reqs.mtx, &t); +                if (ret == ETIMEDOUT) +                        break;          } -        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        pthread_cleanup_pop(false); -        e->t_rep = t.tv_sec + dht->t_repub; +        if (ret == ETIMEDOUT) { +                log_warn(KEY_FMT " Req timed out.", KEY_VAL(key)); +                __dht_kv_req_remove(key); +                goto timedout; +        } -        return e; -} +        max = MIN(req->cache.len, DHT_MAX_VALS); +        if (max == 0) +                goto no_vals; -static void ref_entry_destroy(struct ref_entry * e) -{ -        free(e->key); -        free(e); +        *vals = malloc(max * sizeof(**vals)); +        if (*vals == NULL) { +                log_err(KEY_FMT "Failed to malloc val buffer.", KEY_VAL(key)); +                goto fail_vals; +        } + +        memset(*vals, 0, max * sizeof(**vals)); + +        list_for_each(p, &req->cache.list) { +                struct val_entry * v; +                if (i == max) +                        break; /* We have enough values */ +                v = list_entry(p, struct val_entry, next); +                (*vals)[i].data = malloc(v->val.len); +                if ((*vals)[i].data == NULL) +                        goto fail_val_data; + +                (*vals)[i].len = v->val.len; +                memcpy((*vals)[i++].data, v->val.data, v->val.len); +        } + +        pthread_mutex_unlock(&dht.reqs.mtx); + +        return i; + no_vals: +        pthread_mutex_unlock(&dht.reqs.mtx); +        *vals = NULL; +        return 0; + fail_val_data: +        freebufs(*vals, i); + fail_vals: +        pthread_mutex_unlock(&dht.reqs.mtx); +        return -ENOMEM; + timedout: +        pthread_mutex_unlock(&dht.reqs.mtx); +        return -ETIMEDOUT;  } -static struct dht_entry * dht_entry_create(struct dht *    dht, -                                           const uint8_t * key) +static struct bucket * iter_bucket(struct bucket * b, +                                   const uint8_t * id)  { -        struct dht_entry * e; +        uint8_t byte; +        uint8_t mask; -        assert(dht); -        assert(key); +        assert(b != NULL); -        e = malloc(sizeof(*e)); -        if (e == NULL) -                return NULL; +        if (b->children[0] == NULL) +                return b; -        list_head_init(&e->next); -        list_head_init(&e->vals); +        byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; -        e->n_vals = 0; +        mask = ((1L << DHT_BETA) - 1) & 0xFF; -        e->key = dht_dup_key(key, dht->b); -        if (e->key == NULL) { -                free(e); -                return NULL; -        } +        byte >>= (CHAR_BIT - DHT_BETA) - +                (((b->depth) * DHT_BETA) & (CHAR_BIT - 1)); -        return e; +        return iter_bucket(b->children[(byte & mask)], id);  } -static void dht_entry_destroy(struct dht_entry * e) +static struct bucket * __dht_kv_get_bucket(const uint8_t * id) +{ +        assert(dht.db.contacts.root != NULL); + +        return iter_bucket(dht.db.contacts.root, id); +} + +static void contact_list_add(struct list_head * l, +                             struct contact *   c)  {          struct list_head * p; -        struct list_head * h; -        assert(e); +        assert(l != NULL); +        assert(c != NULL); -        list_for_each_safe(p, h, &e->vals) { -                struct val * v = list_entry(p, struct val, next); -                list_del(&v->next); -                val_destroy(v); +        list_for_each(p, l) { +                struct contact * e = list_entry(p, struct contact, next); +                if (IS_CLOSER(e->id, c->id)) +                        continue;          } -        free(e->key); - -        free(e); +        list_add_tail(&c->next, p);  } -static int dht_entry_add_addr(struct dht_entry * e, -                              uint64_t           addr, -                              time_t             exp) +static ssize_t dht_kv_contact_list(const uint8_t *    key, +                                   struct list_head * l, +                                   size_t             max)  {          struct list_head * p; -        struct val * val; -        struct timespec t; +        struct bucket *    b; +        struct timespec    t; +        size_t             i; +        size_t             len = 0; + +        assert(l   != NULL); +        assert(key != NULL); +        assert(list_is_empty(l));          clock_gettime(CLOCK_REALTIME_COARSE, &t); -        list_for_each(p, &e->vals) { -                struct val * v = list_entry(p, struct val, next); -                if (v->addr == addr) { -                        if (v->t_exp < t.tv_sec + exp) { -                                v->t_exp = t.tv_sec + exp; -                                v->t_rep = t.tv_sec + KAD_T_REPL; -                        } +        max = MIN(max, dht.k); -                        return 0; -                } +        pthread_rwlock_rdlock(&dht.db.lock); + +        b = __dht_kv_get_bucket(key); +        if (b == NULL) { +                log_err(KEY_FMT " Failed to get bucket.", KEY_VAL(key)); +                goto fail_bucket;          } -        val = val_create(addr, exp); -        if (val == NULL) -                return -ENOMEM; +        b->t_refr = t.tv_sec + dht.t_refresh; -        list_add(&val->next, &e->vals); -        ++e->n_vals; +        if (b->contacts.len == dht.k || b->parent == NULL) { +                list_for_each(p, &b->contacts.list) { +                        struct contact * c; +                        struct contact * d; +                        c = list_entry(p, struct contact, next); +                        if (c->addr == dht.addr) +                                continue; +                        d = contact_create(c->id, c->addr); +                        if (d == NULL) +                                continue; +                        contact_list_add(l, d); +                        if (++len == max) +                                break; +                } +        } else { +                struct bucket * d = b->parent; +                for (i = 0; i < (1L << DHT_BETA) && len < dht.k; ++i) { +                        list_for_each(p, &d->children[i]->contacts.list) { +                                struct contact * c; +                                struct contact * d; +                                c = list_entry(p, struct contact, next); +                                if (c->addr == dht.addr) +                                        continue; +                                d = contact_create(c->id, c->addr); +                                if (d == NULL) +                                        continue; +                                contact_list_add(l, d); +                                if (++len == max) +                                        break; +                        } +                } +        } -        return 0; -} +        pthread_rwlock_unlock(&dht.db.lock); +        return len; + fail_bucket: +        pthread_rwlock_unlock(&dht.db.lock); +        return -1; +} -static void dht_entry_del_addr(struct dht_entry * e, -                               uint64_t           addr) +static void contact_list_destroy(struct list_head * l)  {          struct list_head * p;          struct list_head * h; -        assert(e); - -        list_for_each_safe(p, h, &e->vals) { -                struct val * v = list_entry(p, struct val, next); -                if (v->addr == addr) { -                        list_del(&v->next); -                        val_destroy(v); -                        --e->n_vals; -                } -        } +        assert(l != NULL); -        if (e->n_vals == 0) { -                list_del(&e->next); -                dht_entry_destroy(e); +        list_for_each_safe(p, h, l) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c);          }  } -static uint64_t dht_entry_get_addr(struct dht *       dht, -                                   struct dht_entry * e) +static ssize_t dht_kv_get_contacts(const uint8_t *       key, +                                   dht_contact_msg_t *** msgs)  { +        struct list_head   cl;          struct list_head * p; +        struct list_head * h; +        size_t             len; +        size_t             i = 0; -        assert(e); -        assert(!list_is_empty(&e->vals)); +        assert(key != NULL); +        assert(msgs != NULL); -        list_for_each(p, &e->vals) { -                struct val * v = list_entry(p, struct val, next); -                if (v->addr != dht->addr) -                        return v->addr; +        list_head_init(&cl); + +        len = dht_kv_contact_list(key, &cl, dht.k); +        if (len == 0) { +                *msgs = NULL; +                return 0;          } -        return 0; -} +        *msgs = malloc(len * sizeof(**msgs)); +        if (*msgs == NULL) +                goto fail_msgs; -/* Forward declaration. */ -static struct lookup * kad_lookup(struct dht *    dht, -                                  const uint8_t * key, -                                  enum kad_code   code); +        list_for_each_safe(p, h, &cl) { +                struct contact * c; +                (*msgs)[i] = malloc(sizeof(***msgs)); +                if ((*msgs)[i] == NULL) +                        goto fail_contact; +                dht_contact_msg__init((*msgs)[i]); +                c = list_entry(p, struct contact, next); +                list_del(&c->next); +                (*msgs)[i]->id.data = c->id; +                (*msgs)[i]->id.len  = dht.id.len; +                (*msgs)[i++]->addr  = c->addr; +                free(c); +        } + +        return i; + fail_contact: +        while (i-- > 0) +                dht_contact_msg__free_unpacked((*msgs)[i], NULL); +        free(*msgs); +        *msgs = NULL; + fail_msgs: +        contact_list_destroy(&cl); +        return -ENOMEM; +}  /* Build a refresh list. */ -static void bucket_refresh(struct dht *       dht, -                           struct bucket *    b, -                           time_t             t, -                           struct list_head * r) +static void __dht_kv_bucket_refresh_list(struct bucket *    b, +                                         time_t             t, +                                         struct list_head * r)  { -        size_t i; +        struct contact * c; +        struct contact * d; -        if (*b->children != NULL) -                for (i = 0; i < (1L << KAD_BETA); ++i) -                        bucket_refresh(dht, b->children[i], t, r); +        assert(b != NULL); -        if (b->n_contacts == 0) +        if (t < b->t_refr)                  return; -        if (t > b->t_refr) { -                struct contact * c; -                struct contact * d; -                c = list_first_entry(&b->contacts, struct contact, next); -                d = contact_create(c->id, dht->b, c->addr); -                if (c != NULL) -                        list_add(&d->next, r); +        if (*b->children != NULL) { +                size_t i; +                for (i = 0; i < (1L << DHT_BETA); ++i) +                        __dht_kv_bucket_refresh_list(b->children[i], t, r); +        } + +        if (b->contacts.len == 0)                  return; + +        c = list_first_entry(&b->contacts.list, struct contact, next); +        if (t > c->t_seen + dht.t_refresh) { +                d = contact_create(c->id, c->addr); +                if (d != NULL) +                        list_add(&d->next, r);          }  } -  static struct bucket * bucket_create(void)  {          struct bucket * b; @@ -1286,20 +1682,21 @@ static struct bucket * bucket_create(void)          if (b == NULL)                  return NULL; -        list_head_init(&b->contacts); -        b->n_contacts = 0; +        list_head_init(&b->contacts.list); +        b->contacts.len = 0; -        list_head_init(&b->alts); -        b->n_alts = 0; +        list_head_init(&b->alts.list); +        b->alts.len = 0;          clock_gettime(CLOCK_REALTIME_COARSE, &t); -        b->t_refr = t.tv_sec + KAD_T_REFR; +        b->t_refr = t.tv_sec + dht.t_refresh; -        for (i = 0; i < (1L << KAD_BETA); ++i) +        for (i = 0; i < (1L << DHT_BETA); ++i)                  b->children[i]  = NULL;          b->parent = NULL;          b->depth = 0; +        b->mask  = 0;          return b;  } @@ -1310,24 +1707,24 @@ static void bucket_destroy(struct bucket * b)          struct list_head * h;          size_t             i; -        assert(b); +        assert(b != NULL); -        for (i = 0; i < (1L << KAD_BETA); ++i) +        for (i = 0; i < (1L << DHT_BETA); ++i)                  if (b->children[i] != NULL)                          bucket_destroy(b->children[i]); -        list_for_each_safe(p, h, &b->contacts) { +        list_for_each_safe(p, h, &b->contacts.list) {                  struct contact * c = list_entry(p, struct contact, next);                  list_del(&c->next);                  contact_destroy(c); -                --b->n_contacts; +                --b->contacts.len;          } -        list_for_each_safe(p, h, &b->alts) { +        list_for_each_safe(p, h, &b->alts.list) {                  struct contact * c = list_entry(p, struct contact, next);                  list_del(&c->next);                  contact_destroy(c); -                --b->n_contacts; +                --b->alts.len;          }          free(b); @@ -1342,1534 +1739,2314 @@ static bool bucket_has_id(struct bucket * b,          if (b->depth == 0)                  return true; -        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; +        byte = id[(b->depth * DHT_BETA) / CHAR_BIT]; -        mask = ((1L << KAD_BETA) - 1) & 0xFF; +        mask = ((1L << DHT_BETA) - 1) & 0xFF; -        byte >>= (CHAR_BIT - KAD_BETA) - -                (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); +        byte >>= (CHAR_BIT - DHT_BETA) - +                (((b->depth - 1) * DHT_BETA) & (CHAR_BIT - 1));          return ((byte & mask) == b->mask);  } -static int split_bucket(struct bucket * b) +static int move_contacts(struct bucket * b, +                         struct bucket * c)  {          struct list_head * p;          struct list_head * h; +        struct contact *   d; + +        assert(b != NULL); +        assert(c != NULL); + +        list_for_each_safe(p, h, &b->contacts.list) { +                d = list_entry(p, struct contact, next); +                if (bucket_has_id(c, d->id)) { +                        list_del(&d->next); +                        --b->contacts.len; +                        list_add_tail(&d->next, &c->contacts.list); +                        ++c->contacts.len; +                } +        } + +        return 0; +} + +static int split_bucket(struct bucket * b) +{          uint8_t mask = 0;          size_t i; -        size_t c; +        size_t b_len;          assert(b); -        assert(b->n_alts == 0); -        assert(b->n_contacts); +        assert(b->alts.len == 0); +        assert(b->contacts.len != 0);          assert(b->children[0] == NULL); -        c = b->n_contacts; +        b_len = b->contacts.len; -        for (i = 0; i < (1L << KAD_BETA); ++i) { +        for (i = 0; i < (1L << DHT_BETA); ++i) {                  b->children[i] = bucket_create(); -                if (b->children[i] == NULL) { -                        size_t j; -                        for (j = 0; j < i; ++j) -                                bucket_destroy(b->children[j]); -                        return -1; -                } +                if (b->children[i] == NULL) +                        goto fail_child;                  b->children[i]->depth  = b->depth + 1;                  b->children[i]->mask   = mask;                  b->children[i]->parent = b; -                list_for_each_safe(p, h, &b->contacts) { -                        struct contact * c; -                        c = list_entry(p, struct contact, next); -                        if (bucket_has_id(b->children[i], c->id)) { -                                list_del(&c->next); -                                --b->n_contacts; -                                list_add(&c->next, &b->children[i]->contacts); -                                ++b->children[i]->n_contacts; -                        } -                } +                move_contacts(b, b->children[i]);                  mask++;          } -        for (i = 0; i < (1L << KAD_BETA); ++i) -                if (b->children[i]->n_contacts == c) +        for (i = 0; i < (1L << DHT_BETA); ++i) +                if (b->children[i]->contacts.len == b_len)                          split_bucket(b->children[i]);          return 0; + fail_child: +        while (i-- > 0) +                bucket_destroy(b->children[i]); +        return -1;  } -/* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(struct dht *    dht, -                             const uint8_t * id, -                             uint64_t        addr) +static int dht_kv_update_contacts(const uint8_t * id, +                                  uint64_t        addr)  {          struct list_head * p;          struct list_head * h;          struct bucket *    b;          struct contact *   c; -        assert(dht); +        assert(id != NULL); +        assert(addr != INVALID_ADDR); -        b = dht_get_bucket(dht, id); -        if (b == NULL) -                return -1; +        pthread_rwlock_wrlock(&dht.db.lock); -        c = contact_create(id, dht->b, addr); -        if (c == NULL) -                return -1; +        b = __dht_kv_get_bucket(id); +        if (b == NULL) { +                log_err(PEER_FMT " Failed to get bucket.", PEER_VAL(id, addr)); +                        goto fail_update; +        } + +        c = contact_create(id, addr); +        if (c == NULL) { +                log_err(PEER_FMT " Failed to create contact.", +                        PEER_VAL(id, addr)); +                goto fail_update; +        } -        list_for_each_safe(p, h, &b->contacts) { +        list_for_each_safe(p, h, &b->contacts.list) {                  struct contact * d = list_entry(p, struct contact, next);                  if (d->addr == addr) {                          list_del(&d->next);                          contact_destroy(d); -                        --b->n_contacts; +                        --b->contacts.len;                  }          } -        if (b->n_contacts == dht->k) { -                if (bucket_has_id(b, dht->id)) { -                        list_add_tail(&c->next, &b->contacts); -                        ++b->n_contacts; +        if (b->contacts.len == dht.k) { +                if (bucket_has_id(b, dht.id.data)) { +                        list_add_tail(&c->next, &b->contacts.list); +                        ++b->contacts.len;                          if (split_bucket(b)) {                                  list_del(&c->next);                                  contact_destroy(c); -                                --b->n_contacts; +                                --b->contacts.len;                          } -                } else if (b->n_alts == dht->k) { +                } else if (b->alts.len == dht.k) {                          struct contact * d; -                        d = list_first_entry(&b->alts, struct contact, next); +                        d = list_first_entry(&b->alts.list, +                                struct contact, next);                          list_del(&d->next);                          contact_destroy(d); -                        list_add_tail(&c->next, &b->alts); +                        list_add_tail(&c->next, &b->alts.list); +                        ++b->alts.len;                  } else { -                        list_add_tail(&c->next, &b->alts); -                        ++b->n_alts; +                        list_add_tail(&c->next, &b->alts.list); +                        ++b->alts.len;                  }          } else { -                list_add_tail(&c->next, &b->contacts); -                ++b->n_contacts; +                list_add_tail(&c->next, &b->contacts.list); +                ++b->contacts.len;          } +        pthread_rwlock_unlock(&dht.db.lock); +          return 0; + fail_update: +        pthread_rwlock_unlock(&dht.db.lock); +        return -1;  } -static int send_msg(struct dht * dht, -                    kad_msg_t *  msg, -                    uint64_t     addr) +static time_t gcd(time_t a, +                  time_t b)  { -#ifndef __DHT_TEST__ -        struct shm_du_buff * sdb; -        size_t               len; -#endif -        int                  retr = 0; +        if (a == 0) +                return b; -        if (msg->code == KAD_RESPONSE) -                retr = KAD_RESP_RETR; +        return gcd(b % a, a); +} -        pthread_rwlock_wrlock(&dht->lock); +static dht_contact_msg_t * dht_kv_src_contact_msg(void) +{ +        dht_contact_msg_t * src; -        if (dht->id != NULL) { -                msg->has_s_id = true; -                msg->s_id.data = dht->id; -                msg->s_id.len  = dht->b; -        } +        src = malloc(sizeof(*src)); +        if (src == NULL) +                goto fail_malloc; -        msg->s_addr = dht->addr; +        dht_contact_msg__init(src); -        if (msg->code < KAD_STORE) { -                msg->cookie = bmp_allocate(dht->cookies); -                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { -                        pthread_rwlock_unlock(&dht->lock); -                        goto fail_bmp_alloc; -                } -        } +        src->id.data = dht_dup_key(dht.id.data); +        if (src->id.data == NULL) +                goto fail_id; -        pthread_rwlock_unlock(&dht->lock); +        src->id.len  = dht.id.len; +        src->addr    = dht.addr; -#ifndef __DHT_TEST__ -        len = kad_msg__get_packed_size(msg); -        if (len == 0) -                goto fail_msg; +        return src; + fail_id: +        dht_contact_msg__free_unpacked(src, NULL); + fail_malloc: +        return NULL; +} -        while (true) { -                if (ipcp_sdb_reserve(&sdb, len)) -                        goto fail_msg; +static dht_msg_t * dht_kv_find_req_msg(const uint8_t * key, +                                       enum dht_code   code) +{ +        dht_msg_t * msg; -                kad_msg__pack(msg, shm_du_buff_head(sdb)); +        assert(key != NULL); -                if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) -                        break; +        msg = malloc(sizeof(*msg)); +        if (msg == NULL) +                goto fail_malloc; -                ipcp_sdb_release(sdb); +        dht_msg__init(msg); +        msg->code = code; -                sleep(1); +        msg->src = dht_kv_src_contact_msg(); +        if (msg->src == NULL) +                goto fail_msg; -                if (--retr < 0) -                        goto fail_msg; -        } +        msg->find = malloc(sizeof(*msg->find)); +        if (msg->find == NULL) +                goto fail_msg; -#else -        (void) addr; -        (void) retr; -#endif /* __DHT_TEST__ */ +        dht_find_req_msg__init(msg->find); -        if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) -                kad_req_create(dht, msg, addr); +        msg->find->key.data = dht_dup_key(key); +        if (msg->find->key.data == NULL) +                goto fail_msg; + +        msg->find->key.len = dht.id.len; +        msg->find->cookie  = DHT_INVALID; + +        return msg; -        return msg->cookie; -#ifndef __DHT_TEST__   fail_msg: -        pthread_rwlock_wrlock(&dht->lock); -        bmp_release(dht->cookies, msg->cookie); -        pthread_rwlock_unlock(&dht->lock); -#endif /* !__DHT_TEST__ */ - fail_bmp_alloc: -        return -1; +        dht_msg__free_unpacked(msg, NULL); + fail_malloc: +        return NULL;  } -static struct dht_entry * dht_find_entry(struct dht *    dht, -                                         const uint8_t * key) +static dht_msg_t * dht_kv_find_node_req_msg(const uint8_t * key)  { -        struct list_head * p; +        return dht_kv_find_req_msg(key, DHT_FIND_NODE_REQ); +} -        list_for_each(p, &dht->entries) { -                struct dht_entry * e = list_entry(p, struct dht_entry, next); -                if (!memcmp(key, e->key, dht->b)) -                        return e; +static dht_msg_t * dht_kv_find_value_req_msg(const uint8_t * key) +{ +        return dht_kv_find_req_msg(key, DHT_FIND_VALUE_REQ); +} + +static dht_msg_t * dht_kv_find_node_rsp_msg(uint8_t *             key, +                                            uint64_t              cookie, +                                            dht_contact_msg_t *** contacts, +                                            size_t                len) +{ +        dht_msg_t * msg; + +        msg = malloc(sizeof(*msg)); +        if (msg == NULL) +                goto fail_malloc; + +        dht_msg__init(msg); +        msg->code = DHT_FIND_NODE_RSP; + +        msg->src = dht_kv_src_contact_msg(); +        if (msg->src == NULL) +                goto fail_msg; + +        msg->node = malloc(sizeof(*msg->node)); +        if (msg->node == NULL) +                goto fail_msg; + +        dht_find_node_rsp_msg__init(msg->node); + +        msg->node->key.data = dht_dup_key(key); +        if (msg->node->key.data == NULL) +                goto fail_msg; + +        msg->node->cookie     = cookie; +        msg->node->key.len    = dht.id.len; +        msg->node->n_contacts = len; +        if (len != 0) { /* Steal the ptr */ +                msg->node->contacts = *contacts; +                *contacts = NULL;          } +        return msg; + + fail_msg: +        dht_msg__free_unpacked(msg, NULL); + fail_malloc:          return NULL;  } -static int kad_add(struct dht *              dht, -                   const kad_contact_msg_t * contacts, -                   ssize_t                   n, -                   time_t                    exp) +static dht_msg_t * dht_kv_find_value_rsp_msg(uint8_t *             key, +                                             uint64_t              cookie, +                                             dht_contact_msg_t *** contacts, +                                             size_t                n_contacts, +                                             buffer_t **           vals, +                                             size_t                n_vals)  { -        struct dht_entry * e; +        dht_msg_t * msg; -        pthread_rwlock_wrlock(&dht->lock); +        msg = dht_kv_find_node_rsp_msg(key, cookie, contacts, n_contacts); +        if (msg == NULL) +                goto fail_node_rsp; -        while (n-- > 0) { -                if (contacts[n].id.len != dht->b) -                        log_warn("Bad key length in contact data."); +        msg->code = DHT_FIND_VALUE_RSP; -                e = dht_find_entry(dht, contacts[n].id.data); -                if (e != NULL) { -                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) -                                goto fail; -                } else { -                        e = dht_entry_create(dht, contacts[n].id.data); -                        if (e == NULL) -                                goto fail; +        msg->val = malloc(sizeof(*msg->val)); +        if (msg->val == NULL) +                goto fail_msg; -                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) { -                                dht_entry_destroy(e); -                                goto fail; -                        } +        dht_find_value_rsp_msg__init(msg->val); -                        list_add(&e->next, &dht->entries); -                } -        } +        msg->val->n_values = n_vals; +        if (n_vals != 0)  /* Steal the ptr */ +                msg->val->values = (binary_data_t *) *vals; -        pthread_rwlock_unlock(&dht->lock); -        return 0; +        return msg; - fail: -        pthread_rwlock_unlock(&dht->lock); -        return -ENOMEM; + fail_msg: +        dht_msg__free_unpacked(msg, NULL); + fail_node_rsp: +        return NULL;  } -static int wait_resp(struct dht * dht, -                     kad_msg_t *  msg, -                     time_t       timeo) +static dht_msg_t * dht_kv_store_msg(const uint8_t * key, +                                    const buffer_t  val, +                                    time_t          exp)  { -        struct kad_req * req; +        dht_msg_t * msg; + +        assert(key != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        assert(dht); -        assert(msg); +        msg = malloc(sizeof(*msg)); +        if (msg == NULL) +                goto fail_malloc; -        pthread_rwlock_rdlock(&dht->lock); +        dht_msg__init(msg); -        req = dht_find_request(dht, msg); -        if (req == NULL) { -                pthread_rwlock_unlock(&dht->lock); -                return -EPERM; -        } +        msg->code = DHT_STORE; -        pthread_rwlock_unlock(&dht->lock); +        msg->src = dht_kv_src_contact_msg(); +        if (msg->src == NULL) +                goto fail_msg; + +        msg->store = malloc(sizeof(*msg->store)); +        if (msg->store == NULL) +                goto fail_msg; -        return kad_req_wait(req, timeo); +        dht_store_msg__init(msg->store); + +        msg->store->key.data = dht_dup_key(key); +        if (msg->store->key.data == NULL) +                goto fail_msg; + +        msg->store->key.len = dht.id.len; +        msg->store->val.data = malloc(val.len); +        if (msg->store->val.data == NULL) +                goto fail_msg; + +        memcpy(msg->store->val.data, val.data, val.len); + +        msg->store->val.len = val.len; +        msg->store->exp = exp; + +        return msg; + + fail_msg: +        dht_msg__free_unpacked(msg, NULL); + fail_malloc: +        return NULL;  } -static int kad_store(struct dht *    dht, -                     const uint8_t * key, -                     uint64_t        addr, -                     uint64_t        r_addr, -                     time_t          ttl) +static ssize_t dht_kv_retrieve(const uint8_t * key, +                               buffer_t **     vals)  { -        kad_msg_t msg = KAD_MSG__INIT; -        kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; -        kad_contact_msg_t * cmsgp[1]; +        struct dht_entry * e; +        struct list_head * p; +        size_t             n; +        size_t             i; -        cmsg.id.data = (uint8_t *) key; -        cmsg.addr    = addr; +        assert(key  != NULL); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.db.lock); -        cmsg.id.len  = dht->b; +        e = __dht_kv_find_entry(key); +        if (e == NULL) +                goto no_vals; -        pthread_rwlock_unlock(&dht->lock); +        n = MIN(DHT_MAX_VALS, e->vals.len + e->lvals.len); +        if (n == 0) +                goto no_vals; -        cmsgp[0] = &cmsg; +        *vals = malloc(n * sizeof(**vals)); +        if (*vals == NULL) +                goto fail_vals; -        msg.code         = KAD_STORE; -        msg.has_t_expire = true; -        msg.t_expire     = ttl; -        msg.n_contacts   = 1; -        msg.contacts     = cmsgp; +        memset(*vals, 0, n * sizeof(**vals)); -        if (send_msg(dht, &msg, r_addr) < 0) -                return -1; +        i = 0; + +        list_for_each(p, &e->vals.list) { +                struct val_entry * v; +                if (i == n) +                        break; /* We have enough values */ +                v = list_entry(p, struct val_entry, next); +                (*vals)[i].data = malloc(v->val.len); +                if ((*vals)[i].data == NULL) +                        goto fail_val_data; + +                (*vals)[i].len = v->val.len; +                memcpy((*vals)[i++].data, v->val.data, v->val.len); +        } + +        list_for_each(p, &e->lvals.list) { +                struct val_entry * v; +                if (i == n) +                        break; /* We have enough values */ +                v = list_entry(p, struct val_entry, next); +                (*vals)[i].data = malloc(v->val.len); +                if ((*vals)[i].data == NULL) +                        goto fail_val_data; + +                (*vals)[i].len = v->val.len; +                memcpy((*vals)[i++].data, v->val.data, v->val.len); +        } + +        pthread_rwlock_unlock(&dht.db.lock); + +        return (ssize_t) i; + fail_val_data: +        pthread_rwlock_unlock(&dht.db.lock); +        freebufs(*vals, i); +        *vals = NULL; +        return -ENOMEM; + fail_vals: +        pthread_rwlock_unlock(&dht.db.lock); +        return -ENOMEM; + no_vals: +        pthread_rwlock_unlock(&dht.db.lock); +        *vals = NULL;          return 0;  } -static ssize_t kad_find(struct dht *     dht, -                        struct lookup *  lu, -                        const uint64_t * addrs, -                        enum kad_code    code) +static void __cleanup_dht_msg(void * msg) +{ +        dht_msg__free_unpacked((dht_msg_t *) msg, NULL); +} + +#ifdef DEBUG_PROTO_DHT +static void dht_kv_debug_msg(dht_msg_t * msg) +{ +        struct tm *   tm; +        char          tmstr[RIB_TM_STRLEN]; +        time_t        stamp; +        size_t        i; + +        if (msg == NULL) +                return; + +        pthread_cleanup_push(__cleanup_dht_msg, msg); + +        switch (msg->code) { +        case DHT_STORE: +                log_proto("  key: " HASH_FMT64 " [%zu bytes]", +                          HASH_VAL64(msg->store->key.data), +                          msg->store->key.len); +                log_proto("  val: " HASH_FMT64 " [%zu bytes]", +                          HASH_VAL64(msg->store->val.data), +                          msg->store->val.len); +                stamp = msg->store->exp; +                tm = gmtime(&stamp); +                strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); +                log_proto("  exp: %s.", tmstr); +                break; +        case DHT_FIND_NODE_REQ: +                /* FALLTHRU */ +        case DHT_FIND_VALUE_REQ: +                log_proto("  cookie: " HASH_FMT64, +                          HASH_VAL64(&msg->find->cookie)); +                log_proto("  key:    " HASH_FMT64 " [%zu bytes]", +                          HASH_VAL64(msg->find->key.data), +                          msg->find->key.len); +                break; +        case DHT_FIND_VALUE_RSP: +                log_proto("  cookie: " HASH_FMT64, +                          HASH_VAL64(&msg->node->cookie)); +                log_proto("  key:    " HASH_FMT64 " [%zu bytes]", +                          HASH_VAL64(msg->node->key.data), +                          msg->node->key.len); +                log_proto("  values: [%zd]", msg->val->n_values); +                for (i = 0; i < msg->val->n_values; i++) +                        log_proto("    " HASH_FMT64 " [%zu bytes]", +                                  HASH_VAL64(msg->val->values[i].data), +                                  msg->val->values[i].len); +                log_proto("  contacts: [%zd]", msg->node->n_contacts); +                for (i = 0; i < msg->node->n_contacts; i++) { +                        dht_contact_msg_t * c = msg->node->contacts[i]; +                        log_proto("    " PEER_FMT, +                                  PEER_VAL(c->id.data, c->addr)); +                } +                break; +        case DHT_FIND_NODE_RSP: +                log_proto("  cookie: " HASH_FMT64, +                        HASH_VAL64(&msg->node->cookie)); +                log_proto("  key:    " HASH_FMT64 " [%zu bytes]", +                          HASH_VAL64(msg->node->key.data), msg->node->key.len); +                log_proto("  contacts: [%zd]", msg->node->n_contacts); +                for (i = 0; i < msg->node->n_contacts; i++) { +                        dht_contact_msg_t * c = msg->node->contacts[i]; +                        log_proto("    " PEER_FMT, +                                  PEER_VAL(c->id.data, c->addr)); +                } + +                break; +        default: +                break; +        } + +        pthread_cleanup_pop(false); +} + +static void dht_kv_debug_msg_snd(dht_msg_t * msg, +                                 uint8_t *   id, +                                 uint64_t    addr)  { -        kad_msg_t msg  = KAD_MSG__INIT; -        ssize_t   sent = 0; +        if (msg == NULL) +                return; -        assert(dht); -        assert(lu->key); +        log_proto(TX_HDR_FMT ".", TX_HDR_VAL(msg, id, addr)); -        msg.code = code; +        dht_kv_debug_msg(msg); +} -        msg.has_key       = true; -        msg.key.data      = (uint8_t *) lu->key; -        msg.key.len       = dht->b; +static void dht_kv_debug_msg_rcv(dht_msg_t * msg) +{ +        if (msg == NULL) +                return; -        while (*addrs != 0) { -                struct cookie_el * c; -                int ret; +        log_proto(RX_HDR_FMT ".", RX_HDR_VAL(msg)); -                if (*addrs == dht->addr) { -                        ++addrs; -                        continue; -                } +        dht_kv_debug_msg(msg); +} +#endif -                ret = send_msg(dht, &msg, *addrs); -                if (ret < 0) -                        break; +#ifndef __DHT_TEST__ +static int dht_send_msg(dht_msg_t * msg, +                        uint64_t    addr) +{ +        size_t               len; +        struct shm_du_buff * sdb; -                c = malloc(sizeof(*c)); -                if (c == NULL) -                        break; +        if (msg == NULL) +                return 0; -                c->cookie = (uint32_t) ret; +        assert(addr != INVALID_ADDR && addr != dht.addr); -                pthread_mutex_lock(&lu->lock); +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                log_warn("%s failed to pack.", DHT_CODE(msg)); +                goto fail_msg; +        } -                list_add_tail(&c->next, &lu->cookies); +        if (ipcp_sdb_reserve(&sdb, len)) { +                log_warn("%s failed to get sdb.", DHT_CODE(msg)); +                goto fail_msg; +        } -                pthread_mutex_unlock(&lu->lock); +        dht_msg__pack(msg, shm_du_buff_head(sdb)); -                ++sent; -                ++addrs; +        if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) { +                log_warn("%s write failed", DHT_CODE(msg)); +                goto fail_send;          } -        return sent; +        return 0; + fail_send: +        ipcp_sdb_release(sdb); + fail_msg: +        return -1;  } +#else /* funtion for testing  */ +static int dht_send_msg(dht_msg_t * msg, +                        uint64_t    addr) +{ +        buffer_t buf; + +        assert(msg != NULL); +        assert(addr != INVALID_ADDR && addr != dht.addr); -static void lookup_detach(struct dht *    dht, -                          struct lookup * lu) +        buf.len = dht_msg__get_packed_size(msg); +        if (buf.len == 0) { +                log_warn("%s failed to pack.", DHT_CODE(msg)); +                goto fail_msg; +        } + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) { +                log_warn("%s failed to malloc buf.", DHT_CODE(msg)); +                goto fail_msg; +        } + +        dht_msg__pack(msg, buf.data); + +        if (sink_send_msg(&buf, addr) < 0) { +                log_warn("%s write failed", DHT_CODE(msg)); +                goto fail_send; +        } + +        return 0; + fail_send: +        freebuf(buf); + fail_msg: +        return -1; +} +#endif /* __DHT_TEST__ */ + +static void __cleanup_peer_list(void * pl)  { -        pthread_rwlock_wrlock(&dht->lock); +        struct list_head * p; +        struct list_head * h; -        list_del(&lu->next); +        assert(pl != NULL); -        pthread_rwlock_unlock(&dht->lock); +        list_for_each_safe(p, h, (struct list_head *) pl) { +                struct peer_entry * e = list_entry(p, struct peer_entry, next); +                list_del(&e->next); +                free(e->id); +                free(e); +        }  } -static struct lookup * kad_lookup(struct dht *    dht, -                                  const uint8_t * id, -                                  enum kad_code   code) + +static int dht_kv_send_msgs(dht_msg_t *        msg, +                            struct list_head * pl)  { -        uint64_t          addrs[KAD_ALPHA + 1]; -        enum lookup_state state; -        struct lookup *   lu; +        struct list_head * p; +        struct list_head * h; -        lu = lookup_create(dht, id); -        if (lu == NULL) -                return NULL; +        pthread_cleanup_push(__cleanup_dht_msg, msg); +        pthread_cleanup_push(__cleanup_peer_list, pl); -        lookup_new_addrs(lu, addrs); +        list_for_each_safe(p, h, pl) { +                struct peer_entry * e = list_entry(p, struct peer_entry, next); +                if (IS_REQUEST(msg->code)) { +                        msg->find->cookie = e->cookie; +                        assert(msg->find->cookie != DHT_INVALID); +                } +                if (dht_send_msg(msg, e->addr) < 0) +                        continue; -        if (addrs[0] == 0) { -                lookup_detach(dht, lu); -                lookup_destroy(lu); -                return NULL; +#ifdef DEBUG_PROTO_DHT +                dht_kv_debug_msg_snd(msg, e->id, e->addr); +#endif +                list_del(&e->next); +                free(e->id); +                free(e);          } -        if (kad_find(dht, lu, addrs, code) == 0) { -                lookup_detach(dht, lu); -                return lu; +        pthread_cleanup_pop(false); +        pthread_cleanup_pop(false); + +        return list_is_empty(pl) ? 0 : -1; +} + +static int dht_kv_get_peer_list_for_msg(dht_msg_t *        msg, +                                        struct list_head * pl) +{ +        struct list_head   cl;  /* contact list       */ +        uint8_t *          key; /* key in the request */ +        size_t             max; + +        assert(msg != NULL); + +        assert(list_is_empty(pl)); + +        max = msg->code == DHT_STORE ? dht.k : dht.alpha; + +        switch (msg->code) { +        case DHT_FIND_NODE_REQ: +                /* FALLTHRU */ +        case DHT_FIND_VALUE_REQ: +                key = msg->find->key.data; +                break; +        case DHT_STORE: +                key = msg->store->key.data; +                break; +        default: +                log_err("Invalid DHT msg code (%d).", msg->code); +                return -1;          } -        while ((state = lookup_wait(lu)) != LU_COMPLETE) { -                switch (state) { -                case LU_UPDATE: -                        lookup_new_addrs(lu, addrs); -                        if (addrs[0] == 0) -                                break; +        list_head_init(&cl); -                        kad_find(dht, lu, addrs, code); -                        break; -                case LU_DESTROY: -                        lookup_detach(dht, lu); -                        lookup_set_state(lu, LU_NULL); -                        return NULL; -                default: -                        break; -                } +        if (dht_kv_contact_list(key, &cl, max) < 0) { +                log_err(KEY_FMT " Failed to get contact list.", KEY_VAL(key)); +                goto fail_contacts;          } -        assert(state == LU_COMPLETE); +        if (list_is_empty(&cl)) { +                log_warn(KEY_FMT " No available contacts.", KEY_VAL(key)); +                goto fail_contacts; +        } -        lookup_detach(dht, lu); +        if (dht_kv_create_peer_list(&cl, pl, msg->code) < 0) { +                log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); +                goto fail_peers; +        } -        return lu; +        contact_list_destroy(&cl); +        return 0; + fail_peers: +        contact_list_destroy(&cl); + fail_contacts: +        return -1;  } -static void kad_publish(struct dht *    dht, -                        const uint8_t * key, -                        uint64_t        addr, -                        time_t          exp) +static int dht_kv_store_remote(const uint8_t * key, +                               const buffer_t  val, +                               time_t          exp)  { -        struct lookup * lu; -        uint64_t      * addrs; -        ssize_t         n; -        size_t          k; -        time_t          t_expire; +        dht_msg_t *      msg; +        struct timespec  now; +        struct list_head pl; +        assert(key != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        assert(dht); -        assert(key); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        pthread_rwlock_rdlock(&dht->lock); +        msg = dht_kv_store_msg(key, val, exp); +        if (msg == NULL) { +                log_err(KV_FMT " Failed to create %s.", +                        KV_VAL(key, val), dht_code_str[DHT_STORE]); +                goto fail_msg; +        } -        k        = dht->k; -        t_expire = dht->t_expire; +        list_head_init(&pl); -        pthread_rwlock_unlock(&dht->lock); +        if (dht_kv_get_peer_list_for_msg(msg, &pl) < 0) { +                log_dbg(KV_FMT " Failed to get peer list.", KV_VAL(key, val)); +                goto fail_peer_list; +        } -        addrs = malloc(k * sizeof(*addrs)); -        if (addrs == NULL) -                return; +        if (dht_kv_send_msgs(msg, &pl) < 0) { +                log_warn(KV_FMT " Failed to send any %s msg.", +                         KV_VAL(key, val), DHT_CODE(msg)); +                goto fail_msgs; +        } -        lu = kad_lookup(dht, key, KAD_FIND_NODE); -        if (lu == NULL) { -                free(addrs); -                return; +        dht_msg__free_unpacked(msg, NULL); + +        return 0; + fail_msgs: +        peer_list_destroy(&pl); + fail_peer_list: +        dht_msg__free_unpacked(msg, NULL); + fail_msg: +        return -1; +} + +/* recursive lookup, start with pl NULL */ +static int dht_kv_query_contacts(const uint8_t *    key, +                                 struct list_head * pl) +{ +        struct list_head p; + +        dht_msg_t * msg; + +        assert(key != NULL); + +        msg = dht_kv_find_node_req_msg(key); +        if (msg == NULL) { +                log_err(KEY_FMT " Failed to create %s msg.", +                        KEY_VAL(key), dht_code_str[DHT_FIND_NODE_REQ]); +                goto fail_msg;          } -        n = lookup_contact_addrs(lu, addrs); +        if (pl == NULL) { +                list_head_init(&p); +                pl = &p; +        } -        while (n-- > 0) { -                if (addrs[n] == dht->addr) { -                        kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; -                        msg.id.data = (uint8_t *) key; -                        msg.id.len  = dht->b; -                        msg.addr    = addr; -                        kad_add(dht, &msg, 1, exp); -                } else { -                        if (kad_store(dht, key, addr, addrs[n], t_expire)) -                                log_warn("Failed to send store message."); -                } +        if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { +                log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); +                goto fail_peer_list; +        } + +        if (dht_kv_update_req(key, pl) < 0) { +                log_warn(KEY_FMT " Failed to update req.", KEY_VAL(key)); +                goto fail_update;          } -        lookup_destroy(lu); +        if (dht_kv_send_msgs(msg, pl)) { +                log_warn(KEY_FMT " Failed to send any %s msg.", +                         KEY_VAL(key), DHT_CODE(msg)); +                goto fail_update; +        } + +        dht_msg__free_unpacked(msg, NULL); -        free(addrs); +        return 0; + fail_update: +        peer_list_destroy(pl); + fail_peer_list: +        dht_msg__free_unpacked(msg, NULL); + fail_msg: +        return -1;  } -static int kad_join(struct dht * dht, -                    uint64_t     addr) +/* recursive lookup, start with pl NULL */ +static ssize_t dht_kv_query_remote(const uint8_t *    key, +                                   buffer_t **        vals, +                                   struct list_head * pl)  { -        kad_msg_t       msg = KAD_MSG__INIT; +        struct list_head p; +        dht_msg_t *      msg; + +        assert(key != NULL); -        msg.code = KAD_JOIN; +        msg = dht_kv_find_value_req_msg(key); +        if (msg == NULL) { +                log_err(KEY_FMT " Failed to create value req.", KEY_VAL(key)); +                goto fail_msg; +        } -        msg.has_alpha       = true; -        msg.has_b           = true; -        msg.has_k           = true; -        msg.has_t_refresh   = true; -        msg.has_t_replicate = true; -        msg.alpha           = KAD_ALPHA; -        msg.k               = KAD_K; -        msg.t_refresh       = KAD_T_REFR; -        msg.t_replicate     = KAD_T_REPL; +        if (pl == NULL) { +                list_head_init(&p); +                pl = &p; +        } -        pthread_rwlock_rdlock(&dht->lock); +        if (list_is_empty(pl) && dht_kv_get_peer_list_for_msg(msg, pl) < 0) { +                log_warn(KEY_FMT " Failed to get peer list.", KEY_VAL(key)); +                goto fail_peer_list; +        } -        msg.b               = dht->b; +        if (dht_kv_update_req(key, pl) < 0) { +                log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); +                goto fail_update; +        } -        pthread_rwlock_unlock(&dht->lock); +        if (dht_kv_send_msgs(msg, pl)) { +                log_warn(KEY_FMT " Failed to send %s msg.", +                         KEY_VAL(key), DHT_CODE(msg)); +                goto fail_update; +        } -        if (send_msg(dht, &msg, addr) < 0) -                return -1; +        dht_msg__free_unpacked(msg, NULL); -        if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) -                return -1; +        if (vals == NULL) /* recursive lookup, already waiting */ +                return 0; -        dht->id = create_id(dht->b); -        if (dht->id == NULL) -                return -1; +        return dht_kv_wait_req(key, vals); + fail_update: +        peer_list_destroy(pl); + fail_peer_list: +        dht_msg__free_unpacked(msg, NULL); + fail_msg: +        return -1; +} -        pthread_rwlock_wrlock(&dht->lock); +static void __add_dht_kv_entry(struct dht_entry * e) +{ +        struct list_head * p; -        dht_update_bucket(dht, dht->id, dht->addr); +        assert(e != NULL); -        pthread_rwlock_unlock(&dht->lock); +        list_for_each(p, &dht.db.kv.list) { +                struct dht_entry * d = list_entry(p, struct dht_entry, next); +                if (IS_CLOSER(d->key, e->key)) +                        continue; +                break; +        } -        return 0; +        list_add_tail(&e->next, p); +        ++dht.db.kv.len;  } -static void dht_dead_peer(struct dht * dht, -                          uint8_t *    key, -                          uint64_t     addr) +/* incoming store message */ +static int dht_kv_store(const uint8_t * key, +                        const buffer_t  val, +                        time_t          exp)  { -        struct list_head * p; -        struct list_head * h; -        struct bucket *    b; +        struct dht_entry * e; +        bool               new = false; -        b = dht_get_bucket(dht, key); +        assert(key != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        list_for_each_safe(p, h, &b->contacts) { -                struct contact * c = list_entry(p, struct contact, next); -                if (b->n_contacts + b->n_alts <= dht->k) { -                        ++c->fails; -                        return; -                } +        pthread_rwlock_wrlock(&dht.db.lock); -                if (c->addr == addr) { -                        list_del(&c->next); -                        contact_destroy(c); -                        --b->n_contacts; -                        break; -                } +        e = __dht_kv_find_entry(key); +        if (e == NULL) { +                log_dbg(KV_FMT " Adding entry (store).", KV_VAL(key, val)); +                e = dht_entry_create(key); +                if (e == NULL) +                        goto fail; + +                new = true; + +                __add_dht_kv_entry(e);          } -        while (b->n_contacts < dht->k && b->n_alts > 0) { -                struct contact * c; -                c = list_first_entry(&b->alts, struct contact, next); -                list_del(&c->next); -                --b->n_alts; -                list_add(&c->next, &b->contacts); -                ++b->n_contacts; +        if (dht_entry_update_val(e, val, exp) < 0) +                goto fail_add; + +        pthread_rwlock_unlock(&dht.db.lock); + +        return 0; + fail_add: +        if (new) { +                list_del(&e->next); +                dht_entry_destroy(e); +                --dht.db.kv.len;          } + fail: +        pthread_rwlock_unlock(&dht.db.lock); +        return -1;  } -static int dht_del(struct dht *    dht, -                   const uint8_t * key, -                   uint64_t        addr) +static int dht_kv_publish(const uint8_t * key, +                          const buffer_t  val)  {          struct dht_entry * e; +        struct timespec    now; +        bool               new = false; + +        assert(key != NULL); +        assert(val.data != NULL); +        assert(val.len > 0); -        pthread_rwlock_wrlock(&dht->lock); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        e = dht_find_entry(dht, key); +        pthread_rwlock_wrlock(&dht.db.lock); + +        e = __dht_kv_find_entry(key);          if (e == NULL) { -                pthread_rwlock_unlock(&dht->lock); -                return -EPERM; +                log_dbg(KV_FMT " Adding entry (publish).", KV_VAL(key, val)); +                e = dht_entry_create(key); +                if (e == NULL) +                        goto fail; + +                __add_dht_kv_entry(e); +                new = true;          } -        dht_entry_del_addr(e, addr); +        if (dht_entry_update_lval(e, val) < 0) +                goto fail_add; + +        pthread_rwlock_unlock(&dht.db.lock); -        pthread_rwlock_unlock(&dht->lock); +        dht_kv_store_remote(key, val, now.tv_sec + dht.t_expire);          return 0; + fail_add: +        if (new) { +                list_del(&e->next); +                dht_entry_destroy(e); +                --dht.db.kv.len; +        } + fail: +        pthread_rwlock_unlock(&dht.db.lock); +        return -1;  } -static buffer_t dht_retrieve(struct dht *    dht, -                             const uint8_t * key) +static int dht_kv_unpublish(const uint8_t * key, +                            const buffer_t  val)  {          struct dht_entry * e; -        struct list_head * p; -        buffer_t           buf; -        uint64_t *         pos; -        size_t             addrs = 0; +        int                rc; + +        assert(key != NULL); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.db.lock); -        e = dht_find_entry(dht, key); +        e = __dht_kv_find_entry(key);          if (e == NULL) -                goto fail; +                goto no_entry; -        buf.len = MIN(DHT_RETR_ADDR, e->n_vals); -        if (buf.len == 0) -                goto fail; +        rc = dht_entry_remove_lval(e, val); -        pos = malloc(sizeof(dht->addr) * buf.len); -        if (pos == NULL) -                goto fail; +        pthread_rwlock_unlock(&dht.db.lock); -        buf.data = (uint8_t *) pos; +        return rc; + no_entry: +        pthread_rwlock_unlock(&dht.db.lock); +        return -ENOENT; -        list_for_each(p, &e->vals) { -                struct val * v = list_entry(p, struct val, next); -                *pos++ = v->addr; -                if (++addrs >= buf.len) -                        break; +} + +/* message validation */ +static int dht_kv_validate_store_msg(const dht_store_msg_t * store) +{ +        if (store == NULL) { +                log_warn("Store in msg is NULL."); +                return -EINVAL; +        } + +        if (store->key.data == NULL || store->key.len == 0) { +                log_warn("Invalid key in DHT store msg."); +                return -EINVAL;          } -        pthread_rwlock_unlock(&dht->lock); +        if (store->key.len != dht.id.len) { +                log_warn("Invalid key length in DHT store msg."); +                return -EINVAL; +        } -        return buf; +        if (store->val.data == NULL || store->val.len == 0) { +                log_warn("Invalid value in DHT store msg."); +                return -EINVAL; +        } - fail: -        pthread_rwlock_unlock(&dht->lock); -        buf.len = 0; +        return 0; +} + +static int validate_find_req_msg(const dht_find_req_msg_t * req) +{ +        if (req == NULL) { +                log_warn("Request in msg is NULL."); +                return -EINVAL; +        } -        return buf; +        if (req->key.data == NULL || req->key.len == 0) { +                log_warn("Find request without key."); +                return -EINVAL; +        } + +        if (req->key.len != dht.id.len) { +                log_warn("Invalid key length in request msg."); +                return -EINVAL; +        } + +        return 0;  } -static ssize_t dht_get_contacts(struct dht *          dht, -                                const uint8_t *       key, -                                kad_contact_msg_t *** msgs) +static int validate_node_rsp_msg(const dht_find_node_rsp_msg_t * rsp)  { -        struct list_head   l; -        struct list_head * p; -        struct list_head * h; -        size_t             len; -        size_t             i = 0; +        if (rsp == NULL) { +                log_warn("Node rsp in msg is NULL."); +                return -EINVAL; +        } -        list_head_init(&l); +        if (rsp->key.data == NULL) { +                log_warn("Invalid key in DHT response msg."); +                return -EINVAL; +        } -        pthread_rwlock_wrlock(&dht->lock); +        if (rsp->key.len != dht.id.len) { +                log_warn("Invalid key length in DHT response msg."); +                return -EINVAL; +        } -        len = dht_contact_list(dht, &l, key); -        if (len == 0) { -                pthread_rwlock_unlock(&dht->lock); -                *msgs = NULL; +        if (!dht_kv_has_req(rsp->key.data, rsp->cookie)) { +                log_warn(KEY_FMT " No request " CK_FMT  ".", +                         KEY_VAL(rsp->key.data), CK_VAL(rsp->cookie)); + +                return -EINVAL; +        } + +        return 0; +} + +static int validate_value_rsp_msg(const dht_find_value_rsp_msg_t * rsp) +{ +        if (rsp == NULL) { +                log_warn("Invalid DHT find value response msg."); +                return -EINVAL; +        } + +        if (rsp->values == NULL && rsp->n_values > 0) { +                log_dbg("No values in DHT response msg.");                  return 0;          } -        *msgs = malloc(len * sizeof(**msgs)); -        if (*msgs == NULL) { -                pthread_rwlock_unlock(&dht->lock); +        if (rsp->n_values == 0 && rsp->values != NULL) { +                log_dbg("DHT response did not set values NULL.");                  return 0;          } -        list_for_each_safe(p, h, &l) { -                struct contact * c = list_entry(p, struct contact, next); -                (*msgs)[i] = malloc(sizeof(***msgs)); -                if ((*msgs)[i] == NULL) { -                        pthread_rwlock_unlock(&dht->lock); -                        while (i > 0) -                                free(*msgs[--i]); -                        free(*msgs); -                        *msgs = NULL; -                        return 0; -                } +        return 0; +} -                kad_contact_msg__init((*msgs)[i]); +static int dht_kv_validate_msg(dht_msg_t * msg) +{ -                (*msgs)[i]->id.data = c->id; -                (*msgs)[i]->id.len  = dht->b; -                (*msgs)[i++]->addr  = c->addr; -                list_del(&c->next); -                free(c); +        assert(msg != NULL); + +        if (msg->src->id.len != dht.id.len) { +                log_warn("%s Invalid source contact ID.", DHT_CODE(msg)); +                return -EINVAL;          } -        pthread_rwlock_unlock(&dht->lock); +        if (msg->src->addr == INVALID_ADDR) { +                log_warn("%s Invalid source address.", DHT_CODE(msg)); +                return -EINVAL; +        } -        return i; +        switch (msg->code) { +        case DHT_FIND_VALUE_REQ: +                /* FALLTHRU */ +        case DHT_FIND_NODE_REQ: +                if (validate_find_req_msg(msg->find) < 0) +                        return -EINVAL; +                break; +        case DHT_FIND_VALUE_RSP: +                if (validate_value_rsp_msg(msg->val) < 0) +                        return -EINVAL; +                /* FALLTHRU */ +        case DHT_FIND_NODE_RSP: +                if (validate_node_rsp_msg(msg->node) < 0) +                        return -EINVAL; +                break; +        case DHT_STORE: +                if (dht_kv_validate_store_msg(msg->store) < 0) +                        return -EINVAL; +                break; +        default: +                log_warn("Invalid DHT msg code (%d).", msg->code); +                return -ENOENT; +        } + +        return 0;  } -static time_t gcd(time_t a, -                  time_t b) +static void do_dht_kv_store(const dht_store_msg_t * store)  { -        if (a == 0) -                return b; +        struct tm * tm; +        char        tmstr[RIB_TM_STRLEN]; +        buffer_t    val; +        uint8_t *   key; +        time_t      exp; -        return gcd(b % a, a); +        assert(store != NULL); + +        val.data = store->val.data; +        val.len  = store->val.len; +        key      = store->key.data; +        exp      = store->exp; + +        if (dht_kv_store(store->key.data, val, store->exp) < 0) { +                log_err(KV_FMT " Failed to store.", KV_VAL(key, val)); +                return; +        } + +        tm = gmtime(&exp); +        strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); +        log_dbg(KV_FMT " Stored value until %s.", KV_VAL(key, val), tmstr);  } -static void * work(void * o) +static dht_msg_t * do_dht_kv_find_node_req(const dht_find_req_msg_t * req)  { -        struct dht *       dht; -        struct timespec    now; -        struct list_head * p; -        struct list_head * h; -        struct list_head   reflist; -        time_t             intv; -        struct lookup *    lu; +        dht_contact_msg_t ** contacts; +        dht_msg_t *          rsp; +        uint8_t *            key; +        uint64_t             cookie; +        ssize_t              len; -        dht = (struct dht *) o; +        assert(req  != NULL); -        pthread_rwlock_rdlock(&dht->lock); +        key    = req->key.data; +        cookie = req->cookie; -        intv = gcd(dht->t_expire, dht->t_repub); -        intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; +        len = dht_kv_get_contacts(key, &contacts); +        if (len < 0) { +                log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); +                goto fail_contacts; +        } -        pthread_rwlock_unlock(&dht->lock); +        rsp = dht_kv_find_node_rsp_msg(key, cookie, &contacts, len); +        if (rsp == NULL) { +                log_err(KEY_FMT " Failed to create %s.", KEY_VAL(key), +                        dht_code_str[DHT_FIND_NODE_RSP]); +                goto fail_msg; +        } -        list_head_init(&reflist); +        assert(rsp->code == DHT_FIND_NODE_RSP); -        while (true) { -                clock_gettime(CLOCK_REALTIME_COARSE, &now); - -                pthread_rwlock_wrlock(&dht->lock); - -                /* Republish registered hashes. */ -                list_for_each(p, &dht->refs) { -                        struct ref_entry * e; -                        uint8_t *          key; -                        uint64_t           addr; -                        time_t             t_expire; -                        e = list_entry(p, struct ref_entry, next); -                        if (now.tv_sec > e->t_rep) { -                                key = dht_dup_key(e->key, dht->b); -                                if (key == NULL) -                                        continue; -                                addr = dht->addr; -                                t_expire = dht->t_expire; -                                e->t_rep = now.tv_sec + dht->t_repub; - -                                pthread_rwlock_unlock(&dht->lock); -                                kad_publish(dht, key, addr, t_expire); -                                pthread_rwlock_wrlock(&dht->lock); -                                free(key); -                        } -                } +        log_info(KEY_FMT " Responding with %zd contacts", KEY_VAL(key), len); -                /* Remove stale entries and republish if necessary. */ -                list_for_each_safe(p, h, &dht->entries) { -                        struct list_head * p1; -                        struct list_head * h1; -                        struct dht_entry * e; -                        uint8_t *          key; -                        time_t             t_expire; -                        e = list_entry (p, struct dht_entry, next); -                        list_for_each_safe(p1, h1, &e->vals) { -                                struct val * v; -                                uint64_t     addr; -                                v = list_entry(p1, struct val, next); -                                if (now.tv_sec > v->t_exp) { -                                        list_del(&v->next); -                                        val_destroy(v); -                                        continue; -                                } - -                                if (now.tv_sec > v->t_rep) { -                                        key  = dht_dup_key(e->key, dht->b); -                                        addr = v->addr; -                                        t_expire = dht->t_expire = now.tv_sec; -                                        v->t_rep = now.tv_sec + dht->t_replic; -                                        pthread_rwlock_unlock(&dht->lock); -                                        kad_publish(dht, key, addr, t_expire); -                                        pthread_rwlock_wrlock(&dht->lock); -                                        free(key); -                                } -                        } -                } +        return rsp; + fail_msg: +        while (len-- > 0) +                dht_contact_msg__free_unpacked(contacts[len], NULL); +        free(contacts); + fail_contacts: +        return NULL; +} -                /* Check the requests list for unresponsive nodes. */ -                list_for_each_safe(p, h, &dht->requests) { -                        struct kad_req * r; -                        r = list_entry(p, struct kad_req, next); -                        if (now.tv_sec > r->t_exp) { -                                list_del(&r->next); -                                bmp_release(dht->cookies, r->cookie); -                                dht_dead_peer(dht, r->key, r->addr); -                                kad_req_destroy(r); -                        } -                } +static void dht_kv_process_node_rsp(dht_contact_msg_t ** contacts, +                                    size_t               len, +                                    struct list_head *   pl, +                                    enum dht_code        code) +{ +        struct timespec now; +        size_t          i; -                /* Refresh unaccessed buckets. */ -                bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); +        assert(contacts != NULL); +        assert(len > 0); +        assert(pl != NULL); +        assert(list_is_empty(pl)); -                pthread_rwlock_unlock(&dht->lock); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -                list_for_each_safe(p, h, &reflist) { -                        struct contact * c; -                        c = list_entry(p, struct contact, next); -                        lu = kad_lookup(dht, c->id, KAD_FIND_NODE); -                        if (lu != NULL) -                                lookup_destroy(lu); -                        list_del(&c->next); -                        contact_destroy(c); +        for (i = 0; i < len; i++) { +                dht_contact_msg_t * c = contacts[i]; +                struct peer_entry * e; +                if (c->addr == dht.addr) +                        continue; + +                if (dht_kv_update_contacts(c->id.data, c->addr) < 0) +                        log_warn(PEER_FMT " Failed to update contacts.", +                                 PEER_VAL(c->id.data, c->addr)); + +                e = malloc(sizeof(*e)); +                if (e == NULL) { +                        log_err(PEER_FMT " Failed to malloc entry.", +                                PEER_VAL(c->id.data, c->addr)); +                        continue;                  } -                sleep(intv); -        } +                e->id = dht_dup_key(c->id.data); +                if (e->id == NULL) { +                        log_warn(PEER_FMT " Failed to duplicate id.", +                                 PEER_VAL(c->id.data, c->addr)); +                        free(e); +                        continue; +                } -        return (void *) 0; +                e->cookie = generate_cookie(); +                e->code   = code; +                e->addr   = c->addr; +                e->t_sent = now.tv_sec; + +                list_add_tail(&e->next, pl); +        }  } -static int kad_handle_join_resp(struct dht *     dht, -                                struct kad_req * req, -                                kad_msg_t *      msg) +static dht_msg_t * do_dht_kv_find_value_req(const dht_find_req_msg_t * req)  { -        assert(dht); -        assert(req); -        assert(msg); +        dht_contact_msg_t ** contacts; +        ssize_t              n_contacts; +        buffer_t *           vals; +        ssize_t              n_vals; +        dht_msg_t *          rsp; +        uint8_t *            key; +        uint64_t             cookie; -        /* We might send version numbers later to warn of updates if needed. */ -        if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && -              msg->has_t_refresh && msg->has_t_replicate)) { -                log_warn("Join refused by remote."); -                return -1; +        assert(req != NULL); + +        key    = req->key.data; +        cookie = req->cookie; + +        n_contacts = dht_kv_get_contacts(key, &contacts); +        if (n_contacts < 0) { +                log_warn(KEY_FMT " Failed to get contacts.", KEY_VAL(key)); +                goto fail_contacts;          } -        if (msg->b < sizeof(uint64_t)) { -                log_err("Hash sizes less than 8 bytes unsupported."); -                return -1; +        assert(n_contacts > 0 || contacts == NULL); + +        n_vals = dht_kv_retrieve(key, &vals); +        if (n_vals < 0) { +                log_dbg(KEY_FMT " Failed to get values.", KEY_VAL(key)); +                goto fail_vals;          } -        pthread_rwlock_wrlock(&dht->lock); +        if (n_vals == 0) +                log_dbg(KEY_FMT " No values found.", KEY_VAL(key)); -        dht->buckets = bucket_create(); -        if (dht->buckets == NULL) { -                pthread_rwlock_unlock(&dht->lock); -                return -1; +        rsp = dht_kv_find_value_rsp_msg(key, cookie, &contacts, n_contacts, +                                        &vals, n_vals); +        if (rsp == NULL) { +                log_err(KEY_FMT " Failed to create %s msg.", +                        KEY_VAL(key), dht_code_str[DHT_FIND_VALUE_RSP]); +                goto fail_msg;          } -        /* Likely corrupt packet. The member will refuse, we might here too. */ -        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) -                log_warn("Different kademlia parameters detected."); +        log_info(KEY_FMT " Responding with %zd contacts, %zd values.", +                 KEY_VAL(req->key.data), n_contacts, n_vals); -        if (msg->t_replicate != KAD_T_REPL) -                log_warn("Different kademlia replication time detected."); +        return rsp; -        if (msg->t_refresh != KAD_T_REFR) -                log_warn("Different kademlia refresh time detected."); + fail_msg: +        freebufs(vals, n_vals); + fail_vals: +        while (n_contacts-- > 0) +                dht_contact_msg__free_unpacked(contacts[n_contacts], NULL); +        free(contacts); + fail_contacts: +        return NULL; +} -        dht->k        = msg->k; -        dht->b        = msg->b; -        dht->t_expire = msg->t_expire; -        dht->t_repub  = MAX(1, dht->t_expire - 10); +static void do_dht_kv_find_node_rsp(const dht_find_node_rsp_msg_t * rsp) +{ +        struct list_head pl; -        if (pthread_create(&dht->worker, NULL, work, dht)) { -                bucket_destroy(dht->buckets); -                pthread_rwlock_unlock(&dht->lock); -                return -1; -        } +        assert(rsp != NULL); + +        list_head_init(&pl); -        kad_req_respond(req); +        dht_kv_process_node_rsp(rsp->contacts, rsp->n_contacts, &pl, +                                DHT_FIND_NODE_REQ); -        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); +        if (list_is_empty(&pl)) +                goto no_contacts; -        pthread_rwlock_unlock(&dht->lock); +        if (dht_kv_update_req(rsp->key.data, &pl) < 0) { +                log_err(KEY_FMT " Failed to update request.", +                        KEY_VAL(rsp->key.data)); +                goto fail_update; +        } -        log_dbg("Enrollment of DHT completed."); +        dht_kv_query_contacts(rsp->key.data, &pl); -        return 0; +        return; + + fail_update: +        peer_list_destroy(&pl); + no_contacts: +        return;  } -static int kad_handle_find_resp(struct dht *     dht, -                                struct kad_req * req, -                                kad_msg_t *      msg) +static void do_dht_kv_find_value_rsp(const dht_find_node_rsp_msg_t  * node, +                                     const dht_find_value_rsp_msg_t * val)  { -        struct lookup * lu; +        struct list_head pl; +        uint8_t *        key; -        assert(dht); -        assert(req); -        assert(msg); +        assert(node != NULL); +        assert(val != NULL); -        pthread_rwlock_rdlock(&dht->lock); +        list_head_init(&pl); -        lu = dht_find_lookup(dht, req->cookie); -        if (lu == NULL) { -                pthread_rwlock_unlock(&dht->lock); -                return -1; +        key = node->key.data; + +        dht_kv_process_node_rsp(node->contacts, node->n_contacts, &pl, +                                DHT_FIND_VALUE_REQ); + +        if (val->n_values > 0) { +                log_dbg(KEY_FMT " %zd new values received.", +                        KEY_VAL(key), val->n_values); +                if (dht_kv_respond_req(key, val->values, val->n_values) < 0) +                        log_warn(KEY_FMT " Failed to respond to request.", +                                 KEY_VAL(key)); +                peer_list_destroy(&pl); +                return; /* done! */          } -        lookup_update(dht, lu, msg); +        if (list_is_empty(&pl)) +                goto no_contacts; + +        if (dht_kv_update_req(key, &pl) < 0) { +                log_err(KEY_FMT " Failed to update request.", KEY_VAL(key)); +                goto fail_update; +        } -        pthread_rwlock_unlock(&dht->lock); +        dht_kv_query_remote(key, NULL, &pl); -        return 0; +        return; + fail_update: +        peer_list_destroy(&pl); + no_contacts: +        return;  } -static void kad_handle_response(struct dht * dht, -                                kad_msg_t *  msg) +static dht_msg_t * dht_wait_for_dht_msg(void)  { -        struct kad_req * req; +        dht_msg_t *  msg; +        struct cmd * cmd; -        assert(dht); -        assert(msg); +        pthread_mutex_lock(&dht.cmds.mtx); -        pthread_rwlock_wrlock(&dht->lock); +        pthread_cleanup_push(__cleanup_mutex_unlock, &dht.cmds.mtx); -        req = dht_find_request(dht, msg); -        if (req == NULL) { -                pthread_rwlock_unlock(&dht->lock); +        while (list_is_empty(&dht.cmds.list)) +                pthread_cond_wait(&dht.cmds.cond, &dht.cmds.mtx); + +        cmd = list_last_entry(&dht.cmds.list, struct cmd, next); +        list_del(&cmd->next); + +        pthread_cleanup_pop(true); + +        msg = dht_msg__unpack(NULL, cmd->cbuf.len, cmd->cbuf.data); +        if (msg == NULL) +                log_warn("Failed to unpack DHT msg."); + +        freebuf(cmd->cbuf); +        free(cmd); + +        return msg; +} + +static void do_dht_msg(dht_msg_t * msg) +{ +        dht_msg_t * rsp = NULL; +        uint8_t *   id; +        uint64_t    addr; + +#ifdef DEBUG_PROTO_DHT +        dht_kv_debug_msg_rcv(msg); +#endif +        if (dht_kv_validate_msg(msg) == -EINVAL) { +                log_warn("%s Validation failed.", DHT_CODE(msg)); +                dht_msg__free_unpacked(msg, NULL);                  return;          } -        bmp_release(dht->cookies, req->cookie); -        list_del(&req->next); +        id =   msg->src->id.data; +        addr = msg->src->addr; + +        if (dht_kv_update_contacts(id, addr) < 0) +                log_warn(PEER_FMT " Failed to update contact from msg src.", +                         PEER_VAL(id, addr)); -        pthread_rwlock_unlock(&dht->lock); +        pthread_cleanup_push(__cleanup_dht_msg, msg); -        switch(req->code) { -        case KAD_JOIN: -                if (kad_handle_join_resp(dht, req, msg)) -                        log_err("Enrollment of DHT failed."); +        switch(msg->code) { +        case DHT_FIND_VALUE_REQ: +                rsp = do_dht_kv_find_value_req(msg->find);                  break; -        case KAD_FIND_VALUE: -        case KAD_FIND_NODE: -                if (dht_get_state(dht) != DHT_RUNNING) -                        break; -                kad_handle_find_resp(dht, req, msg); +        case DHT_FIND_NODE_REQ: +                rsp = do_dht_kv_find_node_req(msg->find);                  break; -        default: +        case DHT_STORE: +                do_dht_kv_store(msg->store); +                break; +        case DHT_FIND_NODE_RSP: +                do_dht_kv_find_node_rsp(msg->node); +                break; +        case DHT_FIND_VALUE_RSP: +                do_dht_kv_find_value_rsp(msg->node, msg->val);                  break; +        default: +                assert(false); /* already validated */          } -        kad_req_destroy(req); +        pthread_cleanup_pop(true); + +        if (rsp == NULL) +                return; + +        pthread_cleanup_push(__cleanup_dht_msg, rsp); + +        dht_send_msg(rsp, addr); + +        pthread_cleanup_pop(true); /* free rsp */  } -int dht_bootstrap(void * dir) +static void * dht_handle_packet(void * o)  { -        struct dht * dht; +        (void) o; -        dht = (struct dht *) dir; +        while (true) { +                dht_msg_t * msg; -        assert(dht); +                msg = dht_wait_for_dht_msg(); +                if (msg == NULL) +                        continue; -        pthread_rwlock_wrlock(&dht->lock); +                tpm_begin_work(dht.tpm); -        dht->id = create_id(dht->b); -        if (dht->id == NULL) -                goto fail_id; +                do_dht_msg(msg); -        dht->buckets = bucket_create(); -        if (dht->buckets == NULL) -                goto fail_buckets; +                tpm_end_work(dht.tpm); +        } -        dht->buckets->depth = 0; -        dht->buckets->mask  = 0; +        return (void *) 0; +}  #ifndef __DHT_TEST__ -        dht->b        = hash_len(ipcpi.dir_hash_algo); -#else -        dht->b        = DHT_TEST_KEY_LEN; +static void dht_post_packet(void *               comp, +                            struct shm_du_buff * sdb) +{ +        struct cmd * cmd; + +        (void) comp; + +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command malloc failed."); +                goto fail_cmd; +        } + +        cmd->cbuf.data = malloc(shm_du_buff_len(sdb)); +        if (cmd->cbuf.data == NULL) { +                log_err("Command buffer malloc failed."); +                goto fail_buf; +        } + +        cmd->cbuf.len = shm_du_buff_len(sdb); + +        memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len); + +        ipcp_sdb_release(sdb); + +        pthread_mutex_lock(&dht.cmds.mtx); + +        list_add(&cmd->next, &dht.cmds.list); + +        pthread_cond_signal(&dht.cmds.cond); + +        pthread_mutex_unlock(&dht.cmds.mtx); + +        return; + + fail_buf: +        free(cmd); + fail_cmd: +        ipcp_sdb_release(sdb); +        return; +}  #endif -        dht->t_expire = 86400; /* 1 day */ -        dht->t_repub  = dht->t_expire - 10; -        dht->k        = KAD_K; -        if (pthread_create(&dht->worker, NULL, work, dht)) -                goto fail_pthread_create; +int dht_reg(const uint8_t * key) +{ +        buffer_t val; -        dht->state = DHT_RUNNING; +        if (addr_to_buf(dht.addr, &val) < 0) { +                log_err("Failed to convert address to buffer."); +                goto fail_a2b; +        } -        dht_update_bucket(dht, dht->id, dht->addr); +        if (dht_kv_publish(key, val)) { +                log_err(KV_FMT " Failed to publish.", KV_VAL(key, val)); +                goto fail_publish; +        } -        pthread_rwlock_unlock(&dht->lock); +        freebuf(val);          return 0; - - fail_pthread_create: -        bucket_destroy(dht->buckets); -        dht->buckets = NULL; - fail_buckets: -        free(dht->id); -        dht->id = NULL; - fail_id: -        pthread_rwlock_unlock(&dht->lock); + fail_publish: +        freebuf(val); + fail_a2b:          return -1;  } -static struct ref_entry * ref_entry_get(struct dht *    dht, -                                        const uint8_t * key) +int dht_unreg(const uint8_t * key)  { -        struct list_head * p; +        buffer_t val; -        list_for_each(p, &dht->refs) { -                struct ref_entry * r = list_entry(p, struct ref_entry, next); -                if (!memcmp(key, r->key, dht-> b) ) -                        return r; +        if (addr_to_buf(dht.addr, &val) < 0) { +                log_err("Failed to convert address to buffer."); +                goto fail_a2b;          } -        return NULL; +        if (dht_kv_unpublish(key, val)) { +                log_err(KV_FMT " Failed to unpublish.", KV_VAL(key, val)); +                goto fail_unpublish; +        } + +        freebuf(val); + +        return 0; + fail_unpublish: +        freebuf(val); + fail_a2b: +        return -ENOMEM;  } -int dht_reg(void *          dir, -            const uint8_t * key) +uint64_t dht_query(const uint8_t * key)  { -        struct dht *       dht; -        struct ref_entry * e; -        uint64_t           addr; -        time_t             t_expire; +        buffer_t *       vals; +        ssize_t          n; +        uint64_t         addr; -        dht = (struct dht *) dir; +        n = dht_kv_retrieve(key, &vals); +        if (n < 0) { +                log_err(KEY_FMT " Failed to query db.", KEY_VAL(key)); +                goto fail_vals; +        } -        assert(dht); -        assert(key); -        assert(dht->addr != 0); +        if (n == 0) { +                assert(vals == NULL); -        if (dht_wait_running(dht)) -                return -1; +                log_dbg(KEY_FMT " No local values.", KEY_VAL(key)); +                n = dht_kv_query_remote(key, &vals, NULL); +                if (n < 0) { +                        log_warn(KEY_FMT " Failed to query DHT.", KEY_VAL(key)); +                        goto fail_vals; +                } +                if (n == 0) { +                        log_dbg(KEY_FMT " No values.", KEY_VAL(key)); +                        goto no_vals; +                } +        } -        pthread_rwlock_wrlock(&dht->lock); +        if (buf_to_addr(vals[0], &addr) < 0) { +                log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[0])); +                goto fail_b2a; +        } -        if (ref_entry_get(dht, key) != NULL) { -                log_dbg("Name already registered."); -                pthread_rwlock_unlock(&dht->lock); -                return 0; +        if (n > 1 && addr == INVALID_ADDR && buf_to_addr(vals[1], &addr) < 0) { +                log_err(VAL_FMT " Failed addr conversion.", VAL_VAL(vals[1])); +                goto fail_b2a;          } -        e = ref_entry_create(dht, key); +        freebufs(vals, n); + +        return addr; + fail_b2a: +        freebufs(vals, n); +        return INVALID_ADDR; + no_vals: +        free(vals); + fail_vals: +        return INVALID_ADDR; +} + +static int emergency_peer(struct list_head * pl) +{ +        struct peer_entry * e; +        struct timespec     now; + +        assert(pl != NULL); +        assert(list_is_empty(pl)); + +        if (dht.peer == INVALID_ADDR) +                return -1; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        e = malloc(sizeof(*e));          if (e == NULL) { -                pthread_rwlock_unlock(&dht->lock); -                return -ENOMEM; +                log_err("Failed to malloc emergency peer entry."); +                goto fail_malloc;          } -        list_add(&e->next, &dht->refs); - -        t_expire = dht->t_expire; -        addr = dht->addr; +        e->id = dht_dup_key(dht.id.data); +        if (e->id == NULL) { +                log_err("Failed to duplicate DHT ID for emergency peer."); +                goto fail_id; +        } -        pthread_rwlock_unlock(&dht->lock); +        e->addr   = dht.peer; +        e->cookie = dht.magic; +        e->code   = DHT_FIND_NODE_REQ; +        e->t_sent = now.tv_sec; -        kad_publish(dht, key, addr, t_expire); +        list_add_tail(&e->next, pl);          return 0; + fail_id: +        free(e); + fail_malloc: +        return -ENOMEM;  } -int dht_unreg(void *          dir, -              const uint8_t * key) +static int dht_kv_seed_bootstrap_peer(void)  { -        struct dht *       dht; -        struct list_head * p; -        struct list_head * h; +        struct list_head pl; -        dht = (struct dht *) dir; +        list_head_init(&pl); -        assert(dht); -        assert(key); +        if (dht.peer == INVALID_ADDR) { +                log_dbg("No-one to contact."); +                return 0; +        } -        if (dht_get_state(dht) != DHT_RUNNING) -                return -1; +        if (emergency_peer(&pl) < 0) { +                log_err("Could not create emergency peer."); +                goto fail_peer; +        } -        pthread_rwlock_wrlock(&dht->lock); +        log_dbg("Pinging emergency peer " ADDR_FMT32 ".", +                ADDR_VAL32(&dht.peer)); -        list_for_each_safe(p, h, &dht->refs) { -                struct ref_entry * r = list_entry(p, struct ref_entry, next); -                if (!memcmp(key, r->key, dht-> b) ) { -                        list_del(&r->next); -                        ref_entry_destroy(r); -                } +        if (dht_kv_query_contacts(dht.id.data, &pl) < 0) { +                log_warn("Failed to bootstrap peer."); +                goto fail_query;          } -        dht_del(dht, key, dht->addr); - -        pthread_rwlock_unlock(&dht->lock); +        peer_list_destroy(&pl);          return 0; + fail_query: +        peer_list_destroy(&pl); + fail_peer: +        return -EAGAIN;  } -uint64_t dht_query(void *          dir, -                   const uint8_t * key) +static void dht_kv_check_contacts(void)  { -        struct dht *       dht; -        struct dht_entry * e; -        struct lookup *    lu; -        uint64_t           addrs[KAD_K]; -        size_t             n; +        struct list_head cl; +        struct list_head pl; -        dht = (struct dht *) dir; +        list_head_init(&cl); -        assert(dht); +        dht_kv_contact_list(dht.id.data, &cl, dht.k); -        addrs[0] = 0; +        if (!list_is_empty(&cl)) +                goto success; -        if (dht_wait_running(dht)) -                return 0; +        contact_list_destroy(&cl); -        pthread_rwlock_rdlock(&dht->lock); +        list_head_init(&pl); -        e = dht_find_entry(dht, key); -        if (e != NULL) -                addrs[0] = dht_entry_get_addr(dht, e); +        if (dht.peer == INVALID_ADDR) { +                log_dbg("No-one to contact."); +                return; +        } -        pthread_rwlock_unlock(&dht->lock); +        if (emergency_peer(&pl) < 0) { +                log_err("Could not create emergency peer."); +                goto fail_peer; +        } -        if (addrs[0] != 0) -                return addrs[0]; +        log_dbg("No contacts found, using emergency peer " ADDR_FMT32 ".", +                ADDR_VAL32(&dht.peer)); -        lu = kad_lookup(dht, key, KAD_FIND_VALUE); -        if (lu == NULL) -                return 0; +        dht_kv_query_contacts(dht.id.data, &pl); -        n = lookup_get_addrs(lu, addrs); -        if (n == 0) { -                lookup_destroy(lu); -                return 0; +        peer_list_destroy(&pl); + +        return; + success: +        contact_list_destroy(&cl); +        return; + fail_peer: +        return; +} + +static void dht_kv_remove_expired_reqs(void) +{ +        struct list_head * p; +        struct list_head * h; +        struct timespec    now; + +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        pthread_mutex_lock(&dht.reqs.mtx); + +        list_for_each_safe(p, h, &dht.reqs.list) { +                struct dht_req * e; +                e = list_entry(p, struct dht_req, next); +                if (IS_EXPIRED(e, &now)) { +                        log_dbg(KEY_FMT " Removing expired request.", +                                KEY_VAL(e->key)); +                        list_del(&e->next); +                        dht_req_destroy(e); +                        --dht.reqs.len; +                }          } -        lookup_destroy(lu); +        pthread_mutex_unlock(&dht.reqs.mtx); +} -        /* Current behaviour is anycast and return the first peer address. */ -        if (addrs[0] != dht->addr) -                return addrs[0]; +static void value_list_destroy(struct list_head * vl) +{ +        struct list_head * p; +        struct list_head * h; -        if (n > 1) -                return addrs[1]; +        assert(vl != NULL); -        return 0; +        list_for_each_safe(p, h, vl) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                list_del(&v->next); +                val_entry_destroy(v); +        }  } -static void * dht_handle_packet(void * o) +#define MUST_REPLICATE(v, now) ((now)->tv_sec > (v)->t_repl + dht.t_repl) +#define MUST_REPUBLISH(v, now) /* Close to expiry deadline */ \ +        (((v)->t_exp - (now)->tv_sec) < (DHT_N_REPUB * dht.t_repl)) +static void dht_entry_get_repl_lists(const struct dht_entry * e, +                                     struct list_head *       repl, +                                     struct list_head *       rebl, +                                     struct timespec *        now)  { -        struct dht * dht = (struct dht *) o; +        struct list_head * p; +        struct val_entry * n; -        assert(dht); +        list_for_each(p, &e->vals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                if (MUST_REPLICATE(v, now) && !IS_EXPIRED(v, now)) { +                        n = val_entry_create(v->val, v->t_exp); +                        if (n == NULL) +                                continue; -        while (true) { -                kad_msg_t *          msg; -                kad_contact_msg_t ** cmsgs; -                kad_msg_t            resp_msg = KAD_MSG__INIT; -                uint64_t             addr; -                buffer_t             buf; -                size_t               i; -                size_t               b; -                size_t               t_expire; -                struct cmd *         cmd; +                        list_add_tail(&n->next, repl); +                } +        } -                pthread_mutex_lock(&dht->mtx); +        list_for_each(p, &e->lvals.list) { +                struct val_entry * v = list_entry(p, struct val_entry, next); +                if (MUST_REPLICATE(v, now) && MUST_REPUBLISH(v, now)) { +                        /* Add expire time here, to allow creating val_entry */ +                        n = val_entry_create(v->val, now->tv_sec + dht.t_expire); +                        if (n == NULL) +                                continue; -                pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); +                        list_add_tail(&n->next, rebl); +                } +        } +} -                while (list_is_empty(&dht->cmds)) -                        pthread_cond_wait(&dht->cond, &dht->mtx); +static int dht_kv_next_values(uint8_t *          key, +                              struct list_head * repl, +                              struct list_head * rebl) +{ +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        struct dht_entry * e = NULL; -                cmd = list_last_entry(&dht->cmds, struct cmd, next); -                list_del(&cmd->next); +        assert(key != NULL); +        assert(repl != NULL); +        assert(rebl != NULL); -                pthread_cleanup_pop(true); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -                i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); +        assert(list_is_empty(repl)); +        assert(list_is_empty(rebl)); -                msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -#ifndef __DHT_TEST__ -                ipcp_sdb_release(cmd->sdb); -#endif -                free(cmd); +        pthread_rwlock_rdlock(&dht.db.lock); -                if (msg == NULL) { -                        log_err("Failed to unpack message."); -                        continue; -                } +        if (dht.db.kv.len == 0) +                goto no_entries; -                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { -                        kad_msg__free_unpacked(msg, NULL); -                        log_dbg("Got a request message when not running."); -                        continue; -                } +        list_for_each_safe(p, h, &dht.db.kv.list) { +                e = list_entry(p, struct dht_entry, next); +                if (IS_CLOSER(e->key, key)) +                        continue;  /* Already processed */ +        } -                pthread_rwlock_rdlock(&dht->lock); +        if (e != NULL) { +                memcpy(key, e->key, dht.id.len); +                dht_entry_get_repl_lists(e, repl, rebl, &now); +        } + no_entries: +        pthread_rwlock_unlock(&dht.db.lock); -                b        = dht->b; -                t_expire = dht->t_expire; +        return list_is_empty(repl) && list_is_empty(rebl) ? -ENOENT : 0; +} -                pthread_rwlock_unlock(&dht->lock); +static void dht_kv_replicate_value(const uint8_t *         key, +                                   struct val_entry *      v, +                                   const struct timespec * now) +{ +        assert(MUST_REPLICATE(v, now)); -                if (msg->has_key && msg->key.len != b) { -                        kad_msg__free_unpacked(msg, NULL); -                        log_warn("Bad key in message."); -                        continue; -                } +        (void) now; -                if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { -                        kad_msg__free_unpacked(msg, NULL); -                        log_warn("Bad source ID in message of type %d.", -                                 msg->code); -                        continue; -                } +        if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { +                log_dbg(KV_FMT " Replicated.", KV_VAL(key, v->val)); +                return; +        } -                tpm_dec(dht->tpm); +        log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); -                addr = msg->s_addr; +        list_del(&v->next); +        val_entry_destroy(v); +} -                resp_msg.code   = KAD_RESPONSE; -                resp_msg.cookie = msg->cookie; +static void dht_kv_republish_value(const uint8_t *  key, +                            struct val_entry *      v, +                            const struct timespec * now) +{ +        assert(MUST_REPLICATE(v, now)); -                switch(msg->code) { -                case KAD_JOIN: -                        /* Refuse enrollee on check fails. */ -                        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { -                                log_warn("Parameter mismatch. " -                                         "DHT enrolment refused."); -                                break; -                        } +        if (MUST_REPUBLISH(v, now)) +                assert(v->t_exp >= now->tv_sec + dht.t_expire); -                        if (msg->t_replicate != KAD_T_REPL) { -                                log_warn("Replication time mismatch. " -                                         "DHT enrolment refused."); +        if (dht_kv_store_remote(key, v->val, v->t_exp) == 0) { +                log_dbg(KV_FMT " Republished.", KV_VAL(key, v->val)); +                return; +        } -                                break; -                        } +        if (MUST_REPUBLISH(v, now)) +                log_warn(KV_FMT " Republish failed.", KV_VAL(key, v->val)); +        else +                log_dbg(KV_FMT " Replication failed.", KV_VAL(key, v->val)); -                        if (msg->t_refresh != KAD_T_REFR) { -                                log_warn("Refresh time mismatch. " -                                         "DHT enrolment refused."); -                                break; -                        } +        list_del(&v->next); +        val_entry_destroy(v); +} -                        resp_msg.has_alpha       = true; -                        resp_msg.has_b           = true; -                        resp_msg.has_k           = true; -                        resp_msg.has_t_expire    = true; -                        resp_msg.has_t_refresh   = true; -                        resp_msg.has_t_replicate = true; -                        resp_msg.alpha           = KAD_ALPHA; -                        resp_msg.b               = b; -                        resp_msg.k               = KAD_K; -                        resp_msg.t_expire        = t_expire; -                        resp_msg.t_refresh       = KAD_T_REFR; -                        resp_msg.t_replicate     = KAD_T_REPL; -                        break; -                case KAD_FIND_VALUE: -                        buf = dht_retrieve(dht, msg->key.data); -                        if (buf.len != 0) { -                                resp_msg.n_addrs = buf.len; -                                resp_msg.addrs   = (uint64_t *) buf.data; -                                break; -                        } -                        /* FALLTHRU */ -                case KAD_FIND_NODE: -                        /* Return k closest contacts. */ -                        resp_msg.n_contacts = -                                dht_get_contacts(dht, msg->key.data, &cmsgs); -                        resp_msg.contacts = cmsgs; -                        break; -                case KAD_STORE: -                        if (msg->n_contacts < 1) { -                                log_warn("No contacts in store message."); -                                break; -                        } +static void dht_kv_update_replication_times(const uint8_t *         key, +                                            struct list_head *      repl, +                                            struct list_head *      rebl, +                                            const struct timespec * now) +{ +        struct dht_entry * e; +        struct list_head * p; +        struct list_head * h; +        struct val_entry * v; -                        if (!msg->has_t_expire) { -                                log_warn("No expiry time in store message."); -                                break; -                        } +        assert(key != NULL); +        assert(repl != NULL); +        assert(rebl != NULL); +        assert(now != NULL); -                        kad_add(dht, *msg->contacts, msg->n_contacts, -                                msg->t_expire); -                        break; -                case KAD_RESPONSE: -                        kad_handle_response(dht, msg); -                        break; -                default: -                        assert(false); -                        break; -                } +        pthread_rwlock_wrlock(&dht.db.lock); -                if (msg->code != KAD_JOIN) { -                        pthread_rwlock_wrlock(&dht->lock); -                        if (dht_get_state(dht) == DHT_JOINING && -                            dht->buckets == NULL) { -                                pthread_rwlock_unlock(&dht->lock); -                                goto finish; -                        } +        e = __dht_kv_find_entry(key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht.db.lock); +                return; +        } -                        if (dht_update_bucket(dht, msg->s_id.data, addr)) -                                log_warn("Failed to update bucket."); -                        pthread_rwlock_unlock(&dht->lock); +        list_for_each_safe(p, h, repl) { +                struct val_entry * x; +                v = list_entry(p, struct val_entry, next); +                x = dht_entry_get_val(e, v->val); +                if (x == NULL) { +                        log_err(KV_FMT " Not in vals.", KV_VAL(key, v->val)); +                        continue;                  } -                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) -                                log_warn("Failed to send response."); - - finish: -                kad_msg__free_unpacked(msg, NULL); +                x->t_repl = now->tv_sec; -                if (resp_msg.n_addrs > 0) -                        free(resp_msg.addrs); +                list_del(&v->next); +                val_entry_destroy(v); +        } -                if (resp_msg.n_contacts == 0) { -                        tpm_inc(dht->tpm); +        list_for_each_safe(p, h, rebl) { +                struct val_entry * x; +                v = list_entry(p, struct val_entry, next); +                x = dht_entry_get_lval(e, v->val); +                if (x == NULL) { +                        log_err(KV_FMT " Not in lvals.", KV_VAL(key, v->val));                          continue;                  } -                for (i = 0; i < resp_msg.n_contacts; ++i) -                        kad_contact_msg__free_unpacked(resp_msg.contacts[i], -                                                       NULL); -                free(resp_msg.contacts); +                x->t_repl = now->tv_sec; +                if (v->t_exp > x->t_exp) { +                        x->t_exp = v->t_exp; /* update expiration time */ +                } -                tpm_inc(dht->tpm); +                list_del(&v->next); +                val_entry_destroy(v);          } -        return (void *) 0; +        pthread_rwlock_unlock(&dht.db.lock);  } -static void dht_post_packet(void *               comp, -                            struct shm_du_buff * sdb) +static void __cleanup_value_list(void * o)  { -        struct cmd * cmd; -        struct dht * dht = (struct dht *) comp; +        return value_list_destroy((struct list_head *) o); +} -        if (dht_get_state(dht) == DHT_SHUTDOWN) { -#ifndef __DHT_TEST__ -                ipcp_sdb_release(sdb); -#endif -                return; +static void dht_kv_replicate_values(const uint8_t *    key, +                                    struct list_head * repl, +                                    struct list_head * rebl) +{ +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_cleanup_push(__cleanup_value_list, repl); +        pthread_cleanup_push(__cleanup_value_list, rebl); + +        list_for_each_safe(p, h, repl) { +                struct val_entry * v; +                v = list_entry(p, struct val_entry, next); +                dht_kv_replicate_value(key, v, &now);          } -        cmd = malloc(sizeof(*cmd)); -        if (cmd == NULL) { -                log_err("Command failed. Out of memory."); +        list_for_each_safe(p, h, rebl) { +                struct val_entry * v; +                v = list_entry(p, struct val_entry, next); +                dht_kv_republish_value(key, v, &now); +        } + +        pthread_cleanup_pop(false); +        pthread_cleanup_pop(false); + +        /* removes non-replicated items from the list */ +        dht_kv_update_replication_times(key, repl, rebl, &now); + +        if (list_is_empty(repl) && list_is_empty(rebl)) +                return; + +        log_warn(KEY_FMT " Failed to update replication times.", KEY_VAL(key)); +} + +static void dht_kv_replicate(void) +{ +        struct list_head repl; /* list of values to replicate       */ +        struct list_head rebl; /* list of local values to republish */ +        uint8_t *        key; + +        key = dht_dup_key(dht.id.data); /* dist == 0 */ +        if (key == NULL) { +                log_err("Replicate: Failed to duplicate DHT ID.");                  return;          } -        cmd->sdb = sdb; +        list_head_init(&repl); +        list_head_init(&rebl); -        pthread_mutex_lock(&dht->mtx); +        pthread_cleanup_push(free, key); -        list_add(&cmd->next, &dht->cmds); +        while (dht_kv_next_values(key, &repl, &rebl) == 0) { +                dht_kv_replicate_values(key, &repl, &rebl); +                if (!list_is_empty(&repl)) { +                        log_warn(KEY_FMT " Replication items left.", +                                 KEY_VAL(key)); +                        value_list_destroy(&repl); +                } -        pthread_cond_signal(&dht->cond); +                if (!list_is_empty(&rebl)) { +                        log_warn(KEY_FMT " Republish items left.", +                                 KEY_VAL(key)); +                        value_list_destroy(&rebl); +                } +        } -        pthread_mutex_unlock(&dht->mtx); +        pthread_cleanup_pop(true);  } -void dht_destroy(void * dir) +static void dht_kv_refresh_contacts(void)  { -        struct dht *       dht;          struct list_head * p;          struct list_head * h; +        struct list_head   rl; /* refresh list */ +        struct timespec    now; -        dht = (struct dht *) dir; -        if (dht == NULL) -                return; +        list_head_init(&rl); -#ifndef __DHT_TEST__ -        tpm_stop(dht->tpm); +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -        tpm_destroy(dht->tpm); -#endif -        if (dht_get_state(dht) == DHT_RUNNING) { -                dht_set_state(dht, DHT_SHUTDOWN); -                pthread_cancel(dht->worker); -                pthread_join(dht->worker, NULL); -        } +        pthread_rwlock_rdlock(&dht.db.lock); -        pthread_rwlock_wrlock(&dht->lock); +        __dht_kv_bucket_refresh_list(dht.db.contacts.root, now.tv_sec, &rl); -        list_for_each_safe(p, h, &dht->cmds) { -                struct cmd * c = list_entry(p, struct cmd, next); +        pthread_rwlock_unlock(&dht.db.lock); + +        list_for_each_safe(p, h, &rl) { +                struct contact * c; +                c = list_entry(p, struct contact, next); +                log_dbg(PEER_FMT " Refreshing contact.", +                        PEER_VAL(c->id, c->addr)); +                dht_kv_query_contacts(c->id, NULL);                  list_del(&c->next); -#ifndef __DHT_TEST__ -                ipcp_sdb_release(c->sdb); -#endif -                free(c); +                contact_destroy(c);          } -        list_for_each_safe(p, h, &dht->entries) { -                struct dht_entry * e = list_entry(p, struct dht_entry, next); -                list_del(&e->next); -                dht_entry_destroy(e); -        } +        assert(list_is_empty(&rl)); +} -        list_for_each_safe(p, h, &dht->requests) { -                struct kad_req * r = list_entry(p, struct kad_req, next); -                list_del(&r->next); -                kad_req_destroy(r); -        } +static void (*tasks[])(void) = { +        dht_kv_check_contacts, +        dht_kv_remove_expired_entries, +        dht_kv_remove_expired_reqs, +        dht_kv_replicate, +        dht_kv_refresh_contacts, +        NULL +}; -        list_for_each_safe(p, h, &dht->refs) { -                struct ref_entry * e = list_entry(p, struct ref_entry, next); -                list_del(&e->next); -                ref_entry_destroy(e); +static void * work(void * o) +{ +        struct timespec now = TIMESPEC_INIT_MS(1); +        time_t          intv; +        size_t          n; /* number of tasks */ + +        n = sizeof(tasks) / sizeof(tasks[0]) - 1; /* last is NULL */ + +        (void) o; + +        while (dht_kv_seed_bootstrap_peer() == -EAGAIN) { +                ts_add(&now, &now, &now); /* exponential backoff */ +                if (now.tv_sec > 1)       /* cap at 1 second     */ +                        now.tv_sec = 1; +                nanosleep(&now, NULL);          } -        list_for_each_safe(p, h, &dht->lookups) { -                struct lookup * l = list_entry(p, struct lookup, next); -                list_del(&l->next); -                lookup_destroy(l); +        intv = gcd(dht.t_expire, (dht.t_expire - DHT_N_REPUB * dht.t_repl)); +        intv = gcd(intv, gcd(dht.t_repl, dht.t_refresh)) / 2; +        intv = MAX(1, intv / n); + +        log_dbg("DHT worker starting %ld seconds interval.", intv * n); + +        while (true) { +                int i = 0; +                while (tasks[i] != NULL) { +                        tasks[i++](); +                        sleep(intv); +                }          } -        pthread_rwlock_unlock(&dht->lock); +        return (void *) 0; +} -        if (dht->buckets != NULL) -                bucket_destroy(dht->buckets); +int dht_start(void) +{ +        dht.state = DHT_RUNNING; -        bmp_destroy(dht->cookies); +        if (tpm_start(dht.tpm)) +                goto fail_tpm_start; -        pthread_mutex_destroy(&dht->mtx); +#ifndef __DHT_TEST__ +        if (pthread_create(&dht.worker, NULL, work, NULL)) { +                log_err("Failed to create DHT worker thread."); +                goto fail_worker; +        } -        pthread_rwlock_destroy(&dht->lock); +        dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT); +        if ((int) dht.eid < 0) { +                log_err("Failed to register DHT component."); +                goto fail_reg; +        } +#else +        (void) work; +#endif +        return 0; +#ifndef __DHT_TEST__ + fail_reg: +        pthread_cancel(dht.worker); +        pthread_join(dht.worker, NULL); + fail_worker: +        tpm_stop(dht.tpm); +#endif + fail_tpm_start: +        dht.state = DHT_INIT; +        return -1; +} + +void dht_stop(void) +{ +        assert(dht.state == DHT_RUNNING); -        free(dht->id); +#ifndef __DHT_TEST__ +        dt_unreg_comp(dht.eid); + +        pthread_cancel(dht.worker); +        pthread_join(dht.worker, NULL); +#endif +        tpm_stop(dht.tpm); -        free(dht); +        dht.state = DHT_INIT;  } -static void * join_thr(void * o) +int dht_init(struct dir_dht_config * conf)  { -        struct join_info * info = (struct join_info *) o; -        struct lookup *    lu; -        size_t             retr = 0; +        struct timespec now; +        pthread_condattr_t cattr; -        assert(info); +        assert(conf != NULL); -        while (kad_join(info->dht, info->addr)) { -                if (dht_get_state(info->dht) == DHT_SHUTDOWN) { -                        log_dbg("DHT enrollment aborted."); -                        goto finish; -                } +        clock_gettime(CLOCK_REALTIME_COARSE, &now); -                if (retr++ == KAD_JOIN_RETR) { -                        dht_set_state(info->dht, DHT_INIT); -                        log_warn("DHT enrollment attempt failed."); -                        goto finish; -                } +#ifndef __DHT_TEST__ +        dht.id.len    = ipcp_dir_hash_len(); +        dht.addr      = addr_auth_address(); +#else +        dht.id.len    = DHT_TEST_KEY_LEN; +        dht.addr      = DHT_TEST_ADDR; +#endif +        dht.t0        = now.tv_sec; +        dht.alpha     = conf->params.alpha; +        dht.k         = conf->params.k; +        dht.t_expire  = conf->params.t_expire; +        dht.t_refresh = conf->params.t_refresh; +        dht.t_repl    = conf->params.t_replicate; +        dht.peer      = conf->peer; + +        dht.magic = generate_cookie(); + +        /* Send my address on enrollment */ +        conf->peer    = dht.addr; + +        dht.id.data = generate_id(); +        if (dht.id.data == NULL) { +                log_err("Failed to create DHT ID."); +                goto fail_id; +        } + +        list_head_init(&dht.cmds.list); -                sleep(KAD_JOIN_INTV); +        if (pthread_mutex_init(&dht.cmds.mtx, NULL)) { +                log_err("Failed to initialize command mutex."); +                goto fail_cmds_mutex;          } -        dht_set_state(info->dht, DHT_RUNNING); +        if (pthread_cond_init(&dht.cmds.cond, NULL)) { +                log_err("Failed to initialize command condvar."); +                goto fail_cmds_cond; +        } -        lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); -        if (lu != NULL) -                lookup_destroy(lu); +        list_head_init(&dht.reqs.list); +        dht.reqs.len = 0; - finish: -        free(info); +        if (pthread_mutex_init(&dht.reqs.mtx, NULL)) { +                log_err("Failed to initialize request mutex."); +                goto fail_reqs_mutex; +        } -        return (void *) 0; -} +        if (pthread_condattr_init(&cattr)) { +                log_err("Failed to initialize request condvar attributes."); +                goto fail_cattr; +        } +#ifndef __APPLE__ +        if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) { +                log_err("Failed to set request condvar clock."); +                goto fail_cattr; +        } +#endif +        if (pthread_cond_init(&dht.reqs.cond, &cattr)) { +                log_err("Failed to initialize request condvar."); +                goto fail_reqs_cond; +        } -static void handle_event(void *       self, -                         int          event, -                         const void * o) -{ -        struct dht * dht = (struct dht *) self; +        list_head_init(&dht.db.kv.list); +        dht.db.kv.len   = 0; +        dht.db.kv.vals  = 0; +        dht.db.kv.lvals = 0; -        if (event == NOTIFY_DT_CONN_ADD) { -                pthread_t          thr; -                struct join_info * inf; -                struct conn *      c     = (struct conn *) o; -                struct timespec    slack = {0, DHT_ENROLL_SLACK * MILLION}; +        if (pthread_rwlock_init(&dht.db.lock, NULL)) { +                log_err("Failed to initialize store rwlock."); +                goto fail_rwlock; +        } -                /* Give the pff some time to update for the new link. */ -                nanosleep(&slack, NULL); +        dht.db.contacts.root = bucket_create(); +        if (dht.db.contacts.root == NULL) { +                log_err("Failed to create DHT buckets."); +                goto fail_buckets; +        } -                switch(dht_get_state(dht)) { -                case DHT_INIT: -                        inf = malloc(sizeof(*inf)); -                        if (inf == NULL) -                                break; +        if (rib_reg(DHT, &r_ops) < 0) { +                log_err("Failed to register DHT RIB operations."); +                goto fail_rib_reg; +        } -                        inf->dht  = dht; -                        inf->addr = c->conn_info.addr; - -                        if (dht_set_state(dht, DHT_JOINING) == 0 || -                            dht_wait_running(dht)) { -                                if (pthread_create(&thr, NULL, join_thr, inf)) { -                                        dht_set_state(dht, DHT_INIT); -                                        free(inf); -                                        return; -                                } -                                pthread_detach(thr); -                        } else { -                                free(inf); -                        } -                        break; -                case DHT_RUNNING: -                        /* -                         * FIXME: this lookup for effiency reasons -                         * causes a SEGV when stressed with rapid -                         * enrollments. -                         * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); -                         * if (lu != NULL) -                         *         lookup_destroy(lu); -                         */ -                        break; -                default: -                        break; -                } +        dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); +        if (dht.tpm == NULL) { +                log_err("Failed to create TPM for DHT."); +                goto fail_tpm_create;          } + +        if (dht_kv_update_contacts(dht.id.data, dht.addr) < 0) +                log_warn("Failed to update contacts with DHT ID."); + +        pthread_condattr_destroy(&cattr); +#ifndef __DHT_TEST__ +        log_info("DHT initialized."); +        log_dbg("  ID: " HASH_FMT64 " [%zu bytes].", +                HASH_VAL64(dht.id.data), dht.id.len); +        log_dbg("  address: " ADDR_FMT32 ".", ADDR_VAL32(&dht.addr)); +        log_dbg("  peer: " ADDR_FMT32 ".", ADDR_VAL32(&dht.peer)); +        log_dbg("  magic cookie: " HASH_FMT64 ".", HASH_VAL64(&dht.magic)); +        log_info("  parameters: alpha=%u, k=%zu, t_expire=%ld, " +                "t_refresh=%ld, t_replicate=%ld.", +                dht.alpha, dht.k, dht.t_expire, dht.t_refresh, dht.t_repl); +#endif +        dht.state = DHT_INIT; + +        return 0; + + fail_tpm_create: +        rib_unreg(DHT); + fail_rib_reg: +        bucket_destroy(dht.db.contacts.root); + fail_buckets: +        pthread_rwlock_destroy(&dht.db.lock); + fail_rwlock: +        pthread_cond_destroy(&dht.reqs.cond); + fail_reqs_cond: +        pthread_condattr_destroy(&cattr); + fail_cattr: +        pthread_mutex_destroy(&dht.reqs.mtx); + fail_reqs_mutex: +        pthread_cond_destroy(&dht.cmds.cond); + fail_cmds_cond: +        pthread_mutex_destroy(&dht.cmds.mtx); + fail_cmds_mutex: +        freebuf(dht.id); + fail_id: +        return -1;  } -void * dht_create(void) +void dht_fini(void)  { -        struct dht * dht; +        struct list_head * p; +        struct list_head * h; -        dht = malloc(sizeof(*dht)); -        if (dht == NULL) -                goto fail_malloc; +        rib_unreg(DHT); -        dht->buckets = NULL; +        tpm_destroy(dht.tpm); -        list_head_init(&dht->entries); -        list_head_init(&dht->requests); -        list_head_init(&dht->refs); -        list_head_init(&dht->lookups); -        list_head_init(&dht->cmds); +        pthread_mutex_lock(&dht.cmds.mtx); -        if (pthread_rwlock_init(&dht->lock, NULL)) -                goto fail_rwlock; +        list_for_each_safe(p, h, &dht.cmds.list) { +                struct cmd * c = list_entry(p, struct cmd, next); +                list_del(&c->next); +                freebuf(c->cbuf); +                free(c); +        } -        if (pthread_mutex_init(&dht->mtx, NULL)) -                goto fail_mutex; +        pthread_mutex_unlock(&dht.cmds.mtx); -        if (pthread_cond_init(&dht->cond, NULL)) -                goto fail_cond; +        pthread_cond_destroy(&dht.cmds.cond); +        pthread_mutex_destroy(&dht.cmds.mtx); -        dht->cookies = bmp_create(DHT_MAX_REQS, 1); -        if (dht->cookies == NULL) -                goto fail_bmp; +        pthread_mutex_lock(&dht.reqs.mtx); -        dht->b    = 0; -        dht->id   = NULL; -#ifndef __DHT_TEST__ -        dht->addr = ipcpi.dt_addr; -        dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); -        if (dht->tpm == NULL) -                goto fail_tpm_create; +        list_for_each_safe(p, h, &dht.reqs.list) { +                struct dht_req * r = list_entry(p, struct dht_req, next); +                list_del(&r->next); +                dht_req_destroy(r); +                dht.reqs.len--; +        } -        if (tpm_start(dht->tpm)) -                goto fail_tpm_start; +        pthread_mutex_unlock(&dht.reqs.mtx); -        dht->eid   = dt_reg_comp(dht, &dht_post_packet, DHT); -        if ((int) dht->eid < 0) -                goto fail_tpm_start; +        pthread_cond_destroy(&dht.reqs.cond); +        pthread_mutex_destroy(&dht.reqs.mtx); -        notifier_reg(handle_event, dht); -#else -        (void) handle_event; -        (void) dht_handle_packet; -        (void) dht_post_packet; -#endif -        dht->state = DHT_INIT; +        pthread_rwlock_wrlock(&dht.db.lock); -        return (void *) dht; -#ifndef __DHT_TEST__ - fail_tpm_start: -        tpm_destroy(dht->tpm); - fail_tpm_create: -        bmp_destroy(dht->cookies); -#endif - fail_bmp: -        pthread_cond_destroy(&dht->cond); - fail_cond: -        pthread_mutex_destroy(&dht->mtx); - fail_mutex: -        pthread_rwlock_destroy(&dht->lock); - fail_rwlock: -        free(dht); - fail_malloc: -        return NULL; +        list_for_each_safe(p, h, &dht.db.kv.list) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                list_del(&e->next); +                dht_entry_destroy(e); +                dht.db.kv.len--; +        } + +        if (dht.db.contacts.root != NULL) +                bucket_destroy(dht.db.contacts.root); + +        pthread_rwlock_unlock(&dht.db.lock); + +        pthread_rwlock_destroy(&dht.db.lock); + +        assert(dht.db.kv.len == 0); +        assert(dht.db.kv.vals == 0); +        assert(dht.db.kv.lvals == 0); +        assert(dht.reqs.len == 0); + +        freebuf(dht.id);  } diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h index a6e9c2c8..852a5130 100644 --- a/src/ipcpd/unicast/dir/dht.h +++ b/src/ipcpd/unicast/dir/dht.h @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024   *   * Distributed Hash Table based on Kademlia   * @@ -30,22 +30,19 @@  #include <stdint.h>  #include <sys/types.h> -void *   dht_create(void); +int      dht_init(struct dir_dht_config * conf); -void     dht_destroy(void * dir); +void     dht_fini(void); -int      dht_bootstrap(void * dir); +int      dht_start(void); -int      dht_reg(void *          dir, -                 const uint8_t * key); +void     dht_stop(void); -int      dht_unreg(void *          dir, -                   const uint8_t * key); +int      dht_reg(const uint8_t * key); -uint64_t dht_query(void *          dir, -                   const uint8_t * key); +int      dht_unreg(const uint8_t * key); -int      dht_wait_running(void * dir); +uint64_t dht_query(const uint8_t * key);  extern struct dir_ops dht_dir_ops; diff --git a/src/ipcpd/unicast/dir/dht.proto b/src/ipcpd/unicast/dir/dht.proto new file mode 100644 index 00000000..ea74805f --- /dev/null +++ b/src/ipcpd/unicast/dir/dht.proto @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * DHT protocol, based on Kademlia + * + *    Dimitri Staessens <dimitri@ouroboros.rocks> + *    Sander Vrijders   <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message dht_contact_msg { +        required bytes  id   = 1; +        required uint64 addr = 2; +} + +message dht_find_req_msg { +        required uint64 cookie = 1; +        required bytes key     = 2; +} + +message dht_find_node_rsp_msg { +        required uint64          cookie   = 1; +        required bytes           key      = 2; +        repeated dht_contact_msg contacts = 3; +} + +message dht_find_value_rsp_msg { +        repeated bytes values = 1; +} + +message dht_store_msg { +        required bytes  key = 1; +        required bytes  val = 2; +        required uint32 exp = 3; +} + +message dht_msg { +        required uint32                 code  = 1; +        required dht_contact_msg        src   = 2; +        optional dht_store_msg          store = 3; +        optional dht_find_req_msg       find  = 4; +        optional dht_find_node_rsp_msg  node  = 5; +        optional dht_find_value_rsp_msg val   = 6; +} diff --git a/src/ipcpd/unicast/dir/kademlia.proto b/src/ipcpd/unicast/dir/kademlia.proto deleted file mode 100644 index 58f5e787..00000000 --- a/src/ipcpd/unicast/dir/kademlia.proto +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2021 - * - * KAD protocol - * - *    Dimitri Staessens <dimitri@ouroboros.rocks> - *    Sander Vrijders   <sander@ouroboros.rocks> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -syntax = "proto2"; - -message kad_contact_msg { -        required bytes  id   = 1; -        required uint64 addr = 2; -}; - -message kad_msg { -        required uint32 code              =  1; -        required uint32 cookie            =  2; -        required uint64 s_addr            =  3; -        optional bytes  s_id              =  4; -        optional bytes  key               =  5; -        repeated uint64 addrs             =  6; -        repeated kad_contact_msg contacts =  7; -        // enrolment parameters -        optional uint32 alpha             =  8; -        optional uint32 b                 =  9; -        optional uint32 k                 = 10; -        optional uint32 t_expire          = 11; -        optional uint32 t_refresh         = 12; -        optional uint32 t_replicate       = 13; -};
\ No newline at end of file diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h index e74324da..8c6e5eb5 100644 --- a/src/ipcpd/unicast/dir/ops.h +++ b/src/ipcpd/unicast/dir/ops.h @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024   *   * Directory policy ops   * @@ -23,24 +23,20 @@  #ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H  #define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H -  struct dir_ops { -        void *   (* create)(void); +        int      (* init)(void * config); -        void     (* destroy)(void * dir); +        void     (* fini)(void); -        int      (* bootstrap)(void * dir); +        int      (* start)(void); -        int      (* reg)(void * dir, -                         const uint8_t * hash); +        void     (* stop)(void); -        int      (* unreg)(void * dir, -                           const uint8_t * hash); +        int      (* reg)(const uint8_t * hash); -        uint64_t (* query)(void * dir, -                           const uint8_t * hash); +        int      (* unreg)(const uint8_t * hash); -        int      (* wait_running)(void * dir); +        uint64_t (* query)(const uint8_t * hash);  };  #endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */ diff --git a/src/ipcpd/unicast/dir/pol.h b/src/ipcpd/unicast/dir/pol.h index 3aa2d59f..eae4b2e7 100644 --- a/src/ipcpd/unicast/dir/pol.h +++ b/src/ipcpd/unicast/dir/pol.h @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024   *   * Directory policies   * diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt index 482711d5..f62ed993 100644 --- a/src/ipcpd/unicast/dir/tests/CMakeLists.txt +++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt @@ -20,16 +20,19 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c    dht_test.c    ) -protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS ../kademlia.proto) - +protobuf_generate_c(DHT_PROTO_SRCS KAD_PROTO_HDRS ../dht.proto)  add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} -  ${KAD_PROTO_SRCS}) +  ${DHT_PROTO_SRCS})  target_link_libraries(${PARENT_DIR}_test ouroboros-common)  add_dependencies(check ${PARENT_DIR}_test)  set(tests_to_run ${${PARENT_DIR}_tests}) -remove(tests_to_run test_suite.c) +if(CMAKE_VERSION VERSION_LESS "3.29.0") +  remove(tests_to_run test_suite.c) +else () +  list(POP_FRONT tests_to_run) +endif()  foreach (test ${tests_to_run})    get_filename_component(test_name ${test} NAME_WE) diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c index 3f4c3b87..cb6b0f9f 100644 --- a/src/ipcpd/unicast/dir/tests/dht_test.c +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -1,10 +1,9 @@  /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024   *   * Unit tests of the DHT   *   *    Dimitri Staessens <dimitri@ouroboros.rocks> - *    Sander Vrijders   <sander@ouroboros.rocks>   *   * This program is free software; you can redistribute it and/or modify   * it under the terms of the GNU General Public License version 2 as @@ -21,76 +20,1906 @@   */  #define __DHT_TEST__ -#define DHT_TEST_KEY_LEN  32 -#include "dht.c" +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include <ouroboros/test.h> +#include <ouroboros/list.h> +#include <ouroboros/utils.h> -#include <pthread.h> +#include "dht.pb-c.h" + +#include <assert.h> +#include <inttypes.h>  #include <time.h>  #include <stdlib.h>  #include <stdio.h> -#define CONTACTS          1000 +#define DHT_MAX_RAND_SIZE 64 +#define DHT_TEST_KEY_LEN  32 +#define DHT_TEST_ADDR     0x1234567890abcdefULL -int dht_test(int     argc, -             char ** argv) +/* forward declare for use in the dht code */ +/* Packet sink for DHT tests */ +struct { +        bool   enabled; + +        struct list_head list; +        size_t len; +} sink; + +struct message { +        struct   list_head next; +        void *   msg; +        uint64_t dst; +}; + +static int sink_send_msg(buffer_t * pkt, +                         uint64_t  addr)  { -        struct dht * dht; -        uint8_t      key[DHT_TEST_KEY_LEN]; -        size_t       i; +        struct message *   m; -        (void) argc; -        (void) argv; +        assert(pkt  != NULL); +        assert(addr != 0); + +        assert(!list_is_empty(&sink.list) || sink.len == 0); + +        if (!sink.enabled) +                goto finish; + +        m = malloc(sizeof(*m)); +        if (m == NULL) { +                printf("Failed to malloc message."); +                goto fail_malloc; +        } + +        m->msg = dht_msg__unpack(NULL, pkt->len, pkt->data); +        if (m->msg == NULL) +                goto fail_unpack; + +        m->dst = addr; + +        list_add_tail(&m->next, &sink.list); + +        ++sink.len; + finish: +        freebuf(*pkt); + +        return 0; + fail_unpack: +        free(m); + fail_malloc: +        freebuf(*pkt); +        return -1; +} + +#include "dht.c" + +/* Test helpers */ + +static void sink_init(void) +{ +        list_head_init(&sink.list); +        sink.len = 0; +        sink.enabled = true; +} + +static void sink_clear(void) +{ +        struct list_head * p; +        struct list_head * h; + +        list_for_each_safe(p, h, &sink.list) { +                struct message * m = list_entry(p, struct message, next); +                list_del(&m->next); +                dht_msg__free_unpacked((dht_msg_t *) m->msg, NULL); +                free(m); +                --sink.len; +        } + +        assert(list_is_empty(&sink.list)); +} + +static void sink_fini(void) +{ +        sink_clear(); + +        assert(list_is_empty(&sink.list) || sink.len != 0); +} + +static dht_msg_t * sink_read(void) +{ +        struct message * m; +        dht_msg_t *      msg; + +        assert(!list_is_empty(&sink.list) || sink.len == 0); + +        if (list_is_empty(&sink.list)) +                return NULL; + +        m = list_first_entry(&sink.list, struct message, next); + +        --sink.len; + +        list_del(&m->next); + +        msg = m->msg; + +        free(m); + +        return (dht_msg_t *) msg; +} + +static const buffer_t test_val = { +        .data = (uint8_t *) "test_value", +        .len = 10 +}; + +static const buffer_t test_val2 = { +        .data = (uint8_t *) "test_value_2", +        .len = 12 +}; + +static int random_value_len(buffer_t * b) +{ +        assert(b != NULL); +        assert(b->len > 0 && b->len <= DHT_MAX_RAND_SIZE); + +        b->data = malloc(b->len); +        if (b->data == NULL) +                goto fail_malloc; + +        random_buffer(b->data, b->len); + +        return 0; + + fail_malloc: +        return -ENOMEM; +} + +static int random_value(buffer_t * b) +{ +        assert(b != NULL); + +        b->len = rand() % DHT_MAX_RAND_SIZE + 1; + +        return random_value_len(b); +} + +static int fill_dht_with_contacts(size_t n) +{ +        size_t    i; +        uint8_t * id; + +        for (i = 0; i < n; i++) { +                uint64_t addr = generate_cookie(); +                id = generate_id(); +                if (id == NULL) +                        goto fail_id; + +                if (dht_kv_update_contacts(id, addr) < 0) +                        goto fail_update; +                free(id); +        } + +        return 0; + + fail_update: +        free(id); + fail_id: +        return -1; +} + +static int fill_store_with_random_values(const uint8_t * key, +                                         size_t          len, +                                         size_t          n_values) +{ +        buffer_t        val; +        struct timespec now; +        size_t          i; +        uint8_t *       _key; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        for (i = 0; i < n_values; ++i) { +                if (key != NULL) +                        _key = (uint8_t *) key; +                else { +                        _key = generate_id(); +                        if (_key == NULL) +                                goto fail_key; +                } + +                if (len == 0) +                        val.len = rand() % DHT_MAX_RAND_SIZE + 1; +                else +                        val.len = len; + +                if (random_value_len(&val) < 0) +                        goto fail_value; + +                if (dht_kv_store(_key, val, now.tv_sec + 10) < 0) +                        goto fail_store; + +                freebuf(val); +                if (key == NULL) +                        free(_key); +        } + +        return 0; -        dht = dht_create(); -        if (dht == NULL) { + fail_store: +        freebuf(val); + fail_value: +        free(_key); + fail_key: +        return -1; +} + +static int random_contact_list(dht_contact_msg_t *** contacts, +                               size_t                max) +{ +        size_t i; + +        assert(contacts != NULL); +        assert(max > 0); + +        *contacts = malloc(max * sizeof(**contacts)); +        if (*contacts == NULL) +                goto fail_malloc; + +        for (i = 0; i < max; i++) { +                (*contacts)[i] = malloc(sizeof(*(*contacts)[i])); +                if ((*contacts)[i] == NULL) +                        goto fail_contacts; + +                dht_contact_msg__init((*contacts)[i]); + +                (*contacts)[i]->id.data = generate_id(); +                if ((*contacts)[i]->id.data == NULL) +                        goto fail_contact; + +                (*contacts)[i]->id.len = dht.id.len; +                (*contacts)[i]->addr = generate_cookie(); +        } + +        return 0; + + fail_contact: +        dht_contact_msg__free_unpacked((*contacts)[i], NULL); + fail_contacts: +        while (i-- > 0) +                free((*contacts)[i]); +        free(*contacts); + fail_malloc: +        return -ENOMEM; +} + +static void clear_contacts(dht_contact_msg_t ** contacts, +                           size_t               len) +{ +        size_t i; + +        assert(contacts != NULL); +        if (*contacts == NULL) +                return; + +        for (i = 0; i < len; ++i) +                dht_contact_msg__free_unpacked((contacts)[i], NULL); + +        free(*contacts); +        *contacts = NULL; +} + +/* Start of actual tests */ +static struct dir_dht_config test_dht_config = { +        .params = { +                .alpha       = 3, +                .k           = 8, +                .t_expire    = 86400, +                .t_refresh   = 900, +                .t_replicate = 900 +        } +}; + +static int test_dht_init_fini(void) +{ +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_start_stop(void) +{ +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (dht_start() < 0) { +                printf("Failed to start dht.\n"); +                goto fail_start; +        } + +        dht_stop(); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_start: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_val_entry_create_destroy(void) +{ +        struct val_entry * e; +        struct timespec    now; + +        TEST_START(); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        if (dht_init(&test_dht_config) < 0) {                  printf("Failed to create dht.\n"); -                return -1; +                goto fail_init;          } -        dht_destroy(dht); +        e = val_entry_create(test_val, now.tv_sec + 10); +        if (e == NULL) { +                printf("Failed to create val entry.\n"); +                goto fail_entry; +        } + +        val_entry_destroy(e); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; -        dht = dht_create(); -        if (dht == NULL) { -                printf("Failed to re-create dht.\n"); -                return -1; + fail_entry: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_entry_create_destroy(void) +{ +        struct dht_entry * e; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init;          } -        if (dht_bootstrap(dht)) { -                printf("Failed to bootstrap dht.\n"); -                dht_destroy(dht); -                return -1; +        e = dht_entry_create(dht.id.data); +        if (e == NULL) { +                printf("Failed to create dht entry.\n"); +                goto fail_entry;          } -        dht_destroy(dht); +        dht_entry_destroy(e); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_entry: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_entry_update_get_val(void) +{ +        struct dht_entry * e; +        struct val_entry * v; +        struct timespec    now; + +        TEST_START(); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        e = dht_entry_create(dht.id.data); +        if (e == NULL) { +                printf("Failed to create dht entry.\n"); +                goto fail_entry; +        } + +        if (dht_entry_get_val(e, test_val) != NULL) { +                printf("Found value in empty dht entry.\n"); +                goto fail_get; +        } + +        if (dht_entry_update_val(e, test_val, now.tv_sec + 10) < 0) { +                printf("Failed to update dht entry value.\n"); +                goto fail_get; +        } + +        if (dht_entry_get_val(e, test_val2) != NULL) { +                printf("Found value in dht entry with different key.\n"); +                goto fail_get; +        } + +        v = dht_entry_get_val(e, test_val); +        if (v == NULL) { +                printf("Failed to get value from dht entry.\n"); +                goto fail_get; +        } + +        if (v->val.len != test_val.len) { +                printf("Length in dht entry does not match expected.\n"); +                goto fail_get; +        } + +        if(memcmp(v->val.data, test_val.data, test_val.len) != 0) { +                printf("Data in dht entry does not match expected.\n"); +                goto fail_get; +        } -        dht = dht_create(); -        if (dht == NULL) { -                printf("Failed to re-create dht.\n"); -                return -1; +        if (dht_entry_update_val(e, test_val, now.tv_sec + 15) < 0) { +                printf("Failed to update exsting dht entry value.\n"); +                goto fail_get;          } -        if (dht_bootstrap(dht)) { -                printf("Failed to bootstrap dht.\n"); -                dht_destroy(dht); -                return -1; +        if (v->t_exp != now.tv_sec + 15) { +                printf("Expiration time in dht entry value not updated.\n"); +                goto fail_get;          } -        for (i = 0; i < CONTACTS; ++i) { -                uint64_t addr; -                random_buffer(&addr, sizeof(addr)); -                random_buffer(key, DHT_TEST_KEY_LEN); -                pthread_rwlock_wrlock(&dht->lock); -                if (dht_update_bucket(dht, key, addr)) { -                        pthread_rwlock_unlock(&dht->lock); -                        printf("Failed to update bucket.\n"); -                        dht_destroy(dht); -                        return -1; +        if (dht_entry_update_val(e, test_val, now.tv_sec + 5) < 0) { +                printf("Failed to update existing dht entry value (5).\n"); +                goto fail_get; +        } + +        if (v->t_exp != now.tv_sec + 15) { +                printf("Expiration time in dht entry shortened.\n"); +                goto fail_get; +        } + +        if (dht_entry_get_val(e, test_val) != v) { +                printf("Wrong value in dht entry found after update.\n"); +                goto fail_get; +        } + +        dht_entry_destroy(e); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_get: +        dht_entry_destroy(e); + fail_entry: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_entry_update_get_lval(void) +{ +        struct dht_entry * e; +        struct val_entry * v; +        struct timespec    now; + +        TEST_START(); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        e = dht_entry_create(dht.id.data); +        if (e == NULL) { +                printf("Failed to create dht entry.\n"); +                goto fail_entry; +        } + +        if (dht_entry_get_lval(e, test_val) != NULL) { +                printf("Found value in empty dht entry.\n"); +                goto fail_get; +        } + +        if (dht_entry_update_lval(e, test_val) < 0) { +                printf("Failed to update dht entry value.\n"); +                goto fail_get; +        } + +        v = dht_entry_get_lval(e, test_val); +        if (v== NULL) { +                printf("Failed to get value from dht entry.\n"); +                goto fail_get; +        } + +        if (dht_entry_get_lval(e, test_val2) != NULL) { +                printf("Found value in dht entry in vals.\n"); +                goto fail_get; +        } + +        if (v->val.len != test_val.len) { +                printf("Length in dht entry does not match expected.\n"); +                goto fail_get; +        } + +        if(memcmp(v->val.data, test_val.data, test_val.len) != 0) { +                printf("Data in dht entry does not match expected.\n"); +                goto fail_get; +        } + +        if (dht_entry_update_lval(e, test_val) < 0) { +                printf("Failed to update existing dht entry value.\n"); +                goto fail_get; +        } + +        if (dht_entry_get_lval(e, test_val) != v) { +                printf("Wrong value in dht entry found after update.\n"); +                goto fail_get; +        } + +        dht_entry_destroy(e); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_get: +        dht_entry_destroy(e); + fail_entry: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_contact_create_destroy(void) +{ +        struct contact * c; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        c = contact_create(dht.id.data, dht.addr); +        if (c == NULL) { +                printf("Failed to create contact.\n"); +                goto fail_contact; +        } + +        contact_destroy(c); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_contact: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_update_bucket(void) +{ +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_dht_with_contacts(1000) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_update; +        } + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_update: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_contact_list(void) +{ +        struct list_head cl; +        ssize_t          len; +        ssize_t          items; + +        TEST_START(); + +        list_head_init(&cl); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        items = 5; + +        if (fill_dht_with_contacts(items) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_fill; +        } + +        len = dht_kv_contact_list(dht.id.data, &cl, dht.k); +        if (len < 0) { +                printf("Failed to get contact list.\n"); +                goto fail_fill; +        } + +        if (len != items) { +                printf("Failed to get contacts (%zu != %zu).\n", len, items); +                goto fail_contact_list; +        } + +        contact_list_destroy(&cl); + +        items = 100; + +        if (fill_dht_with_contacts(items) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_fill; +        } + +        len = dht_kv_contact_list(dht.id.data, &cl, items); +        if (len < 0) { +                printf("Failed to get contact list.\n"); +                goto fail_fill; +        } + +        if ((size_t) len < dht.k) { +                printf("Failed to get contacts (%zu < %zu).\n", len, dht.k); +                goto fail_contact_list; +        } + +        contact_list_destroy(&cl); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_contact_list: +        contact_list_destroy(&cl); + fail_fill: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_get_values(void) +{ +        buffer_t * vals; +        ssize_t    len; +        size_t     n = sizeof(uint64_t); + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_store_with_random_values(dht.id.data, n, 3) < 0) { +                printf("Failed to fill store with random values.\n"); +                goto fail_fill; +        } + +        len = dht_kv_retrieve(dht.id.data, &vals); +        if (len < 0) { +                printf("Failed to get values from store.\n"); +                goto fail_fill; +        } + +        if (len != 3) { +                printf("Failed to get %ld values (%zu).\n", 3L, len); +                goto fail_get_values; +        } + +        freebufs(vals, len); + +        if (fill_store_with_random_values(dht.id.data, n, 20) < 0) { +                printf("Failed to fill store with random values.\n"); +                goto fail_fill; +        } + +        len = dht_kv_retrieve(dht.id.data, &vals); +        if (len < 0) { +                printf("Failed to get values from store.\n"); +                goto fail_fill; +        } + +        if (len != DHT_MAX_VALS) { +                printf("Failed to get %d values.\n", DHT_MAX_VALS); +                goto fail_get_values; +        } + +        freebufs(vals, len); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_get_values: +        freebufs(vals, len); + fail_fill: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_req_msg(void) +{ +        dht_msg_t * msg; +        dht_msg_t * upk; +        size_t      len; +        uint8_t *   buf; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        msg = dht_kv_find_node_req_msg(dht.id.data); +        if (msg == NULL) { +                printf("Failed to get find node request message.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_FIND_NODE_REQ) { +                printf("Wrong code in find_node_req message (%s != %s).\n", +                        dht_code_str[msg->code], +                        dht_code_str[DHT_FIND_NODE_REQ]); +                goto fail_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_node_req.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_node_req buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_node_req message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_value_req message.\n"); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_rsp_msg(void) +{ +        dht_contact_msg_t ** contacts; +        dht_msg_t *          msg; +        dht_msg_t *          upk; +        size_t               len; +        uint8_t *            buf; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, 0); +        if (msg == NULL) { +                printf("Failed to get find node response message.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_FIND_NODE_RSP) { +                printf("Wrong code in find_node_rsp message (%s != %s).\n", +                       dht_code_str[msg->code], +                       dht_code_str[DHT_FIND_NODE_RSP]); +                goto fail_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_node_rsp.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_node_rsp buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_node_rsp message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_node_rsp message.\n"); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_node_rsp_msg_contacts(void) +{ +        dht_contact_msg_t ** contacts; +        dht_msg_t *          msg; +        dht_msg_t *          upk; +        uint8_t *            buf; +        size_t               len; +        ssize_t              n; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_dht_with_contacts(100) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_fill; +        } + +        n = dht_kv_get_contacts(dht.id.data, &contacts); +        if (n < 0) { +                printf("Failed to get contacts.\n"); +                goto fail_fill; +        } + +        if ((size_t) n < dht.k) { +                printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k); +                goto fail_fill; +        } + +        msg = dht_kv_find_node_rsp_msg(dht.id.data, 0, &contacts, n); +        if (msg == NULL) { +                printf("Failed to build find node response message.\n"); +                goto fail_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_node_rsp.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_node_rsp buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_node_rsp message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_node_rsp message.\n"); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: +        clear_contacts(contacts, n); + fail_fill: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_req_msg(void) +{ +        dht_msg_t * msg; +        dht_msg_t * upk; +        size_t      len; +        uint8_t *   buf; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        msg = dht_kv_find_value_req_msg(dht.id.data); +        if (msg == NULL) { +                printf("Failed to build find value request message.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_FIND_VALUE_REQ) { +                printf("Wrong code in find_value_req message (%s != %s).\n", +                       dht_code_str[msg->code], +                       dht_code_str[DHT_FIND_VALUE_REQ]); +                goto fail_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_value_req.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_node_req buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_value_req message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_value_req message.\n"); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg(void) +{ +        dht_msg_t * msg; +        dht_msg_t * upk; +        size_t      len; +        uint8_t *   buf; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, NULL, 0, NULL, 0); +        if (msg == NULL) { +                printf("Failed to build find value response message.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_FIND_VALUE_RSP) { +                printf("Wrong code in find_value_rsp message (%s != %s).\n", +                       dht_code_str[msg->code], +                       dht_code_str[DHT_FIND_VALUE_RSP]); +                goto fail_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_value_rsp.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_value_rsp buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_value_rsp message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_value_rsp message.\n"); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg_contacts(void) +{ +        dht_msg_t *          msg; +        dht_msg_t *          upk; +        size_t               len; +        uint8_t *            buf; +        dht_contact_msg_t ** contacts; +        ssize_t              n; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_dht_with_contacts(100) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_fill; +        } + +        n = dht_kv_get_contacts(dht.id.data, &contacts); +        if (n < 0) { +                printf("Failed to get contacts.\n"); +                goto fail_fill; +        } + +        if ((size_t) n < dht.k) { +                printf("Failed to get enough contacts (%zu < %zu).\n", n, dht.k); +                goto fail_fill; +        } + +        msg = dht_kv_find_value_rsp_msg(dht.id.data, 0, &contacts, n, NULL, 0); +        if (msg == NULL) { +                printf("Failed to build find value response message.\n"); +                goto fail_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_value_rsp.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_value_rsp buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_value_rsp message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_value_rsp message.\n"); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: +        clear_contacts(contacts, n); + fail_fill: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_find_value_rsp_msg_values(void) +{ +        dht_msg_t * msg; +        dht_msg_t * upk; +        size_t      len; +        uint8_t *   buf; +        buffer_t *  values; +        size_t      i; +        uint64_t    ck; + +        TEST_START(); + +        ck = generate_cookie(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        values = malloc(sizeof(*values) * 8); +        if (values == NULL) { +                printf("Failed to malloc values.\n"); +                goto fail_values; +        } + +        for (i = 0; i < 8; i++) { +                if (random_value(&values[i]) < 0) { +                        printf("Failed to create random value.\n"); +                        goto fail_fill;                  } -                pthread_rwlock_unlock(&dht->lock);          } -        dht_destroy(dht); +        msg = dht_kv_find_value_rsp_msg(dht.id.data, ck, NULL, 0, &values, 8); +        if (msg == NULL) { +                printf("Failed to build find value response message.\n"); +                goto fail_msg; +        } -        return 0; +        values = NULL; /* msg owns the values now */ + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed length of find_value_rsp.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc find_value_rsp buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack find_value_rsp message.\n"); +                goto fail_pack; +        } + +        upk = dht_msg__unpack(NULL, len, buf); +        if (upk == NULL) { +                printf("Failed to unpack find_value_rsp message.\n"); +                goto fail_unpack; +        } + +        if (upk->code != DHT_FIND_VALUE_RSP) { +                printf("Wrong code in find_value_rsp message (%s != %s).\n", +                       dht_code_str[upk->code], +                       dht_code_str[DHT_FIND_VALUE_RSP]); +                goto fail_unpack; +        } + +        if (upk->val == NULL) { +                printf("No values in find_value_rsp message.\n"); +                goto fail_unpack; +        } + +        if (upk->val->n_values != 8) { +                printf("Not enough values in find_value_rsp (%zu != %lu).\n", +                       upk->val->n_values, 8UL); +                goto fail_unpack; +        } + +        free(buf); +        dht_msg__free_unpacked(msg, NULL); +        dht_msg__free_unpacked(upk, NULL); + +        free(values); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_unpack: +        dht_msg__free_unpacked(msg, NULL); + fail_pack: +        free(buf); + fail_msg: + fail_fill: +        while((i--) > 0) +                freebuf(values[i]); +        free(values); + fail_values: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_store_msg(void) +{ +        dht_msg_t *     msg; +        size_t          len; +        uint8_t *       buf; +        struct timespec now; + +        TEST_START(); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        msg = dht_kv_store_msg(dht.id.data, test_val, now.tv_sec + 10); +        if (msg == NULL) { +                printf("Failed to get store message.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_STORE) { +                printf("Wrong code in store message (%s != %s).\n", +                       dht_code_str[msg->code], +                       dht_code_str[DHT_STORE]); +                goto fail_store_msg; +        } + +        if (dht_kv_validate_msg(msg) < 0) { +                printf("Failed to validate store message.\n"); +                goto fail_store_msg; +        } + +        len = dht_msg__get_packed_size(msg); +        if (len == 0) { +                printf("Failed to get packed msg length.\n"); +                goto fail_msg; +        } + +        buf = malloc(len); +        if (buf == NULL) { +                printf("Failed to malloc store msg buf.\n"); +                goto fail_msg; +        } + +        if (dht_msg__pack(msg, buf) != len) { +                printf("Failed to pack store message.\n"); +                goto fail_pack; +        } + +        free(buf); + +        dht_msg__free_unpacked(msg, NULL); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_pack: +        free(buf); + fail_store_msg: +        dht_msg__free_unpacked(msg, NULL); + fail_msg: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_kv_query_contacts_req_rsp(void) +{ +        dht_msg_t *          req; +        dht_msg_t *          rsp; +        dht_contact_msg_t ** contacts; +        size_t               len = 2; + +        uint8_t * key; + +        TEST_START(); + +        sink_init(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_dht_with_contacts(1) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_prep; +        } + +        key = generate_id(); +        if (key == NULL) { +                printf("Failed to generate key.\n"); +                goto fail_prep; +        } + +        if (dht_kv_query_contacts(key, NULL) < 0) { +                printf("Failed to query contacts.\n"); +                goto fail_query; +        } + +        req = sink_read(); +        if (req == NULL) { +                printf("Failed to read request from sink.\n"); +                goto fail_query; +        } + +        if (dht_kv_validate_msg(req) < 0) { +                printf("Failed to validate find node req.\n"); +                goto fail_val_req; +        } + +        if (random_contact_list(&contacts, len) < 0) { +                printf("Failed to create random contact.\n"); +                goto fail_val_req; +        } + +        rsp = dht_kv_find_node_rsp_msg(key, req->find->cookie, &contacts, len); +        if (rsp == NULL) { +                printf("Failed to create find node response message.\n"); +                goto fail_rsp; +        } + +        memcpy(rsp->src->id.data, dht.id.data, dht.id.len); +        rsp->src->addr = generate_cookie(); + +        if (dht_kv_validate_msg(rsp) < 0) { +                printf("Failed to validate find node response message.\n"); +                goto fail_val_rsp; +        } + +        do_dht_kv_find_node_rsp(rsp->node); + +        /* dht_contact_msg__free_unpacked(contacts[0], NULL); set to NULL */ + +        free(contacts); + +        dht_msg__free_unpacked(rsp, NULL); + +        free(key); + +        dht_msg__free_unpacked(req, NULL); + +        sink_fini(); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_val_rsp: +        dht_msg__free_unpacked(rsp, NULL); + fail_rsp: +        while (len-- > 0) +                dht_contact_msg__free_unpacked(contacts[len], NULL); +        free(contacts); + fail_val_req: +        dht_msg__free_unpacked(req, NULL); + fail_query: +        free(key); + fail_prep: +        dht_fini(); + fail_init: +        sink_fini(); +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_req_create_destroy(void) +{ +        struct dht_req * req; + +        TEST_START(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        req = dht_req_create(dht.id.data); +        if (req == NULL) { +                printf("Failed to create kad request.\n"); +                goto fail_req; +        } + +        dht_req_destroy(req); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_req: +        dht_fini(); + fail_init: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_reg_unreg(void) +{ +        TEST_START(); + +        sink_init(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (dht_reg(dht.id.data) < 0) { +                printf("Failed to register own id.\n"); +                goto fail_reg; +        } + +        if (sink.len != 0) { +                printf("Packet sent without contacts!"); +                goto fail_msg; +        } + +        if (dht_unreg(dht.id.data) < 0) { +                printf("Failed to unregister own id.\n"); +                goto fail_msg; +        } + +        dht_fini(); + +        sink_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_msg: +        dht_unreg(dht.id.data); + fail_reg: +        dht_fini(); + fail_init: +        sink_fini(); +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_reg_unreg_contacts(void) +{ +        dht_msg_t * msg; + +        TEST_START(); + +        sink_init(); + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_dht_with_contacts(4) < 0) { +                printf("Failed to fill bucket with contacts.\n"); +                goto fail_reg; +        } + +        if (dht_reg(dht.id.data) < 0) { +                printf("Failed to register own id.\n"); +                goto fail_reg; +        } + +        if (sink.len != dht.alpha) { +                printf("Packet sent to too few contacts!\n"); +                goto fail_msg; +        } + +        msg = sink_read(); +        if (msg == NULL) { +                printf("Failed to read message from sink.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_STORE) { +                printf("Wrong code in dht reg message (%s != %s).\n", +                       dht_code_str[msg->code], +                       dht_code_str[DHT_STORE]); +                goto fail_validation; +        } + +        if (dht_kv_validate_msg(msg) < 0) { +                printf("Failed to validate dht message.\n"); +                goto fail_validation; +        } + +        if (dht_unreg(dht.id.data) < 0) { +                printf("Failed to unregister own id.\n"); +                goto fail_validation; +        } + +        dht_msg__free_unpacked(msg, NULL); + +        dht_fini(); + +        sink_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_validation: +        dht_msg__free_unpacked(msg, NULL); + fail_msg: +        sink_clear(); +        dht_unreg(dht.id.data); + fail_reg: +        dht_fini(); + fail_init: +        sink_fini(); +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_reg_query_local(void) +{ +        struct timespec now; +        buffer_t test_addr; + +        TEST_START(); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        if (addr_to_buf(1234321, &test_addr) < 0) { +                printf("Failed to convert test address to buffer.\n"); +                goto fail_buf; +        } + +        if (dht_init(&test_dht_config) < 0) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (dht_reg(dht.id.data) < 0) { +                printf("Failed to register own id.\n"); +                goto fail_reg; +        } + +        if (dht_query(dht.id.data) == dht.addr) { +                printf("Succeeded to query own id.\n"); +                goto fail_get; +        } + +        if (dht_kv_store(dht.id.data, test_addr, now.tv_sec + 5) < 0) { +                printf("Failed to publish value.\n"); +                goto fail_get; +        } + +        if (dht_query(dht.id.data) != 1234321) { +                printf("Failed to return remote addr.\n"); +                goto fail_get; +        } + +        if (dht_unreg(dht.id.data) < 0) { +                printf("Failed to unregister own id.\n"); +                goto fail_get; +        } + +        freebuf(test_addr); + +        dht_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_get: +        dht_unreg(dht.id.data); + fail_reg: +        dht_fini(); + fail_init: +        freebuf(test_addr); + fail_buf: +        TEST_FAIL(); +        return TEST_RC_FAIL; +} + +static int test_dht_query(void) +{ +        uint8_t *             key; +        struct dir_dht_config cfg; + +        TEST_START(); + +        sink_init(); + +        cfg = test_dht_config; +        cfg.peer = generate_cookie(); + +        if (dht_init(&cfg)) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        key = generate_id(); +        if (key == NULL) { +                printf("Failed to generate key.\n"); +                goto fail_key; +        } + +        if (dht_query(key) != INVALID_ADDR) { +                printf("Succeeded to get address without contacts.\n"); +                goto fail_get; +        } + +        if (sink.len != 0) { +                printf("Packet sent without contacts!"); +                goto fail_test; +        } + +        free(key); + +        dht_fini(); + +        sink_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + + fail_test: +        sink_clear(); + fail_get: +        free(key); + fail_key: +        dht_fini(); + fail_init: +        sink_fini(); +        return TEST_RC_FAIL; +} + +static int test_dht_query_contacts(void) +{ +        dht_msg_t *           msg; +        uint8_t *             key; +        struct dir_dht_config cfg; + + +        TEST_START(); + +        sink_init(); + +        cfg = test_dht_config; +        cfg.peer = generate_cookie(); + +        if (dht_init(&cfg)) { +                printf("Failed to create dht.\n"); +                goto fail_init; +        } + +        if (fill_dht_with_contacts(10) < 0) { +                printf("Failed to fill with contacts!"); +                goto fail_contacts; +        } + +        key = generate_id(); +        if (key == NULL) { +                printf("Failed to generate key."); +                goto fail_contacts; +        } + +        if (dht_query(key) != INVALID_ADDR) { +                printf("Succeeded to get address for random id.\n"); +                goto fail_query; +        } + +        msg = sink_read(); +        if (msg == NULL) { +                printf("Failed to read message.!\n"); +                goto fail_read; +        } + +        if (dht_kv_validate_msg(msg) < 0) { +                printf("Failed to validate dht message.\n"); +                goto fail_msg; +        } + +        if (msg->code != DHT_FIND_VALUE_REQ) { +                printf("Failed to validate dht message.\n"); +                goto fail_msg; +        } + +        dht_msg__free_unpacked(msg, NULL); + +        free(key); + +        sink_clear(); + +        dht_fini(); + +        sink_fini(); + +        TEST_SUCCESS(); + +        return TEST_RC_SUCCESS; + fail_msg: +        dht_msg__free_unpacked(msg, NULL); + fail_read: +        sink_clear(); + fail_query: +        free(key); + fail_contacts: +        dht_fini(); + fail_init: +        sink_fini(); +        return TEST_RC_FAIL; +} + +int dht_test(int     argc, +             char ** argv) +{ +        int rc = 0; + +        (void) argc; +        (void) argv; + +        rc |= test_dht_init_fini(); +        rc |= test_dht_start_stop(); +        rc |= test_val_entry_create_destroy(); +        rc |= test_dht_entry_create_destroy(); +        rc |= test_dht_entry_update_get_val(); +        rc |= test_dht_entry_update_get_lval(); +        rc |= test_dht_kv_contact_create_destroy(); +        rc |= test_dht_kv_contact_list(); +        rc |= test_dht_kv_update_bucket(); +        rc |= test_dht_kv_get_values(); +        rc |= test_dht_kv_find_node_req_msg(); +        rc |= test_dht_kv_find_node_rsp_msg(); +        rc |= test_dht_kv_find_node_rsp_msg_contacts(); +        rc |= test_dht_kv_query_contacts_req_rsp(); +        rc |= test_dht_kv_find_value_req_msg(); +        rc |= test_dht_kv_find_value_rsp_msg(); +        rc |= test_dht_kv_find_value_rsp_msg_contacts(); +        rc |= test_dht_kv_find_value_rsp_msg_values(); +        rc |= test_dht_kv_store_msg(); +        rc |= test_dht_req_create_destroy(); +        rc |= test_dht_reg_unreg(); +        rc |= test_dht_reg_unreg_contacts(); +        rc |= test_dht_reg_query_local(); +        rc |= test_dht_query(); +        rc |= test_dht_query_contacts(); + +        return rc;  } | 
