diff options
-rw-r--r-- | src/ipcpd/normal/dht.c | 136 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.c | 6 |
2 files changed, 88 insertions, 54 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 9296130a..d2ea9985 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -60,13 +60,13 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_K 8 /* Replication factor, MDHT value. */ #define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ #define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 6 /* Response time to wait for a join. */ -#define KAD_T_RESP 2 /* Response time to wait for a response. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ #define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ #define KAD_QUEER 15 /* Time to declare peer questionable. */ #define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ #define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ #define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ @@ -101,7 +101,6 @@ enum lookup_state { LU_PENDING, LU_UPDATE, LU_COMPLETE, - LU_DONE, LU_DESTROY }; @@ -120,16 +119,22 @@ struct kad_req { time_t t_exp; }; +struct cookie_el { + struct list_head next; + + uint32_t cookie; +}; + struct lookup { struct list_head next; + struct list_head cookies; + uint8_t * key; struct list_head contacts; size_t n_contacts; - size_t out; - uint64_t * addrs; size_t n_addrs; @@ -645,11 +650,11 @@ static struct lookup * lookup_create(struct dht * dht, goto fail_malloc; list_head_init(&lu->contacts); + list_head_init(&lu->cookies); lu->state = LU_INIT; lu->addrs = NULL; lu->n_addrs = 0; - lu->out = 0; lu->key = dht_dup_key(id, dht->b); if (lu->key == NULL) goto fail_id; @@ -688,16 +693,6 @@ static struct lookup * lookup_create(struct dht * dht, return NULL; } -static void lookup_add_out(struct lookup * lu, - size_t n) -{ - pthread_mutex_lock(&lu->lock); - - lu->out += n; - - pthread_mutex_unlock(&lu->lock); -} - static void cancel_lookup_destroy(void * o) { struct lookup * lu; @@ -717,6 +712,12 @@ static void cancel_lookup_destroy(void * o) contact_destroy(c); } + list_for_each_safe(p, h, &lu->cookies) { + struct cookie_el * c = list_entry(p, struct cookie_el, next); + list_del(&c->next); + free(c); + } + pthread_mutex_unlock(&lu->lock); pthread_mutex_destroy(&lu->lock); @@ -739,7 +740,6 @@ static void lookup_destroy(struct lookup * lu) pthread_cond_broadcast(&lu->cond); break; case LU_INIT: - case LU_DONE: case LU_UPDATE: case LU_COMPLETE: lu->state = LU_NULL; @@ -762,6 +762,7 @@ static void lookup_update(struct dht * dht, kad_msg_t * msg) { struct list_head * p = NULL; + struct list_head * h; struct contact * c = NULL; size_t n; size_t pos = 0; @@ -775,7 +776,19 @@ static void lookup_update(struct dht * dht, pthread_mutex_lock(&lu->lock); - --lu->out; + list_for_each_safe(p, h, &lu->cookies) { + struct cookie_el * e = list_entry(p, struct cookie_el, next); + if (e->cookie == msg->cookie) { + list_del(&e->next); + free(e); + break; + } + } + + if (lu->state == LU_COMPLETE) { + pthread_mutex_unlock(&lu->lock); + return; + } if (msg->n_addrs > 0) { if (lu->addrs == NULL) { @@ -784,6 +797,7 @@ static void lookup_update(struct dht * dht, lu->addrs[n] = msg->addrs[n]; lu->n_addrs = msg->n_addrs; } + lu->state = LU_COMPLETE; pthread_cond_broadcast(&lu->cond); pthread_mutex_unlock(&lu->lock); @@ -850,7 +864,7 @@ static void lookup_update(struct dht * dht, } } - if (lu->out == 0 && !mod) + if (list_is_empty(&lu->cookies) && !mod) lu->state = LU_COMPLETE; else lu->state = LU_UPDATE; @@ -929,11 +943,6 @@ static void lookup_new_addrs(struct lookup * lu, addrs[n] = 0; - if (n == 0) { - lu->state = LU_DONE; - pthread_cond_signal(&lu->cond); - } - pthread_mutex_unlock(&lu->lock); } @@ -969,10 +978,8 @@ static enum lookup_state lookup_wait(struct lookup * lu) pthread_mutex_lock(&lu->lock); - if (lu->state == LU_INIT) { + if (lu->state == LU_INIT || lu->state == LU_UPDATE) lu->state = LU_PENDING; - pthread_cond_signal(&lu->cond); - } pthread_cleanup_push(cleanup_wait, lu); @@ -1009,17 +1016,26 @@ static struct kad_req * dht_find_request(struct dht * dht, } static struct lookup * dht_find_lookup(struct dht * dht, - const uint8_t * key) + uint32_t cookie) { struct list_head * p; + struct list_head * p2; assert(dht); - assert(key); + assert(cookie > 0); list_for_each(p, &dht->lookups) { struct lookup * l = list_entry(p, struct lookup, next); - if (!memcmp(l->key, key, dht->b)) - return l; + pthread_mutex_lock(&l->lock); + list_for_each(p2, &l->cookies) { + struct cookie_el * e; + e = list_entry(p2, struct cookie_el, next); + if (e->cookie == cookie) { + pthread_mutex_unlock(&l->lock); + return l; + } + } + pthread_mutex_unlock(&l->lock); } return NULL; @@ -1479,7 +1495,7 @@ static int send_msg(struct dht * dht, if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) kad_req_create(dht, msg, addr); - return 0; + return msg->cookie; #ifndef __DHT_TEST__ fail_msg: pthread_rwlock_wrlock(&dht->lock); @@ -1592,14 +1608,14 @@ static int kad_store(struct dht * dht, msg.n_contacts = 1; msg.contacts = cmsgp; - if (send_msg(dht, &msg, r_addr)) + if (send_msg(dht, &msg, r_addr) < 0) return -1; return 0; } static ssize_t kad_find(struct dht * dht, - const uint8_t * key, + struct lookup * lu, const uint64_t * addrs, enum kad_code code) { @@ -1607,19 +1623,40 @@ static ssize_t kad_find(struct dht * dht, ssize_t sent = 0; assert(dht); - assert(key); + assert(lu->key); msg.code = code; msg.has_key = true; - msg.key.data = (uint8_t *) key; + msg.key.data = (uint8_t *) lu->key; msg.key.len = dht->b; while (*addrs != 0) { - if (*addrs != dht->addr) { - send_msg(dht, &msg, *addrs); - sent++; + struct cookie_el * c; + int ret; + + if (*addrs == dht->addr) { + ++addrs; + continue; } + + ret = send_msg(dht, &msg, *addrs); + if (ret < 0) + break; + + c = malloc(sizeof(*c)); + if (c == NULL) + break; + + c->cookie = (uint32_t) ret; + + pthread_mutex_lock(&lu->lock); + + list_add_tail(&c->next, &lu->cookies); + + pthread_mutex_unlock(&lu->lock); + + ++sent; ++addrs; } @@ -1643,7 +1680,6 @@ static struct lookup * kad_lookup(struct dht * dht, uint64_t addrs[KAD_ALPHA + 1]; enum lookup_state state; struct lookup * lu; - size_t out; lu = lookup_create(dht, id); if (lu == NULL) @@ -1657,14 +1693,11 @@ static struct lookup * kad_lookup(struct dht * dht, return NULL; } - out = kad_find(dht, id, addrs, code); - if (out == 0) { + if (kad_find(dht, lu, addrs, code) == 0) { lookup_detach(dht, lu); return lu; } - lookup_add_out(lu, out); - while ((state = lookup_wait(lu)) != LU_COMPLETE) { switch (state) { case LU_UPDATE: @@ -1672,10 +1705,8 @@ static struct lookup * kad_lookup(struct dht * dht, if (addrs[0] == 0) break; - out = kad_find(dht, id, addrs, code); - lookup_add_out(lu, out); + kad_find(dht, lu, addrs, code); break; - case LU_DONE: case LU_DESTROY: lookup_detach(dht, lu); lookup_set_state(lu, LU_NULL); @@ -1714,7 +1745,6 @@ static void kad_publish(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - addrs = malloc(k * sizeof(*addrs)); if (addrs == NULL) return; @@ -1768,7 +1798,7 @@ static int kad_join(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - if (send_msg(dht, &msg, addr)) + if (send_msg(dht, &msg, addr) < 0) return -1; if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) @@ -2104,6 +2134,8 @@ static int kad_handle_join_resp(struct dht * dht, kad_req_respond(req); + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + pthread_rwlock_unlock(&dht->lock); log_dbg("Enrollment of DHT completed."); @@ -2123,7 +2155,7 @@ static int kad_handle_find_resp(struct dht * dht, pthread_rwlock_rdlock(&dht->lock); - lu = dht_find_lookup(dht, req->key); + lu = dht_find_lookup(dht, req->cookie); if (lu == NULL) { pthread_rwlock_unlock(&dht->lock); return -1; @@ -2515,7 +2547,7 @@ static void * dht_handle_sdu(void * o) pthread_rwlock_unlock(&dht->lock); } - if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr)) + if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) log_warn("Failed to send response."); kad_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 8d8b51ee..2b3f5c2a 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -229,10 +229,12 @@ int fa_alloc(int fd, uint64_t addr; struct shm_du_buff * sdb; - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) + addr = dir_query(dst); + if (addr == 0) return -1; - addr = dir_query(dst); + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) + return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REQ; |