diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 115 | 
1 files changed, 85 insertions, 30 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index be5411da..548ae03a 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -304,17 +304,18 @@ static uint8_t * create_id(size_t len)          return id;  } -static struct kad_req * kad_req_create(struct dht * dht, -                                       kad_msg_t *  msg, -                                       uint64_t     addr) +static void kad_req_create(struct dht * dht, +                           kad_msg_t *  msg, +                           uint64_t     addr)  {          struct kad_req *   req;          pthread_condattr_t cattr;          struct timespec    t; +        size_t             b;          req = malloc(sizeof(*req));          if (req == NULL) -                return NULL; +                return;          list_head_init(&req->next); @@ -327,18 +328,22 @@ static struct kad_req * kad_req_create(struct dht * dht,          req->code   = msg->code;          req->key    = NULL; +        pthread_rwlock_rdlock(&dht->lock); +        b = dht->b; +        pthread_rwlock_unlock(&dht->lock); +          if (msg->has_key) { -                req->key = dht_dup_key(msg->key.data, dht->b); +                req->key = dht_dup_key(msg->key.data, b);                  if (req->key == NULL) {                          free(req); -                        return NULL; +                        return;                  }          }          if (pthread_mutex_init(&req->lock, NULL)) {                  free(req->key);                  free(req); -                return NULL; +                return;          }          pthread_condattr_init(&cattr); @@ -351,12 +356,16 @@ static struct kad_req * kad_req_create(struct dht * dht,                  pthread_mutex_destroy(&req->lock);                  free(req->key);                  free(req); -                return NULL; +                return;          }          pthread_condattr_destroy(&cattr); -        return req; +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&req->next, &dht->requests); + +        pthread_rwlock_unlock(&dht->lock);  }  static void kad_req_destroy(struct kad_req * req) @@ -1357,7 +1366,6 @@ static int send_msg(struct dht * dht,                      uint64_t     addr)  {          struct shm_du_buff * sdb; -        struct kad_req *     req;          size_t               len;          int                  retr = 0; @@ -1376,10 +1384,14 @@ static int send_msg(struct dht * dht,          if (msg->code < KAD_STORE) {                  msg->cookie = bmp_allocate(dht->cookies); -                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) +                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { +                        pthread_rwlock_unlock(&dht->lock);                          goto fail_bmp_alloc; +                }          } +        pthread_rwlock_unlock(&dht->lock); +          len = kad_msg__get_packed_size(msg);          if (len == 0)                  goto fail_msg; @@ -1406,13 +1418,8 @@ static int send_msg(struct dht * dht,          ipcp_sdb_release(sdb);  #endif /* __DHT_TEST__ */ -        if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) { -                req = kad_req_create(dht, msg, addr); -                if (req != NULL) -                        list_add(&req->next, &dht->requests); -        } - -        pthread_rwlock_unlock(&dht->lock); +        if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) +                kad_req_create(dht, msg, addr);          return 0; @@ -1423,7 +1430,6 @@ static int send_msg(struct dht * dht,   fail_msg:          bmp_release(dht->cookies, msg->cookie);   fail_bmp_alloc: -        pthread_rwlock_unlock(&dht->lock);          return -1;  } @@ -1513,9 +1519,14 @@ static int kad_store(struct dht *    dht,          kad_contact_msg_t * cmsgp[1];          cmsg.id.data = (uint8_t *) key; -        cmsg.id.len  = dht->b;          cmsg.addr    = addr; +        pthread_rwlock_rdlock(&dht->lock); + +        cmsg.id.len  = dht->b; + +        pthread_rwlock_unlock(&dht->lock); +          cmsgp[0] = &cmsg;          msg.code         = KAD_STORE; @@ -1632,15 +1643,32 @@ static void kad_publish(struct dht *    dht,                          time_t          exp)  {          struct lookup * lu; -        uint64_t        addrs[KAD_K]; +        uint64_t      * addrs;          ssize_t         n; +        size_t          k; +        time_t          t_expire; +          assert(dht);          assert(key); +        pthread_rwlock_rdlock(&dht->lock); + +        k        = dht->k; +        t_expire = dht->t_expire; + +        pthread_rwlock_unlock(&dht->lock); + + +        addrs = malloc(k * sizeof(*addrs)); +        if (addrs == NULL) +                return; +          lu = kad_lookup(dht, key, KAD_FIND_NODE); -        if (lu == NULL) +        if (lu == NULL) { +                free(addrs);                  return; +        }          n = lookup_contact_addrs(lu, addrs); @@ -1652,12 +1680,14 @@ static void kad_publish(struct dht *    dht,                          msg.addr    = addr;                          kad_add(dht, &msg, 1, exp);                  } else { -                        if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) +                        if (kad_store(dht, key, addr, addrs[n], t_expire))                                  log_warn("Failed to send store message.");                  }          }          lookup_destroy(lu); + +        free(addrs);  }  static int kad_join(struct dht * dht, @@ -1879,13 +1909,24 @@ static void * work(void * o)                  pthread_rwlock_wrlock(&dht->lock);                  /* Republish registered hashes. */ -                list_for_each_safe(p, h, &dht->refs) { +                list_for_each(p, &dht->refs) {                          struct ref_entry * e; +                        uint8_t *          key; +                        uint64_t           addr; +                        time_t             t_expire;                          e = list_entry(p, struct ref_entry, next);                          if (now.tv_sec > e->t_rep) { -                                kad_publish(dht, e->key, dht->addr, -                                            dht->t_expire); +                                key = dht_dup_key(e->key, dht->b); +                                if (key == NULL) +                                        continue; +                                addr = dht->addr; +                                t_expire = dht->t_expire;                                  e->t_rep = now.tv_sec + dht->t_repub; + +                                pthread_rwlock_unlock(&dht->lock); +                                kad_publish(dht, key, addr, t_expire); +                                pthread_rwlock_wrlock(&dht->lock); +                                free(key);                          }                  } @@ -1894,19 +1935,28 @@ static void * work(void * o)                          struct list_head * p1;                          struct list_head * h1;                          struct dht_entry * e; +                        uint8_t *          key; +                        time_t             t_expire;                          e = list_entry (p, struct dht_entry, next);                          list_for_each_safe(p1, h1, &e->vals) {                                  struct val * v; +                                uint64_t     addr;                                  v = list_entry(p1, struct val, next);                                  if (now.tv_sec > v->t_exp) {                                          list_del(&v->next);                                          val_destroy(v); +                                        continue;                                  }                                  if (now.tv_sec > v->t_rep) { -                                        kad_publish(dht, e->key, v->addr, -                                                    dht->t_expire - now.tv_sec); +                                        key  = dht_dup_key(e->key, dht->b); +                                        addr = v->addr; +                                        t_expire = dht->t_expire = now.tv_sec;                                          v->t_rep = now.tv_sec + dht->t_replic; +                                        pthread_rwlock_unlock(&dht->lock); +                                        kad_publish(dht, key, addr, t_expire); +                                        pthread_rwlock_wrlock(&dht->lock); +                                        free(key);                                  }                          }                  } @@ -1964,7 +2014,7 @@ static int kad_handle_join_resp(struct dht *     dht,                  return -1;          } -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_wrlock(&dht->lock);          dht->buckets = bucket_create();          if (dht->buckets == NULL) { @@ -2116,6 +2166,8 @@ int dht_reg(struct dht *    dht,              const uint8_t * key)  {          struct ref_entry * e; +        uint64_t           addr; +        time_t             t_expire;          assert(dht);          assert(key); @@ -2132,9 +2184,12 @@ int dht_reg(struct dht *    dht,          list_add(&e->next, &dht->refs); +        t_expire = dht->t_expire; +        addr = dht->addr; +          pthread_rwlock_unlock(&dht->lock); -        kad_publish(dht, key, dht->addr, dht->t_expire); +        kad_publish(dht, key, >addr, t_expire);          return 0;  } | 
