diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2021-12-04 18:26:58 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2021-12-06 17:52:16 +0100 | 
| commit | 9422e6be94ac1007e8115a920379fd545055e531 (patch) | |
| tree | 31075ad5ee851ef4625e3cafbd821e591e817997 /src/ipcpd | |
| parent | 11d2ecc140486949c8d81e984137263ca48d5799 (diff) | |
| download | ouroboros-9422e6be94ac1007e8115a920379fd545055e531.tar.gz ouroboros-9422e6be94ac1007e8115a920379fd545055e531.zip | |
ipcpd: Move DHT to stack
This makes the DHT a single directory implementation and moves it to
the stack (init/fini instead of create/destroy). This is a step
towards making it a directory policy, in line with our other policy
implementations.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/unicast/dht.c | 819 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dht.h | 21 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir.c | 24 | ||||
| -rw-r--r-- | src/ipcpd/unicast/tests/dht_test.c | 44 | 
4 files changed, 410 insertions, 498 deletions
| diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c index 2b668f9f..f7cb89f2 100644 --- a/src/ipcpd/unicast/dht.c +++ b/src/ipcpd/unicast/dht.c @@ -47,6 +47,7 @@  #include "common/connmgr.h"  #include "dht.h"  #include "dt.h" +#include "ipcp.h"  #include <stdlib.h>  #include <string.h> @@ -208,7 +209,7 @@ struct cmd {          struct shm_du_buff * sdb;  }; -struct dht { +struct {          size_t           alpha;          size_t           b;          size_t           k; @@ -244,15 +245,13 @@ struct dht {          struct tpm *     tpm;          pthread_t        worker; -}; +} dht;  struct join_info { -        struct dht * dht;          uint64_t     addr;  };  struct packet_info { -        struct dht *         dht;          struct shm_du_buff * sdb;  }; @@ -270,50 +269,49 @@ static uint8_t * dht_dup_key(const uint8_t * key,          return dup;  } -static enum dht_state dht_get_state(struct dht * dht) +static enum dht_state dht_get_state(void)  {          enum dht_state state; -        pthread_mutex_lock(&dht->mtx); +        pthread_mutex_lock(&dht.mtx); -        state = dht->state; +        state = dht.state; -        pthread_mutex_unlock(&dht->mtx); +        pthread_mutex_unlock(&dht.mtx);          return state;  } -static int dht_set_state(struct dht *   dht, -                         enum dht_state state) +static int dht_set_state(enum dht_state state)  { -        pthread_mutex_lock(&dht->mtx); +        pthread_mutex_lock(&dht.mtx); -        if (state == DHT_JOINING && dht->state != DHT_INIT) { -                 pthread_mutex_unlock(&dht->mtx); +        if (state == DHT_JOINING && dht.state != DHT_INIT) { +                 pthread_mutex_unlock(&dht.mtx);                   return -1;          } -        dht->state = state; +        dht.state = state; -        pthread_cond_broadcast(&dht->cond); +        pthread_cond_broadcast(&dht.cond); -        pthread_mutex_unlock(&dht->mtx); +        pthread_mutex_unlock(&dht.mtx);          return 0;  } -int dht_wait_running(struct dht * dht) +int dht_wait_running()  {          int ret = 0; -        pthread_mutex_lock(&dht->mtx); +        pthread_mutex_lock(&dht.mtx); -        pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); +        pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); -        while (dht->state == DHT_JOINING) -                pthread_cond_wait(&dht->cond, &dht->mtx); +        while (dht.state == DHT_JOINING) +                pthread_cond_wait(&dht.cond, &dht.mtx); -        if (dht->state != DHT_RUNNING) +        if (dht.state != DHT_RUNNING)                  ret = -1;          pthread_cleanup_pop(true); @@ -337,8 +335,7 @@ static uint8_t * create_id(size_t len)          return id;  } -static void kad_req_create(struct dht * dht, -                           kad_msg_t *  msg, +static void kad_req_create(kad_msg_t *  msg,                             uint64_t     addr)  {          struct kad_req *   req; @@ -361,9 +358,9 @@ static void kad_req_create(struct dht * dht,          req->code   = msg->code;          req->key    = NULL; -        pthread_rwlock_rdlock(&dht->lock); -        b = dht->b; -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); +        b = dht.b; +        pthread_rwlock_unlock(&dht.lock);          if (msg->has_key) {                  req->key = dht_dup_key(msg->key.data, b); @@ -394,11 +391,11 @@ static void kad_req_create(struct dht * dht,          pthread_condattr_destroy(&cattr); -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        list_add(&req->next, &dht->requests); +        list_add(&req->next, &dht.requests); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);  }  static void cancel_req_destroy(void * o) @@ -556,12 +553,11 @@ static struct bucket * iter_bucket(struct bucket * b,          return iter_bucket(b->children[(byte & mask)], id);  } -static struct bucket * dht_get_bucket(struct dht *    dht, -                                      const uint8_t * id) +static struct bucket * dht_get_bucket(const uint8_t * id)  { -        assert(dht->buckets); +        assert(dht.buckets); -        return iter_bucket(dht->buckets, id); +        return iter_bucket(dht.buckets, id);  }  /* @@ -596,8 +592,7 @@ static size_t list_add_sorted(struct list_head * l,          return 1;  } -static size_t dht_contact_list(struct dht *       dht, -                               struct list_head * l, +static size_t dht_contact_list(struct list_head * l,                                 const uint8_t *    key)  {          struct list_head * p; @@ -607,55 +602,52 @@ static size_t dht_contact_list(struct dht *       dht,          struct timespec    t;          assert(l); -        assert(dht);          assert(key);          assert(list_is_empty(l));          clock_gettime(CLOCK_REALTIME_COARSE, &t); -        b = dht_get_bucket(dht, key); +        b = dht_get_bucket(key);          if (b == NULL)                  return 0;          b->t_refr = t.tv_sec + KAD_T_REFR; -        if (b->n_contacts == dht->k || b->parent == NULL) { +        if (b->n_contacts == dht.k || b->parent == NULL) {                  list_for_each(p, &b->contacts) {                          struct contact * c;                          c = list_entry(p, struct contact, next); -                        c = contact_create(c->id, dht->b, c->addr); +                        c = contact_create(c->id, dht.b, c->addr);                          if (list_add_sorted(l, c, key) == 1) -                                if (++len == dht->k) +                                if (++len == dht.k)                                          break;                  }          } else {                  struct bucket * d = b->parent; -                for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { +                for (i = 0; i < (1L << KAD_BETA) && len < dht.k; ++i) {                          list_for_each(p, &d->children[i]->contacts) {                                  struct contact * c;                                  c = list_entry(p, struct contact, next); -                                c = contact_create(c->id, dht->b, c->addr); +                                c = contact_create(c->id, dht.b, c->addr);                                  if (c == NULL)                                          continue;                                  if (list_add_sorted(l, c, key) == 1) -                                        if (++len == dht->k) +                                        if (++len == dht.k)                                                  break;                          }                  }          } -        assert(len == dht->k || b->parent == NULL); +        assert(len == dht.k || b->parent == NULL);          return len;  } -static struct lookup * lookup_create(struct dht *    dht, -                                     const uint8_t * id) +static struct lookup * lookup_create(const uint8_t * id)  {          struct lookup *    lu;          pthread_condattr_t cattr; -        assert(dht);          assert(id);          lu = malloc(sizeof(*lu)); @@ -668,7 +660,7 @@ static struct lookup * lookup_create(struct dht *    dht,          lu->state   = LU_INIT;          lu->addrs   = NULL;          lu->n_addrs = 0; -        lu->key     = dht_dup_key(id, dht->b); +        lu->key     = dht_dup_key(id, dht.b);          if (lu->key == NULL)                  goto fail_id; @@ -685,13 +677,13 @@ static struct lookup * lookup_create(struct dht *    dht,          pthread_condattr_destroy(&cattr); -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        list_add(&lu->next, &dht->lookups); +        list_add(&lu->next, &dht.lookups); -        lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); +        lu->n_contacts = dht_contact_list(&lu->contacts, id); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return lu; @@ -770,8 +762,7 @@ static void lookup_destroy(struct lookup * lu)          pthread_cleanup_pop(true);  } -static void lookup_update(struct dht *    dht, -                          struct lookup * lu, +static void lookup_update(struct lookup * lu,                            kad_msg_t *     msg)  {          struct list_head * p = NULL; @@ -784,7 +775,7 @@ static void lookup_update(struct dht *    dht,          assert(lu);          assert(msg); -        if (dht_get_state(dht) != DHT_RUNNING) +        if (dht_get_state() != DHT_RUNNING)                  return;          pthread_mutex_lock(&lu->lock); @@ -820,16 +811,16 @@ static void lookup_update(struct dht *    dht,          pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock);          while (lu->state == LU_INIT) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  pthread_cond_wait(&lu->cond, &lu->lock); -                pthread_rwlock_rdlock(&dht->lock); +                pthread_rwlock_rdlock(&dht.lock);          }          pthread_cleanup_pop(false);          for (n = 0; n < msg->n_contacts; ++n) {                  c = contact_create(msg->contacts[n]->id.data, -                                   dht->b, msg->contacts[n]->addr); +                                   dht.b, msg->contacts[n]->addr);                  if (c == NULL)                          continue; @@ -838,7 +829,7 @@ static void lookup_update(struct dht *    dht,                  list_for_each(p, &lu->contacts) {                          struct contact * e;                          e = list_entry(p, struct contact, next); -                        if (!memcmp(e->id, c->id, dht->b)) { +                        if (!memcmp(e->id, c->id, dht.b)) {                                  contact_destroy(c);                                  c = NULL;                                  break; @@ -853,11 +844,11 @@ static void lookup_update(struct dht *    dht,                  if (c == NULL)                          continue; -                if (lu->n_contacts < dht->k) { +                if (lu->n_contacts < dht.k) {                          list_add_tail(&c->next, p);                          ++lu->n_contacts;                          mod = true; -                } else if (pos == dht->k) { +                } else if (pos == dht.k) {                          contact_destroy(c);                  } else {                          struct contact * d; @@ -1005,15 +996,12 @@ static enum lookup_state lookup_wait(struct lookup * lu)          return state;  } -static struct kad_req * dht_find_request(struct dht * dht, -                                         kad_msg_t *  msg) +static struct kad_req * dht_find_request(kad_msg_t *  msg)  {          struct list_head * p; - -        assert(dht);          assert(msg); -        list_for_each(p, &dht->requests) { +        list_for_each(p, &dht.requests) {                  struct kad_req * r = list_entry(p, struct kad_req, next);                  if (r->cookie == msg->cookie)                          return r; @@ -1022,17 +1010,15 @@ static struct kad_req * dht_find_request(struct dht * dht,          return NULL;  } -static struct lookup * dht_find_lookup(struct dht *    dht, -                                       uint32_t        cookie) +static struct lookup * dht_find_lookup(uint32_t        cookie)  {          struct list_head * p;          struct list_head * p2;          struct list_head * h2; -        assert(dht);          assert(cookie > 0); -        list_for_each(p, &dht->lookups) { +        list_for_each(p, &dht.lookups) {                  struct lookup * l = list_entry(p, struct lookup, next);                  pthread_mutex_lock(&l->lock);                  list_for_each_safe(p2, h2, &l->cookies) { @@ -1079,20 +1065,18 @@ static void val_destroy(struct val * v)          free(v);  } -static struct ref_entry * ref_entry_create(struct dht *    dht, -                                           const uint8_t * key) +static struct ref_entry * ref_entry_create(const uint8_t * key)  {          struct ref_entry * e;          struct timespec    t; -        assert(dht);          assert(key);          e = malloc(sizeof(*e));          if (e == NULL)                  return NULL; -        e->key = dht_dup_key(key, dht->b); +        e->key = dht_dup_key(key, dht.b);          if (e->key == NULL) {                  free(e);                  return NULL; @@ -1100,7 +1084,7 @@ static struct ref_entry * ref_entry_create(struct dht *    dht,          clock_gettime(CLOCK_REALTIME_COARSE, &t); -        e->t_rep = t.tv_sec + dht->t_repub; +        e->t_rep = t.tv_sec + dht.t_repub;          return e;  } @@ -1111,12 +1095,10 @@ static void ref_entry_destroy(struct ref_entry * e)          free(e);  } -static struct dht_entry * dht_entry_create(struct dht *    dht, -                                           const uint8_t * key) +static struct dht_entry * dht_entry_create(const uint8_t * key)  {          struct dht_entry * e; -        assert(dht);          assert(key);          e = malloc(sizeof(*e)); @@ -1128,7 +1110,7 @@ static struct dht_entry * dht_entry_create(struct dht *    dht,          e->n_vals = 0; -        e->key = dht_dup_key(key, dht->b); +        e->key = dht_dup_key(key, dht.b);          if (e->key == NULL) {                  free(e);                  return NULL; @@ -1211,8 +1193,7 @@ static void dht_entry_del_addr(struct dht_entry * e,          }  } -static uint64_t dht_entry_get_addr(struct dht *       dht, -                                   struct dht_entry * e) +static uint64_t dht_entry_get_addr(struct dht_entry * e)  {          struct list_head * p; @@ -1221,7 +1202,7 @@ static uint64_t dht_entry_get_addr(struct dht *       dht,          list_for_each(p, &e->vals) {                  struct val * v = list_entry(p, struct val, next); -                if (v->addr != dht->addr) +                if (v->addr != dht.addr)                          return v->addr;          } @@ -1229,14 +1210,12 @@ static uint64_t dht_entry_get_addr(struct dht *       dht,  }  /* Forward declaration. */ -static struct lookup * kad_lookup(struct dht *    dht, -                                  const uint8_t * key, +static struct lookup * kad_lookup(const uint8_t * key,                                    enum kad_code   code);  /* Build a refresh list. */ -static void bucket_refresh(struct dht *       dht, -                           struct bucket *    b, +static void bucket_refresh(struct bucket *    b,                             time_t             t,                             struct list_head * r)  { @@ -1244,7 +1223,7 @@ static void bucket_refresh(struct dht *       dht,          if (*b->children != NULL)                  for (i = 0; i < (1L << KAD_BETA); ++i) -                        bucket_refresh(dht, b->children[i], t, r); +                        bucket_refresh(b->children[i], t, r);          if (b->n_contacts == 0)                  return; @@ -1253,7 +1232,7 @@ static void bucket_refresh(struct dht *       dht,                  struct contact * c;                  struct contact * d;                  c = list_first_entry(&b->contacts, struct contact, next); -                d = contact_create(c->id, dht->b, c->addr); +                d = contact_create(c->id, dht.b, c->addr);                  if (c != NULL)                          list_add(&d->next, r);                  return; @@ -1387,8 +1366,7 @@ static int split_bucket(struct bucket * b)  }  /* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(struct dht *    dht, -                             const uint8_t * id, +static int dht_update_bucket(const uint8_t * id,                               uint64_t        addr)  {          struct list_head * p; @@ -1396,13 +1374,11 @@ static int dht_update_bucket(struct dht *    dht,          struct bucket *    b;          struct contact *   c; -        assert(dht); - -        b = dht_get_bucket(dht, id); +        b = dht_get_bucket(id);          if (b == NULL)                  return -1; -        c = contact_create(id, dht->b, addr); +        c = contact_create(id, dht.b, addr);          if (c == NULL)                  return -1; @@ -1415,8 +1391,8 @@ static int dht_update_bucket(struct dht *    dht,                  }          } -        if (b->n_contacts == dht->k) { -                if (bucket_has_id(b, dht->id)) { +        if (b->n_contacts == dht.k) { +                if (bucket_has_id(b, dht.id)) {                          list_add_tail(&c->next, &b->contacts);                          ++b->n_contacts;                          if (split_bucket(b)) { @@ -1424,7 +1400,7 @@ static int dht_update_bucket(struct dht *    dht,                                  contact_destroy(c);                                  --b->n_contacts;                          } -                } else if (b->n_alts == dht->k) { +                } else if (b->n_alts == dht.k) {                          struct contact * d;                          d = list_first_entry(&b->alts, struct contact, next);                          list_del(&d->next); @@ -1442,8 +1418,7 @@ static int dht_update_bucket(struct dht *    dht,          return 0;  } -static int send_msg(struct dht * dht, -                    kad_msg_t *  msg, +static int send_msg(kad_msg_t *  msg,                      uint64_t     addr)  {  #ifndef __DHT_TEST__ @@ -1455,25 +1430,25 @@ static int send_msg(struct dht * dht,          if (msg->code == KAD_RESPONSE)                  retr = KAD_RESP_RETR; -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        if (dht->id != NULL) { +        if (dht.id != NULL) {                  msg->has_s_id = true; -                msg->s_id.data = dht->id; -                msg->s_id.len  = dht->b; +                msg->s_id.data = dht.id; +                msg->s_id.len  = dht.b;          } -        msg->s_addr = dht->addr; +        msg->s_addr = dht.addr;          if (msg->code < KAD_STORE) { -                msg->cookie = bmp_allocate(dht->cookies); -                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { -                        pthread_rwlock_unlock(&dht->lock); +                msg->cookie = bmp_allocate(dht.cookies); +                if (!bmp_is_id_valid(dht.cookies, msg->cookie)) { +                        pthread_rwlock_unlock(&dht.lock);                          goto fail_bmp_alloc;                  }          } -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);  #ifndef __DHT_TEST__          len = kad_msg__get_packed_size(msg); @@ -1486,7 +1461,7 @@ static int send_msg(struct dht * dht,                  kad_msg__pack(msg, shm_du_buff_head(sdb)); -                if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) +                if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) == 0)                          break;                  ipcp_sdb_release(sdb); @@ -1502,53 +1477,51 @@ static int send_msg(struct dht * dht,          (void) retr;  #endif /* __DHT_TEST__ */ -        if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) -                kad_req_create(dht, msg, addr); +        if (msg->code < KAD_STORE && dht_get_state() != DHT_SHUTDOWN) +                kad_req_create(msg, addr);          return msg->cookie;  #ifndef __DHT_TEST__   fail_msg: -        pthread_rwlock_wrlock(&dht->lock); -        bmp_release(dht->cookies, msg->cookie); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); +        bmp_release(dht.cookies, msg->cookie); +        pthread_rwlock_unlock(&dht.lock);  #endif /* !__DHT_TEST__ */   fail_bmp_alloc:          return -1;  } -static struct dht_entry * dht_find_entry(struct dht *    dht, -                                         const uint8_t * key) +static struct dht_entry * dht_find_entry(const uint8_t * key)  {          struct list_head * p; -        list_for_each(p, &dht->entries) { +        list_for_each(p, &dht.entries) {                  struct dht_entry * e = list_entry(p, struct dht_entry, next); -                if (!memcmp(key, e->key, dht->b)) +                if (!memcmp(key, e->key, dht.b))                          return e;          }          return NULL;  } -static int kad_add(struct dht *              dht, -                   const kad_contact_msg_t * contacts, +static int kad_add(const kad_contact_msg_t * contacts,                     ssize_t                   n,                     time_t                    exp)  {          struct dht_entry * e; -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock);          while (n-- > 0) { -                if (contacts[n].id.len != dht->b) +                if (contacts[n].id.len != dht.b)                          log_warn("Bad key length in contact data."); -                e = dht_find_entry(dht, contacts[n].id.data); +                e = dht_find_entry(contacts[n].id.data);                  if (e != NULL) {                          if (dht_entry_add_addr(e, contacts[n].addr, exp))                                  goto fail;                  } else { -                        e = dht_entry_create(dht, contacts[n].id.data); +                        e = dht_entry_create(contacts[n].id.data);                          if (e == NULL)                                  goto fail; @@ -1557,42 +1530,39 @@ static int kad_add(struct dht *              dht,                                  goto fail;                          } -                        list_add(&e->next, &dht->entries); +                        list_add(&e->next, &dht.entries);                  }          } -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return 0;   fail: -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return -ENOMEM;  } -static int wait_resp(struct dht * dht, -                     kad_msg_t *  msg, +static int wait_resp(kad_msg_t *  msg,                       time_t       timeo)  {          struct kad_req * req; -        assert(dht);          assert(msg); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        req = dht_find_request(dht, msg); +        req = dht_find_request(msg);          if (req == NULL) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return -EPERM;          } -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return kad_req_wait(req, timeo);  } -static int kad_store(struct dht *    dht, -                     const uint8_t * key, +static int kad_store(const uint8_t * key,                       uint64_t        addr,                       uint64_t        r_addr,                       time_t          ttl) @@ -1604,11 +1574,11 @@ static int kad_store(struct dht *    dht,          cmsg.id.data = (uint8_t *) key;          cmsg.addr    = addr; -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        cmsg.id.len  = dht->b; +        cmsg.id.len  = dht.b; -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          cmsgp[0] = &cmsg; @@ -1618,39 +1588,37 @@ static int kad_store(struct dht *    dht,          msg.n_contacts   = 1;          msg.contacts     = cmsgp; -        if (send_msg(dht, &msg, r_addr) < 0) +        if (send_msg(&msg, r_addr) < 0)                  return -1;          return 0;  } -static ssize_t kad_find(struct dht *     dht, -                        struct lookup *  lu, +static ssize_t kad_find(struct lookup *  lu,                          const uint64_t * addrs,                          enum kad_code    code)  {          kad_msg_t msg  = KAD_MSG__INIT;          ssize_t   sent = 0; -        assert(dht);          assert(lu->key);          msg.code = code;          msg.has_key       = true;          msg.key.data      = (uint8_t *) lu->key; -        msg.key.len       = dht->b; +        msg.key.len       = dht.b;          while (*addrs != 0) {                  struct cookie_el * c;                  int ret; -                if (*addrs == dht->addr) { +                if (*addrs == dht.addr) {                          ++addrs;                          continue;                  } -                ret = send_msg(dht, &msg, *addrs); +                ret = send_msg(&msg, *addrs);                  if (ret < 0)                          break; @@ -1673,38 +1641,36 @@ static ssize_t kad_find(struct dht *     dht,          return sent;  } -static void lookup_detach(struct dht *    dht, -                          struct lookup * lu) +static void lookup_detach(struct lookup * lu)  { -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock);          list_del(&lu->next); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);  } -static struct lookup * kad_lookup(struct dht *    dht, -                                  const uint8_t * id, +static struct lookup * kad_lookup( const uint8_t * id,                                    enum kad_code   code)  {          uint64_t          addrs[KAD_ALPHA + 1];          enum lookup_state state;          struct lookup *   lu; -        lu = lookup_create(dht, id); +        lu = lookup_create(id);          if (lu == NULL)                  return NULL;          lookup_new_addrs(lu, addrs);          if (addrs[0] == 0) { -                lookup_detach(dht, lu); +                lookup_detach(lu);                  lookup_destroy(lu);                  return NULL;          } -        if (kad_find(dht, lu, addrs, code) == 0) { -                lookup_detach(dht, lu); +        if (kad_find(lu, addrs, code) == 0) { +                lookup_detach(lu);                  return lu;          } @@ -1715,10 +1681,10 @@ static struct lookup * kad_lookup(struct dht *    dht,                          if (addrs[0] == 0)                                  break; -                        kad_find(dht, lu, addrs, code); +                        kad_find(lu, addrs, code);                          break;                  case LU_DESTROY: -                        lookup_detach(dht, lu); +                        lookup_detach(lu);                          lookup_set_state(lu, LU_NULL);                          return NULL;                  default: @@ -1728,13 +1694,12 @@ static struct lookup * kad_lookup(struct dht *    dht,          assert(state == LU_COMPLETE); -        lookup_detach(dht, lu); +        lookup_detach(lu);          return lu;  } -static void kad_publish(struct dht *    dht, -                        const uint8_t * key, +static void kad_publish(const uint8_t * key,                          uint64_t        addr,                          time_t          exp)  { @@ -1745,21 +1710,20 @@ static void kad_publish(struct dht *    dht,          time_t          t_expire; -        assert(dht);          assert(key); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        k        = dht->k; -        t_expire = dht->t_expire; +        k        = dht.k; +        t_expire = dht.t_expire; -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          addrs = malloc(k * sizeof(*addrs));          if (addrs == NULL)                  return; -        lu = kad_lookup(dht, key, KAD_FIND_NODE); +        lu = kad_lookup(key, KAD_FIND_NODE);          if (lu == NULL) {                  free(addrs);                  return; @@ -1768,14 +1732,14 @@ static void kad_publish(struct dht *    dht,          n = lookup_contact_addrs(lu, addrs);          while (n-- > 0) { -                if (addrs[n] == dht->addr) { +                if (addrs[n] == dht.addr) {                          kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;                          msg.id.data = (uint8_t *) key; -                        msg.id.len  = dht->b; +                        msg.id.len  = dht.b;                          msg.addr    = addr; -                        kad_add(dht, &msg, 1, exp); +                        kad_add(&msg, 1, exp);                  } else { -                        if (kad_store(dht, key, addr, addrs[n], t_expire)) +                        if (kad_store(key, addr, addrs[n], t_expire))                                  log_warn("Failed to send store message.");                  }          } @@ -1785,8 +1749,7 @@ static void kad_publish(struct dht *    dht,          free(addrs);  } -static int kad_join(struct dht * dht, -                    uint64_t     addr) +static int kad_join(uint64_t     addr)  {          kad_msg_t       msg = KAD_MSG__INIT; @@ -1802,44 +1765,43 @@ static int kad_join(struct dht * dht,          msg.t_refresh       = KAD_T_REFR;          msg.t_replicate     = KAD_T_REPL; -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        msg.b               = dht->b; +        msg.b               = dht.b; -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock); -        if (send_msg(dht, &msg, addr) < 0) +        if (send_msg(&msg, addr) < 0)                  return -1; -        if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) +        if (wait_resp(&msg, KAD_T_JOIN) < 0)                  return -1; -        dht->id = create_id(dht->b); -        if (dht->id == NULL) +        dht.id = create_id(dht.b); +        if (dht.id == NULL)                  return -1; -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        dht_update_bucket(dht, dht->id, dht->addr); +        dht_update_bucket(dht.id, dht.addr); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return 0;  } -static void dht_dead_peer(struct dht * dht, -                          uint8_t *    key, +static void dht_dead_peer(uint8_t *    key,                            uint64_t     addr)  {          struct list_head * p;          struct list_head * h;          struct bucket *    b; -        b = dht_get_bucket(dht, key); +        b = dht_get_bucket(key);          list_for_each_safe(p, h, &b->contacts) {                  struct contact * c = list_entry(p, struct contact, next); -                if (b->n_contacts + b->n_alts <= dht->k) { +                if (b->n_contacts + b->n_alts <= dht.k) {                          ++c->fails;                          return;                  } @@ -1852,7 +1814,7 @@ static void dht_dead_peer(struct dht * dht,                  }          } -        while (b->n_contacts < dht->k && b->n_alts > 0) { +        while (b->n_contacts < dht.k && b->n_alts > 0) {                  struct contact * c;                  c = list_first_entry(&b->alts, struct contact, next);                  list_del(&c->next); @@ -1862,29 +1824,27 @@ static void dht_dead_peer(struct dht * dht,          }  } -static int dht_del(struct dht *    dht, -                   const uint8_t * key, +static int dht_del(const uint8_t * key,                     uint64_t        addr)  {          struct dht_entry * e; -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        e = dht_find_entry(dht, key); +        e = dht_find_entry(key);          if (e == NULL) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return -EPERM;          }          dht_entry_del_addr(e, addr); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return 0;  } -static buffer_t dht_retrieve(struct dht *    dht, -                             const uint8_t * key) +static buffer_t dht_retrieve(const uint8_t * key)  {          struct dht_entry * e;          struct list_head * p; @@ -1892,9 +1852,9 @@ static buffer_t dht_retrieve(struct dht *    dht,          uint64_t *         pos;          size_t             addrs = 0; -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        e = dht_find_entry(dht, key); +        e = dht_find_entry(key);          if (e == NULL)                  goto fail; @@ -1902,7 +1862,7 @@ static buffer_t dht_retrieve(struct dht *    dht,          if (buf.len == 0)                  goto fail; -        pos = malloc(sizeof(dht->addr) * buf.len); +        pos = malloc(sizeof(dht.addr) * buf.len);          if (pos == NULL)                  goto fail; @@ -1915,19 +1875,18 @@ static buffer_t dht_retrieve(struct dht *    dht,                          break;          } -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return buf;   fail: -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          buf.len = 0;          return buf;  } -static ssize_t dht_get_contacts(struct dht *          dht, -                                const uint8_t *       key, +static ssize_t dht_get_contacts(const uint8_t *       key,                                  kad_contact_msg_t *** msgs)  {          struct list_head   l; @@ -1938,18 +1897,18 @@ static ssize_t dht_get_contacts(struct dht *          dht,          list_head_init(&l); -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        len = dht_contact_list(dht, &l, key); +        len = dht_contact_list(&l, key);          if (len == 0) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  *msgs = NULL;                  return 0;          }          *msgs = malloc(len * sizeof(**msgs));          if (*msgs == NULL) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return 0;          } @@ -1957,7 +1916,7 @@ static ssize_t dht_get_contacts(struct dht *          dht,                  struct contact * c = list_entry(p, struct contact, next);                  (*msgs)[i] = malloc(sizeof(***msgs));                  if ((*msgs)[i] == NULL) { -                        pthread_rwlock_unlock(&dht->lock); +                        pthread_rwlock_unlock(&dht.lock);                          while (i > 0)                                  free(*msgs[--i]);                          free(*msgs); @@ -1968,13 +1927,13 @@ static ssize_t dht_get_contacts(struct dht *          dht,                  kad_contact_msg__init((*msgs)[i]);                  (*msgs)[i]->id.data = c->id; -                (*msgs)[i]->id.len  = dht->b; +                (*msgs)[i]->id.len  = dht.b;                  (*msgs)[i++]->addr  = c->addr;                  list_del(&c->next);                  free(c);          } -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return i;  } @@ -1990,7 +1949,6 @@ static time_t gcd(time_t a,  static void * work(void * o)  { -        struct dht *       dht;          struct timespec    now;          struct list_head * p;          struct list_head * h; @@ -1998,46 +1956,46 @@ static void * work(void * o)          time_t             intv;          struct lookup *    lu; -        dht = (struct dht *) o; +        (void) o; -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        intv = gcd(dht->t_expire, dht->t_repub); +        intv = gcd(dht.t_expire, dht.t_repub);          intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          list_head_init(&reflist);          while (true) {                  clock_gettime(CLOCK_REALTIME_COARSE, &now); -                pthread_rwlock_wrlock(&dht->lock); +                pthread_rwlock_wrlock(&dht.lock);                  /* Republish registered hashes. */ -                list_for_each(p, &dht->refs) { +                list_for_each(p, &dht.refs) {                          struct ref_entry * e;                          uint8_t *          key;                          uint64_t           addr;                          time_t             t_expire;                          e = list_entry(p, struct ref_entry, next);                          if (now.tv_sec > e->t_rep) { -                                key = dht_dup_key(e->key, dht->b); +                                key = dht_dup_key(e->key, dht.b);                                  if (key == NULL)                                          continue; -                                addr = dht->addr; -                                t_expire = dht->t_expire; -                                e->t_rep = now.tv_sec + dht->t_repub; +                                addr = dht.addr; +                                t_expire = dht.t_expire; +                                e->t_rep = now.tv_sec + dht.t_repub; -                                pthread_rwlock_unlock(&dht->lock); -                                kad_publish(dht, key, addr, t_expire); -                                pthread_rwlock_wrlock(&dht->lock); +                                pthread_rwlock_unlock(&dht.lock); +                                kad_publish(key, addr, t_expire); +                                pthread_rwlock_wrlock(&dht.lock);                                  free(key);                          }                  }                  /* Remove stale entries and republish if necessary. */ -                list_for_each_safe(p, h, &dht->entries) { +                list_for_each_safe(p, h, &dht.entries) {                          struct list_head * p1;                          struct list_head * h1;                          struct dht_entry * e; @@ -2055,39 +2013,39 @@ static void * work(void * o)                                  }                                  if (now.tv_sec > v->t_rep) { -                                        key  = dht_dup_key(e->key, dht->b); +                                        key  = dht_dup_key(e->key, dht.b);                                          addr = v->addr; -                                        t_expire = dht->t_expire = now.tv_sec; -                                        v->t_rep = now.tv_sec + dht->t_replic; -                                        pthread_rwlock_unlock(&dht->lock); -                                        kad_publish(dht, key, addr, t_expire); -                                        pthread_rwlock_wrlock(&dht->lock); +                                        t_expire = dht.t_expire = now.tv_sec; +                                        v->t_rep = now.tv_sec + dht.t_replic; +                                        pthread_rwlock_unlock(&dht.lock); +                                        kad_publish(key, addr, t_expire); +                                        pthread_rwlock_wrlock(&dht.lock);                                          free(key);                                  }                          }                  }                  /* Check the requests list for unresponsive nodes. */ -                list_for_each_safe(p, h, &dht->requests) { +                list_for_each_safe(p, h, &dht.requests) {                          struct kad_req * r;                          r = list_entry(p, struct kad_req, next);                          if (now.tv_sec > r->t_exp) {                                  list_del(&r->next); -                                bmp_release(dht->cookies, r->cookie); -                                dht_dead_peer(dht, r->key, r->addr); +                                bmp_release(dht.cookies, r->cookie); +                                dht_dead_peer(r->key, r->addr);                                  kad_req_destroy(r);                          }                  }                  /* Refresh unaccessed buckets. */ -                bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); +                bucket_refresh(dht.buckets, now.tv_sec, &reflist); -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  list_for_each_safe(p, h, &reflist) {                          struct contact * c;                          c = list_entry(p, struct contact, next); -                        lu = kad_lookup(dht, c->id, KAD_FIND_NODE); +                        lu = kad_lookup(c->id, KAD_FIND_NODE);                          if (lu != NULL)                                  lookup_destroy(lu);                          list_del(&c->next); @@ -2100,11 +2058,9 @@ static void * work(void * o)          return (void *) 0;  } -static int kad_handle_join_resp(struct dht *     dht, -                                struct kad_req * req, +static int kad_handle_join_resp(struct kad_req * req,                                  kad_msg_t *      msg)  { -        assert(dht);          assert(req);          assert(msg); @@ -2120,11 +2076,11 @@ static int kad_handle_join_resp(struct dht *     dht,                  return -1;          } -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        dht->buckets = bucket_create(); -        if (dht->buckets == NULL) { -                pthread_rwlock_unlock(&dht->lock); +        dht.buckets = bucket_create(); +        if (dht.buckets == NULL) { +                pthread_rwlock_unlock(&dht.lock);                  return -1;          } @@ -2138,84 +2094,80 @@ static int kad_handle_join_resp(struct dht *     dht,          if (msg->t_refresh != KAD_T_REFR)                  log_warn("Different kademlia refresh time detected."); -        dht->k        = msg->k; -        dht->b        = msg->b; -        dht->t_expire = msg->t_expire; -        dht->t_repub  = MAX(1, dht->t_expire - 10); +        dht.k        = msg->k; +        dht.b        = msg->b; +        dht.t_expire = msg->t_expire; +        dht.t_repub  = MAX(1, dht.t_expire - 10); -        if (pthread_create(&dht->worker, NULL, work, dht)) { -                bucket_destroy(dht->buckets); -                pthread_rwlock_unlock(&dht->lock); +        if (pthread_create(&dht.worker, NULL, work, NULL)) { +                bucket_destroy(dht.buckets); +                pthread_rwlock_unlock(&dht.lock);                  return -1;          }          kad_req_respond(req); -        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); +        dht_update_bucket(msg->s_id.data, msg->s_addr); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          log_dbg("Enrollment of DHT completed.");          return 0;  } -static int kad_handle_find_resp(struct dht *     dht, -                                struct kad_req * req, +static int kad_handle_find_resp(struct kad_req * req,                                  kad_msg_t *      msg)  {          struct lookup * lu; -        assert(dht);          assert(req);          assert(msg); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        lu = dht_find_lookup(dht, req->cookie); +        lu = dht_find_lookup(req->cookie);          if (lu == NULL) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return -1;          } -        lookup_update(dht, lu, msg); +        lookup_update(lu, msg); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return 0;  } -static void kad_handle_response(struct dht * dht, -                                kad_msg_t *  msg) +static void kad_handle_response(kad_msg_t *  msg)  {          struct kad_req * req; -        assert(dht);          assert(msg); -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        req = dht_find_request(dht, msg); +        req = dht_find_request(msg);          if (req == NULL) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return;          } -        bmp_release(dht->cookies, req->cookie); +        bmp_release(dht.cookies, req->cookie);          list_del(&req->next); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          switch(req->code) {          case KAD_JOIN: -                if (kad_handle_join_resp(dht, req, msg)) +                if (kad_handle_join_resp(req, msg))                          log_err("Enrollment of DHT failed.");                  break;          case KAD_FIND_VALUE:          case KAD_FIND_NODE: -                if (dht_get_state(dht) != DHT_RUNNING) +                if (dht_get_state() != DHT_RUNNING)                          break; -                kad_handle_find_resp(dht, req, msg); +                kad_handle_find_resp(req, msg);                  break;          default:                  break; @@ -2224,137 +2176,131 @@ static void kad_handle_response(struct dht * dht,          kad_req_destroy(req);  } -int dht_bootstrap(struct dht * dht, -                  size_t       b, -                  time_t       t_expire) +int dht_bootstrap()  { -        assert(dht); +        pthread_rwlock_wrlock(&dht.lock); -        pthread_rwlock_wrlock(&dht->lock); +#ifndef __DHT_TEST__ +        dht.b        = hash_len(ipcpi.dir_hash_algo); +#else +        dht.b        = DHT_TEST_KEY_LEN; +#endif +        dht.t_expire = 86400; /* 1 day */ +        dht.t_repub  = dht.t_expire - 10; +        dht.k        = KAD_K; -        dht->id = create_id(b); -        if (dht->id == NULL) +        dht.id = create_id(dht.b); +        if (dht.id == NULL)                  goto fail_id; -        dht->buckets = bucket_create(); -        if (dht->buckets == NULL) +        dht.buckets = bucket_create(); +        if (dht.buckets == NULL)                  goto fail_buckets; -        dht->buckets->depth = 0; -        dht->buckets->mask  = 0; - -        dht->b        = b / CHAR_BIT; -        dht->t_expire = MAX(2, t_expire); -        dht->t_repub  = MAX(1, t_expire - 10); -        dht->k        = KAD_K; +        dht.buckets->depth = 0; +        dht.buckets->mask  = 0; -        if (pthread_create(&dht->worker, NULL, work, dht)) +        if (pthread_create(&dht.worker, NULL, work, NULL))                  goto fail_pthread_create; -        dht->state = DHT_RUNNING; +        dht.state = DHT_RUNNING; -        dht_update_bucket(dht, dht->id, dht->addr); +        dht_update_bucket(dht.id, dht.addr); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return 0;   fail_pthread_create: -        bucket_destroy(dht->buckets); -        dht->buckets = NULL; +        bucket_destroy(dht.buckets); +        dht.buckets = NULL;   fail_buckets: -        free(dht->id); -        dht->id = NULL; +        free(dht.id); +        dht.id = NULL;   fail_id: -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return -1;  } -static struct ref_entry * ref_entry_get(struct dht *    dht, -                                        const uint8_t * key) +static struct ref_entry * ref_entry_get(const uint8_t * key)  {          struct list_head * p; -        list_for_each(p, &dht->refs) { +        list_for_each(p, &dht.refs) {                  struct ref_entry * r = list_entry(p, struct ref_entry, next); -                if (!memcmp(key, r->key, dht-> b) ) +                if (!memcmp(key, r->key, dht. b) )                          return r;          }          return NULL;  } -int dht_reg(struct dht *    dht, -            const uint8_t * key) +int dht_reg(const uint8_t * key)  {          struct ref_entry * e;          uint64_t           addr;          time_t             t_expire; -        assert(dht);          assert(key); -        assert(dht->addr != 0); +        assert(dht.addr != 0); -        if (dht_wait_running(dht)) +        if (dht_wait_running())                  return -1; -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        if (ref_entry_get(dht, key) != NULL) { +        if (ref_entry_get(key) != NULL) {                  log_dbg("Name already registered."); -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return 0;          } -        e = ref_entry_create(dht, key); +        e = ref_entry_create(key);          if (e == NULL) { -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  return -ENOMEM;          } -        list_add(&e->next, &dht->refs); +        list_add(&e->next, &dht.refs); -        t_expire = dht->t_expire; -        addr = dht->addr; +        t_expire = dht.t_expire; +        addr = dht.addr; -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock); -        kad_publish(dht, key, addr, t_expire); +        kad_publish(key, addr, t_expire);          return 0;  } -int dht_unreg(struct dht *    dht, -              const uint8_t * key) +int dht_unreg(const uint8_t * key)  {          struct list_head * p;          struct list_head * h; -        assert(dht);          assert(key); -        if (dht_get_state(dht) != DHT_RUNNING) +        if (dht_get_state() != DHT_RUNNING)                  return -1; -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        list_for_each_safe(p, h, &dht->refs) { +        list_for_each_safe(p, h, &dht.refs) {                  struct ref_entry * r = list_entry(p, struct ref_entry, next); -                if (!memcmp(key, r->key, dht-> b) ) { +                if (!memcmp(key, r->key, dht. b) ) {                          list_del(&r->next);                          ref_entry_destroy(r);                  }          } -        dht_del(dht, key, dht->addr); +        dht_del(key, dht.addr); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          return 0;  } -uint64_t dht_query(struct dht *    dht, -                   const uint8_t * key) +uint64_t dht_query(const uint8_t * key)  {          struct dht_entry * e;          struct lookup *    lu; @@ -2363,21 +2309,21 @@ uint64_t dht_query(struct dht *    dht,          addrs[0] = 0; -        if (dht_wait_running(dht)) +        if (dht_wait_running())                  return 0; -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_rdlock(&dht.lock); -        e = dht_find_entry(dht, key); +        e = dht_find_entry(key);          if (e != NULL) -                addrs[0] = dht_entry_get_addr(dht, e); +                addrs[0] = dht_entry_get_addr(e); -        pthread_rwlock_unlock(&dht->lock); +        pthread_rwlock_unlock(&dht.lock);          if (addrs[0] != 0)                  return addrs[0]; -        lu = kad_lookup(dht, key, KAD_FIND_VALUE); +        lu = kad_lookup(key, KAD_FIND_VALUE);          if (lu == NULL)                  return 0; @@ -2390,7 +2336,7 @@ uint64_t dht_query(struct dht *    dht,          lookup_destroy(lu);          /* Current behaviour is anycast and return the first peer address. */ -        if (addrs[0] != dht->addr) +        if (addrs[0] != dht.addr)                  return addrs[0];          if (n > 1) @@ -2401,9 +2347,7 @@ uint64_t dht_query(struct dht *    dht,  static void * dht_handle_packet(void * o)  { -        struct dht * dht = (struct dht *) o; - -        assert(dht); +        (void) o;          while (true) {                  kad_msg_t *          msg; @@ -2416,14 +2360,14 @@ static void * dht_handle_packet(void * o)                  size_t               t_expire;                  struct cmd *         cmd; -                pthread_mutex_lock(&dht->mtx); +                pthread_mutex_lock(&dht.mtx); -                pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); +                pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); -                while (list_is_empty(&dht->cmds)) -                        pthread_cond_wait(&dht->cond, &dht->mtx); +                while (list_is_empty(&dht.cmds)) +                        pthread_cond_wait(&dht.cond, &dht.mtx); -                cmd = list_last_entry(&dht->cmds, struct cmd, next); +                cmd = list_last_entry(&dht.cmds, struct cmd, next);                  list_del(&cmd->next);                  pthread_cleanup_pop(true); @@ -2441,18 +2385,18 @@ static void * dht_handle_packet(void * o)                          continue;                  } -                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { +                if (msg->code != KAD_RESPONSE && dht_wait_running()) {                          kad_msg__free_unpacked(msg, NULL);                          log_dbg("Got a request message when not running.");                          continue;                  } -                pthread_rwlock_rdlock(&dht->lock); +                pthread_rwlock_rdlock(&dht.lock); -                b        = dht->b; -                t_expire = dht->t_expire; +                b        = dht.b; +                t_expire = dht.t_expire; -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);                  if (msg->has_key && msg->key.len != b) {                          kad_msg__free_unpacked(msg, NULL); @@ -2467,7 +2411,7 @@ static void * dht_handle_packet(void * o)                          continue;                  } -                tpm_dec(dht->tpm); +                tpm_dec(dht.tpm);                  addr = msg->s_addr; @@ -2510,7 +2454,7 @@ static void * dht_handle_packet(void * o)                          resp_msg.t_replicate     = KAD_T_REPL;                          break;                  case KAD_FIND_VALUE: -                        buf = dht_retrieve(dht, msg->key.data); +                        buf = dht_retrieve(msg->key.data);                          if (buf.len != 0) {                                  resp_msg.n_addrs = buf.len;                                  resp_msg.addrs   = (uint64_t *) buf.data; @@ -2520,7 +2464,7 @@ static void * dht_handle_packet(void * o)                  case KAD_FIND_NODE:                          /* Return k closest contacts. */                          resp_msg.n_contacts = -                                dht_get_contacts(dht, msg->key.data, &cmsgs); +                                dht_get_contacts(msg->key.data, &cmsgs);                          resp_msg.contacts = cmsgs;                          break;                  case KAD_STORE: @@ -2534,11 +2478,11 @@ static void * dht_handle_packet(void * o)                                  break;                          } -                        kad_add(dht, *msg->contacts, msg->n_contacts, +                        kad_add(*msg->contacts, msg->n_contacts,                                  msg->t_expire);                          break;                  case KAD_RESPONSE: -                        kad_handle_response(dht, msg); +                        kad_handle_response(msg);                          break;                  default:                          assert(false); @@ -2546,19 +2490,19 @@ static void * dht_handle_packet(void * o)                  }                  if (msg->code != KAD_JOIN) { -                        pthread_rwlock_wrlock(&dht->lock); -                        if (dht_get_state(dht) == DHT_JOINING && -                            dht->buckets == NULL) { -                                pthread_rwlock_unlock(&dht->lock); +                        pthread_rwlock_wrlock(&dht.lock); +                        if (dht_get_state() == DHT_JOINING && +                            dht.buckets == NULL) { +                                pthread_rwlock_unlock(&dht.lock);                                  goto finish;                          } -                        if (dht_update_bucket(dht, msg->s_id.data, addr)) +                        if (dht_update_bucket(msg->s_id.data, addr))                                  log_warn("Failed to update bucket."); -                        pthread_rwlock_unlock(&dht->lock); +                        pthread_rwlock_unlock(&dht.lock);                  } -                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) +                if (msg->code < KAD_STORE && send_msg(&resp_msg, addr) < 0)                                  log_warn("Failed to send response.");   finish: @@ -2568,7 +2512,7 @@ static void * dht_handle_packet(void * o)                          free(resp_msg.addrs);                  if (resp_msg.n_contacts == 0) { -                        tpm_inc(dht->tpm); +                        tpm_inc(dht.tpm);                          continue;                  } @@ -2577,19 +2521,22 @@ static void * dht_handle_packet(void * o)                                                         NULL);                  free(resp_msg.contacts); -                tpm_inc(dht->tpm); +                tpm_inc(dht.tpm);          }          return (void *) 0;  } -static void dht_post_packet(void *               comp, +static void dht_post_packet(void *               o,                              struct shm_du_buff * sdb)  {          struct cmd * cmd; -        struct dht * dht = (struct dht *) comp; -        if (dht_get_state(dht) == DHT_SHUTDOWN) { +        (void) o; + +        assert(o == &dht); + +        if (dht_get_state() == DHT_SHUTDOWN) {  #ifndef __DHT_TEST__                  ipcp_sdb_release(sdb);  #endif @@ -2604,37 +2551,34 @@ static void dht_post_packet(void *               comp,          cmd->sdb = sdb; -        pthread_mutex_lock(&dht->mtx); +        pthread_mutex_lock(&dht.mtx); -        list_add(&cmd->next, &dht->cmds); +        list_add(&cmd->next, &dht.cmds); -        pthread_cond_signal(&dht->cond); +        pthread_cond_signal(&dht.cond); -        pthread_mutex_unlock(&dht->mtx); +        pthread_mutex_unlock(&dht.mtx);  } -void dht_destroy(struct dht * dht) +void dht_fini()  {          struct list_head * p;          struct list_head * h; -        if (dht == NULL) -                return; -  #ifndef __DHT_TEST__ -        tpm_stop(dht->tpm); +        tpm_stop(dht.tpm); -        tpm_destroy(dht->tpm); +        tpm_destroy(dht.tpm);  #endif -        if (dht_get_state(dht) == DHT_RUNNING) { -                dht_set_state(dht, DHT_SHUTDOWN); -                pthread_cancel(dht->worker); -                pthread_join(dht->worker, NULL); +        if (dht_get_state() == DHT_RUNNING) { +                dht_set_state(DHT_SHUTDOWN); +                pthread_cancel(dht.worker); +                pthread_join(dht.worker, NULL);          } -        pthread_rwlock_wrlock(&dht->lock); +        pthread_rwlock_wrlock(&dht.lock); -        list_for_each_safe(p, h, &dht->cmds) { +        list_for_each_safe(p, h, &dht.cmds) {                  struct cmd * c = list_entry(p, struct cmd, next);                  list_del(&c->next);  #ifndef __DHT_TEST__ @@ -2643,44 +2587,42 @@ void dht_destroy(struct dht * dht)                  free(c);          } -        list_for_each_safe(p, h, &dht->entries) { +        list_for_each_safe(p, h, &dht.entries) {                  struct dht_entry * e = list_entry(p, struct dht_entry, next);                  list_del(&e->next);                  dht_entry_destroy(e);          } -        list_for_each_safe(p, h, &dht->requests) { +        list_for_each_safe(p, h, &dht.requests) {                  struct kad_req * r = list_entry(p, struct kad_req, next);                  list_del(&r->next);                  kad_req_destroy(r);          } -        list_for_each_safe(p, h, &dht->refs) { +        list_for_each_safe(p, h, &dht.refs) {                  struct ref_entry * e = list_entry(p, struct ref_entry, next);                  list_del(&e->next);                  ref_entry_destroy(e);          } -        list_for_each_safe(p, h, &dht->lookups) { +        list_for_each_safe(p, h, &dht.lookups) {                  struct lookup * l = list_entry(p, struct lookup, next);                  list_del(&l->next);                  lookup_destroy(l);          } -        pthread_rwlock_unlock(&dht->lock); - -        if (dht->buckets != NULL) -                bucket_destroy(dht->buckets); +        pthread_rwlock_unlock(&dht.lock); -        bmp_destroy(dht->cookies); +        if (dht.buckets != NULL) +                bucket_destroy(dht.buckets); -        pthread_mutex_destroy(&dht->mtx); +        bmp_destroy(dht.cookies); -        pthread_rwlock_destroy(&dht->lock); +        pthread_mutex_destroy(&dht.mtx); -        free(dht->id); +        pthread_rwlock_destroy(&dht.lock); -        free(dht); +        free(dht.id);  }  static void * join_thr(void * o) @@ -2691,14 +2633,14 @@ static void * join_thr(void * o)          assert(info); -        while (kad_join(info->dht, info->addr)) { -                if (dht_get_state(info->dht) == DHT_SHUTDOWN) { +        while (kad_join(info->addr)) { +                if (dht_get_state() == DHT_SHUTDOWN) {                          log_dbg("DHT enrollment aborted.");                          goto finish;                  }                  if (retr++ == KAD_JOIN_RETR) { -                        dht_set_state(info->dht, DHT_INIT); +                        dht_set_state(DHT_INIT);                          log_warn("DHT enrollment attempt failed.");                          goto finish;                  } @@ -2706,9 +2648,9 @@ static void * join_thr(void * o)                  sleep(KAD_JOIN_INTV);          } -        dht_set_state(info->dht, DHT_RUNNING); +        dht_set_state(DHT_RUNNING); -        lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); +        lu = kad_lookup(dht.id, KAD_FIND_NODE);          if (lu != NULL)                  lookup_destroy(lu); @@ -2722,7 +2664,7 @@ static void handle_event(void *       self,                           int          event,                           const void * o)  { -        struct dht * dht = (struct dht *) self; +        (void) self;          if (event == NOTIFY_DT_CONN_ADD) {                  pthread_t          thr; @@ -2733,19 +2675,18 @@ static void handle_event(void *       self,                  /* Give the pff some time to update for the new link. */                  nanosleep(&slack, NULL); -                switch(dht_get_state(dht)) { +                switch(dht_get_state()) {                  case DHT_INIT:                          inf = malloc(sizeof(*inf));                          if (inf == NULL)                                  break; -                        inf->dht  = dht;                          inf->addr = c->conn_info.addr; -                        if (dht_set_state(dht, DHT_JOINING) == 0 || -                            dht_wait_running(dht)) { +                        if (dht_set_state(DHT_JOINING) == 0 || +                            dht_wait_running()) {                                  if (pthread_create(&thr, NULL, join_thr, inf)) { -                                        dht_set_state(dht, DHT_INIT); +                                        dht_set_state(DHT_INIT);                                          free(inf);                                          return;                                  } @@ -2759,7 +2700,7 @@ static void handle_event(void *       self,                           * FIXME: this lookup for effiency reasons                           * causes a SEGV when stressed with rapid                           * enrollments. -                         * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); +                         * lu = kad_lookup(dht, dht.id, KAD_FIND_NODE);                           * if (lu != NULL)                           *         lookup_destroy(lu);                           */ @@ -2770,73 +2711,65 @@ static void handle_event(void *       self,          }  } -struct dht * dht_create(uint64_t addr) +int dht_init()  { -        struct dht * dht; - -        dht = malloc(sizeof(*dht)); -        if (dht == NULL) -                goto fail_malloc; +        dht.buckets = NULL; -        dht->buckets = NULL; +        list_head_init(&dht.entries); +        list_head_init(&dht.requests); +        list_head_init(&dht.refs); +        list_head_init(&dht.lookups); +        list_head_init(&dht.cmds); -        list_head_init(&dht->entries); -        list_head_init(&dht->requests); -        list_head_init(&dht->refs); -        list_head_init(&dht->lookups); -        list_head_init(&dht->cmds); - -        if (pthread_rwlock_init(&dht->lock, NULL)) +        if (pthread_rwlock_init(&dht.lock, NULL))                  goto fail_rwlock; -        if (pthread_mutex_init(&dht->mtx, NULL)) +        if (pthread_mutex_init(&dht.mtx, NULL))                  goto fail_mutex; -        if (pthread_cond_init(&dht->cond, NULL)) +        if (pthread_cond_init(&dht.cond, NULL))                  goto fail_cond; -        dht->cookies = bmp_create(DHT_MAX_REQS, 1); -        if (dht->cookies == NULL) +        dht.cookies = bmp_create(DHT_MAX_REQS, 1); +        if (dht.cookies == NULL)                  goto fail_bmp; -        dht->b    = 0; -        dht->addr = addr; -        dht->id   = NULL; +        dht.b    = 0; +        dht.id   = NULL;  #ifndef __DHT_TEST__ -        dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); -        if (dht->tpm == NULL) +        dht.addr = ipcpi.dt_addr; +        dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); +        if (dht.tpm == NULL)                  goto fail_tpm_create; -        if (tpm_start(dht->tpm)) +        if (tpm_start(dht.tpm))                  goto fail_tpm_start; -        dht->eid   = dt_reg_comp(dht, &dht_post_packet, DHT); -        if ((int) dht->eid < 0) +        dht.eid   = dt_reg_comp(&dht, &dht_post_packet, DHT); +        if ((int) dht.eid < 0)                  goto fail_tpm_start; -        notifier_reg(handle_event, dht); +        notifier_reg(handle_event, NULL);  #else          (void) handle_event;          (void) dht_handle_packet;          (void) dht_post_packet;  #endif -        dht->state = DHT_INIT; +        dht.state = DHT_INIT; -        return dht; +        return 0;  #ifndef __DHT_TEST__   fail_tpm_start: -        tpm_destroy(dht->tpm); +        tpm_destroy(dht.tpm);   fail_tpm_create: -        bmp_destroy(dht->cookies); +        bmp_destroy(dht.cookies);  #endif   fail_bmp: -        pthread_cond_destroy(&dht->cond); +        pthread_cond_destroy(&dht.cond);   fail_cond: -        pthread_mutex_destroy(&dht->mtx); +        pthread_mutex_destroy(&dht.mtx);   fail_mutex: -        pthread_rwlock_destroy(&dht->lock); +        pthread_rwlock_destroy(&dht.lock);   fail_rwlock: -        free(dht); - fail_malloc: -        return NULL; +        return -1;  } diff --git a/src/ipcpd/unicast/dht.h b/src/ipcpd/unicast/dht.h index df394714..29ab7ee5 100644 --- a/src/ipcpd/unicast/dht.h +++ b/src/ipcpd/unicast/dht.h @@ -28,25 +28,18 @@  #include <stdint.h>  #include <sys/types.h> -struct dht; +int          dht_init(void); -struct dht * dht_create(uint64_t addr); +int          dht_bootstrap(void); -int          dht_bootstrap(struct dht * dht, -                           size_t       b, -                           time_t       t_expire); +void         dht_fini(void); -void         dht_destroy(struct dht * dht); +int          dht_reg(const uint8_t * key); -int          dht_reg(struct dht *    dht, -                     const uint8_t * key); +int          dht_unreg(const uint8_t * key); -int          dht_unreg(struct dht *    dht, -                       const uint8_t * key); +uint64_t     dht_query(const uint8_t * key); -uint64_t     dht_query(struct dht *    dht, -                       const uint8_t * key); - -int          dht_wait_running(struct dht * dht); +int          dht_wait_running(void);  #endif /* OUROBOROS_IPCPD_UNICAST_DHT_H */ diff --git a/src/ipcpd/unicast/dir.c b/src/ipcpd/unicast/dir.c index a30908b8..d27cabfa 100644 --- a/src/ipcpd/unicast/dir.c +++ b/src/ipcpd/unicast/dir.c @@ -35,7 +35,6 @@  #include "dir.h"  #include "dht.h" -#include "ipcp.h"  #include <stdlib.h>  #include <string.h> @@ -43,15 +42,9 @@  #include <inttypes.h>  #include <limits.h> -#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) - -struct ipcp icpci; -struct dht * dht; -  int dir_init(void)  { -        dht = dht_create(ipcpi.dt_addr); -        if (dht == NULL) +        if (dht_init() < 0)                  return -ENOMEM;          return 0; @@ -59,15 +52,14 @@ int dir_init(void)  void dir_fini(void)  { -        dht_destroy(dht); +        dht_fini();  }  int dir_bootstrap(void) {          log_dbg("Bootstrapping directory."); -        /* TODO: get parameters for bootstrap from IRM tool. */ -        if (dht_bootstrap(dht, KAD_B, 86400)) { -                dht_destroy(dht); +        if (dht_bootstrap()) { +                dht_fini();                  return -ENOMEM;          } @@ -78,22 +70,22 @@ int dir_bootstrap(void) {  int dir_reg(const uint8_t * hash)  { -        return dht_reg(dht, hash); +        return dht_reg(hash);  }  int dir_unreg(const uint8_t * hash)  { -        return dht_unreg(dht, hash); +        return dht_unreg(hash);  }  uint64_t dir_query(const uint8_t * hash)  { -        return dht_query(dht, hash); +        return dht_query(hash);  }  int dir_wait_running(void)  { -        if (dht_wait_running(dht)) { +        if (dht_wait_running()) {                  log_warn("Directory did not bootstrap.");                  return -1;          } diff --git a/src/ipcpd/unicast/tests/dht_test.c b/src/ipcpd/unicast/tests/dht_test.c index 552af75c..70773ea7 100644 --- a/src/ipcpd/unicast/tests/dht_test.c +++ b/src/ipcpd/unicast/tests/dht_test.c @@ -21,6 +21,8 @@   */  #define __DHT_TEST__ +#define DHT_TEST_KEY_LEN  32 +  #include "dht.c" @@ -29,71 +31,63 @@  #include <stdlib.h>  #include <stdio.h> -#define KEY_LEN  32 - -#define EXP      86400  #define CONTACTS 1000  int dht_test(int     argc,               char ** argv)  { -        struct dht * dht; -        uint64_t     addr = 0x0D1F; -        uint8_t      key[KEY_LEN]; +        uint8_t      key[DHT_TEST_KEY_LEN];          size_t       i;          (void) argc;          (void) argv; -        dht = dht_create(addr); -        if (dht == NULL) { +        if (dht_init() < 0) {                  printf("Failed to create dht.\n");                  return -1;          } -        dht_destroy(dht); +        dht_fini(); -        dht = dht_create(addr); -        if (dht == NULL) { +        if (dht_init() < 0) {                  printf("Failed to re-create dht.\n");                  return -1;          } -        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +        if (dht_bootstrap()) {                  printf("Failed to bootstrap dht.\n"); -                dht_destroy(dht); +                dht_fini();                  return -1;          } -        dht_destroy(dht); +        dht_fini(); -        dht = dht_create(addr); -        if (dht == NULL) { +        if (dht_init() < 0) {                  printf("Failed to re-create dht.\n");                  return -1;          } -        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +        if (dht_bootstrap()) {                  printf("Failed to bootstrap dht.\n"); -                dht_destroy(dht); +                dht_fini();                  return -1;          }          for (i = 0; i < CONTACTS; ++i) {                  uint64_t addr;                  random_buffer(&addr, sizeof(addr)); -                random_buffer(key, KEY_LEN); -                pthread_rwlock_wrlock(&dht->lock); -                if (dht_update_bucket(dht, key, addr)) { -                        pthread_rwlock_unlock(&dht->lock); +                random_buffer(key, DHT_TEST_KEY_LEN); +                pthread_rwlock_wrlock(&dht.lock); +                if (dht_update_bucket(key, addr)) { +                        pthread_rwlock_unlock(&dht.lock);                          printf("Failed to update bucket.\n"); -                        dht_destroy(dht); +                        dht_fini();                          return -1;                  } -                pthread_rwlock_unlock(&dht->lock); +                pthread_rwlock_unlock(&dht.lock);          } -        dht_destroy(dht); +        dht_fini();          return 0;  } | 
