diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-21 08:46:55 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-09-21 08:46:55 +0000 | 
| commit | b66cf853efee91fcc43e2bbfcb1e39071b775d49 (patch) | |
| tree | 98b9791cb6db7b930fff21cc4e38c8ea0450e137 /src/ipcpd | |
| parent | 0934aa0242f0d61f2b8f7311402cf009b88f1ca6 (diff) | |
| parent | 1e1de177e7da8075b016b703168f3473f58e0568 (diff) | |
| download | ouroboros-b66cf853efee91fcc43e2bbfcb1e39071b775d49.tar.gz ouroboros-b66cf853efee91fcc43e2bbfcb1e39071b775d49.zip | |
Merged in dstaesse/ouroboros/be-dht-revision (pull request #605)
ipcpd: Handle DHT SDUs in different thread
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 73 | 
1 files changed, 58 insertions, 15 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index e7ba4bf3..996b122a 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -227,6 +227,11 @@ struct join_info {          uint64_t     addr;  }; +struct sdu_info { +        struct dht *         dht; +        struct shm_du_buff * sdb; +}; +  static uint8_t * dht_dup_key(const uint8_t * key,                               size_t          len)  { @@ -1665,11 +1670,16 @@ static int kad_join(struct dht * dht,          msg.has_t_refresh   = true;          msg.has_t_replicate = true;          msg.alpha           = KAD_ALPHA; -        msg.b               = dht->b;          msg.k               = KAD_K;          msg.t_refresh       = KAD_T_REFR;          msg.t_replicate     = KAD_T_REPL; +        pthread_rwlock_rdlock(&dht->lock); + +        msg.b               = dht->b; + +        pthread_rwlock_unlock(&dht->lock); +          if (send_msg(dht, &msg, addr))                  return -1; @@ -1798,12 +1808,16 @@ static ssize_t dht_get_contacts(struct dht *          dht,          pthread_rwlock_rdlock(&dht->lock);          len = dht_contact_list(dht, &l, key); -        if (len == 0) +        if (len == 0) { +                pthread_rwlock_unlock(&dht->lock);                  return 0; +        }          *msgs = malloc(len * sizeof(**msgs)); -        if (*msgs == NULL) +        if (*msgs == NULL) { +                pthread_rwlock_unlock(&dht->lock);                  return 0; +        }          list_for_each_safe(p, h, &l) {                  struct contact * c = list_entry(p, struct contact, next); @@ -1976,8 +1990,6 @@ static int kad_handle_join_resp(struct dht *     dht,                  return -1;          } -        dht->state = DHT_RUNNING; -          kad_req_respond(req);          dht_update_bucket(dht, msg->s_id.data, msg->s_addr); @@ -2201,10 +2213,10 @@ uint64_t dht_query(struct dht *    dht,          return 0;  } -void dht_post_sdu(void *               ae, -                  struct shm_du_buff * sdb) +static void * dht_handle_sdu(void * o)  {          struct dht *         dht; +        struct shm_du_buff * sdb;          kad_msg_t *          msg;          kad_contact_msg_t ** cmsgs;          kad_msg_t            resp_msg = KAD_MSG__INIT; @@ -2212,12 +2224,15 @@ void dht_post_sdu(void *               ae,          buffer_t             buf;          size_t               i; -        assert(ae); -        assert(sdb); +        assert(o);          memset(&buf, 0, sizeof(buf)); -        dht = (struct dht *) ae; +        dht = ((struct sdu_info *) o)->dht; +        sdb = ((struct sdu_info *) o)->sdb; + +        assert(dht); +        assert(sdb);          msg = kad_msg__unpack(NULL,                                shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), @@ -2225,27 +2240,29 @@ void dht_post_sdu(void *               ae,          ipcp_sdb_release(sdb); +        free((struct sdu_info *) o); +          if (msg == NULL) {                  log_err("Failed to unpack message."); -                return; +                return (void *) -1;          }          if (msg->has_key && msg->key.len != dht->b) {                  kad_msg__free_unpacked(msg, NULL);                  log_warn("Bad key in message."); -                return; +                return (void *) -1;          }          if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) {                  kad_msg__free_unpacked(msg, NULL);                  log_warn("Bad source ID in message of type %d.", msg->code); -                return; +                return (void *) -1;          }          if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {                  kad_msg__free_unpacked(msg, NULL);                  log_dbg("Got a request message when not running."); -                return; +                return (void *) -1;          }          addr = msg->s_addr; @@ -2341,11 +2358,34 @@ void dht_post_sdu(void *               ae,                  free(resp_msg.addrs);          if (resp_msg.n_contacts == 0) -                return; +                return (void *) -1;          for (i = 0; i < resp_msg.n_contacts; ++i)                  kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);          free(resp_msg.contacts); + +        return (void *) 0; +} + +static void dht_post_sdu(void *               comp, +                         struct shm_du_buff * sdb) +{ +        pthread_t         thr; +        struct sdu_info * info; + +        info = malloc(sizeof(*info)); +        if (info == NULL) +                return; + +        info->dht = (struct dht *) comp; +        info->sdb = sdb; + +        if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { +                free(info); +                return; +        } + +        pthread_detach(thr);  }  void dht_destroy(struct dht * dht) @@ -2421,6 +2461,8 @@ static void * join_thr(void * o)                  sleep(KAD_JOIN_INTV);          } +        dht_set_state(info->dht, DHT_RUNNING); +          lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE);          if (lu != NULL)                  lookup_destroy(lu); @@ -2509,6 +2551,7 @@ struct dht * dht_create(uint64_t addr)          notifier_reg(handle_event, dht);  #else          (void) handle_event; +        (void) dht_post_sdu;  #endif          dht->state = DHT_INIT; | 
