diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-24 14:45:18 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-09-24 14:45:18 +0000 | 
| commit | 22662b66f5802cd85a973e083c039500ccd2dd5e (patch) | |
| tree | 17f66b04659a06c018494eb732adb661111d63f2 /src/ipcpd | |
| parent | 7cef269be64f64b920763c6f2455931422c8bfe9 (diff) | |
| parent | ff5063ad0e7902ce59864a466bd9d8d606d788e4 (diff) | |
| download | ouroboros-22662b66f5802cd85a973e083c039500ccd2dd5e.tar.gz ouroboros-22662b66f5802cd85a973e083c039500ccd2dd5e.zip | |
Merged in dstaesse/ouroboros/be-dht-debugging (pull request #615)
ipcpd: Fix compilation of DHT
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/ipcp.c | 99 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 9 | ||||
| -rw-r--r-- | src/ipcpd/normal/dht.c | 339 | 
3 files changed, 262 insertions, 185 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 85d543da..513c638a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -36,7 +36,6 @@  #include <ouroboros/dev.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/np1_flow.h> -#include <ouroboros/tpm.h>  #include "ipcp.h" @@ -44,6 +43,14 @@  #include <sys/socket.h>  #include <stdlib.h> +struct cmd { +        struct list_head next; + +        uint8_t          cbuf[IPCP_MSG_BUF_SIZE]; +        size_t           len; +        int              fd; +}; +  void ipcp_sig_handler(int         sig,                        siginfo_t * info,                        void *      c) @@ -107,7 +114,8 @@ static void * acceptloop(void * o)          while (ipcp_get_state() != IPCP_SHUTDOWN &&                 ipcp_get_state() != IPCP_NULL) { -                ssize_t count; +                struct cmd * cmd; +  #if defined(__FreeBSD__) || defined(__APPLE__)                  FD_ZERO(&fds);                  FD_SET(ipcpi.sockfd, &fds); @@ -122,25 +130,28 @@ static void * acceptloop(void * o)                                 (void *) &tv, sizeof(tv)))                          log_warn("Failed to set timeout on socket."); -                pthread_mutex_lock(&ipcpi.cmd_lock); +                cmd = malloc(sizeof(*cmd)); +                if (cmd == NULL) { +                        log_err("Out of memory"); +                        break; +                } -                assert(ipcpi.csockfd == -1); +                pthread_mutex_lock(&ipcpi.cmd_lock); -                count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); -                if (count <= 0) { +                cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE); +                if (cmd->len <= 0) {                          pthread_mutex_unlock(&ipcpi.cmd_lock);                          log_err("Failed to read from socket.");                          close(csockfd); +                        free(cmd);                          continue;                  } -                ipcpi.cmd_len = count; -                ipcpi.csockfd = csockfd; +                cmd->fd = csockfd; -                pthread_cond_signal(&ipcpi.cmd_cond); +                list_add(&cmd->next, &ipcpi.cmds); -                while (ipcpi.csockfd != -1) -                        pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock); +                pthread_cond_signal(&ipcpi.cmd_cond);                  pthread_mutex_unlock(&ipcpi.cmd_lock);          } @@ -159,13 +170,15 @@ static void * mainloop(void * o)          struct timespec     dl;          struct timespec     to  = {(IPCP_ACCEPT_TIMEOUT / 1000),                                     (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; -        (void)  o; + +        (void) o;          while (true) {                  int                 ret = 0;                  ipcp_msg_t          ret_msg  = IPCP_MSG__INIT;                  dif_info_msg_t      dif_info = DIF_INFO_MSG__INIT;                  int                 fd       = -1; +                struct cmd *        cmd;                  ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -174,33 +187,34 @@ static void * mainloop(void * o)                  pthread_mutex_lock(&ipcpi.cmd_lock); -                while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) +                while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT)                          ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,                                                        &ipcpi.cmd_lock,                                                        &dl); -                sfd           = ipcpi.csockfd; -                ipcpi.csockfd = -1; - -                if (sfd == -1) { +                if (ret == -ETIMEDOUT) {                          pthread_mutex_unlock(&ipcpi.cmd_lock); -                        if (tpm_check()) +                        if (tpm_check(ipcpi.tpm))                                  break;                          continue;                  } -                pthread_cond_signal(&ipcpi.acc_cond); +                cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_mutex_unlock(&ipcpi.cmd_lock); + +                msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); +                sfd = cmd->fd; + +                free(cmd); -                msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);                  if (msg == NULL) { -                        pthread_mutex_unlock(&ipcpi.cmd_lock);                          close(sfd);                          continue;                  } -                pthread_mutex_unlock(&ipcpi.cmd_lock); - -                tpm_dec(); +                tpm_dec(ipcpi.tpm);                  switch (msg->code) {                  case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -474,7 +488,7 @@ static void * mainloop(void * o)                  if (buffer.len == 0) {                          log_err("Failed to pack reply message");                          close(sfd); -                        tpm_inc(); +                        tpm_inc(ipcpi.tpm);                          continue;                  } @@ -482,7 +496,7 @@ static void * mainloop(void * o)                  if (buffer.data == NULL) {                          log_err("Failed to create reply buffer.");                          close(sfd); -                        tpm_inc(); +                        tpm_inc(ipcpi.tpm);                          continue;                  } @@ -492,17 +506,17 @@ static void * mainloop(void * o)                          log_err("Failed to send reply message");                          free(buffer.data);                          close(sfd); -                        tpm_inc(); +                        tpm_inc(ipcpi.tpm);                          continue;                  }                  free(buffer.data);                  close(sfd); -                tpm_inc(); +                tpm_inc(ipcpi.tpm);          } -        tpm_exit(); +        tpm_exit(ipcpi.tpm);          return (void *) 0;  } @@ -617,20 +631,14 @@ int ipcp_init(int               argc,                  goto fail_cmd_cond;          } -        if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) { -                log_err("Failed to init convar."); -                goto fail_acc_cond; -        } +        list_head_init(&ipcpi.cmds);          ipcpi.alloc_id = -1; -        ipcpi.csockfd  = -1;          pthread_condattr_destroy(&cattr);          return 0; - fail_acc_cond: -        pthread_cond_destroy(&ipcpi.cmd_cond);   fail_cmd_cond:          pthread_mutex_destroy(&ipcpi.cmd_lock);   fail_cmd_lock: @@ -675,12 +683,14 @@ int ipcp_boot()          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) -                goto fail_tpm_init; +        ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, +                               mainloop, NULL); +        if (ipcpi.tpm == NULL) +                goto fail_tpm_create;          pthread_sigmask(SIG_BLOCK, &sigset, NULL); -        if (tpm_start()) +        if (tpm_start(ipcpi.tpm))                  goto fail_tpm_start;          ipcp_set_state(IPCP_INIT); @@ -696,18 +706,18 @@ int ipcp_boot()          return 0;   fail_acceptor: -        tpm_stop(); +        tpm_stop(ipcpi.tpm);   fail_tpm_start: -        tpm_fini(); - fail_tpm_init: +        tpm_destroy(ipcpi.tpm); + fail_tpm_create:          return -1;  }  void ipcp_shutdown()  {          pthread_join(ipcpi.acceptor, NULL); -        tpm_stop(); -        tpm_fini(); +        tpm_stop(ipcpi.tpm); +        tpm_destroy(ipcpi.tpm);          log_info("IPCP %d shutting down.", getpid());  } @@ -724,7 +734,6 @@ void ipcp_fini()          pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_cond_destroy(&ipcpi.alloc_cond);          pthread_mutex_destroy(&ipcpi.alloc_lock); -        pthread_cond_destroy(&ipcpi.acc_cond);          pthread_cond_destroy(&ipcpi.cmd_cond);          pthread_mutex_destroy(&ipcpi.cmd_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1b2a0334..d47d224b 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -25,8 +25,10 @@  #include <ouroboros/hash.h>  #include <ouroboros/ipcp.h> +#include <ouroboros/list.h>  #include <ouroboros/qoscube.h>  #include <ouroboros/sockets.h> +#include <ouroboros/tpm.h>  #include <pthread.h>  #include <time.h> @@ -92,10 +94,7 @@ struct ipcp {          int                sockfd;          char *             sock_path; -        uint8_t            cbuf[IPCP_MSG_BUF_SIZE]; -        size_t             cmd_len; -        int                csockfd; -        pthread_cond_t     acc_cond; +        struct list_head   cmds;          pthread_cond_t     cmd_cond;          pthread_mutex_t    cmd_lock; @@ -103,6 +102,8 @@ struct ipcp {          pthread_cond_t     alloc_cond;          pthread_mutex_t    alloc_lock; +        struct tpm *       tpm; +          pthread_t          acceptor;  } ipcpi; diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 548ae03a..93fd4e4e 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -34,6 +34,7 @@  #include <ouroboros/notifier.h>  #include <ouroboros/random.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h>  #include <ouroboros/utils.h>  #include "connmgr.h" @@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t;  #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */  #define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */  #define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */ +#define HANDLE_TIMEO  1000 /* Timeout for dht_handle_sdu tpm check (ms)  */  enum dht_state {          DHT_INIT = 0, @@ -187,6 +189,12 @@ struct bucket {          struct bucket *  children[1L << KAD_BETA];  }; +struct cmd { +        struct list_head     next; + +        struct shm_du_buff * sdb; +}; +  struct dht {          size_t           alpha;          size_t           b; @@ -212,6 +220,7 @@ struct dht {          struct bmp *     cookies;          enum dht_state   state; +        struct list_head cmds;          pthread_cond_t   cond;          pthread_mutex_t  mtx; @@ -219,6 +228,8 @@ struct dht {          int              fd; +        struct tpm *     tpm; +          pthread_t        worker;  }; @@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht,          ipcp_sdb_release(sdb);  #endif   fail_msg: +        pthread_rwlock_wrlock(&dht->lock);          bmp_release(dht->cookies, msg->cookie); +        pthread_rwlock_unlock(&dht->lock);   fail_bmp_alloc:          return -1;  } @@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht *          dht,          list_head_init(&l); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_wrlock(&dht->lock);          len = dht_contact_list(dht, &l, key);          if (len == 0) { @@ -1898,9 +1911,13 @@ static void * work(void * o)          dht = (struct dht *) o; +        pthread_rwlock_rdlock(&dht->lock); +          intv = gcd(dht->t_expire, dht->t_repub);          intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; +        pthread_rwlock_unlock(&dht->lock); +          list_head_init(&reflist);          while (true) { @@ -2189,7 +2206,7 @@ int dht_reg(struct dht *    dht,          pthread_rwlock_unlock(&dht->lock); -        kad_publish(dht, key, >addr, t_expire); +        kad_publish(dht, key, addr, t_expire);          return 0;  } @@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht *    dht,  static void * dht_handle_sdu(void * o)  { -        struct dht *         dht; -        struct shm_du_buff * sdb; -        kad_msg_t *          msg; -        kad_contact_msg_t ** cmsgs; -        kad_msg_t            resp_msg = KAD_MSG__INIT; -        uint64_t             addr; -        buffer_t             buf; -        size_t               i; -        size_t               b; -        size_t               t_expire; +        struct dht *    dht = (struct dht *) o; +        struct timespec dl; +        struct timespec to = {(HANDLE_TIMEO / 1000), +                              (HANDLE_TIMEO % 1000) * MILLION}; +        assert(dht); -        assert(o); +        while (true) { +                kad_msg_t *          msg; +                kad_contact_msg_t ** cmsgs; +                kad_msg_t            resp_msg = KAD_MSG__INIT; +                uint64_t             addr; +                buffer_t             buf; +                size_t               i; +                size_t               b; +                size_t               t_expire; +                struct cmd *         cmd; +                int                  ret = 0; + +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_mutex_lock(&dht->mtx); + +                while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) +                        ret = -pthread_cond_timedwait(&dht->cond, +                                                      &dht->mtx, &dl); + +                if (ret == -ETIMEDOUT) { +                        pthread_mutex_unlock(&dht->mtx); +                        if (tpm_check(dht->tpm)) +                                break; +                        continue; +                } -        memset(&buf, 0, sizeof(buf)); +                cmd = list_last_entry(&dht->cmds, struct cmd, next); +                list_del(&cmd->next); -        dht = ((struct sdu_info *) o)->dht; -        sdb = ((struct sdu_info *) o)->sdb; +                pthread_mutex_unlock(&dht->mtx); -        assert(dht); -        assert(sdb); +                i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); -        msg = kad_msg__unpack(NULL, -                              shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), -                              shm_du_buff_head(sdb)); +                msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -        ipcp_sdb_release(sdb); +                ipcp_sdb_release(cmd->sdb); +                free(cmd); -        free(o); +                if (msg == NULL) { +                        log_err("Failed to unpack message."); +                        continue; +                } -        if (msg == NULL) { -                log_err("Failed to unpack message."); -                return (void *) -1; -        } +                pthread_rwlock_rdlock(&dht->lock); -        pthread_rwlock_rdlock(&dht->lock); +                b        = dht->b; +                t_expire = dht->t_expire; -        b        = dht->b; -        t_expire = dht->t_expire; +                pthread_rwlock_unlock(&dht->lock); -        pthread_rwlock_unlock(&dht->lock); +                if (msg->has_key && msg->key.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad key in message."); +                        continue; +                } -        if (msg->has_key && msg->key.len != b) { -                kad_msg__free_unpacked(msg, NULL); -                log_warn("Bad key in message."); -                return (void *) -1; -        } +                if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad source ID in message of type %d.", +                                 msg->code); +                        continue; +                } -        if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { -                kad_msg__free_unpacked(msg, NULL); -                log_warn("Bad source ID in message of type %d.", msg->code); -                return (void *) -1; -        } +                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_dbg("Got a request message when not running."); +                        continue; +                } -        if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { -                kad_msg__free_unpacked(msg, NULL); -                log_dbg("Got a request message when not running."); -                return (void *) -1; -        } +                tpm_dec(dht->tpm); -        addr = msg->s_addr; +                addr = msg->s_addr; -        resp_msg.code   = KAD_RESPONSE; -        resp_msg.cookie = msg->cookie; +                resp_msg.code   = KAD_RESPONSE; +                resp_msg.cookie = msg->cookie; -        switch(msg->code) { -        case KAD_JOIN: -                /* Refuse enrollee on check fails. */ -                if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { -                        log_warn("Parameter mismatch. " -                                 "DHT enrolment refused."); -                        break; -                } +                switch(msg->code) { +                case KAD_JOIN: +                        /* Refuse enrollee on check fails. */ +                        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { +                                log_warn("Parameter mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } -                if (msg->t_replicate != KAD_T_REPL) { -                        log_warn("Replication time mismatch. " -                                 "DHT enrolment refused."); +                        if (msg->t_replicate != KAD_T_REPL) { +                                log_warn("Replication time mismatch. " +                                         "DHT enrolment refused."); -                        break; +                                break;                  } -                if (msg->t_refresh != KAD_T_REFR) { -                        log_warn("Refresh time mismatch. " -                                 "DHT enrolment refused."); +                        if (msg->t_refresh != KAD_T_REFR) { +                                log_warn("Refresh time mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } + +                        resp_msg.has_alpha       = true; +                        resp_msg.has_b           = true; +                        resp_msg.has_k           = true; +                        resp_msg.has_t_expire    = true; +                        resp_msg.has_t_refresh   = true; +                        resp_msg.has_t_replicate = true; +                        resp_msg.alpha           = KAD_ALPHA; +                        resp_msg.b               = b; +                        resp_msg.k               = KAD_K; +                        resp_msg.t_expire        = t_expire; +                        resp_msg.t_refresh       = KAD_T_REFR; +                        resp_msg.t_replicate     = KAD_T_REPL;                          break; -                } +                case KAD_FIND_VALUE: +                        buf = dht_retrieve(dht, msg->key.data); +                        if (buf.len != 0) { +                                resp_msg.n_addrs = buf.len; +                                resp_msg.addrs   = (uint64_t *) buf.data; +                                break; +                        } +                        /* FALLTHRU */ +                case KAD_FIND_NODE: +                        /* Return k closest contacts. */ +                        resp_msg.n_contacts = +                                dht_get_contacts(dht, msg->key.data, &cmsgs); +                        resp_msg.contacts = cmsgs; +                        break; +                case KAD_STORE: +                        if (msg->n_contacts < 1) { +                                log_warn("No contacts in store message."); +                                break; +                        } -                resp_msg.has_alpha       = true; -                resp_msg.has_b           = true; -                resp_msg.has_k           = true; -                resp_msg.has_t_expire    = true; -                resp_msg.has_t_refresh   = true; -                resp_msg.has_t_replicate = true; -                resp_msg.alpha           = KAD_ALPHA; -                resp_msg.b               = b; -                resp_msg.k               = KAD_K; -                resp_msg.t_expire        = t_expire; -                resp_msg.t_refresh       = KAD_T_REFR; -                resp_msg.t_replicate     = KAD_T_REPL; -                break; -        case KAD_FIND_VALUE: -                buf = dht_retrieve(dht, msg->key.data); -                if (buf.len != 0) { -                        resp_msg.n_addrs = buf.len; -                        resp_msg.addrs   = (uint64_t *) buf.data; +                        if (!msg->has_t_expire) { +                                log_warn("No expiry time in store message."); +                                break; +                        } + +                        kad_add(dht, *msg->contacts, msg->n_contacts, +                                msg->t_expire);                          break; -                } -                /* FALLTHRU */ -        case KAD_FIND_NODE: -                /* Return k closest contacts. */ -                resp_msg.n_contacts = -                        dht_get_contacts(dht, msg->key.data, &cmsgs); -                resp_msg.contacts = cmsgs; -                break; -        case KAD_STORE: -                if (msg->n_contacts < 1) { -                        log_warn("No contacts in store message."); +                case KAD_RESPONSE: +                        kad_handle_response(dht, msg); +                        break; +                default: +                        assert(false);                          break;                  } -                if (!msg->has_t_expire) { -                        log_warn("No expiry time in store message."); -                        break; +                if (msg->code != KAD_JOIN) { +                        pthread_rwlock_wrlock(&dht->lock); +                        if (dht_update_bucket(dht, msg->s_id.data, addr)) +                                log_warn("Failed to update bucket."); +                        pthread_rwlock_unlock(&dht->lock);                  } -                kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); -                break; -        case KAD_RESPONSE: -                kad_handle_response(dht, msg); -                break; -        default: -                assert(false); -                break; -        } +                if (msg->code < KAD_STORE) { +                        if (send_msg(dht, &resp_msg, addr)) +                                log_warn("Failed to send response."); +                } -        if (msg->code != KAD_JOIN) { -                pthread_rwlock_wrlock(&dht->lock); -                if (dht_update_bucket(dht, msg->s_id.data, addr)) -                        log_warn("Failed to update bucket."); -                pthread_rwlock_unlock(&dht->lock); -        } +                kad_msg__free_unpacked(msg, NULL); -        if (msg->code < KAD_STORE) { -                if (send_msg(dht, &resp_msg, addr)) -                        log_warn("Failed to send response."); -        } +                if (resp_msg.n_addrs > 0) +                        free(resp_msg.addrs); -        kad_msg__free_unpacked(msg, NULL); +                if (resp_msg.n_contacts == 0) { +                        tpm_inc(dht->tpm); +                        continue; +                } -        if (resp_msg.n_addrs > 0) -                free(resp_msg.addrs); +                for (i = 0; i < resp_msg.n_contacts; ++i) +                        kad_contact_msg__free_unpacked(resp_msg.contacts[i], +                                                       NULL); +                free(resp_msg.contacts); -        if (resp_msg.n_contacts == 0) -                return (void *) -1; +                tpm_inc(dht->tpm); +        } -        for (i = 0; i < resp_msg.n_contacts; ++i) -                kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); -        free(resp_msg.contacts); +        tpm_exit(dht->tpm);          return (void *) 0;  } @@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o)  static void dht_post_sdu(void *               comp,                           struct shm_du_buff * sdb)  { -        pthread_t         thr; -        struct sdu_info * info; +        struct cmd * cmd; +        struct dht * dht = (struct dht *) comp; -        info = malloc(sizeof(*info)); -        if (info == NULL) +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory.");                  return; +        } -        info->dht = (struct dht *) comp; -        info->sdb = sdb; +        cmd->sdb = sdb; -        if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { -                free(info); -                return; -        } +        pthread_mutex_lock(&dht->mtx); + +        list_add(&cmd->next, &dht->cmds); -        pthread_detach(thr); +        pthread_cond_signal(&dht->cond); + +        pthread_mutex_unlock(&dht->mtx);  }  void dht_destroy(struct dht * dht) @@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht)          if (dht == NULL)                  return; +#ifndef __DHT_TEST__ +        tpm_stop(dht->tpm); + +        tpm_destroy(dht->tpm); +#endif          if (dht_get_state(dht) == DHT_RUNNING) {                  dht_set_state(dht, DHT_SHUTDOWN);                  pthread_cancel(dht->worker); @@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr)          list_head_init(&dht->requests);          list_head_init(&dht->refs);          list_head_init(&dht->lookups); +        list_head_init(&dht->cmds);          if (pthread_rwlock_init(&dht->lock, NULL))                  goto fail_rwlock; @@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr)          dht->addr = addr;          dht->id   = NULL;  #ifndef __DHT_TEST__ +        dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); +        if (dht->tpm == NULL) +                goto fail_tpm_create; + +        if (tpm_start(dht->tpm)) +                goto fail_tpm_start; +          dht->fd   = dt_reg_ae(dht, &dht_post_sdu);          notifier_reg(handle_event, dht);  #else          (void) handle_event; +        (void) dht_handle_sdu;          (void) dht_post_sdu;  #endif          dht->state = DHT_INIT;          return dht; - +#ifndef __DHT_TEST__ + fail_tpm_start: +        tpm_destroy(dht->tpm); + fail_tpm_create: +        bmp_destroy(dht->cookies); +#endif   fail_bmp:          pthread_cond_destroy(&dht->cond);   fail_cond: | 
