diff options
Diffstat (limited to 'src/ipcpd/unicast/dir')
| -rw-r--r-- | src/ipcpd/unicast/dir/dht.c | 845 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/dht.h | 17 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/ops.h | 18 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dir/tests/dht_test.c | 37 | 
4 files changed, 508 insertions, 409 deletions
| diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index ba4b897e..1742267b 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -211,8 +211,8 @@ struct cmd {  };  struct dir_ops dht_dir_ops = { -        .init         = dht_init, -        .fini         = dht_fini, +        .create       = dht_create, +        .destroy      = dht_destroy,          .bootstrap    = dht_bootstrap,          .reg          = dht_reg,          .unreg        = dht_unreg, @@ -220,7 +220,7 @@ struct dir_ops dht_dir_ops = {          .wait_running = dht_wait_running  }; -struct { +struct dht {          size_t           alpha;          size_t           b;          size_t           k; @@ -256,13 +256,15 @@ struct {          struct tpm *     tpm;          pthread_t        worker; -} dht; +};  struct join_info { +        struct dht * dht;          uint64_t     addr;  };  struct packet_info { +        struct dht *         dht;          struct shm_du_buff * sdb;  }; @@ -280,49 +282,53 @@ static uint8_t * dht_dup_key(const uint8_t * key,          return dup;  } -static enum dht_state dht_get_state(void) +static enum dht_state dht_get_state(struct dht * dht)  {          enum dht_state state; -        pthread_mutex_lock(&dht.mtx); +        pthread_mutex_lock(&dht->mtx); -        state = dht.state; +        state = dht->state; -        pthread_mutex_unlock(&dht.mtx); +        pthread_mutex_unlock(&dht->mtx);          return state;  } -static int dht_set_state(enum dht_state state) +static int dht_set_state(struct dht *   dht, +                         enum dht_state state)  { -        pthread_mutex_lock(&dht.mtx); +        pthread_mutex_lock(&dht->mtx); -        if (state == DHT_JOINING && dht.state != DHT_INIT) { -                 pthread_mutex_unlock(&dht.mtx); +        if (state == DHT_JOINING && dht->state != DHT_INIT) { +                 pthread_mutex_unlock(&dht->mtx);                   return -1;          } -        dht.state = state; +        dht->state = state; -        pthread_cond_broadcast(&dht.cond); +        pthread_cond_broadcast(&dht->cond); -        pthread_mutex_unlock(&dht.mtx); +        pthread_mutex_unlock(&dht->mtx);          return 0;  } -int dht_wait_running() +int dht_wait_running(void * dir)  { -        int ret = 0; +        struct dht * dht; +        int          ret = 0; -        pthread_mutex_lock(&dht.mtx); +        dht = (struct dht *) dir; -        pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); +        pthread_mutex_lock(&dht->mtx); -        while (dht.state == DHT_JOINING) -                pthread_cond_wait(&dht.cond, &dht.mtx); +        pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); -        if (dht.state != DHT_RUNNING) +        while (dht->state == DHT_JOINING) +                pthread_cond_wait(&dht->cond, &dht->mtx); + +        if (dht->state != DHT_RUNNING)                  ret = -1;          pthread_cleanup_pop(true); @@ -346,7 +352,8 @@ static uint8_t * create_id(size_t len)          return id;  } -static void kad_req_create(kad_msg_t *  msg, +static void kad_req_create(struct dht * dht, +                           kad_msg_t *  msg,                             uint64_t     addr)  {          struct kad_req *   req; @@ -369,9 +376,9 @@ static void kad_req_create(kad_msg_t *  msg,          req->code   = msg->code;          req->key    = NULL; -        pthread_rwlock_rdlock(&dht.lock); -        b = dht.b; -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); +        b = dht->b; +        pthread_rwlock_unlock(&dht->lock);          if (msg->has_key) {                  req->key = dht_dup_key(msg->key.data, b); @@ -402,11 +409,11 @@ static void kad_req_create(kad_msg_t *  msg,          pthread_condattr_destroy(&cattr); -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        list_add(&req->next, &dht.requests); +        list_add(&req->next, &dht->requests); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);  }  static void cancel_req_destroy(void * o) @@ -564,11 +571,12 @@ static struct bucket * iter_bucket(struct bucket * b,          return iter_bucket(b->children[(byte & mask)], id);  } -static struct bucket * dht_get_bucket(const uint8_t * id) +static struct bucket * dht_get_bucket(struct dht *    dht, +                                      const uint8_t * id)  { -        assert(dht.buckets); +        assert(dht->buckets); -        return iter_bucket(dht.buckets, id); +        return iter_bucket(dht->buckets, id);  }  /* @@ -603,7 +611,8 @@ static size_t list_add_sorted(struct list_head * l,          return 1;  } -static size_t dht_contact_list(struct list_head * l, +static size_t dht_contact_list(struct dht *       dht, +                               struct list_head * l,                                 const uint8_t *    key)  {          struct list_head * p; @@ -613,52 +622,55 @@ static size_t dht_contact_list(struct list_head * l,          struct timespec    t;          assert(l); +        assert(dht);          assert(key);          assert(list_is_empty(l));          clock_gettime(CLOCK_REALTIME_COARSE, &t); -        b = dht_get_bucket(key); +        b = dht_get_bucket(dht, key);          if (b == NULL)                  return 0;          b->t_refr = t.tv_sec + KAD_T_REFR; -        if (b->n_contacts == dht.k || b->parent == NULL) { +        if (b->n_contacts == dht->k || b->parent == NULL) {                  list_for_each(p, &b->contacts) {                          struct contact * c;                          c = list_entry(p, struct contact, next); -                        c = contact_create(c->id, dht.b, c->addr); +                        c = contact_create(c->id, dht->b, c->addr);                          if (list_add_sorted(l, c, key) == 1) -                                if (++len == dht.k) +                                if (++len == dht->k)                                          break;                  }          } else {                  struct bucket * d = b->parent; -                for (i = 0; i < (1L << KAD_BETA) && len < dht.k; ++i) { +                for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) {                          list_for_each(p, &d->children[i]->contacts) {                                  struct contact * c;                                  c = list_entry(p, struct contact, next); -                                c = contact_create(c->id, dht.b, c->addr); +                                c = contact_create(c->id, dht->b, c->addr);                                  if (c == NULL)                                          continue;                                  if (list_add_sorted(l, c, key) == 1) -                                        if (++len == dht.k) +                                        if (++len == dht->k)                                                  break;                          }                  }          } -        assert(len == dht.k || b->parent == NULL); +        assert(len == dht->k || b->parent == NULL);          return len;  } -static struct lookup * lookup_create(const uint8_t * id) +static struct lookup * lookup_create(struct dht *    dht, +                                     const uint8_t * id)  {          struct lookup *    lu;          pthread_condattr_t cattr; +        assert(dht);          assert(id);          lu = malloc(sizeof(*lu)); @@ -671,7 +683,7 @@ static struct lookup * lookup_create(const uint8_t * id)          lu->state   = LU_INIT;          lu->addrs   = NULL;          lu->n_addrs = 0; -        lu->key     = dht_dup_key(id, dht.b); +        lu->key     = dht_dup_key(id, dht->b);          if (lu->key == NULL)                  goto fail_id; @@ -688,13 +700,13 @@ static struct lookup * lookup_create(const uint8_t * id)          pthread_condattr_destroy(&cattr); -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        list_add(&lu->next, &dht.lookups); +        list_add(&lu->next, &dht->lookups); -        lu->n_contacts = dht_contact_list(&lu->contacts, id); +        lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return lu; @@ -773,7 +785,8 @@ static void lookup_destroy(struct lookup * lu)          pthread_cleanup_pop(true);  } -static void lookup_update(struct lookup * lu, +static void lookup_update(struct dht *    dht, +                          struct lookup * lu,                            kad_msg_t *     msg)  {          struct list_head * p = NULL; @@ -786,7 +799,7 @@ static void lookup_update(struct lookup * lu,          assert(lu);          assert(msg); -        if (dht_get_state() != DHT_RUNNING) +        if (dht_get_state(dht) != DHT_RUNNING)                  return;          pthread_mutex_lock(&lu->lock); @@ -822,16 +835,16 @@ static void lookup_update(struct lookup * lu,          pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock);          while (lu->state == LU_INIT) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  pthread_cond_wait(&lu->cond, &lu->lock); -                pthread_rwlock_rdlock(&dht.lock); +                pthread_rwlock_rdlock(&dht->lock);          }          pthread_cleanup_pop(false);          for (n = 0; n < msg->n_contacts; ++n) {                  c = contact_create(msg->contacts[n]->id.data, -                                   dht.b, msg->contacts[n]->addr); +                                   dht->b, msg->contacts[n]->addr);                  if (c == NULL)                          continue; @@ -840,7 +853,7 @@ static void lookup_update(struct lookup * lu,                  list_for_each(p, &lu->contacts) {                          struct contact * e;                          e = list_entry(p, struct contact, next); -                        if (!memcmp(e->id, c->id, dht.b)) { +                        if (!memcmp(e->id, c->id, dht->b)) {                                  contact_destroy(c);                                  c = NULL;                                  break; @@ -855,11 +868,11 @@ static void lookup_update(struct lookup * lu,                  if (c == NULL)                          continue; -                if (lu->n_contacts < dht.k) { +                if (lu->n_contacts < dht->k) {                          list_add_tail(&c->next, p);                          ++lu->n_contacts;                          mod = true; -                } else if (pos == dht.k) { +                } else if (pos == dht->k) {                          contact_destroy(c);                  } else {                          struct contact * d; @@ -1007,12 +1020,15 @@ static enum lookup_state lookup_wait(struct lookup * lu)          return state;  } -static struct kad_req * dht_find_request(kad_msg_t *  msg) +static struct kad_req * dht_find_request(struct dht * dht, +                                         kad_msg_t *  msg)  {          struct list_head * p; + +        assert(dht);          assert(msg); -        list_for_each(p, &dht.requests) { +        list_for_each(p, &dht->requests) {                  struct kad_req * r = list_entry(p, struct kad_req, next);                  if (r->cookie == msg->cookie)                          return r; @@ -1021,15 +1037,17 @@ static struct kad_req * dht_find_request(kad_msg_t *  msg)          return NULL;  } -static struct lookup * dht_find_lookup(uint32_t        cookie) +static struct lookup * dht_find_lookup(struct dht *    dht, +                                       uint32_t        cookie)  {          struct list_head * p;          struct list_head * p2;          struct list_head * h2; +        assert(dht);          assert(cookie > 0); -        list_for_each(p, &dht.lookups) { +        list_for_each(p, &dht->lookups) {                  struct lookup * l = list_entry(p, struct lookup, next);                  pthread_mutex_lock(&l->lock);                  list_for_each_safe(p2, h2, &l->cookies) { @@ -1076,18 +1094,20 @@ static void val_destroy(struct val * v)          free(v);  } -static struct ref_entry * ref_entry_create(const uint8_t * key) +static struct ref_entry * ref_entry_create(struct dht *    dht, +                                           const uint8_t * key)  {          struct ref_entry * e;          struct timespec    t; +        assert(dht);          assert(key);          e = malloc(sizeof(*e));          if (e == NULL)                  return NULL; -        e->key = dht_dup_key(key, dht.b); +        e->key = dht_dup_key(key, dht->b);          if (e->key == NULL) {                  free(e);                  return NULL; @@ -1095,7 +1115,7 @@ static struct ref_entry * ref_entry_create(const uint8_t * key)          clock_gettime(CLOCK_REALTIME_COARSE, &t); -        e->t_rep = t.tv_sec + dht.t_repub; +        e->t_rep = t.tv_sec + dht->t_repub;          return e;  } @@ -1106,10 +1126,12 @@ static void ref_entry_destroy(struct ref_entry * e)          free(e);  } -static struct dht_entry * dht_entry_create(const uint8_t * key) +static struct dht_entry * dht_entry_create(struct dht *    dht, +                                           const uint8_t * key)  {          struct dht_entry * e; +        assert(dht);          assert(key);          e = malloc(sizeof(*e)); @@ -1121,7 +1143,7 @@ static struct dht_entry * dht_entry_create(const uint8_t * key)          e->n_vals = 0; -        e->key = dht_dup_key(key, dht.b); +        e->key = dht_dup_key(key, dht->b);          if (e->key == NULL) {                  free(e);                  return NULL; @@ -1204,7 +1226,8 @@ static void dht_entry_del_addr(struct dht_entry * e,          }  } -static uint64_t dht_entry_get_addr(struct dht_entry * e) +static uint64_t dht_entry_get_addr(struct dht *       dht, +                                   struct dht_entry * e)  {          struct list_head * p; @@ -1213,7 +1236,7 @@ static uint64_t dht_entry_get_addr(struct dht_entry * e)          list_for_each(p, &e->vals) {                  struct val * v = list_entry(p, struct val, next); -                if (v->addr != dht.addr) +                if (v->addr != dht->addr)                          return v->addr;          } @@ -1221,12 +1244,14 @@ static uint64_t dht_entry_get_addr(struct dht_entry * e)  }  /* Forward declaration. */ -static struct lookup * kad_lookup(const uint8_t * key, +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * key,                                    enum kad_code   code);  /* Build a refresh list. */ -static void bucket_refresh(struct bucket *    b, +static void bucket_refresh(struct dht *       dht, +                           struct bucket *    b,                             time_t             t,                             struct list_head * r)  { @@ -1234,7 +1259,7 @@ static void bucket_refresh(struct bucket *    b,          if (*b->children != NULL)                  for (i = 0; i < (1L << KAD_BETA); ++i) -                        bucket_refresh(b->children[i], t, r); +                        bucket_refresh(dht, b->children[i], t, r);          if (b->n_contacts == 0)                  return; @@ -1243,7 +1268,7 @@ static void bucket_refresh(struct bucket *    b,                  struct contact * c;                  struct contact * d;                  c = list_first_entry(&b->contacts, struct contact, next); -                d = contact_create(c->id, dht.b, c->addr); +                d = contact_create(c->id, dht->b, c->addr);                  if (c != NULL)                          list_add(&d->next, r);                  return; @@ -1377,7 +1402,8 @@ static int split_bucket(struct bucket * b)  }  /* Locked externally to mandate update as (final) part of join transaction. */ -static int dht_update_bucket(const uint8_t * id, +static int dht_update_bucket(struct dht *    dht, +                             const uint8_t * id,                               uint64_t        addr)  {          struct list_head * p; @@ -1385,11 +1411,13 @@ static int dht_update_bucket(const uint8_t * id,          struct bucket *    b;          struct contact *   c; -        b = dht_get_bucket(id); +        assert(dht); + +        b = dht_get_bucket(dht, id);          if (b == NULL)                  return -1; -        c = contact_create(id, dht.b, addr); +        c = contact_create(id, dht->b, addr);          if (c == NULL)                  return -1; @@ -1402,8 +1430,8 @@ static int dht_update_bucket(const uint8_t * id,                  }          } -        if (b->n_contacts == dht.k) { -                if (bucket_has_id(b, dht.id)) { +        if (b->n_contacts == dht->k) { +                if (bucket_has_id(b, dht->id)) {                          list_add_tail(&c->next, &b->contacts);                          ++b->n_contacts;                          if (split_bucket(b)) { @@ -1411,7 +1439,7 @@ static int dht_update_bucket(const uint8_t * id,                                  contact_destroy(c);                                  --b->n_contacts;                          } -                } else if (b->n_alts == dht.k) { +                } else if (b->n_alts == dht->k) {                          struct contact * d;                          d = list_first_entry(&b->alts, struct contact, next);                          list_del(&d->next); @@ -1429,7 +1457,8 @@ static int dht_update_bucket(const uint8_t * id,          return 0;  } -static int send_msg(kad_msg_t *  msg, +static int send_msg(struct dht * dht, +                    kad_msg_t *  msg,                      uint64_t     addr)  {  #ifndef __DHT_TEST__ @@ -1441,25 +1470,25 @@ static int send_msg(kad_msg_t *  msg,          if (msg->code == KAD_RESPONSE)                  retr = KAD_RESP_RETR; -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        if (dht.id != NULL) { +        if (dht->id != NULL) {                  msg->has_s_id = true; -                msg->s_id.data = dht.id; -                msg->s_id.len  = dht.b; +                msg->s_id.data = dht->id; +                msg->s_id.len  = dht->b;          } -        msg->s_addr = dht.addr; +        msg->s_addr = dht->addr;          if (msg->code < KAD_STORE) { -                msg->cookie = bmp_allocate(dht.cookies); -                if (!bmp_is_id_valid(dht.cookies, msg->cookie)) { -                        pthread_rwlock_unlock(&dht.lock); +                msg->cookie = bmp_allocate(dht->cookies); +                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { +                        pthread_rwlock_unlock(&dht->lock);                          goto fail_bmp_alloc;                  }          } -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);  #ifndef __DHT_TEST__          len = kad_msg__get_packed_size(msg); @@ -1472,7 +1501,7 @@ static int send_msg(kad_msg_t *  msg,                  kad_msg__pack(msg, shm_du_buff_head(sdb)); -                if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) == 0) +                if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)                          break;                  ipcp_sdb_release(sdb); @@ -1488,51 +1517,53 @@ static int send_msg(kad_msg_t *  msg,          (void) retr;  #endif /* __DHT_TEST__ */ -        if (msg->code < KAD_STORE && dht_get_state() != DHT_SHUTDOWN) -                kad_req_create(msg, addr); +        if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) +                kad_req_create(dht, msg, addr);          return msg->cookie;  #ifndef __DHT_TEST__   fail_msg: -        pthread_rwlock_wrlock(&dht.lock); -        bmp_release(dht.cookies, msg->cookie); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); +        bmp_release(dht->cookies, msg->cookie); +        pthread_rwlock_unlock(&dht->lock);  #endif /* !__DHT_TEST__ */   fail_bmp_alloc:          return -1;  } -static struct dht_entry * dht_find_entry(const uint8_t * key) +static struct dht_entry * dht_find_entry(struct dht *    dht, +                                         const uint8_t * key)  {          struct list_head * p; -        list_for_each(p, &dht.entries) { +        list_for_each(p, &dht->entries) {                  struct dht_entry * e = list_entry(p, struct dht_entry, next); -                if (!memcmp(key, e->key, dht.b)) +                if (!memcmp(key, e->key, dht->b))                          return e;          }          return NULL;  } -static int kad_add(const kad_contact_msg_t * contacts, +static int kad_add(struct dht *              dht, +                   const kad_contact_msg_t * contacts,                     ssize_t                   n,                     time_t                    exp)  {          struct dht_entry * e; -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock);          while (n-- > 0) { -                if (contacts[n].id.len != dht.b) +                if (contacts[n].id.len != dht->b)                          log_warn("Bad key length in contact data."); -                e = dht_find_entry(contacts[n].id.data); +                e = dht_find_entry(dht, contacts[n].id.data);                  if (e != NULL) {                          if (dht_entry_add_addr(e, contacts[n].addr, exp))                                  goto fail;                  } else { -                        e = dht_entry_create(contacts[n].id.data); +                        e = dht_entry_create(dht, contacts[n].id.data);                          if (e == NULL)                                  goto fail; @@ -1541,39 +1572,42 @@ static int kad_add(const kad_contact_msg_t * contacts,                                  goto fail;                          } -                        list_add(&e->next, &dht.entries); +                        list_add(&e->next, &dht->entries);                  }          } -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return 0;   fail: -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return -ENOMEM;  } -static int wait_resp(kad_msg_t *  msg, +static int wait_resp(struct dht * dht, +                     kad_msg_t *  msg,                       time_t       timeo)  {          struct kad_req * req; +        assert(dht);          assert(msg); -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        req = dht_find_request(msg); +        req = dht_find_request(dht, msg);          if (req == NULL) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return -EPERM;          } -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return kad_req_wait(req, timeo);  } -static int kad_store(const uint8_t * key, +static int kad_store(struct dht *    dht, +                     const uint8_t * key,                       uint64_t        addr,                       uint64_t        r_addr,                       time_t          ttl) @@ -1585,11 +1619,11 @@ static int kad_store(const uint8_t * key,          cmsg.id.data = (uint8_t *) key;          cmsg.addr    = addr; -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        cmsg.id.len  = dht.b; +        cmsg.id.len  = dht->b; -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          cmsgp[0] = &cmsg; @@ -1599,37 +1633,39 @@ static int kad_store(const uint8_t * key,          msg.n_contacts   = 1;          msg.contacts     = cmsgp; -        if (send_msg(&msg, r_addr) < 0) +        if (send_msg(dht, &msg, r_addr) < 0)                  return -1;          return 0;  } -static ssize_t kad_find(struct lookup *  lu, +static ssize_t kad_find(struct dht *     dht, +                        struct lookup *  lu,                          const uint64_t * addrs,                          enum kad_code    code)  {          kad_msg_t msg  = KAD_MSG__INIT;          ssize_t   sent = 0; +        assert(dht);          assert(lu->key);          msg.code = code;          msg.has_key       = true;          msg.key.data      = (uint8_t *) lu->key; -        msg.key.len       = dht.b; +        msg.key.len       = dht->b;          while (*addrs != 0) {                  struct cookie_el * c;                  int ret; -                if (*addrs == dht.addr) { +                if (*addrs == dht->addr) {                          ++addrs;                          continue;                  } -                ret = send_msg(&msg, *addrs); +                ret = send_msg(dht, &msg, *addrs);                  if (ret < 0)                          break; @@ -1652,36 +1688,38 @@ static ssize_t kad_find(struct lookup *  lu,          return sent;  } -static void lookup_detach(struct lookup * lu) +static void lookup_detach(struct dht *    dht, +                          struct lookup * lu)  { -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock);          list_del(&lu->next); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);  } -static struct lookup * kad_lookup( const uint8_t * id, +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * id,                                    enum kad_code   code)  {          uint64_t          addrs[KAD_ALPHA + 1];          enum lookup_state state;          struct lookup *   lu; -        lu = lookup_create(id); +        lu = lookup_create(dht, id);          if (lu == NULL)                  return NULL;          lookup_new_addrs(lu, addrs);          if (addrs[0] == 0) { -                lookup_detach(lu); +                lookup_detach(dht, lu);                  lookup_destroy(lu);                  return NULL;          } -        if (kad_find(lu, addrs, code) == 0) { -                lookup_detach(lu); +        if (kad_find(dht, lu, addrs, code) == 0) { +                lookup_detach(dht, lu);                  return lu;          } @@ -1692,10 +1730,10 @@ static struct lookup * kad_lookup( const uint8_t * id,                          if (addrs[0] == 0)                                  break; -                        kad_find(lu, addrs, code); +                        kad_find(dht, lu, addrs, code);                          break;                  case LU_DESTROY: -                        lookup_detach(lu); +                        lookup_detach(dht, lu);                          lookup_set_state(lu, LU_NULL);                          return NULL;                  default: @@ -1705,12 +1743,13 @@ static struct lookup * kad_lookup( const uint8_t * id,          assert(state == LU_COMPLETE); -        lookup_detach(lu); +        lookup_detach(dht, lu);          return lu;  } -static void kad_publish(const uint8_t * key, +static void kad_publish(struct dht *    dht, +                        const uint8_t * key,                          uint64_t        addr,                          time_t          exp)  { @@ -1721,20 +1760,21 @@ static void kad_publish(const uint8_t * key,          time_t          t_expire; +        assert(dht);          assert(key); -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        k        = dht.k; -        t_expire = dht.t_expire; +        k        = dht->k; +        t_expire = dht->t_expire; -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          addrs = malloc(k * sizeof(*addrs));          if (addrs == NULL)                  return; -        lu = kad_lookup(key, KAD_FIND_NODE); +        lu = kad_lookup(dht, key, KAD_FIND_NODE);          if (lu == NULL) {                  free(addrs);                  return; @@ -1743,14 +1783,14 @@ static void kad_publish(const uint8_t * key,          n = lookup_contact_addrs(lu, addrs);          while (n-- > 0) { -                if (addrs[n] == dht.addr) { +                if (addrs[n] == dht->addr) {                          kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;                          msg.id.data = (uint8_t *) key; -                        msg.id.len  = dht.b; +                        msg.id.len  = dht->b;                          msg.addr    = addr; -                        kad_add(&msg, 1, exp); +                        kad_add(dht, &msg, 1, exp);                  } else { -                        if (kad_store(key, addr, addrs[n], t_expire)) +                        if (kad_store(dht, key, addr, addrs[n], t_expire))                                  log_warn("Failed to send store message.");                  }          } @@ -1760,7 +1800,8 @@ static void kad_publish(const uint8_t * key,          free(addrs);  } -static int kad_join(uint64_t     addr) +static int kad_join(struct dht * dht, +                    uint64_t     addr)  {          kad_msg_t       msg = KAD_MSG__INIT; @@ -1776,43 +1817,44 @@ static int kad_join(uint64_t     addr)          msg.t_refresh       = KAD_T_REFR;          msg.t_replicate     = KAD_T_REPL; -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        msg.b               = dht.b; +        msg.b               = dht->b; -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock); -        if (send_msg(&msg, addr) < 0) +        if (send_msg(dht, &msg, addr) < 0)                  return -1; -        if (wait_resp(&msg, KAD_T_JOIN) < 0) +        if (wait_resp(dht, &msg, KAD_T_JOIN) < 0)                  return -1; -        dht.id = create_id(dht.b); -        if (dht.id == NULL) +        dht->id = create_id(dht->b); +        if (dht->id == NULL)                  return -1; -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        dht_update_bucket(dht.id, dht.addr); +        dht_update_bucket(dht, dht->id, dht->addr); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return 0;  } -static void dht_dead_peer(uint8_t *    key, +static void dht_dead_peer(struct dht * dht, +                          uint8_t *    key,                            uint64_t     addr)  {          struct list_head * p;          struct list_head * h;          struct bucket *    b; -        b = dht_get_bucket(key); +        b = dht_get_bucket(dht, key);          list_for_each_safe(p, h, &b->contacts) {                  struct contact * c = list_entry(p, struct contact, next); -                if (b->n_contacts + b->n_alts <= dht.k) { +                if (b->n_contacts + b->n_alts <= dht->k) {                          ++c->fails;                          return;                  } @@ -1825,7 +1867,7 @@ static void dht_dead_peer(uint8_t *    key,                  }          } -        while (b->n_contacts < dht.k && b->n_alts > 0) { +        while (b->n_contacts < dht->k && b->n_alts > 0) {                  struct contact * c;                  c = list_first_entry(&b->alts, struct contact, next);                  list_del(&c->next); @@ -1835,27 +1877,29 @@ static void dht_dead_peer(uint8_t *    key,          }  } -static int dht_del(const uint8_t * key, +static int dht_del(struct dht *    dht, +                   const uint8_t * key,                     uint64_t        addr)  {          struct dht_entry * e; -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        e = dht_find_entry(key); +        e = dht_find_entry(dht, key);          if (e == NULL) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return -EPERM;          }          dht_entry_del_addr(e, addr); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return 0;  } -static buffer_t dht_retrieve(const uint8_t * key) +static buffer_t dht_retrieve(struct dht *    dht, +                             const uint8_t * key)  {          struct dht_entry * e;          struct list_head * p; @@ -1863,9 +1907,9 @@ static buffer_t dht_retrieve(const uint8_t * key)          uint64_t *         pos;          size_t             addrs = 0; -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        e = dht_find_entry(key); +        e = dht_find_entry(dht, key);          if (e == NULL)                  goto fail; @@ -1873,7 +1917,7 @@ static buffer_t dht_retrieve(const uint8_t * key)          if (buf.len == 0)                  goto fail; -        pos = malloc(sizeof(dht.addr) * buf.len); +        pos = malloc(sizeof(dht->addr) * buf.len);          if (pos == NULL)                  goto fail; @@ -1886,18 +1930,19 @@ static buffer_t dht_retrieve(const uint8_t * key)                          break;          } -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return buf;   fail: -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          buf.len = 0;          return buf;  } -static ssize_t dht_get_contacts(const uint8_t *       key, +static ssize_t dht_get_contacts(struct dht *          dht, +                                const uint8_t *       key,                                  kad_contact_msg_t *** msgs)  {          struct list_head   l; @@ -1908,18 +1953,18 @@ static ssize_t dht_get_contacts(const uint8_t *       key,          list_head_init(&l); -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        len = dht_contact_list(&l, key); +        len = dht_contact_list(dht, &l, key);          if (len == 0) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  *msgs = NULL;                  return 0;          }          *msgs = malloc(len * sizeof(**msgs));          if (*msgs == NULL) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return 0;          } @@ -1927,7 +1972,7 @@ static ssize_t dht_get_contacts(const uint8_t *       key,                  struct contact * c = list_entry(p, struct contact, next);                  (*msgs)[i] = malloc(sizeof(***msgs));                  if ((*msgs)[i] == NULL) { -                        pthread_rwlock_unlock(&dht.lock); +                        pthread_rwlock_unlock(&dht->lock);                          while (i > 0)                                  free(*msgs[--i]);                          free(*msgs); @@ -1938,13 +1983,13 @@ static ssize_t dht_get_contacts(const uint8_t *       key,                  kad_contact_msg__init((*msgs)[i]);                  (*msgs)[i]->id.data = c->id; -                (*msgs)[i]->id.len  = dht.b; +                (*msgs)[i]->id.len  = dht->b;                  (*msgs)[i++]->addr  = c->addr;                  list_del(&c->next);                  free(c);          } -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return i;  } @@ -1960,6 +2005,7 @@ static time_t gcd(time_t a,  static void * work(void * o)  { +        struct dht *       dht;          struct timespec    now;          struct list_head * p;          struct list_head * h; @@ -1967,46 +2013,46 @@ static void * work(void * o)          time_t             intv;          struct lookup *    lu; -        (void) o; +        dht = (struct dht *) o; -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        intv = gcd(dht.t_expire, dht.t_repub); +        intv = gcd(dht->t_expire, dht->t_repub);          intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          list_head_init(&reflist);          while (true) {                  clock_gettime(CLOCK_REALTIME_COARSE, &now); -                pthread_rwlock_wrlock(&dht.lock); +                pthread_rwlock_wrlock(&dht->lock);                  /* Republish registered hashes. */ -                list_for_each(p, &dht.refs) { +                list_for_each(p, &dht->refs) {                          struct ref_entry * e;                          uint8_t *          key;                          uint64_t           addr;                          time_t             t_expire;                          e = list_entry(p, struct ref_entry, next);                          if (now.tv_sec > e->t_rep) { -                                key = dht_dup_key(e->key, dht.b); +                                key = dht_dup_key(e->key, dht->b);                                  if (key == NULL)                                          continue; -                                addr = dht.addr; -                                t_expire = dht.t_expire; -                                e->t_rep = now.tv_sec + dht.t_repub; +                                addr = dht->addr; +                                t_expire = dht->t_expire; +                                e->t_rep = now.tv_sec + dht->t_repub; -                                pthread_rwlock_unlock(&dht.lock); -                                kad_publish(key, addr, t_expire); -                                pthread_rwlock_wrlock(&dht.lock); +                                pthread_rwlock_unlock(&dht->lock); +                                kad_publish(dht, key, addr, t_expire); +                                pthread_rwlock_wrlock(&dht->lock);                                  free(key);                          }                  }                  /* Remove stale entries and republish if necessary. */ -                list_for_each_safe(p, h, &dht.entries) { +                list_for_each_safe(p, h, &dht->entries) {                          struct list_head * p1;                          struct list_head * h1;                          struct dht_entry * e; @@ -2024,39 +2070,39 @@ static void * work(void * o)                                  }                                  if (now.tv_sec > v->t_rep) { -                                        key  = dht_dup_key(e->key, dht.b); +                                        key  = dht_dup_key(e->key, dht->b);                                          addr = v->addr; -                                        t_expire = dht.t_expire = now.tv_sec; -                                        v->t_rep = now.tv_sec + dht.t_replic; -                                        pthread_rwlock_unlock(&dht.lock); -                                        kad_publish(key, addr, t_expire); -                                        pthread_rwlock_wrlock(&dht.lock); +                                        t_expire = dht->t_expire = now.tv_sec; +                                        v->t_rep = now.tv_sec + dht->t_replic; +                                        pthread_rwlock_unlock(&dht->lock); +                                        kad_publish(dht, key, addr, t_expire); +                                        pthread_rwlock_wrlock(&dht->lock);                                          free(key);                                  }                          }                  }                  /* Check the requests list for unresponsive nodes. */ -                list_for_each_safe(p, h, &dht.requests) { +                list_for_each_safe(p, h, &dht->requests) {                          struct kad_req * r;                          r = list_entry(p, struct kad_req, next);                          if (now.tv_sec > r->t_exp) {                                  list_del(&r->next); -                                bmp_release(dht.cookies, r->cookie); -                                dht_dead_peer(r->key, r->addr); +                                bmp_release(dht->cookies, r->cookie); +                                dht_dead_peer(dht, r->key, r->addr);                                  kad_req_destroy(r);                          }                  }                  /* Refresh unaccessed buckets. */ -                bucket_refresh(dht.buckets, now.tv_sec, &reflist); +                bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  list_for_each_safe(p, h, &reflist) {                          struct contact * c;                          c = list_entry(p, struct contact, next); -                        lu = kad_lookup(c->id, KAD_FIND_NODE); +                        lu = kad_lookup(dht, c->id, KAD_FIND_NODE);                          if (lu != NULL)                                  lookup_destroy(lu);                          list_del(&c->next); @@ -2069,9 +2115,11 @@ static void * work(void * o)          return (void *) 0;  } -static int kad_handle_join_resp(struct kad_req * req, +static int kad_handle_join_resp(struct dht *     dht, +                                struct kad_req * req,                                  kad_msg_t *      msg)  { +        assert(dht);          assert(req);          assert(msg); @@ -2087,11 +2135,11 @@ static int kad_handle_join_resp(struct kad_req * req,                  return -1;          } -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        dht.buckets = bucket_create(); -        if (dht.buckets == NULL) { -                pthread_rwlock_unlock(&dht.lock); +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) { +                pthread_rwlock_unlock(&dht->lock);                  return -1;          } @@ -2105,80 +2153,84 @@ static int kad_handle_join_resp(struct kad_req * req,          if (msg->t_refresh != KAD_T_REFR)                  log_warn("Different kademlia refresh time detected."); -        dht.k        = msg->k; -        dht.b        = msg->b; -        dht.t_expire = msg->t_expire; -        dht.t_repub  = MAX(1, dht.t_expire - 10); +        dht->k        = msg->k; +        dht->b        = msg->b; +        dht->t_expire = msg->t_expire; +        dht->t_repub  = MAX(1, dht->t_expire - 10); -        if (pthread_create(&dht.worker, NULL, work, NULL)) { -                bucket_destroy(dht.buckets); -                pthread_rwlock_unlock(&dht.lock); +        if (pthread_create(&dht->worker, NULL, work, dht)) { +                bucket_destroy(dht->buckets); +                pthread_rwlock_unlock(&dht->lock);                  return -1;          }          kad_req_respond(req); -        dht_update_bucket(msg->s_id.data, msg->s_addr); +        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          log_dbg("Enrollment of DHT completed.");          return 0;  } -static int kad_handle_find_resp(struct kad_req * req, +static int kad_handle_find_resp(struct dht *     dht, +                                struct kad_req * req,                                  kad_msg_t *      msg)  {          struct lookup * lu; +        assert(dht);          assert(req);          assert(msg); -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        lu = dht_find_lookup(req->cookie); +        lu = dht_find_lookup(dht, req->cookie);          if (lu == NULL) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return -1;          } -        lookup_update(lu, msg); +        lookup_update(dht, lu, msg); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return 0;  } -static void kad_handle_response(kad_msg_t *  msg) +static void kad_handle_response(struct dht * dht, +                                kad_msg_t *  msg)  {          struct kad_req * req; +        assert(dht);          assert(msg); -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        req = dht_find_request(msg); +        req = dht_find_request(dht, msg);          if (req == NULL) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return;          } -        bmp_release(dht.cookies, req->cookie); +        bmp_release(dht->cookies, req->cookie);          list_del(&req->next); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          switch(req->code) {          case KAD_JOIN: -                if (kad_handle_join_resp(req, msg)) +                if (kad_handle_join_resp(dht, req, msg))                          log_err("Enrollment of DHT failed.");                  break;          case KAD_FIND_VALUE:          case KAD_FIND_NODE: -                if (dht_get_state() != DHT_RUNNING) +                if (dht_get_state(dht) != DHT_RUNNING)                          break; -                kad_handle_find_resp(req, msg); +                kad_handle_find_resp(dht, req, msg);                  break;          default:                  break; @@ -2187,154 +2239,176 @@ static void kad_handle_response(kad_msg_t *  msg)          kad_req_destroy(req);  } -int dht_bootstrap() +int dht_bootstrap(void * dir)  { -        pthread_rwlock_wrlock(&dht.lock); +        struct dht * dht; -#ifndef __DHT_TEST__ -        dht.b        = hash_len(ipcpi.dir_hash_algo); -#else -        dht.b        = DHT_TEST_KEY_LEN; -#endif -        dht.t_expire = 86400; /* 1 day */ -        dht.t_repub  = dht.t_expire - 10; -        dht.k        = KAD_K; +        dht = (struct dht *) dir; + +        assert(dht); -        dht.id = create_id(dht.b); -        if (dht.id == NULL) +        pthread_rwlock_wrlock(&dht->lock); + +        dht->id = create_id(dht->b); +        if (dht->id == NULL)                  goto fail_id; -        dht.buckets = bucket_create(); -        if (dht.buckets == NULL) +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL)                  goto fail_buckets; -        dht.buckets->depth = 0; -        dht.buckets->mask  = 0; +        dht->buckets->depth = 0; +        dht->buckets->mask  = 0; +#ifndef __DHT_TEST__ +        dht->b        = hash_len(ipcpi.dir_hash_algo); +#else +        dht->b        = DHT_TEST_KEY_LEN; +#endif +        dht->t_expire = 86400; /* 1 day */ +        dht->t_repub  = dht->t_expire - 10; +        dht->k        = KAD_K; -        if (pthread_create(&dht.worker, NULL, work, NULL)) +        if (pthread_create(&dht->worker, NULL, work, dht))                  goto fail_pthread_create; -        dht.state = DHT_RUNNING; +        dht->state = DHT_RUNNING; -        dht_update_bucket(dht.id, dht.addr); +        dht_update_bucket(dht, dht->id, dht->addr); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return 0;   fail_pthread_create: -        bucket_destroy(dht.buckets); -        dht.buckets = NULL; +        bucket_destroy(dht->buckets); +        dht->buckets = NULL;   fail_buckets: -        free(dht.id); -        dht.id = NULL; +        free(dht->id); +        dht->id = NULL;   fail_id: -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return -1;  } -static struct ref_entry * ref_entry_get(const uint8_t * key) +static struct ref_entry * ref_entry_get(struct dht *    dht, +                                        const uint8_t * key)  {          struct list_head * p; -        list_for_each(p, &dht.refs) { +        list_for_each(p, &dht->refs) {                  struct ref_entry * r = list_entry(p, struct ref_entry, next); -                if (!memcmp(key, r->key, dht. b) ) +                if (!memcmp(key, r->key, dht-> b) )                          return r;          }          return NULL;  } -int dht_reg(const uint8_t * key) +int dht_reg(void *          dir, +            const uint8_t * key)  { +        struct dht *       dht;          struct ref_entry * e;          uint64_t           addr;          time_t             t_expire; +        dht = (struct dht *) dir; + +        assert(dht);          assert(key); -        assert(dht.addr != 0); +        assert(dht->addr != 0); -        if (dht_wait_running()) +        if (dht_wait_running(dht))                  return -1; -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        if (ref_entry_get(key) != NULL) { +        if (ref_entry_get(dht, key) != NULL) {                  log_dbg("Name already registered."); -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return 0;          } -        e = ref_entry_create(key); +        e = ref_entry_create(dht, key);          if (e == NULL) { -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  return -ENOMEM;          } -        list_add(&e->next, &dht.refs); +        list_add(&e->next, &dht->refs); -        t_expire = dht.t_expire; -        addr = dht.addr; +        t_expire = dht->t_expire; +        addr = dht->addr; -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock); -        kad_publish(key, addr, t_expire); +        kad_publish(dht, key, addr, t_expire);          return 0;  } -int dht_unreg(const uint8_t * key) +int dht_unreg(void *          dir, +              const uint8_t * key)  { +        struct dht *       dht;          struct list_head * p;          struct list_head * h; +        dht = (struct dht *) dir; + +        assert(dht);          assert(key); -        if (dht_get_state() != DHT_RUNNING) +        if (dht_get_state(dht) != DHT_RUNNING)                  return -1; -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        list_for_each_safe(p, h, &dht.refs) { +        list_for_each_safe(p, h, &dht->refs) {                  struct ref_entry * r = list_entry(p, struct ref_entry, next); -                if (!memcmp(key, r->key, dht. b) ) { +                if (!memcmp(key, r->key, dht-> b) ) {                          list_del(&r->next);                          ref_entry_destroy(r);                  }          } -        dht_del(key, dht.addr); +        dht_del(dht, key, dht->addr); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          return 0;  } -uint64_t dht_query(const uint8_t * key) +uint64_t dht_query(void *          dir, +                   const uint8_t * key)  { +        struct dht *       dht;          struct dht_entry * e;          struct lookup *    lu;          uint64_t           addrs[KAD_K];          size_t             n; +        dht = (struct dht *) dir; + +        assert(dht); +          addrs[0] = 0; -        if (dht_wait_running()) +        if (dht_wait_running(dht))                  return 0; -        pthread_rwlock_rdlock(&dht.lock); +        pthread_rwlock_rdlock(&dht->lock); -        e = dht_find_entry(key); +        e = dht_find_entry(dht, key);          if (e != NULL) -                addrs[0] = dht_entry_get_addr(e); +                addrs[0] = dht_entry_get_addr(dht, e); -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock);          if (addrs[0] != 0)                  return addrs[0]; -        lu = kad_lookup(key, KAD_FIND_VALUE); +        lu = kad_lookup(dht, key, KAD_FIND_VALUE);          if (lu == NULL)                  return 0; @@ -2347,7 +2421,7 @@ uint64_t dht_query(const uint8_t * key)          lookup_destroy(lu);          /* Current behaviour is anycast and return the first peer address. */ -        if (addrs[0] != dht.addr) +        if (addrs[0] != dht->addr)                  return addrs[0];          if (n > 1) @@ -2358,7 +2432,9 @@ uint64_t dht_query(const uint8_t * key)  static void * dht_handle_packet(void * o)  { -        (void) o; +        struct dht * dht = (struct dht *) o; + +        assert(dht);          while (true) {                  kad_msg_t *          msg; @@ -2371,14 +2447,14 @@ static void * dht_handle_packet(void * o)                  size_t               t_expire;                  struct cmd *         cmd; -                pthread_mutex_lock(&dht.mtx); +                pthread_mutex_lock(&dht->mtx); -                pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx); +                pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); -                while (list_is_empty(&dht.cmds)) -                        pthread_cond_wait(&dht.cond, &dht.mtx); +                while (list_is_empty(&dht->cmds)) +                        pthread_cond_wait(&dht->cond, &dht->mtx); -                cmd = list_last_entry(&dht.cmds, struct cmd, next); +                cmd = list_last_entry(&dht->cmds, struct cmd, next);                  list_del(&cmd->next);                  pthread_cleanup_pop(true); @@ -2396,18 +2472,18 @@ static void * dht_handle_packet(void * o)                          continue;                  } -                if (msg->code != KAD_RESPONSE && dht_wait_running()) { +                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {                          kad_msg__free_unpacked(msg, NULL);                          log_dbg("Got a request message when not running.");                          continue;                  } -                pthread_rwlock_rdlock(&dht.lock); +                pthread_rwlock_rdlock(&dht->lock); -                b        = dht.b; -                t_expire = dht.t_expire; +                b        = dht->b; +                t_expire = dht->t_expire; -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);                  if (msg->has_key && msg->key.len != b) {                          kad_msg__free_unpacked(msg, NULL); @@ -2422,7 +2498,7 @@ static void * dht_handle_packet(void * o)                          continue;                  } -                tpm_dec(dht.tpm); +                tpm_dec(dht->tpm);                  addr = msg->s_addr; @@ -2465,7 +2541,7 @@ static void * dht_handle_packet(void * o)                          resp_msg.t_replicate     = KAD_T_REPL;                          break;                  case KAD_FIND_VALUE: -                        buf = dht_retrieve(msg->key.data); +                        buf = dht_retrieve(dht, msg->key.data);                          if (buf.len != 0) {                                  resp_msg.n_addrs = buf.len;                                  resp_msg.addrs   = (uint64_t *) buf.data; @@ -2475,7 +2551,7 @@ static void * dht_handle_packet(void * o)                  case KAD_FIND_NODE:                          /* Return k closest contacts. */                          resp_msg.n_contacts = -                                dht_get_contacts(msg->key.data, &cmsgs); +                                dht_get_contacts(dht, msg->key.data, &cmsgs);                          resp_msg.contacts = cmsgs;                          break;                  case KAD_STORE: @@ -2489,11 +2565,11 @@ static void * dht_handle_packet(void * o)                                  break;                          } -                        kad_add(*msg->contacts, msg->n_contacts, +                        kad_add(dht, *msg->contacts, msg->n_contacts,                                  msg->t_expire);                          break;                  case KAD_RESPONSE: -                        kad_handle_response(msg); +                        kad_handle_response(dht, msg);                          break;                  default:                          assert(false); @@ -2501,19 +2577,19 @@ static void * dht_handle_packet(void * o)                  }                  if (msg->code != KAD_JOIN) { -                        pthread_rwlock_wrlock(&dht.lock); -                        if (dht_get_state() == DHT_JOINING && -                            dht.buckets == NULL) { -                                pthread_rwlock_unlock(&dht.lock); +                        pthread_rwlock_wrlock(&dht->lock); +                        if (dht_get_state(dht) == DHT_JOINING && +                            dht->buckets == NULL) { +                                pthread_rwlock_unlock(&dht->lock);                                  goto finish;                          } -                        if (dht_update_bucket(msg->s_id.data, addr)) +                        if (dht_update_bucket(dht, msg->s_id.data, addr))                                  log_warn("Failed to update bucket."); -                        pthread_rwlock_unlock(&dht.lock); +                        pthread_rwlock_unlock(&dht->lock);                  } -                if (msg->code < KAD_STORE && send_msg(&resp_msg, addr) < 0) +                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0)                                  log_warn("Failed to send response.");   finish: @@ -2523,7 +2599,7 @@ static void * dht_handle_packet(void * o)                          free(resp_msg.addrs);                  if (resp_msg.n_contacts == 0) { -                        tpm_inc(dht.tpm); +                        tpm_inc(dht->tpm);                          continue;                  } @@ -2532,22 +2608,19 @@ static void * dht_handle_packet(void * o)                                                         NULL);                  free(resp_msg.contacts); -                tpm_inc(dht.tpm); +                tpm_inc(dht->tpm);          }          return (void *) 0;  } -static void dht_post_packet(void *               o, +static void dht_post_packet(void *               comp,                              struct shm_du_buff * sdb)  {          struct cmd * cmd; +        struct dht * dht = (struct dht *) comp; -        (void) o; - -        assert(o == &dht); - -        if (dht_get_state() == DHT_SHUTDOWN) { +        if (dht_get_state(dht) == DHT_SHUTDOWN) {  #ifndef __DHT_TEST__                  ipcp_sdb_release(sdb);  #endif @@ -2562,34 +2635,39 @@ static void dht_post_packet(void *               o,          cmd->sdb = sdb; -        pthread_mutex_lock(&dht.mtx); +        pthread_mutex_lock(&dht->mtx); -        list_add(&cmd->next, &dht.cmds); +        list_add(&cmd->next, &dht->cmds); -        pthread_cond_signal(&dht.cond); +        pthread_cond_signal(&dht->cond); -        pthread_mutex_unlock(&dht.mtx); +        pthread_mutex_unlock(&dht->mtx);  } -void dht_fini() +void dht_destroy(void * dir)  { +        struct dht *       dht;          struct list_head * p;          struct list_head * h; +        dht = (struct dht *) dir; +        if (dht == NULL) +                return; +  #ifndef __DHT_TEST__ -        tpm_stop(dht.tpm); +        tpm_stop(dht->tpm); -        tpm_destroy(dht.tpm); +        tpm_destroy(dht->tpm);  #endif -        if (dht_get_state() == DHT_RUNNING) { -                dht_set_state(DHT_SHUTDOWN); -                pthread_cancel(dht.worker); -                pthread_join(dht.worker, NULL); +        if (dht_get_state(dht) == DHT_RUNNING) { +                dht_set_state(dht, DHT_SHUTDOWN); +                pthread_cancel(dht->worker); +                pthread_join(dht->worker, NULL);          } -        pthread_rwlock_wrlock(&dht.lock); +        pthread_rwlock_wrlock(&dht->lock); -        list_for_each_safe(p, h, &dht.cmds) { +        list_for_each_safe(p, h, &dht->cmds) {                  struct cmd * c = list_entry(p, struct cmd, next);                  list_del(&c->next);  #ifndef __DHT_TEST__ @@ -2598,42 +2676,44 @@ void dht_fini()                  free(c);          } -        list_for_each_safe(p, h, &dht.entries) { +        list_for_each_safe(p, h, &dht->entries) {                  struct dht_entry * e = list_entry(p, struct dht_entry, next);                  list_del(&e->next);                  dht_entry_destroy(e);          } -        list_for_each_safe(p, h, &dht.requests) { +        list_for_each_safe(p, h, &dht->requests) {                  struct kad_req * r = list_entry(p, struct kad_req, next);                  list_del(&r->next);                  kad_req_destroy(r);          } -        list_for_each_safe(p, h, &dht.refs) { +        list_for_each_safe(p, h, &dht->refs) {                  struct ref_entry * e = list_entry(p, struct ref_entry, next);                  list_del(&e->next);                  ref_entry_destroy(e);          } -        list_for_each_safe(p, h, &dht.lookups) { +        list_for_each_safe(p, h, &dht->lookups) {                  struct lookup * l = list_entry(p, struct lookup, next);                  list_del(&l->next);                  lookup_destroy(l);          } -        pthread_rwlock_unlock(&dht.lock); +        pthread_rwlock_unlock(&dht->lock); + +        if (dht->buckets != NULL) +                bucket_destroy(dht->buckets); -        if (dht.buckets != NULL) -                bucket_destroy(dht.buckets); +        bmp_destroy(dht->cookies); -        bmp_destroy(dht.cookies); +        pthread_mutex_destroy(&dht->mtx); -        pthread_mutex_destroy(&dht.mtx); +        pthread_rwlock_destroy(&dht->lock); -        pthread_rwlock_destroy(&dht.lock); +        free(dht->id); -        free(dht.id); +        free(dht);  }  static void * join_thr(void * o) @@ -2644,14 +2724,14 @@ static void * join_thr(void * o)          assert(info); -        while (kad_join(info->addr)) { -                if (dht_get_state() == DHT_SHUTDOWN) { +        while (kad_join(info->dht, info->addr)) { +                if (dht_get_state(info->dht) == DHT_SHUTDOWN) {                          log_dbg("DHT enrollment aborted.");                          goto finish;                  }                  if (retr++ == KAD_JOIN_RETR) { -                        dht_set_state(DHT_INIT); +                        dht_set_state(info->dht, DHT_INIT);                          log_warn("DHT enrollment attempt failed.");                          goto finish;                  } @@ -2659,9 +2739,9 @@ static void * join_thr(void * o)                  sleep(KAD_JOIN_INTV);          } -        dht_set_state(DHT_RUNNING); +        dht_set_state(info->dht, DHT_RUNNING); -        lu = kad_lookup(dht.id, KAD_FIND_NODE); +        lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);          if (lu != NULL)                  lookup_destroy(lu); @@ -2675,7 +2755,7 @@ static void handle_event(void *       self,                           int          event,                           const void * o)  { -        (void) self; +        struct dht * dht = (struct dht *) self;          if (event == NOTIFY_DT_CONN_ADD) {                  pthread_t          thr; @@ -2686,18 +2766,19 @@ static void handle_event(void *       self,                  /* Give the pff some time to update for the new link. */                  nanosleep(&slack, NULL); -                switch(dht_get_state()) { +                switch(dht_get_state(dht)) {                  case DHT_INIT:                          inf = malloc(sizeof(*inf));                          if (inf == NULL)                                  break; +                        inf->dht  = dht;                          inf->addr = c->conn_info.addr; -                        if (dht_set_state(DHT_JOINING) == 0 || -                            dht_wait_running()) { +                        if (dht_set_state(dht, DHT_JOINING) == 0 || +                            dht_wait_running(dht)) {                                  if (pthread_create(&thr, NULL, join_thr, inf)) { -                                        dht_set_state(DHT_INIT); +                                        dht_set_state(dht, DHT_INIT);                                          free(inf);                                          return;                                  } @@ -2711,7 +2792,7 @@ static void handle_event(void *       self,                           * FIXME: this lookup for effiency reasons                           * causes a SEGV when stressed with rapid                           * enrollments. -                         * lu = kad_lookup(dht, dht.id, KAD_FIND_NODE); +                         * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);                           * if (lu != NULL)                           *         lookup_destroy(lu);                           */ @@ -2722,65 +2803,73 @@ static void handle_event(void *       self,          }  } -int dht_init() +void * dht_create(void)  { -        dht.buckets = NULL; +        struct dht * dht; -        list_head_init(&dht.entries); -        list_head_init(&dht.requests); -        list_head_init(&dht.refs); -        list_head_init(&dht.lookups); -        list_head_init(&dht.cmds); +        dht = malloc(sizeof(*dht)); +        if (dht == NULL) +                goto fail_malloc; + +        dht->buckets = NULL; -        if (pthread_rwlock_init(&dht.lock, NULL)) +        list_head_init(&dht->entries); +        list_head_init(&dht->requests); +        list_head_init(&dht->refs); +        list_head_init(&dht->lookups); +        list_head_init(&dht->cmds); + +        if (pthread_rwlock_init(&dht->lock, NULL))                  goto fail_rwlock; -        if (pthread_mutex_init(&dht.mtx, NULL)) +        if (pthread_mutex_init(&dht->mtx, NULL))                  goto fail_mutex; -        if (pthread_cond_init(&dht.cond, NULL)) +        if (pthread_cond_init(&dht->cond, NULL))                  goto fail_cond; -        dht.cookies = bmp_create(DHT_MAX_REQS, 1); -        if (dht.cookies == NULL) +        dht->cookies = bmp_create(DHT_MAX_REQS, 1); +        if (dht->cookies == NULL)                  goto fail_bmp; -        dht.b    = 0; -        dht.id   = NULL; +        dht->b    = 0; +        dht->id   = NULL;  #ifndef __DHT_TEST__ -        dht.addr = ipcpi.dt_addr; -        dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL); -        if (dht.tpm == NULL) +        dht->addr = ipcpi.dt_addr; +        dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); +        if (dht->tpm == NULL)                  goto fail_tpm_create; -        if (tpm_start(dht.tpm)) +        if (tpm_start(dht->tpm))                  goto fail_tpm_start; -        dht.eid   = dt_reg_comp(&dht, &dht_post_packet, DHT); -        if ((int) dht.eid < 0) +        dht->eid   = dt_reg_comp(dht, &dht_post_packet, DHT); +        if ((int) dht->eid < 0)                  goto fail_tpm_start; -        notifier_reg(handle_event, NULL); +        notifier_reg(handle_event, dht);  #else          (void) handle_event;          (void) dht_handle_packet;          (void) dht_post_packet;  #endif -        dht.state = DHT_INIT; +        dht->state = DHT_INIT; -        return 0; +        return (void *) dht;  #ifndef __DHT_TEST__   fail_tpm_start: -        tpm_destroy(dht.tpm); +        tpm_destroy(dht->tpm);   fail_tpm_create: -        bmp_destroy(dht.cookies); +        bmp_destroy(dht->cookies);  #endif   fail_bmp: -        pthread_cond_destroy(&dht.cond); +        pthread_cond_destroy(&dht->cond);   fail_cond: -        pthread_mutex_destroy(&dht.mtx); +        pthread_mutex_destroy(&dht->mtx);   fail_mutex: -        pthread_rwlock_destroy(&dht.lock); +        pthread_rwlock_destroy(&dht->lock);   fail_rwlock: -        return -1; +        free(dht); + fail_malloc: +        return NULL;  } diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h index c34cf1c4..a6e9c2c8 100644 --- a/src/ipcpd/unicast/dir/dht.h +++ b/src/ipcpd/unicast/dir/dht.h @@ -30,19 +30,22 @@  #include <stdint.h>  #include <sys/types.h> -int          dht_init(void); +void *   dht_create(void); -int          dht_bootstrap(void); +void     dht_destroy(void * dir); -void         dht_fini(void); +int      dht_bootstrap(void * dir); -int          dht_reg(const uint8_t * key); +int      dht_reg(void *          dir, +                 const uint8_t * key); -int          dht_unreg(const uint8_t * key); +int      dht_unreg(void *          dir, +                   const uint8_t * key); -uint64_t     dht_query(const uint8_t * key); +uint64_t dht_query(void *          dir, +                   const uint8_t * key); -int          dht_wait_running(void); +int      dht_wait_running(void * dir);  extern struct dir_ops dht_dir_ops; diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h index 7eabb680..e74324da 100644 --- a/src/ipcpd/unicast/dir/ops.h +++ b/src/ipcpd/unicast/dir/ops.h @@ -23,20 +23,24 @@  #ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H  #define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H +  struct dir_ops { -        int      (* init)(void); +        void *   (* create)(void); -        void     (* fini)(void); +        void     (* destroy)(void * dir); -        int      (* bootstrap)(void); +        int      (* bootstrap)(void * dir); -        int      (* reg)(const uint8_t * hash); +        int      (* reg)(void * dir, +                         const uint8_t * hash); -        int      (* unreg)(const uint8_t * hash); +        int      (* unreg)(void * dir, +                           const uint8_t * hash); -        uint64_t (* query)(const uint8_t * hash); +        uint64_t (* query)(void * dir, +                           const uint8_t * hash); -        int      (* wait_running)(void); +        int      (* wait_running)(void * dir);  };  #endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */ diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c index 70773ea7..3f4c3b87 100644 --- a/src/ipcpd/unicast/dir/tests/dht_test.c +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -23,7 +23,6 @@  #define __DHT_TEST__  #define DHT_TEST_KEY_LEN  32 -  #include "dht.c"  #include <pthread.h> @@ -31,45 +30,49 @@  #include <stdlib.h>  #include <stdio.h> -#define CONTACTS 1000 +#define CONTACTS          1000  int dht_test(int     argc,               char ** argv)  { +        struct dht * dht;          uint8_t      key[DHT_TEST_KEY_LEN];          size_t       i;          (void) argc;          (void) argv; -        if (dht_init() < 0) { +        dht = dht_create(); +        if (dht == NULL) {                  printf("Failed to create dht.\n");                  return -1;          } -        dht_fini(); +        dht_destroy(dht); -        if (dht_init() < 0) { +        dht = dht_create(); +        if (dht == NULL) {                  printf("Failed to re-create dht.\n");                  return -1;          } -        if (dht_bootstrap()) { +        if (dht_bootstrap(dht)) {                  printf("Failed to bootstrap dht.\n"); -                dht_fini(); +                dht_destroy(dht);                  return -1;          } -        dht_fini(); +        dht_destroy(dht); -        if (dht_init() < 0) { +        dht = dht_create(); +        if (dht == NULL) {                  printf("Failed to re-create dht.\n");                  return -1;          } -        if (dht_bootstrap()) { +        if (dht_bootstrap(dht)) {                  printf("Failed to bootstrap dht.\n"); -                dht_fini(); +                dht_destroy(dht);                  return -1;          } @@ -77,17 +80,17 @@ int dht_test(int     argc,                  uint64_t addr;                  random_buffer(&addr, sizeof(addr));                  random_buffer(key, DHT_TEST_KEY_LEN); -                pthread_rwlock_wrlock(&dht.lock); -                if (dht_update_bucket(key, addr)) { -                        pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_wrlock(&dht->lock); +                if (dht_update_bucket(dht, key, addr)) { +                        pthread_rwlock_unlock(&dht->lock);                          printf("Failed to update bucket.\n"); -                        dht_fini(); +                        dht_destroy(dht);                          return -1;                  } -                pthread_rwlock_unlock(&dht.lock); +                pthread_rwlock_unlock(&dht->lock);          } -        dht_fini(); +        dht_destroy(dht);          return 0;  } | 
