diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/ipcp.c | 99 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 9 | ||||
| -rw-r--r-- | src/ipcpd/normal/dht.c | 339 | ||||
| -rw-r--r-- | src/irmd/main.c | 104 | ||||
| -rw-r--r-- | src/lib/tpm.c | 162 | 
5 files changed, 404 insertions, 309 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 85d543da..513c638a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -36,7 +36,6 @@  #include <ouroboros/dev.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/np1_flow.h> -#include <ouroboros/tpm.h>  #include "ipcp.h" @@ -44,6 +43,14 @@  #include <sys/socket.h>  #include <stdlib.h> +struct cmd { +        struct list_head next; + +        uint8_t          cbuf[IPCP_MSG_BUF_SIZE]; +        size_t           len; +        int              fd; +}; +  void ipcp_sig_handler(int         sig,                        siginfo_t * info,                        void *      c) @@ -107,7 +114,8 @@ static void * acceptloop(void * o)          while (ipcp_get_state() != IPCP_SHUTDOWN &&                 ipcp_get_state() != IPCP_NULL) { -                ssize_t count; +                struct cmd * cmd; +  #if defined(__FreeBSD__) || defined(__APPLE__)                  FD_ZERO(&fds);                  FD_SET(ipcpi.sockfd, &fds); @@ -122,25 +130,28 @@ static void * acceptloop(void * o)                                 (void *) &tv, sizeof(tv)))                          log_warn("Failed to set timeout on socket."); -                pthread_mutex_lock(&ipcpi.cmd_lock); +                cmd = malloc(sizeof(*cmd)); +                if (cmd == NULL) { +                        log_err("Out of memory"); +                        break; +                } -                assert(ipcpi.csockfd == -1); +                pthread_mutex_lock(&ipcpi.cmd_lock); -                count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); -                if (count <= 0) { +                cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE); +                if (cmd->len <= 0) {                          pthread_mutex_unlock(&ipcpi.cmd_lock);                          log_err("Failed to read from socket.");                          close(csockfd); +                        free(cmd);                          continue;                  } -                ipcpi.cmd_len = count; -                ipcpi.csockfd = csockfd; +                cmd->fd = csockfd; -                pthread_cond_signal(&ipcpi.cmd_cond); +                list_add(&cmd->next, &ipcpi.cmds); -                while (ipcpi.csockfd != -1) -                        pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock); +                pthread_cond_signal(&ipcpi.cmd_cond);                  pthread_mutex_unlock(&ipcpi.cmd_lock);          } @@ -159,13 +170,15 @@ static void * mainloop(void * o)          struct timespec     dl;          struct timespec     to  = {(IPCP_ACCEPT_TIMEOUT / 1000),                                     (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; -        (void)  o; + +        (void) o;          while (true) {                  int                 ret = 0;                  ipcp_msg_t          ret_msg  = IPCP_MSG__INIT;                  dif_info_msg_t      dif_info = DIF_INFO_MSG__INIT;                  int                 fd       = -1; +                struct cmd *        cmd;                  ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -174,33 +187,34 @@ static void * mainloop(void * o)                  pthread_mutex_lock(&ipcpi.cmd_lock); -                while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) +                while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT)                          ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,                                                        &ipcpi.cmd_lock,                                                        &dl); -                sfd           = ipcpi.csockfd; -                ipcpi.csockfd = -1; - -                if (sfd == -1) { +                if (ret == -ETIMEDOUT) {                          pthread_mutex_unlock(&ipcpi.cmd_lock); -                        if (tpm_check()) +                        if (tpm_check(ipcpi.tpm))                                  break;                          continue;                  } -                pthread_cond_signal(&ipcpi.acc_cond); +                cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_mutex_unlock(&ipcpi.cmd_lock); + +                msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); +                sfd = cmd->fd; + +                free(cmd); -                msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);                  if (msg == NULL) { -                        pthread_mutex_unlock(&ipcpi.cmd_lock);                          close(sfd);                          continue;                  } -                pthread_mutex_unlock(&ipcpi.cmd_lock); - -                tpm_dec(); +                tpm_dec(ipcpi.tpm);                  switch (msg->code) {                  case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -474,7 +488,7 @@ static void * mainloop(void * o)                  if (buffer.len == 0) {                          log_err("Failed to pack reply message");                          close(sfd); -                        tpm_inc(); +                        tpm_inc(ipcpi.tpm);                          continue;                  } @@ -482,7 +496,7 @@ static void * mainloop(void * o)                  if (buffer.data == NULL) {                          log_err("Failed to create reply buffer.");                          close(sfd); -                        tpm_inc(); +                        tpm_inc(ipcpi.tpm);                          continue;                  } @@ -492,17 +506,17 @@ static void * mainloop(void * o)                          log_err("Failed to send reply message");                          free(buffer.data);                          close(sfd); -                        tpm_inc(); +                        tpm_inc(ipcpi.tpm);                          continue;                  }                  free(buffer.data);                  close(sfd); -                tpm_inc(); +                tpm_inc(ipcpi.tpm);          } -        tpm_exit(); +        tpm_exit(ipcpi.tpm);          return (void *) 0;  } @@ -617,20 +631,14 @@ int ipcp_init(int               argc,                  goto fail_cmd_cond;          } -        if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) { -                log_err("Failed to init convar."); -                goto fail_acc_cond; -        } +        list_head_init(&ipcpi.cmds);          ipcpi.alloc_id = -1; -        ipcpi.csockfd  = -1;          pthread_condattr_destroy(&cattr);          return 0; - fail_acc_cond: -        pthread_cond_destroy(&ipcpi.cmd_cond);   fail_cmd_cond:          pthread_mutex_destroy(&ipcpi.cmd_lock);   fail_cmd_lock: @@ -675,12 +683,14 @@ int ipcp_boot()          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) -                goto fail_tpm_init; +        ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, +                               mainloop, NULL); +        if (ipcpi.tpm == NULL) +                goto fail_tpm_create;          pthread_sigmask(SIG_BLOCK, &sigset, NULL); -        if (tpm_start()) +        if (tpm_start(ipcpi.tpm))                  goto fail_tpm_start;          ipcp_set_state(IPCP_INIT); @@ -696,18 +706,18 @@ int ipcp_boot()          return 0;   fail_acceptor: -        tpm_stop(); +        tpm_stop(ipcpi.tpm);   fail_tpm_start: -        tpm_fini(); - fail_tpm_init: +        tpm_destroy(ipcpi.tpm); + fail_tpm_create:          return -1;  }  void ipcp_shutdown()  {          pthread_join(ipcpi.acceptor, NULL); -        tpm_stop(); -        tpm_fini(); +        tpm_stop(ipcpi.tpm); +        tpm_destroy(ipcpi.tpm);          log_info("IPCP %d shutting down.", getpid());  } @@ -724,7 +734,6 @@ void ipcp_fini()          pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_cond_destroy(&ipcpi.alloc_cond);          pthread_mutex_destroy(&ipcpi.alloc_lock); -        pthread_cond_destroy(&ipcpi.acc_cond);          pthread_cond_destroy(&ipcpi.cmd_cond);          pthread_mutex_destroy(&ipcpi.cmd_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1b2a0334..d47d224b 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -25,8 +25,10 @@  #include <ouroboros/hash.h>  #include <ouroboros/ipcp.h> +#include <ouroboros/list.h>  #include <ouroboros/qoscube.h>  #include <ouroboros/sockets.h> +#include <ouroboros/tpm.h>  #include <pthread.h>  #include <time.h> @@ -92,10 +94,7 @@ struct ipcp {          int                sockfd;          char *             sock_path; -        uint8_t            cbuf[IPCP_MSG_BUF_SIZE]; -        size_t             cmd_len; -        int                csockfd; -        pthread_cond_t     acc_cond; +        struct list_head   cmds;          pthread_cond_t     cmd_cond;          pthread_mutex_t    cmd_lock; @@ -103,6 +102,8 @@ struct ipcp {          pthread_cond_t     alloc_cond;          pthread_mutex_t    alloc_lock; +        struct tpm *       tpm; +          pthread_t          acceptor;  } ipcpi; diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 548ae03a..93fd4e4e 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -34,6 +34,7 @@  #include <ouroboros/notifier.h>  #include <ouroboros/random.h>  #include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h>  #include <ouroboros/utils.h>  #include "connmgr.h" @@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t;  #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */  #define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */  #define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */ +#define HANDLE_TIMEO  1000 /* Timeout for dht_handle_sdu tpm check (ms)  */  enum dht_state {          DHT_INIT = 0, @@ -187,6 +189,12 @@ struct bucket {          struct bucket *  children[1L << KAD_BETA];  }; +struct cmd { +        struct list_head     next; + +        struct shm_du_buff * sdb; +}; +  struct dht {          size_t           alpha;          size_t           b; @@ -212,6 +220,7 @@ struct dht {          struct bmp *     cookies;          enum dht_state   state; +        struct list_head cmds;          pthread_cond_t   cond;          pthread_mutex_t  mtx; @@ -219,6 +228,8 @@ struct dht {          int              fd; +        struct tpm *     tpm; +          pthread_t        worker;  }; @@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht,          ipcp_sdb_release(sdb);  #endif   fail_msg: +        pthread_rwlock_wrlock(&dht->lock);          bmp_release(dht->cookies, msg->cookie); +        pthread_rwlock_unlock(&dht->lock);   fail_bmp_alloc:          return -1;  } @@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht *          dht,          list_head_init(&l); -        pthread_rwlock_rdlock(&dht->lock); +        pthread_rwlock_wrlock(&dht->lock);          len = dht_contact_list(dht, &l, key);          if (len == 0) { @@ -1898,9 +1911,13 @@ static void * work(void * o)          dht = (struct dht *) o; +        pthread_rwlock_rdlock(&dht->lock); +          intv = gcd(dht->t_expire, dht->t_repub);          intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; +        pthread_rwlock_unlock(&dht->lock); +          list_head_init(&reflist);          while (true) { @@ -2189,7 +2206,7 @@ int dht_reg(struct dht *    dht,          pthread_rwlock_unlock(&dht->lock); -        kad_publish(dht, key, >addr, t_expire); +        kad_publish(dht, key, addr, t_expire);          return 0;  } @@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht *    dht,  static void * dht_handle_sdu(void * o)  { -        struct dht *         dht; -        struct shm_du_buff * sdb; -        kad_msg_t *          msg; -        kad_contact_msg_t ** cmsgs; -        kad_msg_t            resp_msg = KAD_MSG__INIT; -        uint64_t             addr; -        buffer_t             buf; -        size_t               i; -        size_t               b; -        size_t               t_expire; +        struct dht *    dht = (struct dht *) o; +        struct timespec dl; +        struct timespec to = {(HANDLE_TIMEO / 1000), +                              (HANDLE_TIMEO % 1000) * MILLION}; +        assert(dht); -        assert(o); +        while (true) { +                kad_msg_t *          msg; +                kad_contact_msg_t ** cmsgs; +                kad_msg_t            resp_msg = KAD_MSG__INIT; +                uint64_t             addr; +                buffer_t             buf; +                size_t               i; +                size_t               b; +                size_t               t_expire; +                struct cmd *         cmd; +                int                  ret = 0; + +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_mutex_lock(&dht->mtx); + +                while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) +                        ret = -pthread_cond_timedwait(&dht->cond, +                                                      &dht->mtx, &dl); + +                if (ret == -ETIMEDOUT) { +                        pthread_mutex_unlock(&dht->mtx); +                        if (tpm_check(dht->tpm)) +                                break; +                        continue; +                } -        memset(&buf, 0, sizeof(buf)); +                cmd = list_last_entry(&dht->cmds, struct cmd, next); +                list_del(&cmd->next); -        dht = ((struct sdu_info *) o)->dht; -        sdb = ((struct sdu_info *) o)->sdb; +                pthread_mutex_unlock(&dht->mtx); -        assert(dht); -        assert(sdb); +                i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); -        msg = kad_msg__unpack(NULL, -                              shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), -                              shm_du_buff_head(sdb)); +                msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); -        ipcp_sdb_release(sdb); +                ipcp_sdb_release(cmd->sdb); +                free(cmd); -        free(o); +                if (msg == NULL) { +                        log_err("Failed to unpack message."); +                        continue; +                } -        if (msg == NULL) { -                log_err("Failed to unpack message."); -                return (void *) -1; -        } +                pthread_rwlock_rdlock(&dht->lock); -        pthread_rwlock_rdlock(&dht->lock); +                b        = dht->b; +                t_expire = dht->t_expire; -        b        = dht->b; -        t_expire = dht->t_expire; +                pthread_rwlock_unlock(&dht->lock); -        pthread_rwlock_unlock(&dht->lock); +                if (msg->has_key && msg->key.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad key in message."); +                        continue; +                } -        if (msg->has_key && msg->key.len != b) { -                kad_msg__free_unpacked(msg, NULL); -                log_warn("Bad key in message."); -                return (void *) -1; -        } +                if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad source ID in message of type %d.", +                                 msg->code); +                        continue; +                } -        if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { -                kad_msg__free_unpacked(msg, NULL); -                log_warn("Bad source ID in message of type %d.", msg->code); -                return (void *) -1; -        } +                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_dbg("Got a request message when not running."); +                        continue; +                } -        if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { -                kad_msg__free_unpacked(msg, NULL); -                log_dbg("Got a request message when not running."); -                return (void *) -1; -        } +                tpm_dec(dht->tpm); -        addr = msg->s_addr; +                addr = msg->s_addr; -        resp_msg.code   = KAD_RESPONSE; -        resp_msg.cookie = msg->cookie; +                resp_msg.code   = KAD_RESPONSE; +                resp_msg.cookie = msg->cookie; -        switch(msg->code) { -        case KAD_JOIN: -                /* Refuse enrollee on check fails. */ -                if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { -                        log_warn("Parameter mismatch. " -                                 "DHT enrolment refused."); -                        break; -                } +                switch(msg->code) { +                case KAD_JOIN: +                        /* Refuse enrollee on check fails. */ +                        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { +                                log_warn("Parameter mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } -                if (msg->t_replicate != KAD_T_REPL) { -                        log_warn("Replication time mismatch. " -                                 "DHT enrolment refused."); +                        if (msg->t_replicate != KAD_T_REPL) { +                                log_warn("Replication time mismatch. " +                                         "DHT enrolment refused."); -                        break; +                                break;                  } -                if (msg->t_refresh != KAD_T_REFR) { -                        log_warn("Refresh time mismatch. " -                                 "DHT enrolment refused."); +                        if (msg->t_refresh != KAD_T_REFR) { +                                log_warn("Refresh time mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } + +                        resp_msg.has_alpha       = true; +                        resp_msg.has_b           = true; +                        resp_msg.has_k           = true; +                        resp_msg.has_t_expire    = true; +                        resp_msg.has_t_refresh   = true; +                        resp_msg.has_t_replicate = true; +                        resp_msg.alpha           = KAD_ALPHA; +                        resp_msg.b               = b; +                        resp_msg.k               = KAD_K; +                        resp_msg.t_expire        = t_expire; +                        resp_msg.t_refresh       = KAD_T_REFR; +                        resp_msg.t_replicate     = KAD_T_REPL;                          break; -                } +                case KAD_FIND_VALUE: +                        buf = dht_retrieve(dht, msg->key.data); +                        if (buf.len != 0) { +                                resp_msg.n_addrs = buf.len; +                                resp_msg.addrs   = (uint64_t *) buf.data; +                                break; +                        } +                        /* FALLTHRU */ +                case KAD_FIND_NODE: +                        /* Return k closest contacts. */ +                        resp_msg.n_contacts = +                                dht_get_contacts(dht, msg->key.data, &cmsgs); +                        resp_msg.contacts = cmsgs; +                        break; +                case KAD_STORE: +                        if (msg->n_contacts < 1) { +                                log_warn("No contacts in store message."); +                                break; +                        } -                resp_msg.has_alpha       = true; -                resp_msg.has_b           = true; -                resp_msg.has_k           = true; -                resp_msg.has_t_expire    = true; -                resp_msg.has_t_refresh   = true; -                resp_msg.has_t_replicate = true; -                resp_msg.alpha           = KAD_ALPHA; -                resp_msg.b               = b; -                resp_msg.k               = KAD_K; -                resp_msg.t_expire        = t_expire; -                resp_msg.t_refresh       = KAD_T_REFR; -                resp_msg.t_replicate     = KAD_T_REPL; -                break; -        case KAD_FIND_VALUE: -                buf = dht_retrieve(dht, msg->key.data); -                if (buf.len != 0) { -                        resp_msg.n_addrs = buf.len; -                        resp_msg.addrs   = (uint64_t *) buf.data; +                        if (!msg->has_t_expire) { +                                log_warn("No expiry time in store message."); +                                break; +                        } + +                        kad_add(dht, *msg->contacts, msg->n_contacts, +                                msg->t_expire);                          break; -                } -                /* FALLTHRU */ -        case KAD_FIND_NODE: -                /* Return k closest contacts. */ -                resp_msg.n_contacts = -                        dht_get_contacts(dht, msg->key.data, &cmsgs); -                resp_msg.contacts = cmsgs; -                break; -        case KAD_STORE: -                if (msg->n_contacts < 1) { -                        log_warn("No contacts in store message."); +                case KAD_RESPONSE: +                        kad_handle_response(dht, msg); +                        break; +                default: +                        assert(false);                          break;                  } -                if (!msg->has_t_expire) { -                        log_warn("No expiry time in store message."); -                        break; +                if (msg->code != KAD_JOIN) { +                        pthread_rwlock_wrlock(&dht->lock); +                        if (dht_update_bucket(dht, msg->s_id.data, addr)) +                                log_warn("Failed to update bucket."); +                        pthread_rwlock_unlock(&dht->lock);                  } -                kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); -                break; -        case KAD_RESPONSE: -                kad_handle_response(dht, msg); -                break; -        default: -                assert(false); -                break; -        } +                if (msg->code < KAD_STORE) { +                        if (send_msg(dht, &resp_msg, addr)) +                                log_warn("Failed to send response."); +                } -        if (msg->code != KAD_JOIN) { -                pthread_rwlock_wrlock(&dht->lock); -                if (dht_update_bucket(dht, msg->s_id.data, addr)) -                        log_warn("Failed to update bucket."); -                pthread_rwlock_unlock(&dht->lock); -        } +                kad_msg__free_unpacked(msg, NULL); -        if (msg->code < KAD_STORE) { -                if (send_msg(dht, &resp_msg, addr)) -                        log_warn("Failed to send response."); -        } +                if (resp_msg.n_addrs > 0) +                        free(resp_msg.addrs); -        kad_msg__free_unpacked(msg, NULL); +                if (resp_msg.n_contacts == 0) { +                        tpm_inc(dht->tpm); +                        continue; +                } -        if (resp_msg.n_addrs > 0) -                free(resp_msg.addrs); +                for (i = 0; i < resp_msg.n_contacts; ++i) +                        kad_contact_msg__free_unpacked(resp_msg.contacts[i], +                                                       NULL); +                free(resp_msg.contacts); -        if (resp_msg.n_contacts == 0) -                return (void *) -1; +                tpm_inc(dht->tpm); +        } -        for (i = 0; i < resp_msg.n_contacts; ++i) -                kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); -        free(resp_msg.contacts); +        tpm_exit(dht->tpm);          return (void *) 0;  } @@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o)  static void dht_post_sdu(void *               comp,                           struct shm_du_buff * sdb)  { -        pthread_t         thr; -        struct sdu_info * info; +        struct cmd * cmd; +        struct dht * dht = (struct dht *) comp; -        info = malloc(sizeof(*info)); -        if (info == NULL) +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory.");                  return; +        } -        info->dht = (struct dht *) comp; -        info->sdb = sdb; +        cmd->sdb = sdb; -        if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { -                free(info); -                return; -        } +        pthread_mutex_lock(&dht->mtx); + +        list_add(&cmd->next, &dht->cmds); -        pthread_detach(thr); +        pthread_cond_signal(&dht->cond); + +        pthread_mutex_unlock(&dht->mtx);  }  void dht_destroy(struct dht * dht) @@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht)          if (dht == NULL)                  return; +#ifndef __DHT_TEST__ +        tpm_stop(dht->tpm); + +        tpm_destroy(dht->tpm); +#endif          if (dht_get_state(dht) == DHT_RUNNING) {                  dht_set_state(dht, DHT_SHUTDOWN);                  pthread_cancel(dht->worker); @@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr)          list_head_init(&dht->requests);          list_head_init(&dht->refs);          list_head_init(&dht->lookups); +        list_head_init(&dht->cmds);          if (pthread_rwlock_init(&dht->lock, NULL))                  goto fail_rwlock; @@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr)          dht->addr = addr;          dht->id   = NULL;  #ifndef __DHT_TEST__ +        dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); +        if (dht->tpm == NULL) +                goto fail_tpm_create; + +        if (tpm_start(dht->tpm)) +                goto fail_tpm_start; +          dht->fd   = dt_reg_ae(dht, &dht_post_sdu);          notifier_reg(handle_event, dht);  #else          (void) handle_event; +        (void) dht_handle_sdu;          (void) dht_post_sdu;  #endif          dht->state = DHT_INIT;          return dht; - +#ifndef __DHT_TEST__ + fail_tpm_start: +        tpm_destroy(dht->tpm); + fail_tpm_create: +        bmp_destroy(dht->cookies); +#endif   fail_bmp:          pthread_cond_destroy(&dht->cond);   fail_cond: diff --git a/src/irmd/main.c b/src/irmd/main.c index 27c771a6..3fceadb6 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -93,6 +93,14 @@ enum irm_state {          IRMD_RUNNING  }; +struct cmd { +        struct list_head next; + +        uint8_t          cbuf[IB_LEN]; +        size_t           len; +        int              fd; +}; +  struct {          struct list_head     registry;     /* registered names known     */ @@ -112,16 +120,15 @@ struct {          int                  sockfd;       /* UNIX socket                */ -        uint8_t              cbuf[IB_LEN]; /* cmd message buffer         */ -        size_t               cmd_len;      /* length of cmd in cbuf      */ -        int                  csockfd;      /* cmd UNIX socket            */ -        pthread_cond_t       acc_cond;     /* cmd accepted condvar       */ +        struct list_head     cmds;         /* pending commands           */          pthread_cond_t       cmd_cond;     /* cmd signal condvar         */          pthread_mutex_t      cmd_lock;     /* cmd signal lock            */          enum irm_state       state;        /* state of the irmd          */          pthread_rwlock_t     state_lock;   /* lock for the entire irmd   */ +        struct tpm *         tpm;          /* thread pool manager        */ +          pthread_t            irm_sanitize; /* clean up irmd resources    */          pthread_t            shm_sanitize; /* keep track of rdrbuff use  */          pthread_t            acceptor;     /* accept new commands        */ @@ -1631,7 +1638,6 @@ static void irm_fini(void)                  lockfile_destroy(irmd.lf);          pthread_mutex_destroy(&irmd.cmd_lock); -        pthread_cond_destroy(&irmd.acc_cond);          pthread_cond_destroy(&irmd.cmd_cond);          pthread_rwlock_destroy(&irmd.reg_lock);          pthread_rwlock_destroy(&irmd.state_lock); @@ -1864,7 +1870,8 @@ static void * acceptloop(void * o)          (void) o;          while (irmd_get_state() == IRMD_RUNNING) { -                ssize_t count; +                struct cmd * cmd; +  #if defined(__FreeBSD__) || defined(__APPLE__)                  FD_ZERO(&fds);                  FD_SET(irmd.sockfd, &fds); @@ -1879,25 +1886,28 @@ static void * acceptloop(void * o)                                 (void *) &tv, sizeof(tv)))                          log_warn("Failed to set timeout on socket."); -                pthread_mutex_lock(&irmd.cmd_lock); +                cmd = malloc(sizeof(*cmd)); +                if (cmd == NULL) { +                        log_err("Out of memory."); +                        break; +                } -                assert(irmd.csockfd == -1); +                pthread_mutex_lock(&irmd.cmd_lock); -                count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE); -                if (count <= 0) { +                cmd->len = read(csockfd, cmd->cbuf, IRM_MSG_BUF_SIZE); +                if (cmd->len <= 0) {                          pthread_mutex_unlock(&irmd.cmd_lock);                          log_err("Failed to read from socket.");                          close(csockfd); +                        free(cmd);                          continue;                  } -                irmd.cmd_len = count; -                irmd.csockfd = csockfd; +                cmd->fd  = csockfd; -                pthread_cond_signal(&irmd.cmd_cond); +                list_add(&cmd->next, &irmd.cmds); -                while(irmd.csockfd != -1) -                        pthread_cond_wait(&irmd.acc_cond, &irmd.cmd_lock); +                pthread_cond_signal(&irmd.cmd_cond);                  pthread_mutex_unlock(&irmd.cmd_lock);          } @@ -1923,6 +1933,7 @@ void * mainloop(void * o)                  pid_t *           apis    = NULL;                  struct timespec * timeo   = NULL;                  struct timespec   ts      = {0, 0}; +                struct cmd *      cmd;                  ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1931,33 +1942,34 @@ void * mainloop(void * o)                  pthread_mutex_lock(&irmd.cmd_lock); -                while (irmd.csockfd == -1 && ret != -ETIMEDOUT) +                while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT)                          ret = -pthread_cond_timedwait(&irmd.cmd_cond,                                                        &irmd.cmd_lock,                                                        &dl); -                sfd          = irmd.csockfd; -                irmd.csockfd = -1; - -                if (sfd == -1) { +                if (ret == -ETIMEDOUT) {                          pthread_mutex_unlock(&irmd.cmd_lock); -                        if (tpm_check()) +                        if (tpm_check(irmd.tpm))                                  break;                          continue;                  } -                pthread_cond_signal(&irmd.acc_cond); +                cmd = list_last_entry(&irmd.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_mutex_unlock(&irmd.cmd_lock); + +                msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf); +                sfd = cmd->fd; + +                free(cmd); -                msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf);                  if (msg == NULL) { -                        pthread_mutex_unlock(&irmd.cmd_lock);                          close(sfd);                          continue;                  } -                pthread_mutex_unlock(&irmd.cmd_lock); - -                tpm_dec(); +                tpm_dec(irmd.tpm);                  if (msg->has_timeo_sec) {                          assert(msg->has_timeo_nsec); @@ -2098,7 +2110,7 @@ void * mainloop(void * o)                  if (ret_msg.result == -EPIPE || !ret_msg.has_result) {                          close(sfd); -                        tpm_inc(); +                        tpm_inc(irmd.tpm);                          continue;                  } @@ -2108,7 +2120,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(sfd); -                        tpm_inc(); +                        tpm_inc(irmd.tpm);                          continue;                  } @@ -2117,7 +2129,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(sfd); -                        tpm_inc(); +                        tpm_inc(irmd.tpm);                          continue;                  } @@ -2133,10 +2145,10 @@ void * mainloop(void * o)                  free(buffer.data);                  close(sfd); -                tpm_inc(); +                tpm_inc(irmd.tpm);          } -        tpm_exit(); +        tpm_exit(irmd.tpm);          return (void *) 0;  } @@ -2184,12 +2196,6 @@ static int irm_init(void)                  goto fail_cmd_cond;          } -        if (pthread_cond_init(&irmd.acc_cond, &cattr)) { -                log_err("Failed to initialize condvar."); -                pthread_condattr_destroy(&cattr); -                goto fail_acc_cond; -        } -          pthread_condattr_destroy(&cattr);          list_head_init(&irmd.ipcps); @@ -2198,6 +2204,7 @@ static int irm_init(void)          list_head_init(&irmd.spawned_apis);          list_head_init(&irmd.registry);          list_head_init(&irmd.irm_flows); +        list_head_init(&irmd.cmds);          irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0);          if (irmd.port_ids == NULL) { @@ -2272,7 +2279,6 @@ static int irm_init(void)          gcry_control(GCRYCTL_INITIALIZATION_FINISHED);  #endif -        irmd.csockfd = -1;          irmd.state   = IRMD_RUNNING;          log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2294,8 +2300,6 @@ static int irm_init(void)   fail_lockfile:          bmp_destroy(irmd.port_ids);   fail_port_ids: -        pthread_cond_destroy(&irmd.acc_cond); - fail_acc_cond:          pthread_cond_destroy(&irmd.cmd_cond);   fail_cmd_cond:          pthread_mutex_destroy(&irmd.cmd_lock); @@ -2367,12 +2371,14 @@ int main(int     argc,          if (irm_init() < 0)                  goto fail_irm_init; -        if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { +        irmd.tpm = tpm_create(IRMD_MIN_THREADS, IRMD_ADD_THREADS, +                              mainloop, NULL); +        if (irmd.tpm == NULL) {                  irmd_set_state(IRMD_NULL); -                goto fail_tpm_init; +                goto fail_tpm_create;          } -        if (tpm_start()) { +        if (tpm_start(irmd.tpm)) {                  irmd_set_state(IRMD_NULL);                  goto fail_tpm_start;          } @@ -2396,9 +2402,9 @@ int main(int     argc,          pthread_join(irmd.irm_sanitize, NULL);          pthread_join(irmd.shm_sanitize, NULL); -        tpm_stop(); +        tpm_stop(irmd.tpm); -        tpm_fini(); +        tpm_destroy(irmd.tpm);          pthread_sigmask(SIG_BLOCK, &sigset, NULL); @@ -2417,10 +2423,10 @@ int main(int     argc,   fail_shm_sanitize:          pthread_join(irmd.irm_sanitize, NULL);   fail_irm_sanitize: -        tpm_stop(); +        tpm_stop(irmd.tpm);   fail_tpm_start: -        tpm_fini(); - fail_tpm_init: +        tpm_destroy(irmd.tpm); + fail_tpm_create:          irm_fini();   fail_irm_init:          log_fini(); diff --git a/src/lib/tpm.c b/src/lib/tpm.c index dd71d276..c883e0a8 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -50,38 +50,38 @@ enum tpm_state {          TPM_RUNNING  }; -struct { +struct tpm {          size_t           min;          size_t           inc;          size_t           cur;          size_t           wrk;          void * (* func)(void *); +        void *           o;          struct list_head pool;          enum tpm_state   state; -          pthread_cond_t   cond;          pthread_mutex_t  lock;          pthread_t        mgr; -} tpm; +}; -static void tpm_join(void) +static void tpm_join(struct tpm * tpm)  {          struct list_head * p;          struct list_head * h; -        list_for_each_safe(p, h, &tpm.pool) { +        list_for_each_safe(p, h, &tpm->pool) {                  struct pthr_el * e = list_entry(p, struct pthr_el, next); -                if (tpm.state != TPM_RUNNING) { +                if (tpm->state != TPM_RUNNING) {                          if (!e->kill) {                                  e->kill = true; -                                --tpm.cur; +                                --tpm->cur;                          }                          while (!e->join) -                                pthread_cond_wait(&tpm.cond, &tpm.lock); +                                pthread_cond_wait(&tpm->cond, &tpm->lock);                  }                  if (e->join) { @@ -92,12 +92,13 @@ static void tpm_join(void)          }  } -static struct pthr_el * tpm_pthr_el(pthread_t thr) +static struct pthr_el * tpm_pthr_el(struct tpm * tpm, +                                    pthread_t    thr)  {          struct list_head * p;          struct pthr_el *   e; -        list_for_each(p, &tpm.pool) { +        list_for_each(p, &tpm->pool) {                  e = list_entry(p, struct pthr_el, next);                  if (e->thr == thr)                          return e; @@ -109,15 +110,15 @@ static struct pthr_el * tpm_pthr_el(pthread_t thr)          return NULL;  } -static void tpm_kill(void) +static void tpm_kill(struct tpm * tpm)  {          struct list_head * p; -        list_for_each(p, &tpm.pool) { +        list_for_each(p, &tpm->pool) {                  struct pthr_el * e = list_entry(p, struct pthr_el, next);                  if (!e->kill) {                          e->kill = true; -                        --tpm.cur; +                        --tpm->cur;                          return;                  }          } @@ -128,25 +129,25 @@ static void * tpmgr(void * o)          struct timespec dl;          struct timespec to = {(TPM_TIMEOUT / 1000),                                (TPM_TIMEOUT % 1000) * MILLION}; -        (void) o; +        struct tpm * tpm = (struct tpm *) o;          while (true) {                  clock_gettime(PTHREAD_COND_CLOCK, &dl);                  ts_add(&dl, &to, &dl); -                pthread_mutex_lock(&tpm.lock); +                pthread_mutex_lock(&tpm->lock); -                if (tpm.state != TPM_RUNNING) { -                        tpm_join(); -                        pthread_mutex_unlock(&tpm.lock); +                if (tpm->state != TPM_RUNNING) { +                        tpm_join(tpm); +                        pthread_mutex_unlock(&tpm->lock);                          break;                  } -                tpm_join(); +                tpm_join(tpm); -                if (tpm.cur - tpm.wrk < tpm.min) { +                if (tpm->cur - tpm->wrk < tpm->min) {                          size_t i; -                        for (i = 0; i < tpm.inc; ++i) { +                        for (i = 0; i < tpm->inc; ++i) {                                  struct pthr_el * e = malloc(sizeof(*e));                                  if (e == NULL)                                          break; @@ -155,35 +156,41 @@ static void * tpmgr(void * o)                                  e->kill = false;                                  if (pthread_create(&e->thr, NULL, -                                                   tpm.func, NULL)) { +                                                   tpm->func, tpm->o)) {                                          free(e);                                          break;                                  } -                                list_add(&e->next, &tpm.pool); +                                list_add(&e->next, &tpm->pool);                          } -                        tpm.cur += i; +                        tpm->cur += i;                  } -                if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) +                if (pthread_cond_timedwait(&tpm->cond, &tpm->lock, &dl)                      == ETIMEDOUT) -                        if (tpm.cur > tpm.min) -                                tpm_kill(); +                        if (tpm->cur > tpm->min) +                                tpm_kill(tpm); -                pthread_mutex_unlock(&tpm.lock); +                pthread_mutex_unlock(&tpm->lock);          }          return (void *) 0;  } -int tpm_init(size_t min, -             size_t inc, -             void * (* func)(void *)) +struct tpm * tpm_create(size_t min, +                        size_t inc, +                        void * (* func)(void *), +                        void * o)  { +        struct tpm *       tpm;          pthread_condattr_t cattr; -        if (pthread_mutex_init(&tpm.lock, NULL)) +        tpm = malloc(sizeof(*tpm)); +        if (tpm == NULL) +                goto fail_malloc; + +        if (pthread_mutex_init(&tpm->lock, NULL))                  goto fail_lock;          if (pthread_condattr_init(&cattr)) @@ -192,103 +199,108 @@ int tpm_init(size_t min,  #ifndef __APPLE__          pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);  #endif -        if (pthread_cond_init(&tpm.cond, &cattr)) +        if (pthread_cond_init(&tpm->cond, &cattr))                  goto fail_cond; -        list_head_init(&tpm.pool); +        list_head_init(&tpm->pool);          pthread_condattr_destroy(&cattr); -        tpm.state = TPM_INIT; -        tpm.func  = func; -        tpm.min   = min; -        tpm.inc   = inc; -        tpm.cur   = 0; -        tpm.wrk   = 0; +        tpm->state = TPM_INIT; +        tpm->func  = func; +        tpm->o     = o; +        tpm->min   = min; +        tpm->inc   = inc; +        tpm->cur   = 0; +        tpm->wrk   = 0; -        return 0; +        return tpm;   fail_cond:          pthread_condattr_destroy(&cattr);   fail_cattr: -        pthread_mutex_destroy(&tpm.lock); +        pthread_mutex_destroy(&tpm->lock);   fail_lock: -        return -1; +        free(tpm); + fail_malloc: +        return NULL;  } -int tpm_start(void) +int tpm_start(struct tpm * tpm)  { -        pthread_mutex_lock(&tpm.lock); +        pthread_mutex_lock(&tpm->lock); -        if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) { -                pthread_mutex_unlock(&tpm.lock); +        if (pthread_create(&tpm->mgr, NULL, tpmgr, tpm)) { +                pthread_mutex_unlock(&tpm->lock);                  return -1;          } -        tpm.state = TPM_RUNNING; +        tpm->state = TPM_RUNNING; -        pthread_mutex_unlock(&tpm.lock); +        pthread_mutex_unlock(&tpm->lock);          return 0;  } -void tpm_stop(void) +void tpm_stop(struct tpm * tpm)  { -        pthread_mutex_lock(&tpm.lock); +        pthread_mutex_lock(&tpm->lock); -        tpm.state = TPM_NULL; +        tpm->state = TPM_NULL; -        pthread_mutex_unlock(&tpm.lock); +        pthread_mutex_unlock(&tpm->lock);  } -void tpm_fini(void) +void tpm_destroy(struct tpm * tpm)  { -        pthread_join(tpm.mgr, NULL); +        pthread_join(tpm->mgr, NULL); + +        pthread_mutex_destroy(&tpm->lock); +        pthread_cond_destroy(&tpm->cond); -        pthread_mutex_destroy(&tpm.lock); -        pthread_cond_destroy(&tpm.cond); +        free(tpm);  } -bool tpm_check(void) +bool tpm_check(struct tpm * tpm)  {          bool ret; -        pthread_mutex_lock(&tpm.lock); +        pthread_mutex_lock(&tpm->lock); -        ret = tpm_pthr_el(pthread_self())->kill; +        ret = tpm_pthr_el(tpm, pthread_self())->kill; -        pthread_mutex_unlock(&tpm.lock); +        pthread_mutex_unlock(&tpm->lock);          return ret;  } -void tpm_inc(void) +void tpm_inc(struct tpm * tpm)  { -        pthread_mutex_lock(&tpm.lock); +        pthread_mutex_lock(&tpm->lock); -        --tpm.wrk; +        --tpm->wrk; -        pthread_mutex_unlock(&tpm.lock); +        pthread_mutex_unlock(&tpm->lock);  } -void tpm_dec(void) +void tpm_dec(struct tpm * tpm)  { -        pthread_mutex_lock(&tpm.lock); +        pthread_mutex_lock(&tpm->lock); -        ++tpm.wrk; +        ++tpm->wrk; -        pthread_cond_signal(&tpm.cond); +        pthread_cond_signal(&tpm->cond); -        pthread_mutex_unlock(&tpm.lock); +        pthread_mutex_unlock(&tpm->lock);  } -void tpm_exit(void) +void tpm_exit(struct tpm * tpm)  { -        pthread_mutex_lock(&tpm.lock); +        pthread_mutex_lock(&tpm->lock); -        tpm_pthr_el(pthread_self())->join = true; +        tpm_pthr_el(tpm, pthread_self())->join = true; -        pthread_cond_signal(&tpm.cond); +        pthread_cond_signal(&tpm->cond); -        pthread_mutex_unlock(&tpm.lock); +        pthread_mutex_unlock(&tpm->lock);  } | 
