summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/dht.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/dht.c')
-rw-r--r--src/ipcpd/normal/dht.c115
1 files changed, 85 insertions, 30 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index be5411da..548ae03a 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -304,17 +304,18 @@ static uint8_t * create_id(size_t len)
return id;
}
-static struct kad_req * kad_req_create(struct dht * dht,
- kad_msg_t * msg,
- uint64_t addr)
+static void kad_req_create(struct dht * dht,
+ kad_msg_t * msg,
+ uint64_t addr)
{
struct kad_req * req;
pthread_condattr_t cattr;
struct timespec t;
+ size_t b;
req = malloc(sizeof(*req));
if (req == NULL)
- return NULL;
+ return;
list_head_init(&req->next);
@@ -327,18 +328,22 @@ static struct kad_req * kad_req_create(struct dht * dht,
req->code = msg->code;
req->key = NULL;
+ pthread_rwlock_rdlock(&dht->lock);
+ b = dht->b;
+ pthread_rwlock_unlock(&dht->lock);
+
if (msg->has_key) {
- req->key = dht_dup_key(msg->key.data, dht->b);
+ req->key = dht_dup_key(msg->key.data, b);
if (req->key == NULL) {
free(req);
- return NULL;
+ return;
}
}
if (pthread_mutex_init(&req->lock, NULL)) {
free(req->key);
free(req);
- return NULL;
+ return;
}
pthread_condattr_init(&cattr);
@@ -351,12 +356,16 @@ static struct kad_req * kad_req_create(struct dht * dht,
pthread_mutex_destroy(&req->lock);
free(req->key);
free(req);
- return NULL;
+ return;
}
pthread_condattr_destroy(&cattr);
- return req;
+ pthread_rwlock_wrlock(&dht->lock);
+
+ list_add(&req->next, &dht->requests);
+
+ pthread_rwlock_unlock(&dht->lock);
}
static void kad_req_destroy(struct kad_req * req)
@@ -1357,7 +1366,6 @@ static int send_msg(struct dht * dht,
uint64_t addr)
{
struct shm_du_buff * sdb;
- struct kad_req * req;
size_t len;
int retr = 0;
@@ -1376,10 +1384,14 @@ static int send_msg(struct dht * dht,
if (msg->code < KAD_STORE) {
msg->cookie = bmp_allocate(dht->cookies);
- if (!bmp_is_id_valid(dht->cookies, msg->cookie))
+ if (!bmp_is_id_valid(dht->cookies, msg->cookie)) {
+ pthread_rwlock_unlock(&dht->lock);
goto fail_bmp_alloc;
+ }
}
+ pthread_rwlock_unlock(&dht->lock);
+
len = kad_msg__get_packed_size(msg);
if (len == 0)
goto fail_msg;
@@ -1406,13 +1418,8 @@ static int send_msg(struct dht * dht,
ipcp_sdb_release(sdb);
#endif /* __DHT_TEST__ */
- if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) {
- req = kad_req_create(dht, msg, addr);
- if (req != NULL)
- list_add(&req->next, &dht->requests);
- }
-
- pthread_rwlock_unlock(&dht->lock);
+ if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN)
+ kad_req_create(dht, msg, addr);
return 0;
@@ -1423,7 +1430,6 @@ static int send_msg(struct dht * dht,
fail_msg:
bmp_release(dht->cookies, msg->cookie);
fail_bmp_alloc:
- pthread_rwlock_unlock(&dht->lock);
return -1;
}
@@ -1513,9 +1519,14 @@ static int kad_store(struct dht * dht,
kad_contact_msg_t * cmsgp[1];
cmsg.id.data = (uint8_t *) key;
- cmsg.id.len = dht->b;
cmsg.addr = addr;
+ pthread_rwlock_rdlock(&dht->lock);
+
+ cmsg.id.len = dht->b;
+
+ pthread_rwlock_unlock(&dht->lock);
+
cmsgp[0] = &cmsg;
msg.code = KAD_STORE;
@@ -1632,15 +1643,32 @@ static void kad_publish(struct dht * dht,
time_t exp)
{
struct lookup * lu;
- uint64_t addrs[KAD_K];
+ uint64_t * addrs;
ssize_t n;
+ size_t k;
+ time_t t_expire;
+
assert(dht);
assert(key);
+ pthread_rwlock_rdlock(&dht->lock);
+
+ k = dht->k;
+ t_expire = dht->t_expire;
+
+ pthread_rwlock_unlock(&dht->lock);
+
+
+ addrs = malloc(k * sizeof(*addrs));
+ if (addrs == NULL)
+ return;
+
lu = kad_lookup(dht, key, KAD_FIND_NODE);
- if (lu == NULL)
+ if (lu == NULL) {
+ free(addrs);
return;
+ }
n = lookup_contact_addrs(lu, addrs);
@@ -1652,12 +1680,14 @@ static void kad_publish(struct dht * dht,
msg.addr = addr;
kad_add(dht, &msg, 1, exp);
} else {
- if (kad_store(dht, key, addr, addrs[n], dht->t_expire))
+ if (kad_store(dht, key, addr, addrs[n], t_expire))
log_warn("Failed to send store message.");
}
}
lookup_destroy(lu);
+
+ free(addrs);
}
static int kad_join(struct dht * dht,
@@ -1879,13 +1909,24 @@ static void * work(void * o)
pthread_rwlock_wrlock(&dht->lock);
/* Republish registered hashes. */
- list_for_each_safe(p, h, &dht->refs) {
+ list_for_each(p, &dht->refs) {
struct ref_entry * e;
+ uint8_t * key;
+ uint64_t addr;
+ time_t t_expire;
e = list_entry(p, struct ref_entry, next);
if (now.tv_sec > e->t_rep) {
- kad_publish(dht, e->key, dht->addr,
- dht->t_expire);
+ key = dht_dup_key(e->key, dht->b);
+ if (key == NULL)
+ continue;
+ addr = dht->addr;
+ t_expire = dht->t_expire;
e->t_rep = now.tv_sec + dht->t_repub;
+
+ pthread_rwlock_unlock(&dht->lock);
+ kad_publish(dht, key, addr, t_expire);
+ pthread_rwlock_wrlock(&dht->lock);
+ free(key);
}
}
@@ -1894,19 +1935,28 @@ static void * work(void * o)
struct list_head * p1;
struct list_head * h1;
struct dht_entry * e;
+ uint8_t * key;
+ time_t t_expire;
e = list_entry (p, struct dht_entry, next);
list_for_each_safe(p1, h1, &e->vals) {
struct val * v;
+ uint64_t addr;
v = list_entry(p1, struct val, next);
if (now.tv_sec > v->t_exp) {
list_del(&v->next);
val_destroy(v);
+ continue;
}
if (now.tv_sec > v->t_rep) {
- kad_publish(dht, e->key, v->addr,
- dht->t_expire - now.tv_sec);
+ key = dht_dup_key(e->key, dht->b);
+ addr = v->addr;
+ t_expire = dht->t_expire = now.tv_sec;
v->t_rep = now.tv_sec + dht->t_replic;
+ pthread_rwlock_unlock(&dht->lock);
+ kad_publish(dht, key, addr, t_expire);
+ pthread_rwlock_wrlock(&dht->lock);
+ free(key);
}
}
}
@@ -1964,7 +2014,7 @@ static int kad_handle_join_resp(struct dht * dht,
return -1;
}
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht->lock);
dht->buckets = bucket_create();
if (dht->buckets == NULL) {
@@ -2116,6 +2166,8 @@ int dht_reg(struct dht * dht,
const uint8_t * key)
{
struct ref_entry * e;
+ uint64_t addr;
+ time_t t_expire;
assert(dht);
assert(key);
@@ -2132,9 +2184,12 @@ int dht_reg(struct dht * dht,
list_add(&e->next, &dht->refs);
+ t_expire = dht->t_expire;
+ addr = dht->addr;
+
pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, dht->addr, dht->t_expire);
+ kad_publish(dht, key, >addr, t_expire);
return 0;
}