summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/normal/dht.c136
-rw-r--r--src/ipcpd/normal/fa.c6
2 files changed, 88 insertions, 54 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index 9296130a..d2ea9985 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -60,13 +60,13 @@ typedef KadContactMsg kad_contact_msg_t;
#define KAD_K 8 /* Replication factor, MDHT value. */
#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */
#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */
-#define KAD_T_JOIN 6 /* Response time to wait for a join. */
-#define KAD_T_RESP 2 /* Response time to wait for a response. */
+#define KAD_T_JOIN 8 /* Response time to wait for a join. */
+#define KAD_T_RESP 5 /* Response time to wait for a response. */
#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */
#define KAD_QUEER 15 /* Time to declare peer questionable. */
#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */
#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
-#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */
+#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */
#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */
@@ -101,7 +101,6 @@ enum lookup_state {
LU_PENDING,
LU_UPDATE,
LU_COMPLETE,
- LU_DONE,
LU_DESTROY
};
@@ -120,16 +119,22 @@ struct kad_req {
time_t t_exp;
};
+struct cookie_el {
+ struct list_head next;
+
+ uint32_t cookie;
+};
+
struct lookup {
struct list_head next;
+ struct list_head cookies;
+
uint8_t * key;
struct list_head contacts;
size_t n_contacts;
- size_t out;
-
uint64_t * addrs;
size_t n_addrs;
@@ -645,11 +650,11 @@ static struct lookup * lookup_create(struct dht * dht,
goto fail_malloc;
list_head_init(&lu->contacts);
+ list_head_init(&lu->cookies);
lu->state = LU_INIT;
lu->addrs = NULL;
lu->n_addrs = 0;
- lu->out = 0;
lu->key = dht_dup_key(id, dht->b);
if (lu->key == NULL)
goto fail_id;
@@ -688,16 +693,6 @@ static struct lookup * lookup_create(struct dht * dht,
return NULL;
}
-static void lookup_add_out(struct lookup * lu,
- size_t n)
-{
- pthread_mutex_lock(&lu->lock);
-
- lu->out += n;
-
- pthread_mutex_unlock(&lu->lock);
-}
-
static void cancel_lookup_destroy(void * o)
{
struct lookup * lu;
@@ -717,6 +712,12 @@ static void cancel_lookup_destroy(void * o)
contact_destroy(c);
}
+ list_for_each_safe(p, h, &lu->cookies) {
+ struct cookie_el * c = list_entry(p, struct cookie_el, next);
+ list_del(&c->next);
+ free(c);
+ }
+
pthread_mutex_unlock(&lu->lock);
pthread_mutex_destroy(&lu->lock);
@@ -739,7 +740,6 @@ static void lookup_destroy(struct lookup * lu)
pthread_cond_broadcast(&lu->cond);
break;
case LU_INIT:
- case LU_DONE:
case LU_UPDATE:
case LU_COMPLETE:
lu->state = LU_NULL;
@@ -762,6 +762,7 @@ static void lookup_update(struct dht * dht,
kad_msg_t * msg)
{
struct list_head * p = NULL;
+ struct list_head * h;
struct contact * c = NULL;
size_t n;
size_t pos = 0;
@@ -775,7 +776,19 @@ static void lookup_update(struct dht * dht,
pthread_mutex_lock(&lu->lock);
- --lu->out;
+ list_for_each_safe(p, h, &lu->cookies) {
+ struct cookie_el * e = list_entry(p, struct cookie_el, next);
+ if (e->cookie == msg->cookie) {
+ list_del(&e->next);
+ free(e);
+ break;
+ }
+ }
+
+ if (lu->state == LU_COMPLETE) {
+ pthread_mutex_unlock(&lu->lock);
+ return;
+ }
if (msg->n_addrs > 0) {
if (lu->addrs == NULL) {
@@ -784,6 +797,7 @@ static void lookup_update(struct dht * dht,
lu->addrs[n] = msg->addrs[n];
lu->n_addrs = msg->n_addrs;
}
+
lu->state = LU_COMPLETE;
pthread_cond_broadcast(&lu->cond);
pthread_mutex_unlock(&lu->lock);
@@ -850,7 +864,7 @@ static void lookup_update(struct dht * dht,
}
}
- if (lu->out == 0 && !mod)
+ if (list_is_empty(&lu->cookies) && !mod)
lu->state = LU_COMPLETE;
else
lu->state = LU_UPDATE;
@@ -929,11 +943,6 @@ static void lookup_new_addrs(struct lookup * lu,
addrs[n] = 0;
- if (n == 0) {
- lu->state = LU_DONE;
- pthread_cond_signal(&lu->cond);
- }
-
pthread_mutex_unlock(&lu->lock);
}
@@ -969,10 +978,8 @@ static enum lookup_state lookup_wait(struct lookup * lu)
pthread_mutex_lock(&lu->lock);
- if (lu->state == LU_INIT) {
+ if (lu->state == LU_INIT || lu->state == LU_UPDATE)
lu->state = LU_PENDING;
- pthread_cond_signal(&lu->cond);
- }
pthread_cleanup_push(cleanup_wait, lu);
@@ -1009,17 +1016,26 @@ static struct kad_req * dht_find_request(struct dht * dht,
}
static struct lookup * dht_find_lookup(struct dht * dht,
- const uint8_t * key)
+ uint32_t cookie)
{
struct list_head * p;
+ struct list_head * p2;
assert(dht);
- assert(key);
+ assert(cookie > 0);
list_for_each(p, &dht->lookups) {
struct lookup * l = list_entry(p, struct lookup, next);
- if (!memcmp(l->key, key, dht->b))
- return l;
+ pthread_mutex_lock(&l->lock);
+ list_for_each(p2, &l->cookies) {
+ struct cookie_el * e;
+ e = list_entry(p2, struct cookie_el, next);
+ if (e->cookie == cookie) {
+ pthread_mutex_unlock(&l->lock);
+ return l;
+ }
+ }
+ pthread_mutex_unlock(&l->lock);
}
return NULL;
@@ -1479,7 +1495,7 @@ static int send_msg(struct dht * dht,
if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN)
kad_req_create(dht, msg, addr);
- return 0;
+ return msg->cookie;
#ifndef __DHT_TEST__
fail_msg:
pthread_rwlock_wrlock(&dht->lock);
@@ -1592,14 +1608,14 @@ static int kad_store(struct dht * dht,
msg.n_contacts = 1;
msg.contacts = cmsgp;
- if (send_msg(dht, &msg, r_addr))
+ if (send_msg(dht, &msg, r_addr) < 0)
return -1;
return 0;
}
static ssize_t kad_find(struct dht * dht,
- const uint8_t * key,
+ struct lookup * lu,
const uint64_t * addrs,
enum kad_code code)
{
@@ -1607,19 +1623,40 @@ static ssize_t kad_find(struct dht * dht,
ssize_t sent = 0;
assert(dht);
- assert(key);
+ assert(lu->key);
msg.code = code;
msg.has_key = true;
- msg.key.data = (uint8_t *) key;
+ msg.key.data = (uint8_t *) lu->key;
msg.key.len = dht->b;
while (*addrs != 0) {
- if (*addrs != dht->addr) {
- send_msg(dht, &msg, *addrs);
- sent++;
+ struct cookie_el * c;
+ int ret;
+
+ if (*addrs == dht->addr) {
+ ++addrs;
+ continue;
}
+
+ ret = send_msg(dht, &msg, *addrs);
+ if (ret < 0)
+ break;
+
+ c = malloc(sizeof(*c));
+ if (c == NULL)
+ break;
+
+ c->cookie = (uint32_t) ret;
+
+ pthread_mutex_lock(&lu->lock);
+
+ list_add_tail(&c->next, &lu->cookies);
+
+ pthread_mutex_unlock(&lu->lock);
+
+ ++sent;
++addrs;
}
@@ -1643,7 +1680,6 @@ static struct lookup * kad_lookup(struct dht * dht,
uint64_t addrs[KAD_ALPHA + 1];
enum lookup_state state;
struct lookup * lu;
- size_t out;
lu = lookup_create(dht, id);
if (lu == NULL)
@@ -1657,14 +1693,11 @@ static struct lookup * kad_lookup(struct dht * dht,
return NULL;
}
- out = kad_find(dht, id, addrs, code);
- if (out == 0) {
+ if (kad_find(dht, lu, addrs, code) == 0) {
lookup_detach(dht, lu);
return lu;
}
- lookup_add_out(lu, out);
-
while ((state = lookup_wait(lu)) != LU_COMPLETE) {
switch (state) {
case LU_UPDATE:
@@ -1672,10 +1705,8 @@ static struct lookup * kad_lookup(struct dht * dht,
if (addrs[0] == 0)
break;
- out = kad_find(dht, id, addrs, code);
- lookup_add_out(lu, out);
+ kad_find(dht, lu, addrs, code);
break;
- case LU_DONE:
case LU_DESTROY:
lookup_detach(dht, lu);
lookup_set_state(lu, LU_NULL);
@@ -1714,7 +1745,6 @@ static void kad_publish(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
-
addrs = malloc(k * sizeof(*addrs));
if (addrs == NULL)
return;
@@ -1768,7 +1798,7 @@ static int kad_join(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
- if (send_msg(dht, &msg, addr))
+ if (send_msg(dht, &msg, addr) < 0)
return -1;
if (wait_resp(dht, &msg, KAD_T_JOIN) < 0)
@@ -2104,6 +2134,8 @@ static int kad_handle_join_resp(struct dht * dht,
kad_req_respond(req);
+ dht_update_bucket(dht, msg->s_id.data, msg->s_addr);
+
pthread_rwlock_unlock(&dht->lock);
log_dbg("Enrollment of DHT completed.");
@@ -2123,7 +2155,7 @@ static int kad_handle_find_resp(struct dht * dht,
pthread_rwlock_rdlock(&dht->lock);
- lu = dht_find_lookup(dht, req->key);
+ lu = dht_find_lookup(dht, req->cookie);
if (lu == NULL) {
pthread_rwlock_unlock(&dht->lock);
return -1;
@@ -2515,7 +2547,7 @@ static void * dht_handle_sdu(void * o)
pthread_rwlock_unlock(&dht->lock);
}
- if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr))
+ if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0)
log_warn("Failed to send response.");
kad_msg__free_unpacked(msg, NULL);
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 8d8b51ee..2b3f5c2a 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -229,10 +229,12 @@ int fa_alloc(int fd,
uint64_t addr;
struct shm_du_buff * sdb;
- if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len()))
+ addr = dir_query(dst);
+ if (addr == 0)
return -1;
- addr = dir_query(dst);
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len()))
+ return -1;
msg = (struct fa_msg *) shm_du_buff_head(sdb);
msg->code = FLOW_REQ;