summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dht.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2021-12-04 18:26:58 +0100
committerSander Vrijders <sander@ouroboros.rocks>2021-12-06 17:52:16 +0100
commit9422e6be94ac1007e8115a920379fd545055e531 (patch)
tree31075ad5ee851ef4625e3cafbd821e591e817997 /src/ipcpd/unicast/dht.c
parent11d2ecc140486949c8d81e984137263ca48d5799 (diff)
downloadouroboros-9422e6be94ac1007e8115a920379fd545055e531.tar.gz
ouroboros-9422e6be94ac1007e8115a920379fd545055e531.zip
ipcpd: Move DHT to stack
This makes the DHT a single directory implementation and moves it to the stack (init/fini instead of create/destroy). This is a step towards making it a directory policy, in line with our other policy implementations. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/unicast/dht.c')
-rw-r--r--src/ipcpd/unicast/dht.c819
1 files changed, 376 insertions, 443 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c
index 2b668f9f..f7cb89f2 100644
--- a/src/ipcpd/unicast/dht.c
+++ b/src/ipcpd/unicast/dht.c
@@ -47,6 +47,7 @@
#include "common/connmgr.h"
#include "dht.h"
#include "dt.h"
+#include "ipcp.h"
#include <stdlib.h>
#include <string.h>
@@ -208,7 +209,7 @@ struct cmd {
struct shm_du_buff * sdb;
};
-struct dht {
+struct {
size_t alpha;
size_t b;
size_t k;
@@ -244,15 +245,13 @@ struct dht {
struct tpm * tpm;
pthread_t worker;
-};
+} dht;
struct join_info {
- struct dht * dht;
uint64_t addr;
};
struct packet_info {
- struct dht * dht;
struct shm_du_buff * sdb;
};
@@ -270,50 +269,49 @@ static uint8_t * dht_dup_key(const uint8_t * key,
return dup;
}
-static enum dht_state dht_get_state(struct dht * dht)
+static enum dht_state dht_get_state(void)
{
enum dht_state state;
- pthread_mutex_lock(&dht->mtx);
+ pthread_mutex_lock(&dht.mtx);
- state = dht->state;
+ state = dht.state;
- pthread_mutex_unlock(&dht->mtx);
+ pthread_mutex_unlock(&dht.mtx);
return state;
}
-static int dht_set_state(struct dht * dht,
- enum dht_state state)
+static int dht_set_state(enum dht_state state)
{
- pthread_mutex_lock(&dht->mtx);
+ pthread_mutex_lock(&dht.mtx);
- if (state == DHT_JOINING && dht->state != DHT_INIT) {
- pthread_mutex_unlock(&dht->mtx);
+ if (state == DHT_JOINING && dht.state != DHT_INIT) {
+ pthread_mutex_unlock(&dht.mtx);
return -1;
}
- dht->state = state;
+ dht.state = state;
- pthread_cond_broadcast(&dht->cond);
+ pthread_cond_broadcast(&dht.cond);
- pthread_mutex_unlock(&dht->mtx);
+ pthread_mutex_unlock(&dht.mtx);
return 0;
}
-int dht_wait_running(struct dht * dht)
+int dht_wait_running()
{
int ret = 0;
- pthread_mutex_lock(&dht->mtx);
+ pthread_mutex_lock(&dht.mtx);
- pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx);
- while (dht->state == DHT_JOINING)
- pthread_cond_wait(&dht->cond, &dht->mtx);
+ while (dht.state == DHT_JOINING)
+ pthread_cond_wait(&dht.cond, &dht.mtx);
- if (dht->state != DHT_RUNNING)
+ if (dht.state != DHT_RUNNING)
ret = -1;
pthread_cleanup_pop(true);
@@ -337,8 +335,7 @@ static uint8_t * create_id(size_t len)
return id;
}
-static void kad_req_create(struct dht * dht,
- kad_msg_t * msg,
+static void kad_req_create(kad_msg_t * msg,
uint64_t addr)
{
struct kad_req * req;
@@ -361,9 +358,9 @@ static void kad_req_create(struct dht * dht,
req->code = msg->code;
req->key = NULL;
- pthread_rwlock_rdlock(&dht->lock);
- b = dht->b;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
+ b = dht.b;
+ pthread_rwlock_unlock(&dht.lock);
if (msg->has_key) {
req->key = dht_dup_key(msg->key.data, b);
@@ -394,11 +391,11 @@ static void kad_req_create(struct dht * dht,
pthread_condattr_destroy(&cattr);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- list_add(&req->next, &dht->requests);
+ list_add(&req->next, &dht.requests);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
}
static void cancel_req_destroy(void * o)
@@ -556,12 +553,11 @@ static struct bucket * iter_bucket(struct bucket * b,
return iter_bucket(b->children[(byte & mask)], id);
}
-static struct bucket * dht_get_bucket(struct dht * dht,
- const uint8_t * id)
+static struct bucket * dht_get_bucket(const uint8_t * id)
{
- assert(dht->buckets);
+ assert(dht.buckets);
- return iter_bucket(dht->buckets, id);
+ return iter_bucket(dht.buckets, id);
}
/*
@@ -596,8 +592,7 @@ static size_t list_add_sorted(struct list_head * l,
return 1;
}
-static size_t dht_contact_list(struct dht * dht,
- struct list_head * l,
+static size_t dht_contact_list(struct list_head * l,
const uint8_t * key)
{
struct list_head * p;
@@ -607,55 +602,52 @@ static size_t dht_contact_list(struct dht * dht,
struct timespec t;
assert(l);
- assert(dht);
assert(key);
assert(list_is_empty(l));
clock_gettime(CLOCK_REALTIME_COARSE, &t);
- b = dht_get_bucket(dht, key);
+ b = dht_get_bucket(key);
if (b == NULL)
return 0;
b->t_refr = t.tv_sec + KAD_T_REFR;
- if (b->n_contacts == dht->k || b->parent == NULL) {
+ if (b->n_contacts == dht.k || b->parent == NULL) {
list_for_each(p, &b->contacts) {
struct contact * c;
c = list_entry(p, struct contact, next);
- c = contact_create(c->id, dht->b, c->addr);
+ c = contact_create(c->id, dht.b, c->addr);
if (list_add_sorted(l, c, key) == 1)
- if (++len == dht->k)
+ if (++len == dht.k)
break;
}
} else {
struct bucket * d = b->parent;
- for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) {
+ for (i = 0; i < (1L << KAD_BETA) && len < dht.k; ++i) {
list_for_each(p, &d->children[i]->contacts) {
struct contact * c;
c = list_entry(p, struct contact, next);
- c = contact_create(c->id, dht->b, c->addr);
+ c = contact_create(c->id, dht.b, c->addr);
if (c == NULL)
continue;
if (list_add_sorted(l, c, key) == 1)
- if (++len == dht->k)
+ if (++len == dht.k)
break;
}
}
}
- assert(len == dht->k || b->parent == NULL);
+ assert(len == dht.k || b->parent == NULL);
return len;
}
-static struct lookup * lookup_create(struct dht * dht,
- const uint8_t * id)
+static struct lookup * lookup_create(const uint8_t * id)
{
struct lookup * lu;
pthread_condattr_t cattr;
- assert(dht);
assert(id);
lu = malloc(sizeof(*lu));
@@ -668,7 +660,7 @@ static struct lookup * lookup_create(struct dht * dht,
lu->state = LU_INIT;
lu->addrs = NULL;
lu->n_addrs = 0;
- lu->key = dht_dup_key(id, dht->b);
+ lu->key = dht_dup_key(id, dht.b);
if (lu->key == NULL)
goto fail_id;
@@ -685,13 +677,13 @@ static struct lookup * lookup_create(struct dht * dht,
pthread_condattr_destroy(&cattr);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- list_add(&lu->next, &dht->lookups);
+ list_add(&lu->next, &dht.lookups);
- lu->n_contacts = dht_contact_list(dht, &lu->contacts, id);
+ lu->n_contacts = dht_contact_list(&lu->contacts, id);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return lu;
@@ -770,8 +762,7 @@ static void lookup_destroy(struct lookup * lu)
pthread_cleanup_pop(true);
}
-static void lookup_update(struct dht * dht,
- struct lookup * lu,
+static void lookup_update(struct lookup * lu,
kad_msg_t * msg)
{
struct list_head * p = NULL;
@@ -784,7 +775,7 @@ static void lookup_update(struct dht * dht,
assert(lu);
assert(msg);
- if (dht_get_state(dht) != DHT_RUNNING)
+ if (dht_get_state() != DHT_RUNNING)
return;
pthread_mutex_lock(&lu->lock);
@@ -820,16 +811,16 @@ static void lookup_update(struct dht * dht,
pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock);
while (lu->state == LU_INIT) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
pthread_cond_wait(&lu->cond, &lu->lock);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
}
pthread_cleanup_pop(false);
for (n = 0; n < msg->n_contacts; ++n) {
c = contact_create(msg->contacts[n]->id.data,
- dht->b, msg->contacts[n]->addr);
+ dht.b, msg->contacts[n]->addr);
if (c == NULL)
continue;
@@ -838,7 +829,7 @@ static void lookup_update(struct dht * dht,
list_for_each(p, &lu->contacts) {
struct contact * e;
e = list_entry(p, struct contact, next);
- if (!memcmp(e->id, c->id, dht->b)) {
+ if (!memcmp(e->id, c->id, dht.b)) {
contact_destroy(c);
c = NULL;
break;
@@ -853,11 +844,11 @@ static void lookup_update(struct dht * dht,
if (c == NULL)
continue;
- if (lu->n_contacts < dht->k) {
+ if (lu->n_contacts < dht.k) {
list_add_tail(&c->next, p);
++lu->n_contacts;
mod = true;
- } else if (pos == dht->k) {
+ } else if (pos == dht.k) {
contact_destroy(c);
} else {
struct contact * d;
@@ -1005,15 +996,12 @@ static enum lookup_state lookup_wait(struct lookup * lu)
return state;
}
-static struct kad_req * dht_find_request(struct dht * dht,
- kad_msg_t * msg)
+static struct kad_req * dht_find_request(kad_msg_t * msg)
{
struct list_head * p;
-
- assert(dht);
assert(msg);
- list_for_each(p, &dht->requests) {
+ list_for_each(p, &dht.requests) {
struct kad_req * r = list_entry(p, struct kad_req, next);
if (r->cookie == msg->cookie)
return r;
@@ -1022,17 +1010,15 @@ static struct kad_req * dht_find_request(struct dht * dht,
return NULL;
}
-static struct lookup * dht_find_lookup(struct dht * dht,
- uint32_t cookie)
+static struct lookup * dht_find_lookup(uint32_t cookie)
{
struct list_head * p;
struct list_head * p2;
struct list_head * h2;
- assert(dht);
assert(cookie > 0);
- list_for_each(p, &dht->lookups) {
+ list_for_each(p, &dht.lookups) {
struct lookup * l = list_entry(p, struct lookup, next);
pthread_mutex_lock(&l->lock);
list_for_each_safe(p2, h2, &l->cookies) {
@@ -1079,20 +1065,18 @@ static void val_destroy(struct val * v)
free(v);
}
-static struct ref_entry * ref_entry_create(struct dht * dht,
- const uint8_t * key)
+static struct ref_entry * ref_entry_create(const uint8_t * key)
{
struct ref_entry * e;
struct timespec t;
- assert(dht);
assert(key);
e = malloc(sizeof(*e));
if (e == NULL)
return NULL;
- e->key = dht_dup_key(key, dht->b);
+ e->key = dht_dup_key(key, dht.b);
if (e->key == NULL) {
free(e);
return NULL;
@@ -1100,7 +1084,7 @@ static struct ref_entry * ref_entry_create(struct dht * dht,
clock_gettime(CLOCK_REALTIME_COARSE, &t);
- e->t_rep = t.tv_sec + dht->t_repub;
+ e->t_rep = t.tv_sec + dht.t_repub;
return e;
}
@@ -1111,12 +1095,10 @@ static void ref_entry_destroy(struct ref_entry * e)
free(e);
}
-static struct dht_entry * dht_entry_create(struct dht * dht,
- const uint8_t * key)
+static struct dht_entry * dht_entry_create(const uint8_t * key)
{
struct dht_entry * e;
- assert(dht);
assert(key);
e = malloc(sizeof(*e));
@@ -1128,7 +1110,7 @@ static struct dht_entry * dht_entry_create(struct dht * dht,
e->n_vals = 0;
- e->key = dht_dup_key(key, dht->b);
+ e->key = dht_dup_key(key, dht.b);
if (e->key == NULL) {
free(e);
return NULL;
@@ -1211,8 +1193,7 @@ static void dht_entry_del_addr(struct dht_entry * e,
}
}
-static uint64_t dht_entry_get_addr(struct dht * dht,
- struct dht_entry * e)
+static uint64_t dht_entry_get_addr(struct dht_entry * e)
{
struct list_head * p;
@@ -1221,7 +1202,7 @@ static uint64_t dht_entry_get_addr(struct dht * dht,
list_for_each(p, &e->vals) {
struct val * v = list_entry(p, struct val, next);
- if (v->addr != dht->addr)
+ if (v->addr != dht.addr)
return v->addr;
}
@@ -1229,14 +1210,12 @@ static uint64_t dht_entry_get_addr(struct dht * dht,
}
/* Forward declaration. */
-static struct lookup * kad_lookup(struct dht * dht,
- const uint8_t * key,
+static struct lookup * kad_lookup(const uint8_t * key,
enum kad_code code);
/* Build a refresh list. */
-static void bucket_refresh(struct dht * dht,
- struct bucket * b,
+static void bucket_refresh(struct bucket * b,
time_t t,
struct list_head * r)
{
@@ -1244,7 +1223,7 @@ static void bucket_refresh(struct dht * dht,
if (*b->children != NULL)
for (i = 0; i < (1L << KAD_BETA); ++i)
- bucket_refresh(dht, b->children[i], t, r);
+ bucket_refresh(b->children[i], t, r);
if (b->n_contacts == 0)
return;
@@ -1253,7 +1232,7 @@ static void bucket_refresh(struct dht * dht,
struct contact * c;
struct contact * d;
c = list_first_entry(&b->contacts, struct contact, next);
- d = contact_create(c->id, dht->b, c->addr);
+ d = contact_create(c->id, dht.b, c->addr);
if (c != NULL)
list_add(&d->next, r);
return;
@@ -1387,8 +1366,7 @@ static int split_bucket(struct bucket * b)
}
/* Locked externally to mandate update as (final) part of join transaction. */
-static int dht_update_bucket(struct dht * dht,
- const uint8_t * id,
+static int dht_update_bucket(const uint8_t * id,
uint64_t addr)
{
struct list_head * p;
@@ -1396,13 +1374,11 @@ static int dht_update_bucket(struct dht * dht,
struct bucket * b;
struct contact * c;
- assert(dht);
-
- b = dht_get_bucket(dht, id);
+ b = dht_get_bucket(id);
if (b == NULL)
return -1;
- c = contact_create(id, dht->b, addr);
+ c = contact_create(id, dht.b, addr);
if (c == NULL)
return -1;
@@ -1415,8 +1391,8 @@ static int dht_update_bucket(struct dht * dht,
}
}
- if (b->n_contacts == dht->k) {
- if (bucket_has_id(b, dht->id)) {
+ if (b->n_contacts == dht.k) {
+ if (bucket_has_id(b, dht.id)) {
list_add_tail(&c->next, &b->contacts);
++b->n_contacts;
if (split_bucket(b)) {
@@ -1424,7 +1400,7 @@ static int dht_update_bucket(struct dht * dht,
contact_destroy(c);
--b->n_contacts;
}
- } else if (b->n_alts == dht->k) {
+ } else if (b->n_alts == dht.k) {
struct contact * d;
d = list_first_entry(&b->alts, struct contact, next);
list_del(&d->next);
@@ -1442,8 +1418,7 @@ static int dht_update_bucket(struct dht * dht,
return 0;
}
-static int send_msg(struct dht * dht,
- kad_msg_t * msg,
+static int send_msg(kad_msg_t * msg,
uint64_t addr)
{
#ifndef __DHT_TEST__
@@ -1455,25 +1430,25 @@ static int send_msg(struct dht * dht,
if (msg->code == KAD_RESPONSE)
retr = KAD_RESP_RETR;
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- if (dht->id != NULL) {
+ if (dht.id != NULL) {
msg->has_s_id = true;
- msg->s_id.data = dht->id;
- msg->s_id.len = dht->b;
+ msg->s_id.data = dht.id;
+ msg->s_id.len = dht.b;
}
- msg->s_addr = dht->addr;
+ msg->s_addr = dht.addr;
if (msg->code < KAD_STORE) {
- msg->cookie = bmp_allocate(dht->cookies);
- if (!bmp_is_id_valid(dht->cookies, msg->cookie)) {
- pthread_rwlock_unlock(&dht->lock);
+ msg->cookie = bmp_allocate(dht.cookies);
+ if (!bmp_is_id_valid(dht.cookies, msg->cookie)) {
+ pthread_rwlock_unlock(&dht.lock);
goto fail_bmp_alloc;
}
}
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
#ifndef __DHT_TEST__
len = kad_msg__get_packed_size(msg);
@@ -1486,7 +1461,7 @@ static int send_msg(struct dht * dht,
kad_msg__pack(msg, shm_du_buff_head(sdb));
- if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
+ if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) == 0)
break;
ipcp_sdb_release(sdb);
@@ -1502,53 +1477,51 @@ static int send_msg(struct dht * dht,
(void) retr;
#endif /* __DHT_TEST__ */
- if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN)
- kad_req_create(dht, msg, addr);
+ if (msg->code < KAD_STORE && dht_get_state() != DHT_SHUTDOWN)
+ kad_req_create(msg, addr);
return msg->cookie;
#ifndef __DHT_TEST__
fail_msg:
- pthread_rwlock_wrlock(&dht->lock);
- bmp_release(dht->cookies, msg->cookie);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
+ bmp_release(dht.cookies, msg->cookie);
+ pthread_rwlock_unlock(&dht.lock);
#endif /* !__DHT_TEST__ */
fail_bmp_alloc:
return -1;
}
-static struct dht_entry * dht_find_entry(struct dht * dht,
- const uint8_t * key)
+static struct dht_entry * dht_find_entry(const uint8_t * key)
{
struct list_head * p;
- list_for_each(p, &dht->entries) {
+ list_for_each(p, &dht.entries) {
struct dht_entry * e = list_entry(p, struct dht_entry, next);
- if (!memcmp(key, e->key, dht->b))
+ if (!memcmp(key, e->key, dht.b))
return e;
}
return NULL;
}
-static int kad_add(struct dht * dht,
- const kad_contact_msg_t * contacts,
+static int kad_add(const kad_contact_msg_t * contacts,
ssize_t n,
time_t exp)
{
struct dht_entry * e;
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
while (n-- > 0) {
- if (contacts[n].id.len != dht->b)
+ if (contacts[n].id.len != dht.b)
log_warn("Bad key length in contact data.");
- e = dht_find_entry(dht, contacts[n].id.data);
+ e = dht_find_entry(contacts[n].id.data);
if (e != NULL) {
if (dht_entry_add_addr(e, contacts[n].addr, exp))
goto fail;
} else {
- e = dht_entry_create(dht, contacts[n].id.data);
+ e = dht_entry_create(contacts[n].id.data);
if (e == NULL)
goto fail;
@@ -1557,42 +1530,39 @@ static int kad_add(struct dht * dht,
goto fail;
}
- list_add(&e->next, &dht->entries);
+ list_add(&e->next, &dht.entries);
}
}
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
fail:
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return -ENOMEM;
}
-static int wait_resp(struct dht * dht,
- kad_msg_t * msg,
+static int wait_resp(kad_msg_t * msg,
time_t timeo)
{
struct kad_req * req;
- assert(dht);
assert(msg);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- req = dht_find_request(dht, msg);
+ req = dht_find_request(msg);
if (req == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return -EPERM;
}
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return kad_req_wait(req, timeo);
}
-static int kad_store(struct dht * dht,
- const uint8_t * key,
+static int kad_store(const uint8_t * key,
uint64_t addr,
uint64_t r_addr,
time_t ttl)
@@ -1604,11 +1574,11 @@ static int kad_store(struct dht * dht,
cmsg.id.data = (uint8_t *) key;
cmsg.addr = addr;
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- cmsg.id.len = dht->b;
+ cmsg.id.len = dht.b;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
cmsgp[0] = &cmsg;
@@ -1618,39 +1588,37 @@ static int kad_store(struct dht * dht,
msg.n_contacts = 1;
msg.contacts = cmsgp;
- if (send_msg(dht, &msg, r_addr) < 0)
+ if (send_msg(&msg, r_addr) < 0)
return -1;
return 0;
}
-static ssize_t kad_find(struct dht * dht,
- struct lookup * lu,
+static ssize_t kad_find(struct lookup * lu,
const uint64_t * addrs,
enum kad_code code)
{
kad_msg_t msg = KAD_MSG__INIT;
ssize_t sent = 0;
- assert(dht);
assert(lu->key);
msg.code = code;
msg.has_key = true;
msg.key.data = (uint8_t *) lu->key;
- msg.key.len = dht->b;
+ msg.key.len = dht.b;
while (*addrs != 0) {
struct cookie_el * c;
int ret;
- if (*addrs == dht->addr) {
+ if (*addrs == dht.addr) {
++addrs;
continue;
}
- ret = send_msg(dht, &msg, *addrs);
+ ret = send_msg(&msg, *addrs);
if (ret < 0)
break;
@@ -1673,38 +1641,36 @@ static ssize_t kad_find(struct dht * dht,
return sent;
}
-static void lookup_detach(struct dht * dht,
- struct lookup * lu)
+static void lookup_detach(struct lookup * lu)
{
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
list_del(&lu->next);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
}
-static struct lookup * kad_lookup(struct dht * dht,
- const uint8_t * id,
+static struct lookup * kad_lookup( const uint8_t * id,
enum kad_code code)
{
uint64_t addrs[KAD_ALPHA + 1];
enum lookup_state state;
struct lookup * lu;
- lu = lookup_create(dht, id);
+ lu = lookup_create(id);
if (lu == NULL)
return NULL;
lookup_new_addrs(lu, addrs);
if (addrs[0] == 0) {
- lookup_detach(dht, lu);
+ lookup_detach(lu);
lookup_destroy(lu);
return NULL;
}
- if (kad_find(dht, lu, addrs, code) == 0) {
- lookup_detach(dht, lu);
+ if (kad_find(lu, addrs, code) == 0) {
+ lookup_detach(lu);
return lu;
}
@@ -1715,10 +1681,10 @@ static struct lookup * kad_lookup(struct dht * dht,
if (addrs[0] == 0)
break;
- kad_find(dht, lu, addrs, code);
+ kad_find(lu, addrs, code);
break;
case LU_DESTROY:
- lookup_detach(dht, lu);
+ lookup_detach(lu);
lookup_set_state(lu, LU_NULL);
return NULL;
default:
@@ -1728,13 +1694,12 @@ static struct lookup * kad_lookup(struct dht * dht,
assert(state == LU_COMPLETE);
- lookup_detach(dht, lu);
+ lookup_detach(lu);
return lu;
}
-static void kad_publish(struct dht * dht,
- const uint8_t * key,
+static void kad_publish(const uint8_t * key,
uint64_t addr,
time_t exp)
{
@@ -1745,21 +1710,20 @@ static void kad_publish(struct dht * dht,
time_t t_expire;
- assert(dht);
assert(key);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- k = dht->k;
- t_expire = dht->t_expire;
+ k = dht.k;
+ t_expire = dht.t_expire;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
addrs = malloc(k * sizeof(*addrs));
if (addrs == NULL)
return;
- lu = kad_lookup(dht, key, KAD_FIND_NODE);
+ lu = kad_lookup(key, KAD_FIND_NODE);
if (lu == NULL) {
free(addrs);
return;
@@ -1768,14 +1732,14 @@ static void kad_publish(struct dht * dht,
n = lookup_contact_addrs(lu, addrs);
while (n-- > 0) {
- if (addrs[n] == dht->addr) {
+ if (addrs[n] == dht.addr) {
kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT;
msg.id.data = (uint8_t *) key;
- msg.id.len = dht->b;
+ msg.id.len = dht.b;
msg.addr = addr;
- kad_add(dht, &msg, 1, exp);
+ kad_add(&msg, 1, exp);
} else {
- if (kad_store(dht, key, addr, addrs[n], t_expire))
+ if (kad_store(key, addr, addrs[n], t_expire))
log_warn("Failed to send store message.");
}
}
@@ -1785,8 +1749,7 @@ static void kad_publish(struct dht * dht,
free(addrs);
}
-static int kad_join(struct dht * dht,
- uint64_t addr)
+static int kad_join(uint64_t addr)
{
kad_msg_t msg = KAD_MSG__INIT;
@@ -1802,44 +1765,43 @@ static int kad_join(struct dht * dht,
msg.t_refresh = KAD_T_REFR;
msg.t_replicate = KAD_T_REPL;
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- msg.b = dht->b;
+ msg.b = dht.b;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
- if (send_msg(dht, &msg, addr) < 0)
+ if (send_msg(&msg, addr) < 0)
return -1;
- if (wait_resp(dht, &msg, KAD_T_JOIN) < 0)
+ if (wait_resp(&msg, KAD_T_JOIN) < 0)
return -1;
- dht->id = create_id(dht->b);
- if (dht->id == NULL)
+ dht.id = create_id(dht.b);
+ if (dht.id == NULL)
return -1;
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- dht_update_bucket(dht, dht->id, dht->addr);
+ dht_update_bucket(dht.id, dht.addr);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
}
-static void dht_dead_peer(struct dht * dht,
- uint8_t * key,
+static void dht_dead_peer(uint8_t * key,
uint64_t addr)
{
struct list_head * p;
struct list_head * h;
struct bucket * b;
- b = dht_get_bucket(dht, key);
+ b = dht_get_bucket(key);
list_for_each_safe(p, h, &b->contacts) {
struct contact * c = list_entry(p, struct contact, next);
- if (b->n_contacts + b->n_alts <= dht->k) {
+ if (b->n_contacts + b->n_alts <= dht.k) {
++c->fails;
return;
}
@@ -1852,7 +1814,7 @@ static void dht_dead_peer(struct dht * dht,
}
}
- while (b->n_contacts < dht->k && b->n_alts > 0) {
+ while (b->n_contacts < dht.k && b->n_alts > 0) {
struct contact * c;
c = list_first_entry(&b->alts, struct contact, next);
list_del(&c->next);
@@ -1862,29 +1824,27 @@ static void dht_dead_peer(struct dht * dht,
}
}
-static int dht_del(struct dht * dht,
- const uint8_t * key,
+static int dht_del(const uint8_t * key,
uint64_t addr)
{
struct dht_entry * e;
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- e = dht_find_entry(dht, key);
+ e = dht_find_entry(key);
if (e == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return -EPERM;
}
dht_entry_del_addr(e, addr);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
}
-static buffer_t dht_retrieve(struct dht * dht,
- const uint8_t * key)
+static buffer_t dht_retrieve(const uint8_t * key)
{
struct dht_entry * e;
struct list_head * p;
@@ -1892,9 +1852,9 @@ static buffer_t dht_retrieve(struct dht * dht,
uint64_t * pos;
size_t addrs = 0;
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- e = dht_find_entry(dht, key);
+ e = dht_find_entry(key);
if (e == NULL)
goto fail;
@@ -1902,7 +1862,7 @@ static buffer_t dht_retrieve(struct dht * dht,
if (buf.len == 0)
goto fail;
- pos = malloc(sizeof(dht->addr) * buf.len);
+ pos = malloc(sizeof(dht.addr) * buf.len);
if (pos == NULL)
goto fail;
@@ -1915,19 +1875,18 @@ static buffer_t dht_retrieve(struct dht * dht,
break;
}
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return buf;
fail:
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
buf.len = 0;
return buf;
}
-static ssize_t dht_get_contacts(struct dht * dht,
- const uint8_t * key,
+static ssize_t dht_get_contacts(const uint8_t * key,
kad_contact_msg_t *** msgs)
{
struct list_head l;
@@ -1938,18 +1897,18 @@ static ssize_t dht_get_contacts(struct dht * dht,
list_head_init(&l);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- len = dht_contact_list(dht, &l, key);
+ len = dht_contact_list(&l, key);
if (len == 0) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
*msgs = NULL;
return 0;
}
*msgs = malloc(len * sizeof(**msgs));
if (*msgs == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
}
@@ -1957,7 +1916,7 @@ static ssize_t dht_get_contacts(struct dht * dht,
struct contact * c = list_entry(p, struct contact, next);
(*msgs)[i] = malloc(sizeof(***msgs));
if ((*msgs)[i] == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
while (i > 0)
free(*msgs[--i]);
free(*msgs);
@@ -1968,13 +1927,13 @@ static ssize_t dht_get_contacts(struct dht * dht,
kad_contact_msg__init((*msgs)[i]);
(*msgs)[i]->id.data = c->id;
- (*msgs)[i]->id.len = dht->b;
+ (*msgs)[i]->id.len = dht.b;
(*msgs)[i++]->addr = c->addr;
list_del(&c->next);
free(c);
}
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return i;
}
@@ -1990,7 +1949,6 @@ static time_t gcd(time_t a,
static void * work(void * o)
{
- struct dht * dht;
struct timespec now;
struct list_head * p;
struct list_head * h;
@@ -1998,46 +1956,46 @@ static void * work(void * o)
time_t intv;
struct lookup * lu;
- dht = (struct dht *) o;
+ (void) o;
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- intv = gcd(dht->t_expire, dht->t_repub);
+ intv = gcd(dht.t_expire, dht.t_repub);
intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
list_head_init(&reflist);
while (true) {
clock_gettime(CLOCK_REALTIME_COARSE, &now);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
/* Republish registered hashes. */
- list_for_each(p, &dht->refs) {
+ list_for_each(p, &dht.refs) {
struct ref_entry * e;
uint8_t * key;
uint64_t addr;
time_t t_expire;
e = list_entry(p, struct ref_entry, next);
if (now.tv_sec > e->t_rep) {
- key = dht_dup_key(e->key, dht->b);
+ key = dht_dup_key(e->key, dht.b);
if (key == NULL)
continue;
- addr = dht->addr;
- t_expire = dht->t_expire;
- e->t_rep = now.tv_sec + dht->t_repub;
+ addr = dht.addr;
+ t_expire = dht.t_expire;
+ e->t_rep = now.tv_sec + dht.t_repub;
- pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, addr, t_expire);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
+ kad_publish(key, addr, t_expire);
+ pthread_rwlock_wrlock(&dht.lock);
free(key);
}
}
/* Remove stale entries and republish if necessary. */
- list_for_each_safe(p, h, &dht->entries) {
+ list_for_each_safe(p, h, &dht.entries) {
struct list_head * p1;
struct list_head * h1;
struct dht_entry * e;
@@ -2055,39 +2013,39 @@ static void * work(void * o)
}
if (now.tv_sec > v->t_rep) {
- key = dht_dup_key(e->key, dht->b);
+ key = dht_dup_key(e->key, dht.b);
addr = v->addr;
- t_expire = dht->t_expire = now.tv_sec;
- v->t_rep = now.tv_sec + dht->t_replic;
- pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, addr, t_expire);
- pthread_rwlock_wrlock(&dht->lock);
+ t_expire = dht.t_expire = now.tv_sec;
+ v->t_rep = now.tv_sec + dht.t_replic;
+ pthread_rwlock_unlock(&dht.lock);
+ kad_publish(key, addr, t_expire);
+ pthread_rwlock_wrlock(&dht.lock);
free(key);
}
}
}
/* Check the requests list for unresponsive nodes. */
- list_for_each_safe(p, h, &dht->requests) {
+ list_for_each_safe(p, h, &dht.requests) {
struct kad_req * r;
r = list_entry(p, struct kad_req, next);
if (now.tv_sec > r->t_exp) {
list_del(&r->next);
- bmp_release(dht->cookies, r->cookie);
- dht_dead_peer(dht, r->key, r->addr);
+ bmp_release(dht.cookies, r->cookie);
+ dht_dead_peer(r->key, r->addr);
kad_req_destroy(r);
}
}
/* Refresh unaccessed buckets. */
- bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist);
+ bucket_refresh(dht.buckets, now.tv_sec, &reflist);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
list_for_each_safe(p, h, &reflist) {
struct contact * c;
c = list_entry(p, struct contact, next);
- lu = kad_lookup(dht, c->id, KAD_FIND_NODE);
+ lu = kad_lookup(c->id, KAD_FIND_NODE);
if (lu != NULL)
lookup_destroy(lu);
list_del(&c->next);
@@ -2100,11 +2058,9 @@ static void * work(void * o)
return (void *) 0;
}
-static int kad_handle_join_resp(struct dht * dht,
- struct kad_req * req,
+static int kad_handle_join_resp(struct kad_req * req,
kad_msg_t * msg)
{
- assert(dht);
assert(req);
assert(msg);
@@ -2120,11 +2076,11 @@ static int kad_handle_join_resp(struct dht * dht,
return -1;
}
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- dht->buckets = bucket_create();
- if (dht->buckets == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ dht.buckets = bucket_create();
+ if (dht.buckets == NULL) {
+ pthread_rwlock_unlock(&dht.lock);
return -1;
}
@@ -2138,84 +2094,80 @@ static int kad_handle_join_resp(struct dht * dht,
if (msg->t_refresh != KAD_T_REFR)
log_warn("Different kademlia refresh time detected.");
- dht->k = msg->k;
- dht->b = msg->b;
- dht->t_expire = msg->t_expire;
- dht->t_repub = MAX(1, dht->t_expire - 10);
+ dht.k = msg->k;
+ dht.b = msg->b;
+ dht.t_expire = msg->t_expire;
+ dht.t_repub = MAX(1, dht.t_expire - 10);
- if (pthread_create(&dht->worker, NULL, work, dht)) {
- bucket_destroy(dht->buckets);
- pthread_rwlock_unlock(&dht->lock);
+ if (pthread_create(&dht.worker, NULL, work, NULL)) {
+ bucket_destroy(dht.buckets);
+ pthread_rwlock_unlock(&dht.lock);
return -1;
}
kad_req_respond(req);
- dht_update_bucket(dht, msg->s_id.data, msg->s_addr);
+ dht_update_bucket(msg->s_id.data, msg->s_addr);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
log_dbg("Enrollment of DHT completed.");
return 0;
}
-static int kad_handle_find_resp(struct dht * dht,
- struct kad_req * req,
+static int kad_handle_find_resp(struct kad_req * req,
kad_msg_t * msg)
{
struct lookup * lu;
- assert(dht);
assert(req);
assert(msg);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- lu = dht_find_lookup(dht, req->cookie);
+ lu = dht_find_lookup(req->cookie);
if (lu == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return -1;
}
- lookup_update(dht, lu, msg);
+ lookup_update(lu, msg);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
}
-static void kad_handle_response(struct dht * dht,
- kad_msg_t * msg)
+static void kad_handle_response(kad_msg_t * msg)
{
struct kad_req * req;
- assert(dht);
assert(msg);
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- req = dht_find_request(dht, msg);
+ req = dht_find_request(msg);
if (req == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return;
}
- bmp_release(dht->cookies, req->cookie);
+ bmp_release(dht.cookies, req->cookie);
list_del(&req->next);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
switch(req->code) {
case KAD_JOIN:
- if (kad_handle_join_resp(dht, req, msg))
+ if (kad_handle_join_resp(req, msg))
log_err("Enrollment of DHT failed.");
break;
case KAD_FIND_VALUE:
case KAD_FIND_NODE:
- if (dht_get_state(dht) != DHT_RUNNING)
+ if (dht_get_state() != DHT_RUNNING)
break;
- kad_handle_find_resp(dht, req, msg);
+ kad_handle_find_resp(req, msg);
break;
default:
break;
@@ -2224,137 +2176,131 @@ static void kad_handle_response(struct dht * dht,
kad_req_destroy(req);
}
-int dht_bootstrap(struct dht * dht,
- size_t b,
- time_t t_expire)
+int dht_bootstrap()
{
- assert(dht);
+ pthread_rwlock_wrlock(&dht.lock);
- pthread_rwlock_wrlock(&dht->lock);
+#ifndef __DHT_TEST__
+ dht.b = hash_len(ipcpi.dir_hash_algo);
+#else
+ dht.b = DHT_TEST_KEY_LEN;
+#endif
+ dht.t_expire = 86400; /* 1 day */
+ dht.t_repub = dht.t_expire - 10;
+ dht.k = KAD_K;
- dht->id = create_id(b);
- if (dht->id == NULL)
+ dht.id = create_id(dht.b);
+ if (dht.id == NULL)
goto fail_id;
- dht->buckets = bucket_create();
- if (dht->buckets == NULL)
+ dht.buckets = bucket_create();
+ if (dht.buckets == NULL)
goto fail_buckets;
- dht->buckets->depth = 0;
- dht->buckets->mask = 0;
-
- dht->b = b / CHAR_BIT;
- dht->t_expire = MAX(2, t_expire);
- dht->t_repub = MAX(1, t_expire - 10);
- dht->k = KAD_K;
+ dht.buckets->depth = 0;
+ dht.buckets->mask = 0;
- if (pthread_create(&dht->worker, NULL, work, dht))
+ if (pthread_create(&dht.worker, NULL, work, NULL))
goto fail_pthread_create;
- dht->state = DHT_RUNNING;
+ dht.state = DHT_RUNNING;
- dht_update_bucket(dht, dht->id, dht->addr);
+ dht_update_bucket(dht.id, dht.addr);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
fail_pthread_create:
- bucket_destroy(dht->buckets);
- dht->buckets = NULL;
+ bucket_destroy(dht.buckets);
+ dht.buckets = NULL;
fail_buckets:
- free(dht->id);
- dht->id = NULL;
+ free(dht.id);
+ dht.id = NULL;
fail_id:
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return -1;
}
-static struct ref_entry * ref_entry_get(struct dht * dht,
- const uint8_t * key)
+static struct ref_entry * ref_entry_get(const uint8_t * key)
{
struct list_head * p;
- list_for_each(p, &dht->refs) {
+ list_for_each(p, &dht.refs) {
struct ref_entry * r = list_entry(p, struct ref_entry, next);
- if (!memcmp(key, r->key, dht-> b) )
+ if (!memcmp(key, r->key, dht. b) )
return r;
}
return NULL;
}
-int dht_reg(struct dht * dht,
- const uint8_t * key)
+int dht_reg(const uint8_t * key)
{
struct ref_entry * e;
uint64_t addr;
time_t t_expire;
- assert(dht);
assert(key);
- assert(dht->addr != 0);
+ assert(dht.addr != 0);
- if (dht_wait_running(dht))
+ if (dht_wait_running())
return -1;
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- if (ref_entry_get(dht, key) != NULL) {
+ if (ref_entry_get(key) != NULL) {
log_dbg("Name already registered.");
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
}
- e = ref_entry_create(dht, key);
+ e = ref_entry_create(key);
if (e == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return -ENOMEM;
}
- list_add(&e->next, &dht->refs);
+ list_add(&e->next, &dht.refs);
- t_expire = dht->t_expire;
- addr = dht->addr;
+ t_expire = dht.t_expire;
+ addr = dht.addr;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
- kad_publish(dht, key, addr, t_expire);
+ kad_publish(key, addr, t_expire);
return 0;
}
-int dht_unreg(struct dht * dht,
- const uint8_t * key)
+int dht_unreg(const uint8_t * key)
{
struct list_head * p;
struct list_head * h;
- assert(dht);
assert(key);
- if (dht_get_state(dht) != DHT_RUNNING)
+ if (dht_get_state() != DHT_RUNNING)
return -1;
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- list_for_each_safe(p, h, &dht->refs) {
+ list_for_each_safe(p, h, &dht.refs) {
struct ref_entry * r = list_entry(p, struct ref_entry, next);
- if (!memcmp(key, r->key, dht-> b) ) {
+ if (!memcmp(key, r->key, dht. b) ) {
list_del(&r->next);
ref_entry_destroy(r);
}
}
- dht_del(dht, key, dht->addr);
+ dht_del(key, dht.addr);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
return 0;
}
-uint64_t dht_query(struct dht * dht,
- const uint8_t * key)
+uint64_t dht_query(const uint8_t * key)
{
struct dht_entry * e;
struct lookup * lu;
@@ -2363,21 +2309,21 @@ uint64_t dht_query(struct dht * dht,
addrs[0] = 0;
- if (dht_wait_running(dht))
+ if (dht_wait_running())
return 0;
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- e = dht_find_entry(dht, key);
+ e = dht_find_entry(key);
if (e != NULL)
- addrs[0] = dht_entry_get_addr(dht, e);
+ addrs[0] = dht_entry_get_addr(e);
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
if (addrs[0] != 0)
return addrs[0];
- lu = kad_lookup(dht, key, KAD_FIND_VALUE);
+ lu = kad_lookup(key, KAD_FIND_VALUE);
if (lu == NULL)
return 0;
@@ -2390,7 +2336,7 @@ uint64_t dht_query(struct dht * dht,
lookup_destroy(lu);
/* Current behaviour is anycast and return the first peer address. */
- if (addrs[0] != dht->addr)
+ if (addrs[0] != dht.addr)
return addrs[0];
if (n > 1)
@@ -2401,9 +2347,7 @@ uint64_t dht_query(struct dht * dht,
static void * dht_handle_packet(void * o)
{
- struct dht * dht = (struct dht *) o;
-
- assert(dht);
+ (void) o;
while (true) {
kad_msg_t * msg;
@@ -2416,14 +2360,14 @@ static void * dht_handle_packet(void * o)
size_t t_expire;
struct cmd * cmd;
- pthread_mutex_lock(&dht->mtx);
+ pthread_mutex_lock(&dht.mtx);
- pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &dht.mtx);
- while (list_is_empty(&dht->cmds))
- pthread_cond_wait(&dht->cond, &dht->mtx);
+ while (list_is_empty(&dht.cmds))
+ pthread_cond_wait(&dht.cond, &dht.mtx);
- cmd = list_last_entry(&dht->cmds, struct cmd, next);
+ cmd = list_last_entry(&dht.cmds, struct cmd, next);
list_del(&cmd->next);
pthread_cleanup_pop(true);
@@ -2441,18 +2385,18 @@ static void * dht_handle_packet(void * o)
continue;
}
- if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
+ if (msg->code != KAD_RESPONSE && dht_wait_running()) {
kad_msg__free_unpacked(msg, NULL);
log_dbg("Got a request message when not running.");
continue;
}
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_rdlock(&dht.lock);
- b = dht->b;
- t_expire = dht->t_expire;
+ b = dht.b;
+ t_expire = dht.t_expire;
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
if (msg->has_key && msg->key.len != b) {
kad_msg__free_unpacked(msg, NULL);
@@ -2467,7 +2411,7 @@ static void * dht_handle_packet(void * o)
continue;
}
- tpm_dec(dht->tpm);
+ tpm_dec(dht.tpm);
addr = msg->s_addr;
@@ -2510,7 +2454,7 @@ static void * dht_handle_packet(void * o)
resp_msg.t_replicate = KAD_T_REPL;
break;
case KAD_FIND_VALUE:
- buf = dht_retrieve(dht, msg->key.data);
+ buf = dht_retrieve(msg->key.data);
if (buf.len != 0) {
resp_msg.n_addrs = buf.len;
resp_msg.addrs = (uint64_t *) buf.data;
@@ -2520,7 +2464,7 @@ static void * dht_handle_packet(void * o)
case KAD_FIND_NODE:
/* Return k closest contacts. */
resp_msg.n_contacts =
- dht_get_contacts(dht, msg->key.data, &cmsgs);
+ dht_get_contacts(msg->key.data, &cmsgs);
resp_msg.contacts = cmsgs;
break;
case KAD_STORE:
@@ -2534,11 +2478,11 @@ static void * dht_handle_packet(void * o)
break;
}
- kad_add(dht, *msg->contacts, msg->n_contacts,
+ kad_add(*msg->contacts, msg->n_contacts,
msg->t_expire);
break;
case KAD_RESPONSE:
- kad_handle_response(dht, msg);
+ kad_handle_response(msg);
break;
default:
assert(false);
@@ -2546,19 +2490,19 @@ static void * dht_handle_packet(void * o)
}
if (msg->code != KAD_JOIN) {
- pthread_rwlock_wrlock(&dht->lock);
- if (dht_get_state(dht) == DHT_JOINING &&
- dht->buckets == NULL) {
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
+ if (dht_get_state() == DHT_JOINING &&
+ dht.buckets == NULL) {
+ pthread_rwlock_unlock(&dht.lock);
goto finish;
}
- if (dht_update_bucket(dht, msg->s_id.data, addr))
+ if (dht_update_bucket(msg->s_id.data, addr))
log_warn("Failed to update bucket.");
- pthread_rwlock_unlock(&dht->lock);
+ pthread_rwlock_unlock(&dht.lock);
}
- if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0)
+ if (msg->code < KAD_STORE && send_msg(&resp_msg, addr) < 0)
log_warn("Failed to send response.");
finish:
@@ -2568,7 +2512,7 @@ static void * dht_handle_packet(void * o)
free(resp_msg.addrs);
if (resp_msg.n_contacts == 0) {
- tpm_inc(dht->tpm);
+ tpm_inc(dht.tpm);
continue;
}
@@ -2577,19 +2521,22 @@ static void * dht_handle_packet(void * o)
NULL);
free(resp_msg.contacts);
- tpm_inc(dht->tpm);
+ tpm_inc(dht.tpm);
}
return (void *) 0;
}
-static void dht_post_packet(void * comp,
+static void dht_post_packet(void * o,
struct shm_du_buff * sdb)
{
struct cmd * cmd;
- struct dht * dht = (struct dht *) comp;
- if (dht_get_state(dht) == DHT_SHUTDOWN) {
+ (void) o;
+
+ assert(o == &dht);
+
+ if (dht_get_state() == DHT_SHUTDOWN) {
#ifndef __DHT_TEST__
ipcp_sdb_release(sdb);
#endif
@@ -2604,37 +2551,34 @@ static void dht_post_packet(void * comp,
cmd->sdb = sdb;
- pthread_mutex_lock(&dht->mtx);
+ pthread_mutex_lock(&dht.mtx);
- list_add(&cmd->next, &dht->cmds);
+ list_add(&cmd->next, &dht.cmds);
- pthread_cond_signal(&dht->cond);
+ pthread_cond_signal(&dht.cond);
- pthread_mutex_unlock(&dht->mtx);
+ pthread_mutex_unlock(&dht.mtx);
}
-void dht_destroy(struct dht * dht)
+void dht_fini()
{
struct list_head * p;
struct list_head * h;
- if (dht == NULL)
- return;
-
#ifndef __DHT_TEST__
- tpm_stop(dht->tpm);
+ tpm_stop(dht.tpm);
- tpm_destroy(dht->tpm);
+ tpm_destroy(dht.tpm);
#endif
- if (dht_get_state(dht) == DHT_RUNNING) {
- dht_set_state(dht, DHT_SHUTDOWN);
- pthread_cancel(dht->worker);
- pthread_join(dht->worker, NULL);
+ if (dht_get_state() == DHT_RUNNING) {
+ dht_set_state(DHT_SHUTDOWN);
+ pthread_cancel(dht.worker);
+ pthread_join(dht.worker, NULL);
}
- pthread_rwlock_wrlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht.lock);
- list_for_each_safe(p, h, &dht->cmds) {
+ list_for_each_safe(p, h, &dht.cmds) {
struct cmd * c = list_entry(p, struct cmd, next);
list_del(&c->next);
#ifndef __DHT_TEST__
@@ -2643,44 +2587,42 @@ void dht_destroy(struct dht * dht)
free(c);
}
- list_for_each_safe(p, h, &dht->entries) {
+ list_for_each_safe(p, h, &dht.entries) {
struct dht_entry * e = list_entry(p, struct dht_entry, next);
list_del(&e->next);
dht_entry_destroy(e);
}
- list_for_each_safe(p, h, &dht->requests) {
+ list_for_each_safe(p, h, &dht.requests) {
struct kad_req * r = list_entry(p, struct kad_req, next);
list_del(&r->next);
kad_req_destroy(r);
}
- list_for_each_safe(p, h, &dht->refs) {
+ list_for_each_safe(p, h, &dht.refs) {
struct ref_entry * e = list_entry(p, struct ref_entry, next);
list_del(&e->next);
ref_entry_destroy(e);
}
- list_for_each_safe(p, h, &dht->lookups) {
+ list_for_each_safe(p, h, &dht.lookups) {
struct lookup * l = list_entry(p, struct lookup, next);
list_del(&l->next);
lookup_destroy(l);
}
- pthread_rwlock_unlock(&dht->lock);
-
- if (dht->buckets != NULL)
- bucket_destroy(dht->buckets);
+ pthread_rwlock_unlock(&dht.lock);
- bmp_destroy(dht->cookies);
+ if (dht.buckets != NULL)
+ bucket_destroy(dht.buckets);
- pthread_mutex_destroy(&dht->mtx);
+ bmp_destroy(dht.cookies);
- pthread_rwlock_destroy(&dht->lock);
+ pthread_mutex_destroy(&dht.mtx);
- free(dht->id);
+ pthread_rwlock_destroy(&dht.lock);
- free(dht);
+ free(dht.id);
}
static void * join_thr(void * o)
@@ -2691,14 +2633,14 @@ static void * join_thr(void * o)
assert(info);
- while (kad_join(info->dht, info->addr)) {
- if (dht_get_state(info->dht) == DHT_SHUTDOWN) {
+ while (kad_join(info->addr)) {
+ if (dht_get_state() == DHT_SHUTDOWN) {
log_dbg("DHT enrollment aborted.");
goto finish;
}
if (retr++ == KAD_JOIN_RETR) {
- dht_set_state(info->dht, DHT_INIT);
+ dht_set_state(DHT_INIT);
log_warn("DHT enrollment attempt failed.");
goto finish;
}
@@ -2706,9 +2648,9 @@ static void * join_thr(void * o)
sleep(KAD_JOIN_INTV);
}
- dht_set_state(info->dht, DHT_RUNNING);
+ dht_set_state(DHT_RUNNING);
- lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);
+ lu = kad_lookup(dht.id, KAD_FIND_NODE);
if (lu != NULL)
lookup_destroy(lu);
@@ -2722,7 +2664,7 @@ static void handle_event(void * self,
int event,
const void * o)
{
- struct dht * dht = (struct dht *) self;
+ (void) self;
if (event == NOTIFY_DT_CONN_ADD) {
pthread_t thr;
@@ -2733,19 +2675,18 @@ static void handle_event(void * self,
/* Give the pff some time to update for the new link. */
nanosleep(&slack, NULL);
- switch(dht_get_state(dht)) {
+ switch(dht_get_state()) {
case DHT_INIT:
inf = malloc(sizeof(*inf));
if (inf == NULL)
break;
- inf->dht = dht;
inf->addr = c->conn_info.addr;
- if (dht_set_state(dht, DHT_JOINING) == 0 ||
- dht_wait_running(dht)) {
+ if (dht_set_state(DHT_JOINING) == 0 ||
+ dht_wait_running()) {
if (pthread_create(&thr, NULL, join_thr, inf)) {
- dht_set_state(dht, DHT_INIT);
+ dht_set_state(DHT_INIT);
free(inf);
return;
}
@@ -2759,7 +2700,7 @@ static void handle_event(void * self,
* FIXME: this lookup for effiency reasons
* causes a SEGV when stressed with rapid
* enrollments.
- * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE);
+ * lu = kad_lookup(dht, dht.id, KAD_FIND_NODE);
* if (lu != NULL)
* lookup_destroy(lu);
*/
@@ -2770,73 +2711,65 @@ static void handle_event(void * self,
}
}
-struct dht * dht_create(uint64_t addr)
+int dht_init()
{
- struct dht * dht;
-
- dht = malloc(sizeof(*dht));
- if (dht == NULL)
- goto fail_malloc;
+ dht.buckets = NULL;
- dht->buckets = NULL;
+ list_head_init(&dht.entries);
+ list_head_init(&dht.requests);
+ list_head_init(&dht.refs);
+ list_head_init(&dht.lookups);
+ list_head_init(&dht.cmds);
- list_head_init(&dht->entries);
- list_head_init(&dht->requests);
- list_head_init(&dht->refs);
- list_head_init(&dht->lookups);
- list_head_init(&dht->cmds);
-
- if (pthread_rwlock_init(&dht->lock, NULL))
+ if (pthread_rwlock_init(&dht.lock, NULL))
goto fail_rwlock;
- if (pthread_mutex_init(&dht->mtx, NULL))
+ if (pthread_mutex_init(&dht.mtx, NULL))
goto fail_mutex;
- if (pthread_cond_init(&dht->cond, NULL))
+ if (pthread_cond_init(&dht.cond, NULL))
goto fail_cond;
- dht->cookies = bmp_create(DHT_MAX_REQS, 1);
- if (dht->cookies == NULL)
+ dht.cookies = bmp_create(DHT_MAX_REQS, 1);
+ if (dht.cookies == NULL)
goto fail_bmp;
- dht->b = 0;
- dht->addr = addr;
- dht->id = NULL;
+ dht.b = 0;
+ dht.id = NULL;
#ifndef __DHT_TEST__
- dht->tpm = tpm_create(2, 1, dht_handle_packet, dht);
- if (dht->tpm == NULL)
+ dht.addr = ipcpi.dt_addr;
+ dht.tpm = tpm_create(2, 1, dht_handle_packet, NULL);
+ if (dht.tpm == NULL)
goto fail_tpm_create;
- if (tpm_start(dht->tpm))
+ if (tpm_start(dht.tpm))
goto fail_tpm_start;
- dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT);
- if ((int) dht->eid < 0)
+ dht.eid = dt_reg_comp(&dht, &dht_post_packet, DHT);
+ if ((int) dht.eid < 0)
goto fail_tpm_start;
- notifier_reg(handle_event, dht);
+ notifier_reg(handle_event, NULL);
#else
(void) handle_event;
(void) dht_handle_packet;
(void) dht_post_packet;
#endif
- dht->state = DHT_INIT;
+ dht.state = DHT_INIT;
- return dht;
+ return 0;
#ifndef __DHT_TEST__
fail_tpm_start:
- tpm_destroy(dht->tpm);
+ tpm_destroy(dht.tpm);
fail_tpm_create:
- bmp_destroy(dht->cookies);
+ bmp_destroy(dht.cookies);
#endif
fail_bmp:
- pthread_cond_destroy(&dht->cond);
+ pthread_cond_destroy(&dht.cond);
fail_cond:
- pthread_mutex_destroy(&dht->mtx);
+ pthread_mutex_destroy(&dht.mtx);
fail_mutex:
- pthread_rwlock_destroy(&dht->lock);
+ pthread_rwlock_destroy(&dht.lock);
fail_rwlock:
- free(dht);
- fail_malloc:
- return NULL;
+ return -1;
}