diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | src/ipcpd/normal/dht.c | 2383 | ||||
| -rw-r--r-- | src/ipcpd/normal/dht.h | 54 | ||||
| -rw-r--r-- | src/ipcpd/normal/dir.c | 167 | ||||
| -rw-r--r-- | src/ipcpd/normal/dir.h | 10 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 14 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 60 | ||||
| -rw-r--r-- | src/ipcpd/normal/kademlia.proto | 46 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 28 | ||||
| -rw-r--r-- | src/ipcpd/normal/pol/flat.c | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribconfig.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 3 | ||||
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 68 | ||||
| -rw-r--r-- | src/ipcpd/normal/tests/CMakeLists.txt | 37 | ||||
| -rw-r--r-- | src/ipcpd/normal/tests/dht_test.c | 99 | 
16 files changed, 2799 insertions, 184 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 336b0e8f..8c2d4efc 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -15,6 +15,8 @@ include_directories(${CMAKE_BINARY_DIR}/include)  set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")  protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto) +protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto) +  # Add GPB sources of policies last  protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto) @@ -22,6 +24,7 @@ set(SOURCE_FILES    # Add source files here    addr_auth.c    connmgr.c +  dht.c    dir.c    dt.c    dt_pci.c @@ -42,7 +45,7 @@ set(SOURCE_FILES    )  add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES} -  ${FLOW_ALLOC_SRCS} ${FSO_SRCS}) +  ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS})  target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros)  include(AddCompileFlags) @@ -53,3 +56,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug)  install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)  add_subdirectory(pol/tests) +add_subdirectory(tests) diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c new file mode 100644 index 00000000..74618658 --- /dev/null +++ b/src/ipcpd/normal/dht.c @@ -0,0 +1,2383 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#define OUROBOROS_PREFIX "dht" + +#include <ouroboros/config.h> +#include <ouroboros/hash.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/random.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> + +#include "dht.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +#include "kademlia.pb-c.h" +typedef KadMsg kad_msg_t; +typedef KadContactMsg kad_contact_msg_t; + +#define DHT_MAX_REQS  2048 /* KAD recommends rnd(), bmp can be changed.  */ +#define KAD_ALPHA     3    /* Parallel factor, proven optimal value.     */ +#define KAD_K         8    /* Replication factor, MDHT value.            */ +#define KAD_T_REPL    900  /* Replication time, tied to k. MDHT value.   */ +#define KAD_T_REFR    900  /* Refresh time stale bucket, MDHT value.     */ +#define KAD_T_JOIN    6    /* Response time to wait for a join.          */ +#define KAD_T_RESP    2    /* Response time to wait for a response.      */ +#define KAD_R_PING    2    /* Ping retries before declaring peer dead.   */ +#define KAD_QUEER     15   /* Time to declare peer questionable.         */ +#define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */ + +enum dht_state { +        DHT_INIT = 0, +        DHT_RUNNING, +        DHT_SHUTDOWN, +}; + +enum kad_code { +        KAD_JOIN = 0, +        KAD_FIND_NODE, +        KAD_FIND_VALUE, +        /* Messages without a response below. */ +        KAD_STORE, +        KAD_RESPONSE +}; + +enum kad_req_state { +        REQ_NULL = 0, +        REQ_INIT, +        REQ_PENDING, +        REQ_RESPONSE, +        REQ_DONE, +        REQ_DESTROY +}; + +enum lookup_state { +        LU_NULL = 0, +        LU_INIT, +        LU_PENDING, +        LU_UPDATE, +        LU_COMPLETE, +        LU_DONE, +        LU_DESTROY +}; + +struct kad_req { +        struct list_head   next; + +        uint32_t           cookie; +        enum kad_code      code; +        uint8_t *          key; +        uint64_t           addr; + +        enum kad_req_state state; +        pthread_cond_t     cond; +        pthread_mutex_t    lock; + +        time_t             t_exp; +}; + +struct lookup { +        struct list_head  next; + +        uint8_t *         key; + +        struct list_head  contacts; +        size_t            n_contacts; + +        uint64_t *        addrs; +        size_t            n_addrs; + +        enum lookup_state state; +        pthread_cond_t    cond; +        pthread_mutex_t   lock; +}; + +struct val { +        struct list_head next; + +        uint64_t         addr; + +        time_t           t_exp; +        time_t           t_rep; +}; + +struct ref_entry { +        struct list_head next; + +        uint8_t *        key; + +        time_t           t_rep; +}; + +struct dht_entry { +        struct list_head next; + +        uint8_t *        key; +        size_t           n_vals; +        struct list_head vals; +}; + +struct contact { +        struct list_head next; + +        uint8_t *        id; +        uint64_t         addr; + +        size_t           fails; +        time_t           t_seen; +}; + +struct bucket { +        struct list_head contacts; +        size_t           n_contacts; + +        struct list_head alts; +        size_t           n_alts; + +        time_t           t_refr; + +        size_t           depth; +        uint8_t          mask; + +        struct bucket *  parent; +        struct bucket *  children[1L << KAD_BETA]; +}; + +struct dht { +        size_t           alpha; +        size_t           b; +        size_t           k; + +        time_t           t_expire; +        time_t           t_refresh; +        time_t           t_replic; +        time_t           t_repub; + +        uint8_t *        id; +        uint64_t         addr; + +        struct bucket *  buckets; + +        struct list_head entries; + +        struct list_head refs; + +        struct list_head lookups; + +        struct list_head requests; +        struct bmp *     cookies; + +        enum dht_state   state; +        pthread_mutex_t  mtx; + +        pthread_rwlock_t lock; + +        int              fd; + +        pthread_t        worker; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, +                             size_t          len) +{ +        uint8_t * dup; + +        dup = malloc(sizeof(*dup) * len); +        if (dup == NULL) +                return NULL; + +        memcpy(dup, key, len); + +        return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ +        enum dht_state state; + +        pthread_mutex_lock(&dht->mtx); + +        state = dht->state; + +        pthread_mutex_unlock(&dht->mtx); + +        return state; +} + +static void dht_set_state(struct dht *   dht, +                          enum dht_state state) +{ +        pthread_mutex_lock(&dht->mtx); + +        dht->state = state; + +        pthread_mutex_unlock(&dht->mtx); +} + +static uint8_t * create_id(size_t len) +{ +        uint8_t * id; + +        id = malloc(len); +        if (id == NULL) +                return NULL; + +        if (random_buffer(id, len) < 0) { +                free(id); +                return NULL; +        } + +        return id; +} + +static struct kad_req * kad_req_create(struct dht * dht, +                                       kad_msg_t *  msg, +                                       uint64_t     addr) +{ +        struct kad_req *   req; +        pthread_condattr_t cattr; +        struct timespec    t; + +        req = malloc(sizeof(*req)); +        if (req == NULL) +                return NULL; + +        list_head_init(&req->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        req->t_exp  = t.tv_sec + KAD_T_RESP; +        req->addr   = addr; +        req->state  = REQ_INIT; +        req->cookie = msg->cookie; +        req->code   = msg->code; +        req->key    = NULL; + +        if (msg->has_key) { +                req->key = dht_dup_key(msg->key.data, dht->b); +                if (req->key == NULL) { +                        free(req); +                        return NULL; +                } +        } + +        if (pthread_mutex_init(&req->lock, NULL)) { +                free(req->key); +                free(req); +                return NULL; +        } + +        pthread_condattr_init(&cattr); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_cond_init(&req->cond, &cattr)) { +                pthread_condattr_destroy(&cattr); +                pthread_mutex_destroy(&req->lock); +                free(req->key); +                free(req); +                return NULL; +        } + +        pthread_condattr_destroy(&cattr); + +        return req; +} + +static void kad_req_destroy(struct kad_req * req) +{ +        assert(req); + +        if (req->key != NULL) +                free(req->key); + +        pthread_mutex_lock(&req->lock); + +        switch (req->state) { +        case REQ_DESTROY: +                pthread_mutex_unlock(&req->lock); +                return; +        case REQ_PENDING: +                req->state = REQ_DESTROY; +                pthread_cond_signal(&req->cond); +                break; +        case REQ_INIT: +        case REQ_DONE: +                req->state = REQ_NULL; +                break; +        case REQ_RESPONSE: +        case REQ_NULL: +        default: +                break; +        } + +        while (req->state != REQ_NULL) +                pthread_cond_wait(&req->cond, &req->lock); + +        pthread_mutex_unlock(&req->lock); + +        pthread_cond_destroy(&req->cond); +        pthread_mutex_destroy(&req->lock); + +        free(req); +} + +static int kad_req_wait(struct kad_req * req, +                        time_t           t) +{ +        struct timespec timeo = {t, 0}; +        struct timespec abs; +        int ret = 0; + +        assert(req); + +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        ts_add(&abs, &timeo, &abs); + +        pthread_mutex_lock(&req->lock); + +        req->state = REQ_PENDING; + +        while (req->state == REQ_PENDING && ret != -ETIMEDOUT) +                ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + +        switch(req->state) { +        case REQ_DESTROY: +                ret = -1; +                req->state = REQ_NULL; +                pthread_cond_signal(&req->cond); +                break; +        case REQ_PENDING: /* ETIMEDOUT */ +        case REQ_RESPONSE: +                req->state = REQ_DONE; +                pthread_cond_signal(&req->cond); +                break; +        default: +                break; +        } + +        pthread_mutex_unlock(&req->lock); + +        return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ +        pthread_mutex_lock(&req->lock); + +        req->state = REQ_RESPONSE; +        pthread_cond_signal(&req->cond); + +        pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, +                                       size_t          len, +                                       uint64_t        addr) +{ +        struct contact * c; +        struct timespec  t; + +        c = malloc(sizeof(*c)); +        if (c == NULL) +                return NULL; + +        list_head_init(&c->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        c->addr   = addr; +        c->fails  = 0; +        c->t_seen = t.tv_sec; +        c->id     = dht_dup_key(id, len); +        if (c->id == NULL) { +                free(c); +                return NULL; +        } + +        return c; +} + +static void contact_destroy(struct contact * c) +{ +        if (c != NULL) +                free(c->id); + +        free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, +                                   const uint8_t * id) +{ +        uint8_t byte; +        uint8_t mask; + +        assert(b); + +        if (b->children[0] == NULL) +                return b; + +        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + +        mask = ((1L << KAD_BETA) - 1) & 0xFF; + +        byte >>= (CHAR_BIT - KAD_BETA) - +                (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + +        return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht *    dht, +                                      const uint8_t * id) +{ +        assert(dht->buckets); + +        return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, +                     const uint8_t * dst) +{ +        return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, +                              struct contact *   c, +                              const uint8_t *    key) +{ +        struct list_head * p; + +        assert(l); +        assert(c); +        assert(key); +        assert(c->id); + +        list_for_each(p, l) { +                struct contact * e = list_entry(p, struct contact, next); +                if (dist(c->id, key) > dist(e->id, key)) +                        break; +        } + +        list_add_tail(&c->next, p); + +        return 1; +} + +static size_t dht_contact_list(struct dht *       dht, +                               struct list_head * l, +                               const uint8_t *    key) +{ +        struct list_head * p; +        struct bucket *    b; +        size_t             len = 0; +        size_t             i; +        struct timespec    t; + +        assert(l); +        assert(dht); +        assert(key); +        assert(list_is_empty(l)); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        b = dht_get_bucket(dht, key); +        if (b == NULL) +                return 0; + +        b->t_refr = t.tv_sec + KAD_T_REFR; + +        if (b->n_contacts == dht->k || b->parent == NULL) { +                list_for_each(p, &b->contacts) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        c = contact_create(c->id, dht->b, c->addr); +                        if (list_add_sorted(l, c, key) == 1) +                                if (++len > dht->k) +                                        break; +                } +        } else { +                struct bucket * d = b->parent; +                for (i = 0; i < (1L << KAD_BETA); ++i) { +                        list_for_each(p, &d->children[i]->contacts) { +                                struct contact * c; +                                c = list_entry(p, struct contact, next); +                                c = contact_create(c->id, dht->b, c->addr); +                                if (c == NULL) +                                        continue; +                                if (list_add_sorted(l, c, key) == 1) +                                        if (++len > dht->k) +                                                break; +                        } +                } +        } + +        assert(len == dht->k || b->parent == NULL); + +        return len; +} + +static struct lookup * lookup_create(struct dht *    dht, +                                     const uint8_t * id) +{ +        struct lookup *    lu; +        pthread_condattr_t cattr; + +        assert(dht); +        assert(id); + +        lu = malloc(sizeof(*lu)); +        if (lu == NULL) +                goto fail_malloc; + +        list_head_init(&lu->contacts); + +        lu->state   = LU_INIT; +        lu->addrs   = NULL; +        lu->n_addrs = 0; +        lu->key     = dht_dup_key(id, dht->b); +        if (lu->key == NULL) +                goto fail_id; + +        if (pthread_mutex_init(&lu->lock, NULL)) +                goto fail_mutex; + +        pthread_condattr_init(&cattr); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_cond_init(&lu->cond, &cattr)) +                goto fail_cond; + +        pthread_condattr_destroy(&cattr); + +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&lu->next, &dht->lookups); + +        lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + +        pthread_rwlock_unlock(&dht->lock); + +        return lu; + + fail_cond: +        pthread_condattr_destroy(&cattr); +        pthread_mutex_destroy(&lu->lock); + fail_mutex: +        free(lu->key); + fail_id: +        free(lu); + fail_malloc: +        return NULL; +} + +static void lookup_destroy(struct lookup * lu) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(lu); + +        pthread_mutex_lock(&lu->lock); + +        switch (lu->state) { +        case LU_DESTROY: +                pthread_mutex_unlock(&lu->lock); +                return; +        case LU_PENDING: +                lu->state = LU_DESTROY; +                pthread_cond_signal(&lu->cond); +                break; +        case LU_INIT: +        case LU_DONE: +        case LU_UPDATE: +        case LU_COMPLETE: +                lu->state = REQ_NULL; +                break; +        case LU_NULL: +        default: +                break; +        } + +        while (lu->state != LU_NULL) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        if (lu->key != NULL) +                free(lu->key); +        if (lu->addrs != NULL) +                free(lu->addrs); + +        list_for_each_safe(p, h, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +        } + +        pthread_mutex_unlock(&lu->lock); + +        pthread_cond_destroy(&lu->cond); +        pthread_mutex_destroy(&lu->lock); + +        free(lu); +} + +static void lookup_update(struct dht *    dht, +                          struct lookup * lu, +                          kad_msg_t *     msg) +{ +        struct list_head * p = NULL; +        struct contact *   c = NULL; +        size_t             n; +        size_t             pos = 0; + +        assert(lu); +        assert(msg); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return; + +        pthread_mutex_lock(&lu->lock); + +        if (msg->n_addrs > 0) { +                if (lu->addrs == NULL) { +                        lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); +                        for (n = 0; n < msg->n_addrs; ++n) +                                lu->addrs[n] = msg->addrs[n]; +                        lu->n_addrs = msg->n_addrs; +                } +                lu->state = LU_COMPLETE; +                pthread_cond_signal(&lu->cond); +                pthread_mutex_unlock(&lu->lock); +                return; +        } + +        while (lu->state == LU_INIT) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        for (n = 0; n < msg->n_contacts; ++n) { +                c = contact_create(msg->contacts[n]->id.data, +                                   dht->b, msg->contacts[n]->addr); +                if (c == NULL) +                        continue; + +                list_for_each(p, &lu->contacts) { +                        struct contact * e; +                        e = list_entry(p, struct contact, next); +                        if (!memcmp(e->id, c->id, dht->b)) { +                                contact_destroy(c); +                                c = NULL; +                                break; +                        } + +                        if (dist(c->id, lu->key) > dist(e->id, lu->key)) +                                break; +                        pos++; +                } + +                if (c == NULL) +                        continue; + +                if (lu->n_contacts < dht->k) { +                        list_add_tail(&c->next, p); +                        ++lu->n_contacts; +                } else if (pos == dht->k) { +                        contact_destroy(c); +                        continue; +                } else { +                        struct contact * d; +                        list_add_tail(&c->next, p); +                        d = list_last_entry(&lu->contacts, struct contact, next); +                        list_del(&d->next); +                        contact_destroy(d); +                } +        } + +        lu->state = LU_UPDATE; +        pthread_cond_signal(&lu->cond); +        pthread_mutex_unlock(&lu->lock); +        return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, +                                uint64_t *      addrs) +{ +        ssize_t n; + +        assert(lu); + +        pthread_mutex_lock(&lu->lock); + +        for (n = 0; (size_t) n < lu->n_addrs; ++n) +                addrs[n] = lu->addrs[n]; + +        assert((size_t) n == lu->n_addrs); + +        pthread_mutex_unlock(&lu->lock); + +        return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, +                                    uint64_t *      addrs) +{ +        struct list_head * p; +        ssize_t            n = 0; + +        assert(lu); +        assert(addrs); + +        pthread_mutex_lock(&lu->lock); + +        list_for_each(p, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                addrs[n] = c->addr; +                n++; +        } + +        pthread_mutex_unlock(&lu->lock); + +        return n; +} + +static void lookup_new_addrs(struct lookup * lu, +                                uint64_t *      addrs) +{ +        struct list_head * p; +        size_t             n = 0; + +        assert(lu); +        assert(addrs); + +        pthread_mutex_lock(&lu->lock); + +        /* Uses fails to check if the contact has been contacted. */ +        list_for_each(p, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                if (c->fails == 0) { +                        c->fails = 1; +                        addrs[n] = c->addr; +                        n++; +                } + +                if (n == KAD_ALPHA) +                        break; +        } + +        assert(n <= KAD_ALPHA); + +        addrs[n] = 0; + +        if (n == 0) +                lu->state = LU_DONE; + +        pthread_mutex_unlock(&lu->lock); +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ +        enum lookup_state state; + +        pthread_mutex_lock(&lu->lock); + +        lu->state = LU_PENDING; +        pthread_cond_signal(&lu->cond); + +        pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); + +        while (lu->state == LU_PENDING) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        pthread_cleanup_pop(false); + +        state = lu->state; + +        pthread_mutex_unlock(&lu->lock); + +        return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, +                                         kad_msg_t *  msg) +{ +        struct list_head * p; + +        assert(dht); +        assert(msg); + +        list_for_each(p, &dht->requests) { +                struct kad_req * r = list_entry(p, struct kad_req, next); +                if (r->cookie == msg->cookie) +                        return r; +        } + +        return NULL; +} + +static struct lookup * dht_find_lookup(struct dht *    dht, +                                       const uint8_t * key) +{ +        struct list_head * p; + +        assert(dht); +        assert(key); + +        list_for_each(p, &dht->lookups) { +                struct lookup * l = list_entry(p, struct lookup, next); +                if (!memcmp(l->key, key, dht->b)) +                        return l; +        } + +        return NULL; +} + +static struct val * val_create(uint64_t addr, +                               time_t   exp) +{ +        struct val *    v; +        struct timespec t; + +        v = malloc(sizeof(*v)); +        if (v == NULL) +                return NULL; + +        list_head_init(&v->next); +        v->addr = addr; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        v->t_exp = t.tv_sec + exp; +        v->t_rep = t.tv_sec + KAD_T_REPL; + +        return v; +} + +static void val_destroy(struct val * v) +{ +        assert(v); + +        free(v); +} + +static struct ref_entry * ref_entry_create(struct dht *    dht, +                                           const uint8_t * key) +{ +        struct ref_entry * e; +        struct timespec    t; + +        assert(dht); +        assert(key); + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        e->key = dht_dup_key(key, dht->b); +        if (e->key == NULL) { +                free(e); +                return NULL; +        } + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        e->t_rep = t.tv_sec + dht->t_repub; + +        return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ +        free(e->key); +        free(e); +} + +static struct dht_entry * dht_entry_create(struct dht *    dht, +                                           const uint8_t * key) +{ +        struct dht_entry * e; + +        assert(dht); +        assert(key); + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        list_head_init(&e->next); +        list_head_init(&e->vals); + +        e->n_vals = 0; + +        e->key = dht_dup_key(key, dht->b); +        if (e->key == NULL) { +                free(e); +                return NULL; +        } + +        return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(e); + +        list_for_each_safe(p, h, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                list_del(&v->next); +                val_destroy(v); +        } + +        free(e->key); + +        free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, +                              uint64_t           addr, +                              time_t             exp) +{ +        struct list_head * p; +        struct val * val; +        struct timespec t; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr == addr) { +                        if (v->t_exp < t.tv_sec + exp) { +                                v->t_exp = t.tv_sec + exp; +                                v->t_rep = t.tv_sec + KAD_T_REPL; +                        } + +                        return 0; +                } +        } + +        val = val_create(addr, exp); +        if (val == NULL) +                return -ENOMEM; + +        list_add(&val->next, &e->vals); +        ++e->n_vals; + +        return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, +                               uint64_t           addr) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(e); + +        list_for_each_safe(p, h, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr == addr) { +                        list_del(&v->next); +                        val_destroy(v); +                        --e->n_vals; +                } +        } + +        if (e->n_vals == 0) { +                list_del(&e->next); +                dht_entry_destroy(e); +        } +} + +static uint64_t dht_entry_get_addr(struct dht *       dht, +                                   struct dht_entry * e) +{ +        struct list_head * p; + +        assert(e); +        assert(!list_is_empty(&e->vals)); + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr != dht->addr) +                        return v->addr; +        } + +        return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * key, +                                  enum kad_code   code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht *       dht, +                           struct bucket *    b, +                           time_t             t, +                           struct list_head * r) +{ +        size_t i; + +        if (*b->children != NULL) +                for (i = 0; i < (1L << KAD_BETA); ++i) +                        bucket_refresh(dht, b->children[i], t, r); + +        if (b->n_contacts == 0) +                return; + +        if (t > b->t_refr) { +                struct contact * c; +                struct contact * d; +                c = list_first_entry(&b->contacts, struct contact, next); +                d = contact_create(c->id, dht->b, c->addr); +                if (c != NULL) +                        list_add(&d->next, r); +                return; +        } +} + + +static struct bucket * bucket_create(void) +{ +        struct bucket * b; +        struct timespec t; +        size_t          i; + +        b = malloc(sizeof(*b)); +        if (b == NULL) +                return NULL; + +        list_head_init(&b->contacts); +        b->n_contacts = 0; + +        list_head_init(&b->alts); +        b->n_alts = 0; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        b->t_refr = t.tv_sec + KAD_T_REFR; + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                b->children[i]  = NULL; + +        b->parent = NULL; +        b->depth = 0; + +        return b; +} + +static void bucket_destroy(struct bucket * b) +{ +        struct list_head * p; +        struct list_head * h; +        size_t             i; + +        assert(b); + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                if (b->children[i] != NULL) +                        bucket_destroy(b->children[i]); + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +                --b->n_contacts; +        } + +        list_for_each_safe(p, h, &b->alts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +                --b->n_contacts; +        } + +        free(b); +} + +static bool bucket_has_id(struct bucket * b, +                          const uint8_t * id) +{ +        uint8_t mask; +        uint8_t byte; + +        if (b->depth == 0) +                return true; + +        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + +        mask = ((1L << KAD_BETA) - 1) & 0xFF; + +        byte >>= (CHAR_BIT - KAD_BETA) - +                (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + +        return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ +        struct list_head * p; +        struct list_head * h; +        uint8_t mask = 0; +        size_t i; +        size_t c; + +        assert(b); +        assert(b->n_alts == 0); +        assert(b->n_contacts); +        assert(b->children[0] == NULL); + +        c = b->n_contacts; + +        for (i = 0; i < (1L << KAD_BETA); ++i) { +                b->children[i] = bucket_create(); +                if (b->children[i] == NULL) { +                        size_t j; +                        for (j = 0; j < i; ++j) +                                bucket_destroy(b->children[j]); +                        return -1; +                } + +                b->children[i]->depth  = b->depth + 1; +                b->children[i]->mask   = mask; +                b->children[i]->parent = b; + +                list_for_each_safe(p, h, &b->contacts) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        if (bucket_has_id(b->children[i], c->id)) { +                                list_del(&c->next); +                                --b->n_contacts; +                                list_add(&c->next, &b->children[i]->contacts); +                                ++b->children[i]->n_contacts; +                        } +                } + +                mask++; +        } + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                if (b->children[i]->n_contacts == c) +                        split_bucket(b->children[i]); + +        return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht *    dht, +                             const uint8_t * id, +                             uint64_t        addr) +{ +        struct list_head * p; +        struct list_head * h; +        struct bucket *    b; +        struct contact *   c; + +        assert(dht); + +        b = dht_get_bucket(dht, id); +        if (b == NULL) +                return -1; + +        c = contact_create(id, dht->b, addr); +        if (c == NULL) +                return -1; + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * d = list_entry(p, struct contact, next); +                if (d->addr == addr) { +                        list_del(&d->next); +                        contact_destroy(d); +                        --b->n_contacts; +                } +        } + +        if (b->n_contacts == dht->k) { +                if (bucket_has_id(b, dht->id)) { +                        list_add_tail(&c->next, &b->contacts); +                        ++b->n_contacts; +                        if (split_bucket(b)) { +                                list_del(&c->next); +                                contact_destroy(c); +                                --b->n_contacts; +                        } +                } else if (b->n_alts == dht->k) { +                        struct contact * d; +                        d = list_first_entry(&b->alts, struct contact, next); +                        list_del(&d->next); +                        contact_destroy(d); +                        list_add_tail(&c->next, &b->alts); +                } else { +                        list_add_tail(&c->next, &b->alts); +                        ++b->n_alts; +                } +        } else { +                list_add_tail(&c->next, &b->contacts); +                ++b->n_contacts; +        } + +        return 0; +} + +static int send_msg(struct dht * dht, +                    kad_msg_t *  msg, +                    uint64_t     addr) +{ +        struct shm_du_buff * sdb; +        struct kad_req *     req; +        size_t               len; +        int                  retr = 0; + +        if (msg->code == KAD_RESPONSE) +                retr = KAD_RESP_RETR; + +        pthread_rwlock_wrlock(&dht->lock); + +        if (dht->id != NULL) { +                msg->has_s_id = true; +                msg->s_id.data = dht->id; +                msg->s_id.len  = dht->b; +        } + +        msg->s_addr = dht->addr; + +        if (msg->code < KAD_STORE) { +                msg->cookie = bmp_allocate(dht->cookies); +                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) +                        goto fail_bmp_alloc; +        } + +        len = kad_msg__get_packed_size(msg); +        if (len == 0) +                goto fail_msg; + +        if (ipcp_sdb_reserve(&sdb, len)) +                goto fail_msg; + +        kad_msg__pack(msg, shm_du_buff_head(sdb)); + +#ifndef __DHT_TEST__ +        while (retr >= 0) { +                if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb)) +                        retr--; +                else +                        break; +                sleep(1); +        } + +        if (retr < 0) +                goto fail_write; +#else +        (void) addr; +        (void) retr; +        ipcp_sdb_release(sdb); +#endif /* __DHT_TEST__ */ + +        if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) { +                req = kad_req_create(dht, msg, addr); +                if (req != NULL) +                        list_add(&req->next, &dht->requests); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; + +#ifndef __DHT_TEST__ + fail_write: +        ipcp_sdb_release(sdb); +#endif + fail_msg: +        bmp_release(dht->cookies, msg->cookie); + fail_bmp_alloc: +        pthread_rwlock_unlock(&dht->lock); +        return -1; +} + +static struct dht_entry * dht_find_entry(struct dht *    dht, +                                         const uint8_t * key) +{ +        struct list_head * p; + +        list_for_each(p, &dht->entries) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                if (!memcmp(key, e->key, dht->b)) +                        return e; +        } + +        return NULL; +} + +static int kad_add(struct dht *              dht, +                   const kad_contact_msg_t * contacts, +                   ssize_t                   n, +                   time_t                    exp) +{ +        struct dht_entry * e; + +        pthread_rwlock_wrlock(&dht->lock); + +        while (n-- > 0) { +                if (contacts[n].id.len != dht->b) +                        log_warn("Bad key length in contact data."); + +                e = dht_find_entry(dht, contacts[n].id.data); +                if (e != NULL) { +                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) +                                goto fail; +                } else { +                        e = dht_entry_create(dht, contacts[n].id.data); +                        if (e == NULL) +                                goto fail; + +                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) { +                                dht_entry_destroy(e); +                                goto fail; +                        } + +                        list_add(&e->next, &dht->entries); +                } +        } + +        pthread_rwlock_unlock(&dht->lock); +        return 0; + + fail: +        pthread_rwlock_unlock(&dht->lock); +        return -ENOMEM; +} + +static int wait_resp(struct dht * dht, +                     kad_msg_t *  msg, +                     time_t       timeo) +{ +        struct kad_req * req; + +        assert(dht); +        assert(msg); + +        pthread_rwlock_rdlock(&dht->lock); + +        req = dht_find_request(dht, msg); +        if (req == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -EPERM; +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht *    dht, +                     const uint8_t * key, +                     uint64_t        addr, +                     uint64_t        r_addr, +                     time_t          ttl) +{ +        kad_msg_t msg = KAD_MSG__INIT; +        kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; +        kad_contact_msg_t * cmsgp[1]; + +        cmsg.id.data = (uint8_t *) key; +        cmsg.id.len  = dht->b; +        cmsg.addr    = addr; + +        cmsgp[0] = &cmsg; + +        msg.code         = KAD_STORE; +        msg.has_t_expire = true; +        msg.t_expire     = ttl; +        msg.n_contacts   = 1; +        msg.contacts     = cmsgp; + +        if (send_msg(dht, &msg, r_addr)) +                return -1; + +        return 0; +} + +static ssize_t kad_find(struct dht *     dht, +                        const uint8_t *  key, +                        const uint64_t * addrs, +                        enum kad_code    code) +{ +        kad_msg_t msg  = KAD_MSG__INIT; +        ssize_t   sent = 0; + +        assert(dht); +        assert(key); + +        msg.code = code; + +        msg.has_key       = true; +        msg.key.data      = (uint8_t *) key; +        msg.key.len       = dht->b; + +        while (*addrs != 0) { +                if (*addrs != dht->addr) { +                        send_msg(dht, &msg, *addrs); +                        sent++; +                } +                ++addrs; +        } + +        return sent; +} + +static void lookup_set_state(struct lookup *   lu, +                             enum lookup_state state) +{ +        pthread_mutex_lock(&lu->lock); + +        lu->state = state; +        pthread_cond_signal(&lu->cond); + +        pthread_mutex_unlock(&lu->lock); +} + +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * id, +                                  enum kad_code   code) +{ +        uint64_t          addrs[KAD_ALPHA + 1]; +        enum lookup_state state; +        struct lookup *   lu; + +        lu = lookup_create(dht, id); +        if (lu == NULL) +                return NULL; + +        lookup_new_addrs(lu, addrs); + +        if (addrs[0] == 0) { +                pthread_rwlock_wrlock(&dht->lock); +                list_del(&lu->next); +                pthread_rwlock_unlock(&dht->lock); +                lookup_destroy(lu); +                return NULL; +        } + +        if (kad_find(dht, id, addrs, code) == 0) { +                pthread_rwlock_wrlock(&dht->lock); +                list_del(&lu->next); +                pthread_rwlock_unlock(&dht->lock); +                lu->state = LU_COMPLETE; +                return lu; +        } + +        while ((state = lookup_wait(lu)) != LU_COMPLETE) { +                switch (state) { +                case LU_UPDATE: +                        lookup_new_addrs(lu, addrs); +                        if (addrs[0] == 0) { +                                pthread_rwlock_wrlock(&dht->lock); +                                list_del(&lu->next); +                                pthread_rwlock_unlock(&dht->lock); +                                return lu; +                        } + +                        kad_find(dht, id, addrs, code); +                        break; +                case LU_DESTROY: +                        pthread_rwlock_wrlock(&dht->lock); +                        list_del(&lu->next); +                        pthread_rwlock_unlock(&dht->lock); +                        lookup_set_state(lu, LU_NULL); +                        return NULL; +                default: +                        break; +                }; +        } + +        assert(state = LU_COMPLETE); + +        pthread_rwlock_wrlock(&dht->lock); +        list_del(&lu->next); +        pthread_rwlock_unlock(&dht->lock); + +        return lu; +} + +static void kad_publish(struct dht *    dht, +                        const uint8_t * key, +                        uint64_t        addr, +                        time_t          exp) +{ +        struct lookup * lu; +        uint64_t        addrs[KAD_K]; +        ssize_t         n; + +        assert(dht); +        assert(key); + +        lu = kad_lookup(dht, key, KAD_FIND_NODE); +        if (lu == NULL) +                return; + +        n = lookup_contact_addrs(lu, addrs); + +        while (n-- > 0) { +                if (addrs[n] == dht->addr) { +                        kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; +                        msg.id.data = (uint8_t *) key; +                        msg.id.len  = dht->b; +                        msg.addr    = addr; +                        kad_add(dht, &msg, 1, exp); +                } else { +                        if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) +                                log_warn("Failed to send store message."); +                } +        } + +        lookup_destroy(lu); +} + +static int kad_join(struct dht * dht, +                    uint64_t     addr) +{ +        kad_msg_t       msg = KAD_MSG__INIT; +        struct lookup * lu; + +        msg.code = KAD_JOIN; + +        msg.has_alpha       = true; +        msg.has_b           = true; +        msg.has_k           = true; +        msg.has_t_refresh   = true; +        msg.has_t_replicate = true; +        msg.alpha           = KAD_ALPHA; +        msg.b               = dht->b; +        msg.k               = KAD_K; +        msg.t_refresh       = KAD_T_REFR; +        msg.t_replicate     = KAD_T_REPL; + +        if (send_msg(dht, &msg, addr)) +                return -1; + +        if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) +                return -1; + +        dht->id = create_id(dht->b); +        if (dht->id == NULL) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        dht_update_bucket(dht, dht->id, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); +        if (lu != NULL) +                lookup_destroy(lu); + +        return 0; +} + +static void dht_dead_peer(struct dht * dht, +                          uint8_t *    key, +                          uint64_t     addr) +{ +        struct list_head * p; +        struct list_head * h; +        struct bucket *    b; + +        b = dht_get_bucket(dht, key); + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                if (b->n_contacts + b->n_alts <= dht->k) { +                        ++c->fails; +                        return; +                } + +                if (c->addr == addr) { +                        list_del(&c->next); +                        contact_destroy(c); +                        --b->n_contacts; +                        break; +                } +        } + +        while (b->n_contacts < dht->k && b->n_alts > 0) { +                struct contact * c; +                c = list_first_entry(&b->alts, struct contact, next); +                list_del(&c->next); +                --b->n_alts; +                list_add(&c->next, &b->contacts); +                ++b->n_contacts; +        } +} + +static int dht_del(struct dht *    dht, +                   const uint8_t * key, +                   uint64_t        addr) +{ +        struct dht_entry * e; + +        pthread_rwlock_wrlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -EPERM; +        } + +        dht_entry_del_addr(e, addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static buffer_t dht_retrieve(struct dht *    dht, +                             const uint8_t * key) +{ +        struct dht_entry * e; +        struct list_head * p; +        buffer_t           buf; +        uint64_t *         pos; + +        buf.len = 0; + +        pthread_rwlock_rdlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return buf; +        } + +        buf.data = malloc(sizeof(dht->addr) * e->n_vals); +        if (buf.data == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return buf; +        } + +        buf.len = e->n_vals; + +        pos = (uint64_t *) buf.data;; + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                *pos++ = v->addr; +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return buf; +} + +static ssize_t dht_get_contacts(struct dht *          dht, +                                const uint8_t *       key, +                                kad_contact_msg_t *** msgs) +{ +        struct list_head   l; +        struct list_head * p; +        struct list_head * h; +        size_t             len; +        size_t             i = 0; + +        list_head_init(&l); + +        pthread_rwlock_rdlock(&dht->lock); + +        len = dht_contact_list(dht, &l, key); +        if (len == 0) +                return 0; + +        *msgs = malloc(len * sizeof(**msgs)); +        if (*msgs == NULL) +                return 0; + +        list_for_each_safe(p, h, &l) { +                struct contact * c = list_entry(p, struct contact, next); +                (*msgs)[i] = malloc(sizeof(***msgs)); +                if ((*msgs)[i] == NULL) { +                        pthread_rwlock_unlock(&dht->lock); +                        while (i > 0) +                                free(*msgs[--i]); +                        free(*msgs); +                        return 0; +                } + +                kad_contact_msg__init((*msgs)[i]); + +                (*msgs)[i]->id.data = c->id; +                (*msgs)[i]->id.len  = dht->b; +                (*msgs)[i++]->addr  = c->addr; +                list_del(&c->next); +                free(c); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return i; +} + +static time_t gcd(time_t a, +                  time_t b) +{ +        if (a == 0) +                return b; + +        return gcd(b % a, a); +} + +static void * work(void * o) +{ +        struct dht *       dht; +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        struct list_head   reflist; +        time_t             intv; +        struct lookup *    lu; + +        dht = (struct dht *) o; + +        intv = gcd(dht->t_expire, dht->t_repub); +        intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + +        list_head_init(&reflist); + +        while (true) { +                clock_gettime(CLOCK_REALTIME_COARSE, &now); + +                pthread_rwlock_wrlock(&dht->lock); + +                /* Republish registered hashes. */ +                list_for_each_safe(p, h, &dht->refs) { +                        struct ref_entry * e; +                        e = list_entry(p, struct ref_entry, next); +                        if (now.tv_sec > e->t_rep) { +                                kad_publish(dht, e->key, dht->addr, +                                            dht->t_expire); +                                e->t_rep = now.tv_sec + dht->t_repub; +                        } +                } + +                /* Remove stale entries and republish if necessary. */ +                list_for_each_safe(p, h, &dht->entries) { +                        struct list_head * p1; +                        struct list_head * h1; +                        struct dht_entry * e; +                        e = list_entry (p, struct dht_entry, next); +                        list_for_each_safe(p1, h1, &e->vals) { +                                struct val * v; +                                v = list_entry(p1, struct val, next); +                                if (now.tv_sec > v->t_exp) { +                                        list_del(&v->next); +                                        val_destroy(v); +                                 } + +                                if (now.tv_sec > v->t_rep) { +                                        kad_publish(dht, e->key, v->addr, +                                                    dht->t_expire - now.tv_sec); +                                        v->t_rep = now.tv_sec + dht->t_replic; +                                } +                        } +                } + +                /* Check the requests list for unresponsive nodes. */ +                list_for_each_safe(p, h, &dht->requests) { +                        struct kad_req * r; +                        r = list_entry(p, struct kad_req, next); +                        if (now.tv_sec > r->t_exp) { +                                list_del(&r->next); +                                bmp_release(dht->cookies, r->cookie); +                                dht_dead_peer(dht, r->key, r->addr); +                                kad_req_destroy(r); +                        } +                } + +                /* Refresh unaccessed buckets. */ +                bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + +                pthread_rwlock_unlock(&dht->lock); + +                list_for_each_safe(p, h, &reflist) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        lu = kad_lookup(dht, c->id, KAD_FIND_NODE); +                        if (lu != NULL) +                                lookup_destroy(lu); +                        list_del(&c->next); +                        contact_destroy(c); +                } + +                sleep(intv); +        } + +        return (void *) 0; +} + +static int kad_handle_join_resp(struct dht *     dht, +                                struct kad_req * req, +                                kad_msg_t *      msg) +{ +        assert(dht); +        assert(req); +        assert(msg); + +        /* We might send version numbers later to warn of updates if needed. */ +        if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && +              msg->has_t_refresh && msg->has_t_replicate)) { +                log_warn("Join refused by remote."); +                return -1; +        } + +        if (msg->b < sizeof(uint64_t)) { +                log_err("Hash sizes less than 8 bytes unsupported."); +                return -1; +        } + +        pthread_rwlock_wrlock(&dht->lock); + +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        /* Likely corrupt packet. The member will refuse, we might here too. */ +        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) +                log_warn("Different kademlia parameters detected."); + +        if (msg->t_replicate != KAD_T_REPL) +                log_warn("Different kademlia replication time detected."); + +        if (msg->t_refresh != KAD_T_REFR) +                log_warn("Different kademlia refresh time detected."); + +        dht->k        = msg->k; +        dht->b        = msg->b; +        dht->t_expire = msg->t_expire; +        dht->t_repub  = MAX(1, dht->t_expire - 10); + +        if (pthread_create(&dht->worker, NULL, work, dht)) { +                bucket_destroy(dht->buckets); +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        dht->state = DHT_RUNNING; + +        kad_req_respond(req); + +        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + +        pthread_rwlock_unlock(&dht->lock); + +        log_dbg("Enrollment of DHT completed."); + +        return 0; +} + +static int kad_handle_find_resp(struct dht *     dht, +                                struct kad_req * req, +                                kad_msg_t *      msg) +{ +        struct lookup * lu; + +        assert(dht); +        assert(req); +        assert(msg); + +        pthread_rwlock_rdlock(&dht->lock); + +        lu = dht_find_lookup(dht, req->key); +        if (lu == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        lookup_update(dht, lu, msg); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static void kad_handle_response(struct dht * dht, +                                kad_msg_t *  msg) +{ +        struct kad_req * req; + +        assert(dht); +        assert(msg); + +        pthread_rwlock_wrlock(&dht->lock); + +        req = dht_find_request(dht, msg); +        if (req == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return; +        } + +        bmp_release(dht->cookies, req->cookie); +        list_del(&req->next); + +        pthread_rwlock_unlock(&dht->lock); + +        switch(req->code) { +        case KAD_JOIN: +                if (kad_handle_join_resp(dht, req, msg)) +                        log_err("Enrollment of DHT failed."); +                break; +        case KAD_FIND_VALUE: +        case KAD_FIND_NODE: +                if (dht_get_state(dht) != DHT_RUNNING) +                        return; +                kad_handle_find_resp(dht, req, msg); +                break; +        default: +                break; +        } + +        kad_req_destroy(req); +} + +int dht_bootstrap(struct dht * dht, +                  size_t       b, +                  time_t       t_expire) +{ +        assert(dht); + +        pthread_rwlock_wrlock(&dht->lock); + +        dht->id = create_id(b); +        if (dht->id == NULL) +                goto fail_id; + +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) +                goto fail_buckets; + +        dht->buckets->depth = 0; +        dht->buckets->mask  = 0; + +        dht->b        = b / CHAR_BIT; +        dht->t_expire = MAX(2, t_expire); +        dht->t_repub  = MAX(1, t_expire - 10); +        dht->k        = KAD_K; + +        if (pthread_create(&dht->worker, NULL, work, dht)) +                goto fail_pthread_create; + +        dht->state = DHT_RUNNING; + +        dht_update_bucket(dht, dht->id, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; + + fail_pthread_create: +        bucket_destroy(dht->buckets); +        dht->buckets = NULL; + fail_buckets: +        free(dht->id); +        dht->id = NULL; + fail_id: +        pthread_rwlock_unlock(&dht->lock); +        return -1; +} + +int dht_enroll(struct dht * dht, +               uint64_t     addr) +{ +        assert(dht); + +        return kad_join(dht, addr); +} + +int dht_reg(struct dht *    dht, +            const uint8_t * key) +{ +        struct ref_entry * e; + +        assert(dht); +        assert(key); +        assert(dht->addr != 0); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return -1; + +        e = ref_entry_create(dht, key); +        if (e == NULL) +                return -ENOMEM; + +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&e->next, &dht->refs); + +        pthread_rwlock_unlock(&dht->lock); + +        kad_publish(dht, key, dht->addr, dht->t_expire); + +        return 0; +} + +int dht_unreg(struct dht *    dht, +              const uint8_t * key) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(dht); +        assert(key); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        list_for_each_safe(p, h, &dht->refs) { +                struct ref_entry * r = list_entry(p, struct ref_entry, next); +                if (!memcmp(key, r->key, dht-> b) ) { +                        list_del(&r->next); +                        ref_entry_destroy(r); +                } +        } + +        dht_del(dht, key, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +uint64_t dht_query(struct dht *    dht, +                   const uint8_t * key) +{ +        struct dht_entry * e; +        struct lookup *    lu; +        uint64_t           addrs[KAD_K]; +        size_t             n; + +        addrs[0] = 0; + +        pthread_rwlock_rdlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e != NULL) +                addrs[0] = dht_entry_get_addr(dht, e); + +        pthread_rwlock_unlock(&dht->lock); + +        if (addrs[0] != 0 && addrs[0] != dht->addr) +                return addrs[0]; + +        lu = kad_lookup(dht, key, KAD_FIND_VALUE); +        if (lu == NULL) +                return 0; + +        n = lookup_get_addrs(lu, addrs); +        if (n == 0) { +                lookup_destroy(lu); +                return 0; +        } + +        lookup_destroy(lu); + +        /* Current behaviour is anycast and return the first peer address. */ +        if (addrs[0] != dht->addr) +                return addrs[0]; + +        if (n > 1) +                return addrs[1]; + +        return 0; +} + +void dht_post_sdu(void *               ae, +                  struct shm_du_buff * sdb) +{ +        struct dht *         dht; +        kad_msg_t *          msg; +        kad_contact_msg_t ** cmsgs; +        kad_msg_t            resp_msg = KAD_MSG__INIT; +        uint64_t             addr; +        buffer_t             buf; +        size_t               i; + +        assert(ae); +        assert(sdb); + +        memset(&buf, 0, sizeof(buf)); + +        dht = (struct dht *) ae; + +        msg = kad_msg__unpack(NULL, +                              shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), +                              shm_du_buff_head(sdb)); + +        ipcp_sdb_release(sdb); + +        if (msg == NULL) { +                log_err("Failed to unpack message."); +                return; +        } + +        if (msg->has_key && msg->key.len != dht->b) { +                kad_msg__free_unpacked(msg, NULL); +                log_warn("Bad key in message."); +                return; +        } + +        if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) { +                kad_msg__free_unpacked(msg, NULL); +                log_warn("Bad source ID in message of type %d.", msg->code); +                return; +        } + +        if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { +                kad_msg__free_unpacked(msg, NULL); +                return; +        } + +        addr = msg->s_addr; + +        resp_msg.code   = KAD_RESPONSE; +        resp_msg.cookie = msg->cookie; + +        switch(msg->code) { +        case KAD_JOIN: +                /* Refuse enrollee on check fails. */ +                if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { +                        log_warn("Parameter mismatch. " +                                 "DHT enrolment refused."); +                        break; +                } + +                if (msg->t_replicate != KAD_T_REPL) { +                        log_warn("Replication time mismatch. " +                                 "DHT enrolment refused."); + +                        break; +                } + +                if (msg->t_refresh != KAD_T_REFR) { +                        log_warn("Refresh time mismatch. " +                                 "DHT enrolment refused."); +                        break; +                } + +                resp_msg.has_alpha       = true; +                resp_msg.has_b           = true; +                resp_msg.has_k           = true; +                resp_msg.has_t_expire    = true; +                resp_msg.has_t_refresh   = true; +                resp_msg.has_t_replicate = true; +                resp_msg.alpha           = KAD_ALPHA; +                resp_msg.b               = dht->b; +                resp_msg.k               = KAD_K; +                resp_msg.t_expire        = dht->t_expire; +                resp_msg.t_refresh       = KAD_T_REFR; +                resp_msg.t_replicate     = KAD_T_REPL; +                break; +        case KAD_FIND_VALUE: +                buf = dht_retrieve(dht, msg->key.data); +                if (buf.len != 0) { +                        resp_msg.n_addrs = buf.len; +                        resp_msg.addrs   = (uint64_t *) buf.data; +                        break; +                } +                /* FALLTHRU */ +        case KAD_FIND_NODE: +                /* Return k closest contacts. */ +                resp_msg.n_contacts = +                        dht_get_contacts(dht, msg->key.data, &cmsgs); +                resp_msg.contacts = cmsgs; +                break; +        case KAD_STORE: +                if (msg->n_contacts < 1) { +                        log_warn("No contacts in store message."); +                        break; +                } + +                if (!msg->has_t_expire) { +                        log_warn("No expiry time in store message."); +                        break; +                } + +                kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); +                break; +        case KAD_RESPONSE: +                kad_handle_response(dht, msg); +                break; +        default: +                assert(false); +                break; +        } + +        if (msg->code != KAD_JOIN) { +                pthread_rwlock_wrlock(&dht->lock); +                if (dht_update_bucket(dht, msg->s_id.data, addr)) +                        log_warn("Failed to update bucket."); +                pthread_rwlock_unlock(&dht->lock); +        } + +        if (msg->code < KAD_STORE) { +                if (send_msg(dht, &resp_msg, addr)) +                        log_warn("Failed to send response."); +        } + +        kad_msg__free_unpacked(msg, NULL); + +        if (resp_msg.n_addrs > 0) +                free(resp_msg.addrs); + +        if (resp_msg.n_contacts == 0) +                return; + +        for (i = 0; i < resp_msg.n_contacts; ++i) +                kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); +        free(resp_msg.contacts); +} + +void dht_destroy(struct dht * dht) +{ +        struct list_head * p; +        struct list_head * h; + +        if (dht == NULL) +                return; + +        if (dht_get_state(dht) == DHT_RUNNING) +                dht_set_state(dht, DHT_SHUTDOWN); + +        pthread_rwlock_wrlock(&dht->lock); + +        list_for_each_safe(p, h, &dht->entries) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                list_del(&e->next); +                dht_entry_destroy(e); +        } + +        list_for_each_safe(p, h, &dht->requests) { +                struct kad_req * r = list_entry(p, struct kad_req, next); +                list_del(&r->next); +                free(r); +        } + +        list_for_each_safe(p, h, &dht->refs) { +                struct ref_entry * e = list_entry(p, struct ref_entry, next); +                list_del(&e->next); +                ref_entry_destroy(e); +        } + +        list_for_each_safe(p, h, &dht->lookups) { +                struct lookup * l = list_entry(p, struct lookup, next); +                list_del(&l->next); +                lookup_destroy(l); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        if (dht_get_state(dht) == DHT_SHUTDOWN) { +                pthread_cancel(dht->worker); +                pthread_join(dht->worker, NULL); +        } + +        if (dht->buckets != NULL) +                bucket_destroy(dht->buckets); + +        bmp_destroy(dht->cookies); + +        pthread_mutex_destroy(&dht->mtx); + +        pthread_rwlock_destroy(&dht->lock); + +        free(dht->id); + +        free(dht); +} + +struct dht * dht_create(uint64_t addr) +{ +        struct dht * dht; + +        dht = malloc(sizeof(*dht)); +        if (dht == NULL) +                goto fail_malloc; + +        dht->buckets = NULL; + +        list_head_init(&dht->entries); +        list_head_init(&dht->requests); +        list_head_init(&dht->refs); +        list_head_init(&dht->lookups); + +        if (pthread_rwlock_init(&dht->lock, NULL)) +                goto fail_rwlock; + +        if (pthread_mutex_init(&dht->mtx, NULL)) +                goto fail_mutex; + +        dht->cookies = bmp_create(DHT_MAX_REQS, 1); +        if (dht->cookies == NULL) +                goto fail_bmp; + +        dht->b    = 0; +        dht->addr = addr; +        dht->id   = NULL; +#ifndef __DHT_TEST__ +        dht->fd   = dt_reg_ae(dht, &dht_post_sdu); +#endif /* __DHT_TEST__ */ + +        dht->state = DHT_INIT; + +        return dht; + + fail_bmp: +        pthread_mutex_destroy(&dht->mtx); + fail_mutex: +        pthread_rwlock_destroy(&dht->lock); + fail_rwlock: +        free(dht); + fail_malloc: +        return NULL; +} diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h new file mode 100644 index 00000000..5d7fc894 --- /dev/null +++ b/src/ipcpd/normal/dht.h @@ -0,0 +1,54 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_DHT_H +#define OUROBOROS_IPCPD_NORMAL_DHT_H + +#include <ouroboros/ipcp-dev.h> + +#include <stdint.h> +#include <sys/types.h> + +struct dht; + +struct dht * dht_create(uint64_t addr); + +int          dht_bootstrap(struct dht * dht, +                           size_t       b, +                           time_t       t_expire); + +int          dht_enroll(struct dht * dht, +                        uint64_t     addr); + +void         dht_destroy(struct dht * dht); + +int          dht_reg(struct dht *    dht, +                     const uint8_t * key); + +int          dht_unreg(struct dht *    dht, +                       const uint8_t * key); + +uint64_t     dht_query(struct dht *    dht, +                       const uint8_t * key); + +#endif /* OUROBOROS_IPCPD_NORMAL_DHT_H */ diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 5ea8a300..69b7e90e 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -20,129 +20,134 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ +#define OUROBOROS_PREFIX "directory" +  #include <ouroboros/config.h> +#include <ouroboros/endian.h>  #include <ouroboros/errno.h> +#include <ouroboros/logs.h>  #include <ouroboros/rib.h> +#include <ouroboros/utils.h>  #include "dir.h" +#include "dht.h"  #include "ipcp.h"  #include "ribconfig.h"  #include <stdlib.h>  #include <string.h>  #include <assert.h> +#include <inttypes.h> -static char dir_path[RIB_MAX_PATH_LEN + 1]; +#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) +#define ENROL_RETR 6 +#define ENROL_INTV 1 -static void dir_path_reset(void) { -        dir_path[strlen(DIR_PATH)]= '\0'; -        assert(strcmp(DIR_PATH, dir_path) == 0); -} +struct dht * dht; -int dir_init(void) +static uint64_t find_peer_addr(void)  { -        /* FIXME: set ribmgr dissemination here */ -        if (rib_add(RIB_ROOT, DIR_NAME)) -                return -1; +        ssize_t  i; +        char ** members; +        ssize_t n_members; +        size_t  reset; +        char    path[RIB_MAX_PATH_LEN + 1]; -        strcpy(dir_path, DIR_PATH); +        strcpy(path, MEMBERS_PATH); -        return 0; -} +        reset = strlen(path); -int dir_fini(void) -{ -        /* FIXME: remove ribmgr dissemination here*/ +        n_members = rib_children(path, &members); +        if (n_members == 1) { +                freepp(ssize_t, members, n_members); +                return 0; +        } + +        for (i = 0; i < n_members; ++i) { +                uint64_t addr; +                rib_path_append(path, members[i]); +                if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) { +                        log_err("Failed to read address from RIB."); +                        freepp(ssize_t, members, n_members); +                        return ipcpi.dt_addr; +                } + +                if (addr != ipcpi.dt_addr) { +                        freepp(ssize_t, members, n_members); +                        return addr; +                } + +                path[reset] = '\0'; +        } -        dir_path_reset(); -        rib_del(dir_path); +        freepp(ssize_t, members, n_members);          return 0;  } -int dir_reg(const uint8_t * hash) +int dir_init()  { -        char hashstr[ipcp_dir_hash_strlen() + 1]; -        int ret; - -        assert(hash); +        uint64_t addr; -        dir_path_reset(); - -        ipcp_hash_str(hashstr, hash); - -        ret = rib_add(dir_path, hashstr); -        if (ret == -ENOMEM) -                 return -ENOMEM; - -        rib_path_append(dir_path, hashstr); +        dht = dht_create(ipcpi.dt_addr); +        if (dht == NULL) +                return -ENOMEM; -        ret = rib_add(dir_path, ipcpi.name); -        if (ret == -EPERM) +        addr = find_peer_addr(); +        if (addr == ipcpi.dt_addr) { +                log_err("Failed to get peer address."); +                dht_destroy(dht);                  return -EPERM; -        if (ret == -ENOMEM) { -                if (rib_children(dir_path, NULL) == 0) -                        rib_del(dir_path); -                return -ENOMEM;          } -        return 0; -} - -int dir_unreg(const uint8_t * hash) -{ -        char hashstr[ipcp_dir_hash_strlen() + 1]; -        size_t len; - -        assert(hash); +        if (addr != 0) { +                size_t retr = 0; +                log_dbg("Enrolling directory with peer %" PRIu64 ".", addr); +                /* NOTE: we could try other members if dht_enroll times out. */ +                while (dht_enroll(dht, addr)) { +                        if (retr++ == ENROL_RETR) { +                                dht_destroy(dht); +                                return -EPERM; +                        } -        dir_path_reset(); +                        log_dbg("Directory enrollment failed, retrying..."); +                        sleep(ENROL_INTV); +                } -        ipcp_hash_str(hashstr, hash); +                log_dbg("Directory enrolled."); -        rib_path_append(dir_path, hashstr); - -        if (!rib_has(dir_path))                  return 0; +        } -        len = strlen(dir_path); - -        rib_path_append(dir_path, ipcpi.name); - -        rib_del(dir_path); +        log_dbg("Bootstrapping directory."); -        dir_path[len] = '\0'; +        /* TODO: get parameters for bootstrap from IRM tool. */ +        if (dht_bootstrap(dht, KAD_B, 86400)) { +                dht_destroy(dht); +                return -ENOMEM; +        } -        if (rib_children(dir_path, NULL) == 0) -                rib_del(dir_path); +        log_dbg("Directory bootstrapped.");          return 0;  } -int dir_query(const uint8_t * hash) +void dir_fini(void)  { -        char hashstr[ipcp_dir_hash_strlen() + 1]; -        size_t len; - -        dir_path_reset(); - -        ipcp_hash_str(hashstr, hash); - -        rib_path_append(dir_path, hashstr); - -        if (!rib_has(dir_path)) -                return -1; - -        /* FIXME: assert after local IPCP is deprecated */ -        len = strlen(dir_path); +        dht_destroy(dht); +} -        rib_path_append(dir_path, ipcpi.name); +int dir_reg(const uint8_t * hash) +{ +        return dht_reg(dht, hash); +} -        if (rib_has(dir_path)) { -                dir_path[len] = '\0'; -                if (rib_children(dir_path, NULL) == 1) -                        return -1; -        } +int dir_unreg(const uint8_t * hash) +{ +        return dht_unreg(dht, hash); +} -        return 0; +uint64_t dir_query(const uint8_t * hash) +{ +        return dht_query(dht, hash);  } diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h index 1b28a5c0..4091a3e8 100644 --- a/src/ipcpd/normal/dir.h +++ b/src/ipcpd/normal/dir.h @@ -23,14 +23,14 @@  #ifndef OUROBOROS_IPCPD_NORMAL_DIR_H  #define OUROBOROS_IPCPD_NORMAL_DIR_H -int dir_init(void); +int      dir_init(void); -int dir_fini(void); +void     dir_fini(void); -int dir_reg(const uint8_t * hash); +int      dir_reg(const uint8_t * hash); -int dir_unreg(const uint8_t * hash); +int      dir_unreg(const uint8_t * hash); -int dir_query(const uint8_t * hash); +uint64_t dir_query(const uint8_t * hash);  #endif /* OUROBOROS_IPCPD_NORMAL_DIR_H */ diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 1867c13b..5fcc5865 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -50,7 +50,7 @@  #include <assert.h>  struct ae_info { -        int    (*post_sdu)(void * ae, struct shm_du_buff * sdb); +        void   (* post_sdu)(void * ae, struct shm_du_buff * sdb);          void * ae;  }; @@ -131,11 +131,14 @@ static int sdu_handler(int                  fd,                          return 0;                  } -                if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) { +                if (dt.aes[dt_pci.fd].post_sdu == NULL) { +                        log_err("No registered AE on fd %d.", dt_pci.fd);                          ipcp_sdb_release(sdb); -                        return -1; +                        return -EPERM;                  } +                dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb); +                  return 0;          } @@ -295,7 +298,7 @@ void dt_stop(void)  }  int dt_reg_ae(void * ae, -              int (* func)(void * func, struct shm_du_buff *)) +              void (* func)(void * func, struct shm_du_buff *))  {          int res_fd; @@ -330,10 +333,11 @@ int dt_write_sdu(uint64_t             dst_addr,          struct dt_pci dt_pci;          assert(sdb); +        assert(dst_addr != ipcpi.dt_addr);          fd = pff_nhop(dt.pff[qc], dst_addr);          if (fd < 0) { -                log_err("Could not get nhop for addr %" PRIu64 ".", dst_addr); +                log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);                  return -1;          } diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 0e1a8cc3..15ef51f0 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -38,7 +38,7 @@ int  dt_start(void);  void dt_stop(void);  int  dt_reg_ae(void * ae, -               int (* func)(void * ae, struct shm_du_buff * sdb)); +               void (* func)(void * ae, struct shm_du_buff * sdb));  int  dt_write_sdu(uint64_t             dst_addr,                    qoscube_t            qc, diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 40a680c3..704f4f16 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -30,6 +30,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/ipcp-dev.h> +#include "dir.h"  #include "dt_pci.h"  #include "fa.h"  #include "sdu_sched.h" @@ -79,8 +80,8 @@ static void destroy_conn(int fd)          fa.r_addr[fd] = INVALID_ADDR;  } -static int fa_post_sdu(void *               ae, -                       struct shm_du_buff * sdb) +static void fa_post_sdu(void *               ae, +                        struct shm_du_buff * sdb)  {          struct timespec    ts  = {0, TIMEOUT * 1000};          struct timespec    abstime; @@ -98,9 +99,12 @@ static int fa_post_sdu(void *               ae,                                       shm_du_buff_tail(sdb) -                                       shm_du_buff_head(sdb),                                       shm_du_buff_head(sdb)); + +        ipcp_sdb_release(sdb); +          if (msg == NULL) {                  log_err("Failed to unpack flow alloc message."); -                return -1; +                return;          }          switch (msg->code) { @@ -113,7 +117,7 @@ static int fa_post_sdu(void *               ae,                          log_err("Bad flow request.");                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; +                        return;                  }                  while (ipcpi.alloc_id != -1 && @@ -128,7 +132,7 @@ static int fa_post_sdu(void *               ae,                          log_dbg("Won't allocate over non-operational IPCP.");                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; +                        return;                  }                  assert(ipcpi.alloc_id == -1); @@ -141,7 +145,7 @@ static int fa_post_sdu(void *               ae,                          pthread_mutex_unlock(&ipcpi.alloc_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          log_err("Failed to get fd for flow."); -                        return -1; +                        return;                  }                  pthread_rwlock_wrlock(&fa.flows_lock); @@ -173,13 +177,10 @@ static int fa_post_sdu(void *               ae,          default:                  log_err("Got an unknown flow allocation message.");                  flow_alloc_msg__free_unpacked(msg, NULL); -                return -1; +                return;          }          flow_alloc_msg__free_unpacked(msg, NULL); -        ipcp_sdb_release(sdb); - -        return 0;  }  int fa_init(void) @@ -240,47 +241,10 @@ int fa_alloc(int             fd,               qoscube_t       qc)  {          flow_alloc_msg_t     msg = FLOW_ALLOC_MSG__INIT; -        char                 path[RIB_MAX_PATH_LEN + 1];          uint64_t             addr; -        ssize_t              ch; -        ssize_t              i; -        char **              children; -        char                 hashstr[ipcp_dir_hash_strlen() + 1]; -        char *               dst_ipcp = NULL;          struct shm_du_buff * sdb; -        ipcp_hash_str(hashstr, dst); - -        assert(strlen(hashstr) + strlen(DIR_PATH) + 1 -               < RIB_MAX_PATH_LEN); - -        strcpy(path, DIR_PATH); - -        rib_path_append(path, hashstr); - -        ch = rib_children(path, &children); -        if (ch <= 0) -                return -1; - -        for (i = 0; i < ch; ++i) -                if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) -                        dst_ipcp = children[i]; -                else -                        free(children[i]); - -        free(children); - -        if (dst_ipcp == NULL) -                return -1; - -        strcpy(path, MEMBERS_PATH); - -        rib_path_append(path, dst_ipcp); - -        free(dst_ipcp); - -        if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) -                return -1; +        addr = dir_query(dst);          msg.code         = FLOW_ALLOC_CODE__FLOW_REQ;          msg.has_hash     = true; diff --git a/src/ipcpd/normal/kademlia.proto b/src/ipcpd/normal/kademlia.proto new file mode 100644 index 00000000..0b7e8beb --- /dev/null +++ b/src/ipcpd/normal/kademlia.proto @@ -0,0 +1,46 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * KAD protocol + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +syntax = "proto2"; + +message kad_contact_msg { +        required bytes  id   = 1; +        required uint64 addr = 2; +}; + +message kad_msg { +        required uint32 code              =  1; +        required uint32 cookie            =  2; +        required uint64 s_addr            =  3; +        optional bytes  s_id              =  4; +        optional bytes  key               =  5; +        repeated uint64 addrs             =  6; +        repeated kad_contact_msg contacts =  7; +        // enrolment parameters +        optional uint32 alpha             =  8; +        optional uint32 b                 =  9; +        optional uint32 k                 = 10; +        optional uint32 t_expire          = 11; +        optional uint32 t_refresh         = 12; +        optional uint32 t_replicate       = 13; +};
\ No newline at end of file diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 8c28de78..f94c15de 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -116,11 +116,6 @@ static int boot_components(void)          log_dbg("Starting ribmgr."); -        if (dir_init()) { -                log_err("Failed to initialize directory."); -                goto fail_dir; -        } -          if (ribmgr_init()) {                  log_err("Failed to initialize RIB manager.");                  goto fail_ribmgr; @@ -148,6 +143,11 @@ static int boot_components(void)                  goto fail_fa_start;          } +        if (dir_init()) { +                log_err("Failed to initialize directory."); +                goto fail_dir; +        } +          if (enroll_start()) {                  log_err("Failed to start enroll.");                  goto fail_enroll_start; @@ -166,6 +166,8 @@ static int boot_components(void)          ipcp_set_state(IPCP_INIT);          enroll_stop();   fail_enroll_start: +        dir_fini(); + fail_dir:          fa_stop();   fail_fa_start:          dt_stop(); @@ -176,8 +178,6 @@ static int boot_components(void)   fail_dt:          ribmgr_fini();   fail_ribmgr: -        dir_fini(); - fail_dir:          addr_auth_fini();   fail_addr_auth:          free(ipcpi.dif_name); @@ -191,6 +191,8 @@ void shutdown_components(void)          enroll_stop(); +        dir_fini(); +          fa_stop();          dt_stop(); @@ -201,8 +203,6 @@ void shutdown_components(void)          ribmgr_fini(); -        dir_fini(); -          addr_auth_fini();          free(ipcpi.dif_name); @@ -227,10 +227,9 @@ static int normal_ipcp_enroll(const char *      dst,                  return -1;          } -        log_dbg("Enrolled with " HASH_FMT, HASH_VAL(dst)); +        log_dbg("Enrolled with %s.", dst);          info->dir_hash_algo = ipcpi.dir_hash_algo; -          strcpy(info->dif_name, ipcpi.dif_name);          return 0; @@ -347,12 +346,17 @@ static int normal_ipcp_bootstrap(const struct ipcp_config * conf)          return 0;  } +static int normal_ipcp_query(const uint8_t * dst) +{ +        return dir_query(dst) ? 0 : -1; +} +  static struct ipcp_ops normal_ops = {          .ipcp_bootstrap       = normal_ipcp_bootstrap,          .ipcp_enroll          = normal_ipcp_enroll,          .ipcp_reg             = dir_reg,          .ipcp_unreg           = dir_unreg, -        .ipcp_query           = dir_query, +        .ipcp_query           = normal_ipcp_query,          .ipcp_flow_alloc      = fa_alloc,          .ipcp_flow_alloc_resp = fa_alloc_resp,          .ipcp_flow_dealloc    = fa_dealloc diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c index e709da7c..0907cf7a 100644 --- a/src/ipcpd/normal/pol/flat.c +++ b/src/ipcpd/normal/pol/flat.c @@ -56,7 +56,7 @@ static int addr_taken(char *  name,          char path[RIB_MAX_PATH_LEN + 1];          size_t reset; -        strcpy(path, "/" MEMBERS_NAME); +        strcpy(path, MEMBERS_PATH);          reset = strlen(path); @@ -102,7 +102,7 @@ uint64_t flat_address(void)          char ** members;          ssize_t n_members; -        strcpy(path, "/" MEMBERS_NAME); +        strcpy(path, MEMBERS_PATH);          if (!rib_has(path)) {                  log_err("Could not read members from RIB."); diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h index 31c79fbe..db1ff1bb 100644 --- a/src/ipcpd/normal/ribconfig.h +++ b/src/ipcpd/normal/ribconfig.h @@ -29,9 +29,7 @@  #define DLR          "/"  #define BOOT_NAME    "boot"  #define MEMBERS_NAME "members" -#define DIR_NAME     "directory"  #define ROUTING_NAME "fsdb" -#define DIR_PATH     DLR DIR_NAME  #define BOOT_PATH    DLR BOOT_NAME  #define MEMBERS_PATH DLR MEMBERS_NAME  #define ROUTING_PATH DLR ROUTING_NAME diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 266a628d..3beb917c 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -299,9 +299,8 @@ static void * sync_rib(void *o)                          rib_path_append(path, children[--ch]);                          free(children[ch]); -                        /* Only sync fsdb, members and directory */ +                        /* Only sync fsdb and members */                          if (strcmp(path, MEMBERS_PATH) == 0 -                            || strcmp(path, DIR_PATH) == 0                              || strcmp(path, ROUTING_PATH) == 0)                                  ribmgr_sync(path);                  } diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@  struct sdu_sched {          flow_set_t * set[QOS_CUBE_MAX]; -        fqueue_t *   fqs[QOS_CUBE_MAX];          next_sdu_t   callback; -        pthread_t    sdu_reader; +        pthread_t    sdu_readers[IPCP_SCHED_THREADS];  }; +static void cleanup_reader(void * o) +{ +        int         i; +        fqueue_t ** fqs = (fqueue_t **) o; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                fqueue_destroy(fqs[i]); +} +  static void * sdu_reader(void * o)  {          struct sdu_sched *   sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o)          int                  fd;          int                  i = 0;          int                  ret; +        fqueue_t *           fqs[QOS_CUBE_MAX];          sched = (struct sdu_sched *) o; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fqs[i] = fqueue_create(); +                if (fqs[i] == NULL) { +                        int j; +                        for (j = 0; j < i; ++j) +                                fqueue_destroy(fqs[j]); +                        return (void *) -1; +                } +        } + +        pthread_cleanup_push(cleanup_reader, fqs); +          while (true) {                  /* FIXME: replace with scheduling policy call */                  i = (i + 1) % QOS_CUBE_MAX; -                ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); +                ret = flow_event_wait(sched->set[i], fqs[i], &timeout);                  if (ret == -ETIMEDOUT)                          continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o)                          continue;                  } -                while ((fd = fqueue_next(sched->fqs[i])) >= 0) { +                while ((fd = fqueue_next(fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  log_warn("Failed to read SDU from fd %d.", fd);                                  continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o)                  }          } +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)          sdu_sched = malloc(sizeof(*sdu_sched));          if (sdu_sched == NULL) -                return NULL; +                goto fail_malloc;          sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)                  if (sdu_sched->set[i] == NULL) {                          for (j = 0; j < i; ++j)                                  flow_set_destroy(sdu_sched->set[j]); -                        goto fail_sdu_sched; +                        goto fail_flow_set;                  }          } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                sdu_sched->fqs[i] = fqueue_create(); -                if (sdu_sched->fqs[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                fqueue_destroy(sdu_sched->fqs[j]); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                if (pthread_create(&sdu_sched->sdu_readers[i], NULL, +                                   sdu_reader, sdu_sched)) { +                        int j; +                        for (j = 0; j < i; ++j) { +                                pthread_cancel(sdu_sched->sdu_readers[j]); +                                pthread_join(sdu_sched->sdu_readers[j], NULL); +                        }                          goto fail_flow_set;                  }          } -        pthread_create(&sdu_sched->sdu_reader, -                       NULL, -                       sdu_reader, -                       (void *) sdu_sched); -          return sdu_sched;   fail_flow_set: -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched:           free(sdu_sched); + fail_malloc:           return NULL;  } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)          assert(sdu_sched); -        pthread_cancel(sdu_sched->sdu_reader); - -        pthread_join(sdu_sched->sdu_reader, NULL); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                pthread_cancel(sdu_sched->sdu_readers[i]); +                pthread_join(sdu_sched->sdu_readers[i], NULL); +        } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fqueue_destroy(sdu_sched->fqs[i]); +        for (i = 0; i < QOS_CUBE_MAX; ++i)                  flow_set_destroy(sdu_sched->set[i]); -        }          free(sdu_sched);  } diff --git a/src/ipcpd/normal/tests/CMakeLists.txt b/src/ipcpd/normal/tests/CMakeLists.txt new file mode 100644 index 00000000..d975caf6 --- /dev/null +++ b/src/ipcpd/normal/tests/CMakeLists.txt @@ -0,0 +1,37 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR +  ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR +  ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c +  # Add new tests here +  dht_test.c +) + +set_source_files_properties(${KAD_PROTO_SRCS} PROPERTIES GENERATED TRUE) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} +  ${KAD_PROTO_SRCS}) +target_link_libraries(${PARENT_DIR}_test ouroboros) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +remove(tests_to_run test_suite.c) + +foreach (test ${tests_to_run}) +  get_filename_component(test_name ${test} NAME_WE) +  add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/normal/tests/dht_test.c b/src/ipcpd/normal/tests/dht_test.c new file mode 100644 index 00000000..861ae10a --- /dev/null +++ b/src/ipcpd/normal/tests/dht_test.c @@ -0,0 +1,99 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Unit tests of the DHT AE + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define __DHT_TEST__ + +#include "dht.c" + +#include <pthread.h> +#include <time.h> +#include <stdlib.h> +#include <stdio.h> + +#define KEY_LEN  32 + +#define EXP      86400 +#define CONTACTS 1000 + +int dht_test(int     argc, +             char ** argv) +{ +        struct dht * dht; +        uint64_t     addr = 0x0D1F; +        uint8_t      key[KEY_LEN]; +        size_t       i; + +        (void) argc; +        (void) argv; + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to create dht.\n"); +                return -1; +        } + +        dht_destroy(dht); + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to re-create dht.\n"); +                return -1; +        } + +        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +                printf("Failed to bootstrap dht.\n"); +                dht_destroy(dht); +                return -1; +        } + +        dht_destroy(dht); + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to re-create dht.\n"); +                return -1; +        } + +        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +                printf("Failed to bootstrap dht.\n"); +                dht_destroy(dht); +                return -1; +        } + +        for (i = 0; i < CONTACTS; ++i) { +                uint64_t addr; +                random_buffer(&addr, sizeof(addr)); +                random_buffer(key, KEY_LEN); +                pthread_rwlock_wrlock(&dht->lock); +                if (dht_update_bucket(dht, key, addr)) { +                        pthread_rwlock_unlock(&dht->lock); +                        printf("Failed to update bucket.\n"); +                        dht_destroy(dht); +                        return -1; +                } +                pthread_rwlock_unlock(&dht->lock); +        } + +        dht_destroy(dht); + +        return 0; +} | 
