diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-24 14:45:18 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-09-24 14:45:18 +0000 | 
| commit | 22662b66f5802cd85a973e083c039500ccd2dd5e (patch) | |
| tree | 17f66b04659a06c018494eb732adb661111d63f2 /src/ipcpd/normal | |
| parent | 7cef269be64f64b920763c6f2455931422c8bfe9 (diff) | |
| parent | ff5063ad0e7902ce59864a466bd9d8d606d788e4 (diff) | |
| download | ouroboros-22662b66f5802cd85a973e083c039500ccd2dd5e.tar.gz ouroboros-22662b66f5802cd85a973e083c039500ccd2dd5e.zip | |
Merged in dstaesse/ouroboros/be-dht-debugging (pull request #615)
ipcpd: Fix compilation of DHT
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 339 | 
1 files changed, 203 insertions, 136 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 548ae03a..93fd4e4e 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -34,6 +34,7 @@  #include <ouroboros/notifier.h>  #include <ouroboros/random.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h>  #include <ouroboros/utils.h>  #include "connmgr.h" @@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t;  #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */  #define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */  #define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */ +#define HANDLE_TIMEO  1000 /* Timeout for dht_handle_sdu tpm check (ms)  */  enum dht_state {          DHT_INIT = 0, @@ -187,6 +189,12 @@ struct bucket {          struct bucket *  children[1L << KAD_BETA];  }; +struct cmd { +        struct list_head     next; + +        struct shm_du_buff * sdb; +}; +  struct dht {          size_t           alpha;          size_t           b; @@ -212,6 +220,7 @@ struct dht {          struct bmp *     cookies;          enum dht_state   state; +        struct list_head cmds;          pthread_cond_t   cond;          pthread_mutex_t  mtx; @@ -219,6 +228,8 @@ struct dht {          int              fd; +        struct tpm *     tpm; +          pthread_t        worker;  }; @@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht,          ipcp_sdb_release(sdb);  #endif   fail_msg: +        pthread_rwlock_wrlock(&dht->lock);          bmp_release(dht->cookies, msg->cookie); +        pthread_rwlock_unlock(&dht->lock);   fail_bmp_alloc:          return -1;  } @@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht *          dht,          list_head_init(&l); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_wrlock(&dht->lock);          len = dht_contact_list(dht, &l, key);          if (len == 0) { @@ -1898,9 +1911,13 @@ static void * work(void * o)          dht = (struct dht *) o; +        pthread_rwlock_rdlock(&dht->lock); +          intv = gcd(dht->t_expire, dht->t_repub);          intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; +        pthread_rwlock_unlock(&dht->lock); +          list_head_init(&reflist);          while (true) { @@ -2189,7 +2206,7 @@ int dht_reg(struct dht *    dht,          pthread_rwlock_unlock(&dht->lock); -        kad_publish(dht, key, >addr, t_expire); +        kad_publish(dht, key, addr, t_expire);          return 0;  } @@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht *    dht,  static void * dht_handle_sdu(void * o)  { -        struct dht *         dht; -        struct shm_du_buff * sdb; -        kad_msg_t *          msg; -        kad_contact_msg_t ** cmsgs; -        kad_msg_t            resp_msg = KAD_MSG__INIT; -        uint64_t             addr; -        buffer_t             buf; -        size_t               i; -        size_t               b; -        size_t               t_expire; +        struct dht *    dht = (struct dht *) o; +        struct timespec dl; +        struct timespec to = {(HANDLE_TIMEO / 1000), +                              (HANDLE_TIMEO % 1000) * MILLION}; +        assert(dht); -        assert(o); +        while (true) { +                kad_msg_t *          msg; +                kad_contact_msg_t ** cmsgs; +                kad_msg_t            resp_msg = KAD_MSG__INIT; +                uint64_t             addr; +                buffer_t             buf; +                size_t               i; +                size_t               b; +                size_t               t_expire; +                struct cmd *         cmd; +                int                  ret = 0; + +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_mutex_lock(&dht->mtx); + +                while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) +                        ret = -pthread_cond_timedwait(&dht->cond, +                                                      &dht->mtx, &dl); + +                if (ret == -ETIMEDOUT) { +                        pthread_mutex_unlock(&dht->mtx); +                        if (tpm_check(dht->tpm)) +                                break; +                        continue; +                } -        memset(&buf, 0, sizeof(buf)); +                cmd = list_last_entry(&dht->cmds, struct cmd, next); +                list_del(&cmd->next); -        dht = ((struct sdu_info *) o)->dht; -        sdb = ((struct sdu_info *) o)->sdb; +                pthread_mutex_unlock(&dht->mtx); -        assert(dht); -        assert(sdb); +                i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); -        msg = kad_msg__unpack(NULL, -                              shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), -                              shm_du_buff_head(sdb)); +                msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -        ipcp_sdb_release(sdb); +                ipcp_sdb_release(cmd->sdb); +                free(cmd); -        free(o); +                if (msg == NULL) { +                        log_err("Failed to unpack message."); +                        continue; +                } -        if (msg == NULL) { -                log_err("Failed to unpack message."); -                return (void *) -1; -        } +                pthread_rwlock_rdlock(&dht->lock); -        pthread_rwlock_rdlock(&dht->lock); +                b        = dht->b; +                t_expire = dht->t_expire; -        b        = dht->b; -        t_expire = dht->t_expire; +                pthread_rwlock_unlock(&dht->lock); -        pthread_rwlock_unlock(&dht->lock); +                if (msg->has_key && msg->key.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad key in message."); +                        continue; +                } -        if (msg->has_key && msg->key.len != b) { -                kad_msg__free_unpacked(msg, NULL); -                log_warn("Bad key in message."); -                return (void *) -1; -        } +                if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad source ID in message of type %d.", +                                 msg->code); +                        continue; +                } -        if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { -                kad_msg__free_unpacked(msg, NULL); -                log_warn("Bad source ID in message of type %d.", msg->code); -                return (void *) -1; -        } +                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_dbg("Got a request message when not running."); +                        continue; +                } -        if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { -                kad_msg__free_unpacked(msg, NULL); -                log_dbg("Got a request message when not running."); -                return (void *) -1; -        } +                tpm_dec(dht->tpm); -        addr = msg->s_addr; +                addr = msg->s_addr; -        resp_msg.code   = KAD_RESPONSE; -        resp_msg.cookie = msg->cookie; +                resp_msg.code   = KAD_RESPONSE; +                resp_msg.cookie = msg->cookie; -        switch(msg->code) { -        case KAD_JOIN: -                /* Refuse enrollee on check fails. */ -                if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { -                        log_warn("Parameter mismatch. " -                                 "DHT enrolment refused."); -                        break; -                } +                switch(msg->code) { +                case KAD_JOIN: +                        /* Refuse enrollee on check fails. */ +                        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { +                                log_warn("Parameter mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } -                if (msg->t_replicate != KAD_T_REPL) { -                        log_warn("Replication time mismatch. " -                                 "DHT enrolment refused."); +                        if (msg->t_replicate != KAD_T_REPL) { +                                log_warn("Replication time mismatch. " +                                         "DHT enrolment refused."); -                        break; +                                break;                  } -                if (msg->t_refresh != KAD_T_REFR) { -                        log_warn("Refresh time mismatch. " -                                 "DHT enrolment refused."); +                        if (msg->t_refresh != KAD_T_REFR) { +                                log_warn("Refresh time mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } + +                        resp_msg.has_alpha       = true; +                        resp_msg.has_b           = true; +                        resp_msg.has_k           = true; +                        resp_msg.has_t_expire    = true; +                        resp_msg.has_t_refresh   = true; +                        resp_msg.has_t_replicate = true; +                        resp_msg.alpha           = KAD_ALPHA; +                        resp_msg.b               = b; +                        resp_msg.k               = KAD_K; +                        resp_msg.t_expire        = t_expire; +                        resp_msg.t_refresh       = KAD_T_REFR; +                        resp_msg.t_replicate     = KAD_T_REPL;                          break; -                } +                case KAD_FIND_VALUE: +                        buf = dht_retrieve(dht, msg->key.data); +                        if (buf.len != 0) { +                                resp_msg.n_addrs = buf.len; +                                resp_msg.addrs   = (uint64_t *) buf.data; +                                break; +                        } +                        /* FALLTHRU */ +                case KAD_FIND_NODE: +                        /* Return k closest contacts. */ +                        resp_msg.n_contacts = +                                dht_get_contacts(dht, msg->key.data, &cmsgs); +                        resp_msg.contacts = cmsgs; +                        break; +                case KAD_STORE: +                        if (msg->n_contacts < 1) { +                                log_warn("No contacts in store message."); +                                break; +                        } -                resp_msg.has_alpha       = true; -                resp_msg.has_b           = true; -                resp_msg.has_k           = true; -                resp_msg.has_t_expire    = true; -                resp_msg.has_t_refresh   = true; -                resp_msg.has_t_replicate = true; -                resp_msg.alpha           = KAD_ALPHA; -                resp_msg.b               = b; -                resp_msg.k               = KAD_K; -                resp_msg.t_expire        = t_expire; -                resp_msg.t_refresh       = KAD_T_REFR; -                resp_msg.t_replicate     = KAD_T_REPL; -                break; -        case KAD_FIND_VALUE: -                buf = dht_retrieve(dht, msg->key.data); -                if (buf.len != 0) { -                        resp_msg.n_addrs = buf.len; -                        resp_msg.addrs   = (uint64_t *) buf.data; +                        if (!msg->has_t_expire) { +                                log_warn("No expiry time in store message."); +                                break; +                        } + +                        kad_add(dht, *msg->contacts, msg->n_contacts, +                                msg->t_expire);                          break; -                } -                /* FALLTHRU */ -        case KAD_FIND_NODE: -                /* Return k closest contacts. */ -                resp_msg.n_contacts = -                        dht_get_contacts(dht, msg->key.data, &cmsgs); -                resp_msg.contacts = cmsgs; -                break; -        case KAD_STORE: -                if (msg->n_contacts < 1) { -                        log_warn("No contacts in store message."); +                case KAD_RESPONSE: +                        kad_handle_response(dht, msg); +                        break; +                default: +                        assert(false);                          break;                  } -                if (!msg->has_t_expire) { -                        log_warn("No expiry time in store message."); -                        break; +                if (msg->code != KAD_JOIN) { +                        pthread_rwlock_wrlock(&dht->lock); +                        if (dht_update_bucket(dht, msg->s_id.data, addr)) +                                log_warn("Failed to update bucket."); +                        pthread_rwlock_unlock(&dht->lock);                  } -                kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); -                break; -        case KAD_RESPONSE: -                kad_handle_response(dht, msg); -                break; -        default: -                assert(false); -                break; -        } +                if (msg->code < KAD_STORE) { +                        if (send_msg(dht, &resp_msg, addr)) +                                log_warn("Failed to send response."); +                } -        if (msg->code != KAD_JOIN) { -                pthread_rwlock_wrlock(&dht->lock); -                if (dht_update_bucket(dht, msg->s_id.data, addr)) -                        log_warn("Failed to update bucket."); -                pthread_rwlock_unlock(&dht->lock); -        } +                kad_msg__free_unpacked(msg, NULL); -        if (msg->code < KAD_STORE) { -                if (send_msg(dht, &resp_msg, addr)) -                        log_warn("Failed to send response."); -        } +                if (resp_msg.n_addrs > 0) +                        free(resp_msg.addrs); -        kad_msg__free_unpacked(msg, NULL); +                if (resp_msg.n_contacts == 0) { +                        tpm_inc(dht->tpm); +                        continue; +                } -        if (resp_msg.n_addrs > 0) -                free(resp_msg.addrs); +                for (i = 0; i < resp_msg.n_contacts; ++i) +                        kad_contact_msg__free_unpacked(resp_msg.contacts[i], +                                                       NULL); +                free(resp_msg.contacts); -        if (resp_msg.n_contacts == 0) -                return (void *) -1; +                tpm_inc(dht->tpm); +        } -        for (i = 0; i < resp_msg.n_contacts; ++i) -                kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); -        free(resp_msg.contacts); +        tpm_exit(dht->tpm);          return (void *) 0;  } @@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o)  static void dht_post_sdu(void *               comp,                           struct shm_du_buff * sdb)  { -        pthread_t         thr; -        struct sdu_info * info; +        struct cmd * cmd; +        struct dht * dht = (struct dht *) comp; -        info = malloc(sizeof(*info)); -        if (info == NULL) +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory.");                  return; +        } -        info->dht = (struct dht *) comp; -        info->sdb = sdb; +        cmd->sdb = sdb; -        if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { -                free(info); -                return; -        } +        pthread_mutex_lock(&dht->mtx); + +        list_add(&cmd->next, &dht->cmds); -        pthread_detach(thr); +        pthread_cond_signal(&dht->cond); + +        pthread_mutex_unlock(&dht->mtx);  }  void dht_destroy(struct dht * dht) @@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht)          if (dht == NULL)                  return; +#ifndef __DHT_TEST__ +        tpm_stop(dht->tpm); + +        tpm_destroy(dht->tpm); +#endif          if (dht_get_state(dht) == DHT_RUNNING) {                  dht_set_state(dht, DHT_SHUTDOWN);                  pthread_cancel(dht->worker); @@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr)          list_head_init(&dht->requests);          list_head_init(&dht->refs);          list_head_init(&dht->lookups); +        list_head_init(&dht->cmds);          if (pthread_rwlock_init(&dht->lock, NULL))                  goto fail_rwlock; @@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr)          dht->addr = addr;          dht->id   = NULL;  #ifndef __DHT_TEST__ +        dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); +        if (dht->tpm == NULL) +                goto fail_tpm_create; + +        if (tpm_start(dht->tpm)) +                goto fail_tpm_start; +          dht->fd   = dt_reg_ae(dht, &dht_post_sdu);          notifier_reg(handle_event, dht);  #else          (void) handle_event; +        (void) dht_handle_sdu;          (void) dht_post_sdu;  #endif          dht->state = DHT_INIT;          return dht; - +#ifndef __DHT_TEST__ + fail_tpm_start: +        tpm_destroy(dht->tpm); + fail_tpm_create: +        bmp_destroy(dht->cookies); +#endif   fail_bmp:          pthread_cond_destroy(&dht->cond);   fail_cond: | 
