diff options
| author | Dimitri Staessens <dimitri.staessens@ugent.be> | 2018-02-10 10:12:02 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2018-02-10 13:13:49 +0100 | 
| commit | 5d8519018bda6af03c116604a2922625b00f8e52 (patch) | |
| tree | 36834e9ef8ed4bb1e3cd027ecb50424f2fe44cb0 /src/ipcpd/normal | |
| parent | 544ad6f9759de6acc109307caee2100478cba8ed (diff) | |
| download | ouroboros-5d8519018bda6af03c116604a2922625b00f8e52.tar.gz ouroboros-5d8519018bda6af03c116604a2922625b00f8e52.zip | |
ipcpd: Revise lookup tracking in DHT
The lookups now track the responses by cookie instead of just counting
the remaining number of responses. This is needed because simultaneous
lookups for the same hash interfere with eachother and lead to missed
responses.
Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 136 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 6 | 
2 files changed, 88 insertions, 54 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 9296130a..d2ea9985 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -60,13 +60,13 @@ typedef KadContactMsg kad_contact_msg_t;  #define KAD_K         8    /* Replication factor, MDHT value.            */  #define KAD_T_REPL    900  /* Replication time, tied to k. MDHT value.   */  #define KAD_T_REFR    900  /* Refresh time stale bucket, MDHT value.     */ -#define KAD_T_JOIN    6    /* Response time to wait for a join.          */ -#define KAD_T_RESP    2    /* Response time to wait for a response.      */ +#define KAD_T_JOIN    8    /* Response time to wait for a join.          */ +#define KAD_T_RESP    5    /* Response time to wait for a response.      */  #define KAD_R_PING    2    /* Ping retries before declaring peer dead.   */  #define KAD_QUEER     15   /* Time to declare peer questionable.         */  #define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8. */  #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */ -#define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */ +#define KAD_JOIN_RETR 8    /* Number of retries sending a join.          */  #define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */  #define HANDLE_TIMEO  1000 /* Timeout for dht_handle_sdu tpm check (ms)  */ @@ -101,7 +101,6 @@ enum lookup_state {          LU_PENDING,          LU_UPDATE,          LU_COMPLETE, -        LU_DONE,          LU_DESTROY  }; @@ -120,16 +119,22 @@ struct kad_req {          time_t             t_exp;  }; +struct cookie_el { +        struct list_head next; + +        uint32_t         cookie; +}; +  struct lookup {          struct list_head  next; +        struct list_head  cookies; +          uint8_t *         key;          struct list_head  contacts;          size_t            n_contacts; -        size_t            out; -          uint64_t *        addrs;          size_t            n_addrs; @@ -645,11 +650,11 @@ static struct lookup * lookup_create(struct dht *    dht,                  goto fail_malloc;          list_head_init(&lu->contacts); +        list_head_init(&lu->cookies);          lu->state   = LU_INIT;          lu->addrs   = NULL;          lu->n_addrs = 0; -        lu->out     = 0;          lu->key     = dht_dup_key(id, dht->b);          if (lu->key == NULL)                  goto fail_id; @@ -688,16 +693,6 @@ static struct lookup * lookup_create(struct dht *    dht,          return NULL;  } -static void lookup_add_out(struct lookup * lu, -                           size_t          n) -{ -        pthread_mutex_lock(&lu->lock); - -        lu->out += n; - -        pthread_mutex_unlock(&lu->lock); -} -  static void cancel_lookup_destroy(void * o)  {          struct lookup *    lu; @@ -717,6 +712,12 @@ static void cancel_lookup_destroy(void * o)                  contact_destroy(c);          } +        list_for_each_safe(p, h, &lu->cookies) { +                struct cookie_el * c = list_entry(p, struct cookie_el, next); +                list_del(&c->next); +                free(c); +        } +          pthread_mutex_unlock(&lu->lock);          pthread_mutex_destroy(&lu->lock); @@ -739,7 +740,6 @@ static void lookup_destroy(struct lookup * lu)                  pthread_cond_broadcast(&lu->cond);                  break;          case LU_INIT: -        case LU_DONE:          case LU_UPDATE:          case LU_COMPLETE:                  lu->state = LU_NULL; @@ -762,6 +762,7 @@ static void lookup_update(struct dht *    dht,                            kad_msg_t *     msg)  {          struct list_head * p = NULL; +        struct list_head * h;          struct contact *   c = NULL;          size_t             n;          size_t             pos = 0; @@ -775,7 +776,19 @@ static void lookup_update(struct dht *    dht,          pthread_mutex_lock(&lu->lock); -        --lu->out; +        list_for_each_safe(p, h, &lu->cookies) { +                struct cookie_el * e = list_entry(p, struct cookie_el, next); +                if (e->cookie == msg->cookie) { +                        list_del(&e->next); +                        free(e); +                        break; +                } +        } + +        if (lu->state == LU_COMPLETE) { +                pthread_mutex_unlock(&lu->lock); +                return; +        }          if (msg->n_addrs > 0) {                  if (lu->addrs == NULL) { @@ -784,6 +797,7 @@ static void lookup_update(struct dht *    dht,                                  lu->addrs[n] = msg->addrs[n];                          lu->n_addrs = msg->n_addrs;                  } +                  lu->state = LU_COMPLETE;                  pthread_cond_broadcast(&lu->cond);                  pthread_mutex_unlock(&lu->lock); @@ -850,7 +864,7 @@ static void lookup_update(struct dht *    dht,                  }          } -        if (lu->out == 0 && !mod) +        if (list_is_empty(&lu->cookies) && !mod)                  lu->state = LU_COMPLETE;          else                  lu->state = LU_UPDATE; @@ -929,11 +943,6 @@ static void lookup_new_addrs(struct lookup * lu,          addrs[n] = 0; -        if (n == 0) { -                lu->state = LU_DONE; -                pthread_cond_signal(&lu->cond); -        } -          pthread_mutex_unlock(&lu->lock);  } @@ -969,10 +978,8 @@ static enum lookup_state lookup_wait(struct lookup * lu)          pthread_mutex_lock(&lu->lock); -        if (lu->state == LU_INIT) { +        if (lu->state == LU_INIT || lu->state == LU_UPDATE)                  lu->state = LU_PENDING; -                pthread_cond_signal(&lu->cond); -        }          pthread_cleanup_push(cleanup_wait, lu); @@ -1009,17 +1016,26 @@ static struct kad_req * dht_find_request(struct dht * dht,  }  static struct lookup * dht_find_lookup(struct dht *    dht, -                                       const uint8_t * key) +                                       uint32_t        cookie)  {          struct list_head * p; +        struct list_head * p2;          assert(dht); -        assert(key); +        assert(cookie > 0);          list_for_each(p, &dht->lookups) {                  struct lookup * l = list_entry(p, struct lookup, next); -                if (!memcmp(l->key, key, dht->b)) -                        return l; +                pthread_mutex_lock(&l->lock); +                list_for_each(p2, &l->cookies) { +                        struct cookie_el * e; +                        e = list_entry(p2, struct cookie_el, next); +                        if (e->cookie == cookie) { +                                pthread_mutex_unlock(&l->lock); +                                return l; +                        } +                } +                pthread_mutex_unlock(&l->lock);          }          return NULL; @@ -1479,7 +1495,7 @@ static int send_msg(struct dht * dht,          if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN)                  kad_req_create(dht, msg, addr); -        return 0; +        return msg->cookie;  #ifndef __DHT_TEST__   fail_msg:          pthread_rwlock_wrlock(&dht->lock); @@ -1592,14 +1608,14 @@ static int kad_store(struct dht *    dht,          msg.n_contacts   = 1;          msg.contacts     = cmsgp; -        if (send_msg(dht, &msg, r_addr)) +        if (send_msg(dht, &msg, r_addr) < 0)                  return -1;          return 0;  }  static ssize_t kad_find(struct dht *     dht, -                        const uint8_t *  key, +                        struct lookup *  lu,                          const uint64_t * addrs,                          enum kad_code    code)  { @@ -1607,19 +1623,40 @@ static ssize_t kad_find(struct dht *     dht,          ssize_t   sent = 0;          assert(dht); -        assert(key); +        assert(lu->key);          msg.code = code;          msg.has_key       = true; -        msg.key.data      = (uint8_t *) key; +        msg.key.data      = (uint8_t *) lu->key;          msg.key.len       = dht->b;          while (*addrs != 0) { -                if (*addrs != dht->addr) { -                        send_msg(dht, &msg, *addrs); -                        sent++; +                struct cookie_el * c; +                int ret; + +                if (*addrs == dht->addr) { +                        ++addrs; +                        continue;                  } + +                ret = send_msg(dht, &msg, *addrs); +                if (ret < 0) +                        break; + +                c = malloc(sizeof(*c)); +                if (c == NULL) +                        break; + +                c->cookie = (uint32_t) ret; + +                pthread_mutex_lock(&lu->lock); + +                list_add_tail(&c->next, &lu->cookies); + +                pthread_mutex_unlock(&lu->lock); + +                ++sent;                  ++addrs;          } @@ -1643,7 +1680,6 @@ static struct lookup * kad_lookup(struct dht *    dht,          uint64_t          addrs[KAD_ALPHA + 1];          enum lookup_state state;          struct lookup *   lu; -        size_t            out;          lu = lookup_create(dht, id);          if (lu == NULL) @@ -1657,14 +1693,11 @@ static struct lookup * kad_lookup(struct dht *    dht,                  return NULL;          } -        out = kad_find(dht, id, addrs, code); -        if (out == 0) { +        if (kad_find(dht, lu, addrs, code) == 0) {                  lookup_detach(dht, lu);                  return lu;          } -        lookup_add_out(lu, out); -          while ((state = lookup_wait(lu)) != LU_COMPLETE) {                  switch (state) {                  case LU_UPDATE: @@ -1672,10 +1705,8 @@ static struct lookup * kad_lookup(struct dht *    dht,                          if (addrs[0] == 0)                                  break; -                        out = kad_find(dht, id, addrs, code); -                        lookup_add_out(lu, out); +                        kad_find(dht, lu, addrs, code);                          break; -                case LU_DONE:                  case LU_DESTROY:                          lookup_detach(dht, lu);                          lookup_set_state(lu, LU_NULL); @@ -1714,7 +1745,6 @@ static void kad_publish(struct dht *    dht,          pthread_rwlock_unlock(&dht->lock); -          addrs = malloc(k * sizeof(*addrs));          if (addrs == NULL)                  return; @@ -1768,7 +1798,7 @@ static int kad_join(struct dht * dht,          pthread_rwlock_unlock(&dht->lock); -        if (send_msg(dht, &msg, addr)) +        if (send_msg(dht, &msg, addr) < 0)                  return -1;          if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) @@ -2104,6 +2134,8 @@ static int kad_handle_join_resp(struct dht *     dht,          kad_req_respond(req); +        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); +          pthread_rwlock_unlock(&dht->lock);          log_dbg("Enrollment of DHT completed."); @@ -2123,7 +2155,7 @@ static int kad_handle_find_resp(struct dht *     dht,          pthread_rwlock_rdlock(&dht->lock); -        lu = dht_find_lookup(dht, req->key); +        lu = dht_find_lookup(dht, req->cookie);          if (lu == NULL) {                  pthread_rwlock_unlock(&dht->lock);                  return -1; @@ -2515,7 +2547,7 @@ static void * dht_handle_sdu(void * o)                          pthread_rwlock_unlock(&dht->lock);                  } -                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr)) +                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0)                                  log_warn("Failed to send response.");                  kad_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 8d8b51ee..2b3f5c2a 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -229,10 +229,12 @@ int fa_alloc(int             fd,          uint64_t             addr;          struct shm_du_buff * sdb; -        if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) +        addr = dir_query(dst); +        if (addr == 0)                  return -1; -        addr = dir_query(dst); +        if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) +                return -1;          msg         = (struct fa_msg *) shm_du_buff_head(sdb);          msg->code   = FLOW_REQ; | 
