diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 136 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 6 | 
2 files changed, 88 insertions, 54 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 9296130a..d2ea9985 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -60,13 +60,13 @@ typedef KadContactMsg kad_contact_msg_t;  #define KAD_K         8    /* Replication factor, MDHT value.            */  #define KAD_T_REPL    900  /* Replication time, tied to k. MDHT value.   */  #define KAD_T_REFR    900  /* Refresh time stale bucket, MDHT value.     */ -#define KAD_T_JOIN    6    /* Response time to wait for a join.          */ -#define KAD_T_RESP    2    /* Response time to wait for a response.      */ +#define KAD_T_JOIN    8    /* Response time to wait for a join.          */ +#define KAD_T_RESP    5    /* Response time to wait for a response.      */  #define KAD_R_PING    2    /* Ping retries before declaring peer dead.   */  #define KAD_QUEER     15   /* Time to declare peer questionable.         */  #define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8. */  #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */ -#define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */ +#define KAD_JOIN_RETR 8    /* Number of retries sending a join.          */  #define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */  #define HANDLE_TIMEO  1000 /* Timeout for dht_handle_sdu tpm check (ms)  */ @@ -101,7 +101,6 @@ enum lookup_state {          LU_PENDING,          LU_UPDATE,          LU_COMPLETE, -        LU_DONE,          LU_DESTROY  }; @@ -120,16 +119,22 @@ struct kad_req {          time_t             t_exp;  }; +struct cookie_el { +        struct list_head next; + +        uint32_t         cookie; +}; +  struct lookup {          struct list_head  next; +        struct list_head  cookies; +          uint8_t *         key;          struct list_head  contacts;          size_t            n_contacts; -        size_t            out; -          uint64_t *        addrs;          size_t            n_addrs; @@ -645,11 +650,11 @@ static struct lookup * lookup_create(struct dht *    dht,                  goto fail_malloc;          list_head_init(&lu->contacts); +        list_head_init(&lu->cookies);          lu->state   = LU_INIT;          lu->addrs   = NULL;          lu->n_addrs = 0; -        lu->out     = 0;          lu->key     = dht_dup_key(id, dht->b);          if (lu->key == NULL)                  goto fail_id; @@ -688,16 +693,6 @@ static struct lookup * lookup_create(struct dht *    dht,          return NULL;  } -static void lookup_add_out(struct lookup * lu, -                           size_t          n) -{ -        pthread_mutex_lock(&lu->lock); - -        lu->out += n; - -        pthread_mutex_unlock(&lu->lock); -} -  static void cancel_lookup_destroy(void * o)  {          struct lookup *    lu; @@ -717,6 +712,12 @@ static void cancel_lookup_destroy(void * o)                  contact_destroy(c);          } +        list_for_each_safe(p, h, &lu->cookies) { +                struct cookie_el * c = list_entry(p, struct cookie_el, next); +                list_del(&c->next); +                free(c); +        } +          pthread_mutex_unlock(&lu->lock);          pthread_mutex_destroy(&lu->lock); @@ -739,7 +740,6 @@ static void lookup_destroy(struct lookup * lu)                  pthread_cond_broadcast(&lu->cond);                  break;          case LU_INIT: -        case LU_DONE:          case LU_UPDATE:          case LU_COMPLETE:                  lu->state = LU_NULL; @@ -762,6 +762,7 @@ static void lookup_update(struct dht *    dht,                            kad_msg_t *     msg)  {          struct list_head * p = NULL; +        struct list_head * h;          struct contact *   c = NULL;          size_t             n;          size_t             pos = 0; @@ -775,7 +776,19 @@ static void lookup_update(struct dht *    dht,          pthread_mutex_lock(&lu->lock); -        --lu->out; +        list_for_each_safe(p, h, &lu->cookies) { +                struct cookie_el * e = list_entry(p, struct cookie_el, next); +                if (e->cookie == msg->cookie) { +                        list_del(&e->next); +                        free(e); +                        break; +                } +        } + +        if (lu->state == LU_COMPLETE) { +                pthread_mutex_unlock(&lu->lock); +                return; +        }          if (msg->n_addrs > 0) {                  if (lu->addrs == NULL) { @@ -784,6 +797,7 @@ static void lookup_update(struct dht *    dht,                                  lu->addrs[n] = msg->addrs[n];                          lu->n_addrs = msg->n_addrs;                  } +                  lu->state = LU_COMPLETE;                  pthread_cond_broadcast(&lu->cond);                  pthread_mutex_unlock(&lu->lock); @@ -850,7 +864,7 @@ static void lookup_update(struct dht *    dht,                  }          } -        if (lu->out == 0 && !mod) +        if (list_is_empty(&lu->cookies) && !mod)                  lu->state = LU_COMPLETE;          else                  lu->state = LU_UPDATE; @@ -929,11 +943,6 @@ static void lookup_new_addrs(struct lookup * lu,          addrs[n] = 0; -        if (n == 0) { -                lu->state = LU_DONE; -                pthread_cond_signal(&lu->cond); -        } -          pthread_mutex_unlock(&lu->lock);  } @@ -969,10 +978,8 @@ static enum lookup_state lookup_wait(struct lookup * lu)          pthread_mutex_lock(&lu->lock); -        if (lu->state == LU_INIT) { +        if (lu->state == LU_INIT || lu->state == LU_UPDATE)                  lu->state = LU_PENDING; -                pthread_cond_signal(&lu->cond); -        }          pthread_cleanup_push(cleanup_wait, lu); @@ -1009,17 +1016,26 @@ static struct kad_req * dht_find_request(struct dht * dht,  }  static struct lookup * dht_find_lookup(struct dht *    dht, -                                       const uint8_t * key) +                                       uint32_t        cookie)  {          struct list_head * p; +        struct list_head * p2;          assert(dht); -        assert(key); +        assert(cookie > 0);          list_for_each(p, &dht->lookups) {                  struct lookup * l = list_entry(p, struct lookup, next); -                if (!memcmp(l->key, key, dht->b)) -                        return l; +                pthread_mutex_lock(&l->lock); +                list_for_each(p2, &l->cookies) { +                        struct cookie_el * e; +                        e = list_entry(p2, struct cookie_el, next); +                        if (e->cookie == cookie) { +                                pthread_mutex_unlock(&l->lock); +                                return l; +                        } +                } +                pthread_mutex_unlock(&l->lock);          }          return NULL; @@ -1479,7 +1495,7 @@ static int send_msg(struct dht * dht,          if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN)                  kad_req_create(dht, msg, addr); -        return 0; +        return msg->cookie;  #ifndef __DHT_TEST__   fail_msg:          pthread_rwlock_wrlock(&dht->lock); @@ -1592,14 +1608,14 @@ static int kad_store(struct dht *    dht,          msg.n_contacts   = 1;          msg.contacts     = cmsgp; -        if (send_msg(dht, &msg, r_addr)) +        if (send_msg(dht, &msg, r_addr) < 0)                  return -1;          return 0;  }  static ssize_t kad_find(struct dht *     dht, -                        const uint8_t *  key, +                        struct lookup *  lu,                          const uint64_t * addrs,                          enum kad_code    code)  { @@ -1607,19 +1623,40 @@ static ssize_t kad_find(struct dht *     dht,          ssize_t   sent = 0;          assert(dht); -        assert(key); +        assert(lu->key);          msg.code = code;          msg.has_key       = true; -        msg.key.data      = (uint8_t *) key; +        msg.key.data      = (uint8_t *) lu->key;          msg.key.len       = dht->b;          while (*addrs != 0) { -                if (*addrs != dht->addr) { -                        send_msg(dht, &msg, *addrs); -                        sent++; +                struct cookie_el * c; +                int ret; + +                if (*addrs == dht->addr) { +                        ++addrs; +                        continue;                  } + +                ret = send_msg(dht, &msg, *addrs); +                if (ret < 0) +                        break; + +                c = malloc(sizeof(*c)); +                if (c == NULL) +                        break; + +                c->cookie = (uint32_t) ret; + +                pthread_mutex_lock(&lu->lock); + +                list_add_tail(&c->next, &lu->cookies); + +                pthread_mutex_unlock(&lu->lock); + +                ++sent;                  ++addrs;          } @@ -1643,7 +1680,6 @@ static struct lookup * kad_lookup(struct dht *    dht,          uint64_t          addrs[KAD_ALPHA + 1];          enum lookup_state state;          struct lookup *   lu; -        size_t            out;          lu = lookup_create(dht, id);          if (lu == NULL) @@ -1657,14 +1693,11 @@ static struct lookup * kad_lookup(struct dht *    dht,                  return NULL;          } -        out = kad_find(dht, id, addrs, code); -        if (out == 0) { +        if (kad_find(dht, lu, addrs, code) == 0) {                  lookup_detach(dht, lu);                  return lu;          } -        lookup_add_out(lu, out); -          while ((state = lookup_wait(lu)) != LU_COMPLETE) {                  switch (state) {                  case LU_UPDATE: @@ -1672,10 +1705,8 @@ static struct lookup * kad_lookup(struct dht *    dht,                          if (addrs[0] == 0)                                  break; -                        out = kad_find(dht, id, addrs, code); -                        lookup_add_out(lu, out); +                        kad_find(dht, lu, addrs, code);                          break; -                case LU_DONE:                  case LU_DESTROY:                          lookup_detach(dht, lu);                          lookup_set_state(lu, LU_NULL); @@ -1714,7 +1745,6 @@ static void kad_publish(struct dht *    dht,          pthread_rwlock_unlock(&dht->lock); -          addrs = malloc(k * sizeof(*addrs));          if (addrs == NULL)                  return; @@ -1768,7 +1798,7 @@ static int kad_join(struct dht * dht,          pthread_rwlock_unlock(&dht->lock); -        if (send_msg(dht, &msg, addr)) +        if (send_msg(dht, &msg, addr) < 0)                  return -1;          if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) @@ -2104,6 +2134,8 @@ static int kad_handle_join_resp(struct dht *     dht,          kad_req_respond(req); +        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); +          pthread_rwlock_unlock(&dht->lock);          log_dbg("Enrollment of DHT completed."); @@ -2123,7 +2155,7 @@ static int kad_handle_find_resp(struct dht *     dht,          pthread_rwlock_rdlock(&dht->lock); -        lu = dht_find_lookup(dht, req->key); +        lu = dht_find_lookup(dht, req->cookie);          if (lu == NULL) {                  pthread_rwlock_unlock(&dht->lock);                  return -1; @@ -2515,7 +2547,7 @@ static void * dht_handle_sdu(void * o)                          pthread_rwlock_unlock(&dht->lock);                  } -                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr)) +                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0)                                  log_warn("Failed to send response.");                  kad_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 8d8b51ee..2b3f5c2a 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -229,10 +229,12 @@ int fa_alloc(int             fd,          uint64_t             addr;          struct shm_du_buff * sdb; -        if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) +        addr = dir_query(dst); +        if (addr == 0)                  return -1; -        addr = dir_query(dst); +        if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) +                return -1;          msg         = (struct fa_msg *) shm_du_buff_head(sdb);          msg->code   = FLOW_REQ; | 
