diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-08-16 07:01:49 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-08-16 07:01:49 +0000 | 
| commit | c8283ea410e7d640537303c6b644bbc3afb35cd5 (patch) | |
| tree | 57a4d386fcc20cd7bbef330a246357ed3cfdae35 /src/irmd | |
| parent | c3d9dbe4971549c8d2f8f821f06dcaa1dce90073 (diff) | |
| parent | a27bef54052b81406ba3142be3da4ab2a6330de6 (diff) | |
| download | ouroboros-c8283ea410e7d640537303c6b644bbc3afb35cd5.tar.gz ouroboros-c8283ea410e7d640537303c6b644bbc3afb35cd5.zip | |
Merged in dstaesse/ouroboros/be-single-accept (pull request #553)
Be single accept
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/main.c | 247 | 
1 files changed, 177 insertions, 70 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index bf6daacc..a316b3cf 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -58,6 +58,7 @@  #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */  #define SHM_SAN_HOLDOFF 1000 /* ms */  #define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) +#define IB_LEN IRM_MSG_BUF_SIZE  struct ipcp_entry {          struct list_head next; @@ -78,7 +79,7 @@ enum irm_state {          IRMD_RUNNING  }; -struct irm { +struct {          struct list_head     registry;     /* registered names known     */          struct list_head     ipcps;        /* list of ipcps in system    */ @@ -94,13 +95,21 @@ struct irm {          struct lockfile *    lf;           /* single irmd per system     */          struct shm_rdrbuff * rdrb;         /* rdrbuff for SDUs           */ +          int                  sockfd;       /* UNIX socket                */ +        uint8_t              cbuf[IB_LEN]; /* cmd message buffer         */ +        size_t               cmd_len;      /* length of cmd in cbuf      */ +        int                  csockfd;      /* cmd UNIX socket            */ +        pthread_cond_t       cmd_cond;     /* cmd signal condvar         */ +        pthread_mutex_t      cmd_lock;     /* cmd signal lock            */ +          enum irm_state       state;        /* state of the irmd          */          pthread_rwlock_t     state_lock;   /* lock for the entire irmd   */          pthread_t            irm_sanitize; /* clean up irmd resources    */          pthread_t            shm_sanitize; /* keep track of rdrbuff use  */ +        pthread_t            acceptor;     /* accept new commands        */  } irmd;  static enum irm_state irmd_get_state(void) @@ -1466,6 +1475,8 @@ static void irm_fini(void)          if (irmd.lf != NULL)                  lockfile_destroy(irmd.lf); +        pthread_mutex_destroy(&irmd.cmd_lock); +        pthread_cond_destroy(&irmd.cmd_cond);          pthread_rwlock_destroy(&irmd.reg_lock);          pthread_rwlock_destroy(&irmd.state_lock);  } @@ -1680,69 +1691,114 @@ void * irm_sanitize(void * o)          }  } -void * mainloop(void * o) +static void * acceptloop(void * o)  { -        uint8_t buf[IRM_MSG_BUF_SIZE]; - -        (void) o; - -        while (true) { +        int            csockfd; +        struct timeval tv = {(SOCKET_TIMEOUT / 1000), +                             (SOCKET_TIMEOUT % 1000) * 1000};  #if defined(__FreeBSD__) || defined(__APPLE__) -                fd_set fds; -                struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), -                                          (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; +        fd_set         fds; +        struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), +                                  (IRMD_ACCEPT_TIMEOUT % 1000) * 1000};  #endif -                int               cli_sockfd; -                irm_msg_t *       msg; -                ssize_t           count; -                buffer_t          buffer; -                irm_msg_t         ret_msg = IRM_MSG__INIT; -                struct irm_flow * e       = NULL; -                pid_t *           apis    = NULL; -                struct timespec * timeo   = NULL; -                struct timespec   ts      = {0, 0}; -                struct timeval    tv      = {(SOCKET_TIMEOUT / 1000), -                                             (SOCKET_TIMEOUT % 1000) * 1000}; - -                if (irmd_get_state() != IRMD_RUNNING || tpm_check()) { -                        tpm_exit(); -                        break; -                } +        (void) o; -                ret_msg.code = IRM_MSG_CODE__IRM_REPLY; -#if defined(__FreeBSD__) || defined(__APPLE__)  +        while (irmd_get_state() == IRMD_RUNNING) { +                ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__)                  FD_ZERO(&fds);                  FD_SET(irmd.sockfd, &fds);                  if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)                          continue;  #endif -                cli_sockfd = accept(irmd.sockfd, 0, 0); -                if (cli_sockfd < 0) +                csockfd = accept(irmd.sockfd, 0, 0); +                if (csockfd < 0)                          continue; -                if (setsockopt(cli_sockfd, SOL_SOCKET, SO_RCVTIMEO, +                if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO,                                 (void *) &tv, sizeof(tv)))                          log_warn("Failed to set timeout on socket."); -                count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); +                pthread_mutex_lock(&irmd.cmd_lock); + +                assert(irmd.csockfd == -1); + +                count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE);                  if (count <= 0) { +                        pthread_mutex_unlock(&irmd.cmd_lock);                          log_err("Failed to read from socket."); -                        close(cli_sockfd); +                        close(csockfd);                          continue;                  } -                if (irmd_get_state() != IRMD_RUNNING) { -                        close(cli_sockfd); -                        tpm_exit(); -                        break; +                irmd.cmd_len = count; +                irmd.csockfd = csockfd; + +                pthread_cond_signal(&irmd.cmd_cond); + +                while(irmd.csockfd != -1) +                        pthread_cond_wait(&irmd.cmd_cond, &irmd.cmd_lock); + +                pthread_mutex_unlock(&irmd.cmd_lock); +        } + +        return (void *) 0; +} + +void * mainloop(void * o) +{ +        int             sfd; +        irm_msg_t *     msg; +        buffer_t        buffer; +        struct timespec dl; +        struct timespec to = {(IRMD_ACCEPT_TIMEOUT / 1000), +                              (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION}; + +        (void) o; + +        while (true) { +                int               ret     = 0; +                irm_msg_t         ret_msg = IRM_MSG__INIT; +                struct irm_flow * e       = NULL; +                pid_t *           apis    = NULL; +                struct timespec * timeo   = NULL; +                struct timespec   ts      = {0,0}; + +                ret_msg.code = IRM_MSG_CODE__IRM_REPLY; + +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_mutex_lock(&irmd.cmd_lock); + +                while (irmd.csockfd == -1 && ret != -ETIMEDOUT) +                        ret = -pthread_cond_timedwait(&irmd.cmd_cond, +                                                      &irmd.cmd_lock, +                                                      &dl); + +                sfd          = irmd.csockfd; +                irmd.csockfd = -1; + +                if (sfd == -1) { +                        pthread_mutex_unlock(&irmd.cmd_lock); +                        if (tpm_check()) { +                                close(sfd); +                                break; +                        } +                        continue;                  } -                msg = irm_msg__unpack(NULL, count, buf); +                pthread_cond_broadcast(&irmd.cmd_cond); + +                msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf);                  if (msg == NULL) { -                        close(cli_sockfd); +                        pthread_mutex_unlock(&irmd.cmd_lock); +                        close(sfd);                          continue;                  } +                pthread_mutex_unlock(&irmd.cmd_lock); +                  tpm_dec();                  if (msg->has_timeo_sec) { @@ -1871,7 +1927,7 @@ void * mainloop(void * o)                  irm_msg__free_unpacked(msg, NULL);                  if (ret_msg.result == -EPIPE || !ret_msg.has_result) { -                        close(cli_sockfd); +                        close(sfd);                          tpm_inc();                          continue;                  } @@ -1881,7 +1937,7 @@ void * mainloop(void * o)                          log_err("Failed to calculate length of reply message.");                          if (apis != NULL)                                  free(apis); -                        close(cli_sockfd); +                        close(sfd);                          tpm_inc();                          continue;                  } @@ -1890,7 +1946,7 @@ void * mainloop(void * o)                  if (buffer.data == NULL) {                          if (apis != NULL)                                  free(apis); -                        close(cli_sockfd); +                        close(sfd);                          tpm_inc();                          continue;                  } @@ -1900,28 +1956,29 @@ void * mainloop(void * o)                  if (apis != NULL)                          free(apis); -                if (write(cli_sockfd, buffer.data, buffer.len) == -1) +                if (write(sfd, buffer.data, buffer.len) == -1)                          log_warn("Failed to send reply message.");                  free(buffer.data); -                close(cli_sockfd); +                close(sfd);                  tpm_inc();          } +        tpm_exit(); +          return (void *) 0;  }  static int irm_init(void)  { -        struct stat st; -        struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), -                                  (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; +        struct stat        st; +        struct timeval     timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), +                                      (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; +        pthread_condattr_t cattr;          memset(&st, 0, sizeof(st)); -        irmd.state = IRMD_NULL; -          if (pthread_rwlock_init(&irmd.state_lock, NULL)) {                  log_err("Failed to initialize rwlock.");                  goto fail_state_lock; @@ -1937,6 +1994,27 @@ static int irm_init(void)                  goto fail_flows_lock;          } +        if (pthread_mutex_init(&irmd.cmd_lock, NULL)) { +                log_err("Failed to initialize mutex."); +                goto fail_cmd_lock; +        } + +        if (pthread_condattr_init(&cattr)) { +                log_err("Failed to initialize mutex."); +                goto fail_cmd_lock; +        } + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&irmd.cmd_cond, &cattr)) { +                log_err("Failed to initialize condvar."); +                pthread_condattr_destroy(&cattr); +                goto fail_cmd_cond; +        } + +        pthread_condattr_destroy(&cattr); +          list_head_init(&irmd.ipcps);          list_head_init(&irmd.api_table);          list_head_init(&irmd.apn_table); @@ -2004,29 +2082,34 @@ static int irm_init(void)                  goto fail_rdrbuff;          } -        irmd.state       = IRMD_RUNNING; +        irmd.csockfd = -1; +        irmd.state   = IRMD_RUNNING;          log_info("Ouroboros IPC Resource Manager daemon started...");          return 0; -fail_rdrbuff: + fail_rdrbuff:          shm_rdrbuff_destroy(irmd.rdrb); -fail_sock_opt: + fail_sock_opt:          close(irmd.sockfd); -fail_sock_path: + fail_sock_path:          unlink(IRM_SOCK_PATH); -fail_stat: + fail_stat:          lockfile_destroy(irmd.lf); -fail_lockfile: + fail_lockfile:          bmp_destroy(irmd.port_ids); -fail_port_ids: + fail_port_ids: +        pthread_cond_destroy(&irmd.cmd_cond); + fail_cmd_cond: +        pthread_mutex_destroy(&irmd.cmd_lock); + fail_cmd_lock:          pthread_rwlock_destroy(&irmd.flows_lock); -fail_flows_lock: + fail_flows_lock:          pthread_rwlock_destroy(&irmd.reg_lock); -fail_reg_lock: + fail_reg_lock:          pthread_rwlock_destroy(&irmd.state_lock); -fail_state_lock: + fail_state_lock:          return -1;  } @@ -2085,29 +2168,39 @@ int main(int     argc,          log_init(!use_stdout); -        if (irm_init() < 0) { -                log_fini(); -                exit(EXIT_FAILURE); -        } +        if (irm_init() < 0) +                goto fail_irm_init;          if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { -                log_fini(); -                exit(EXIT_FAILURE); +                irmd_set_state(IRMD_NULL); +                goto fail_tpm_init;          }          if (tpm_start()) { -                tpm_fini(); -                log_fini(); -                exit(EXIT_FAILURE); +                irmd_set_state(IRMD_NULL); +                goto fail_tpm_start; +        } + +        if (pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL)) { +                irmd_set_state(IRMD_NULL); +                goto fail_irm_sanitize; +        } + +        if (pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb)) { +                irmd_set_state(IRMD_NULL); +                goto fail_shm_sanitize;          } -        pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL); -        pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb); +        if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) { +                irmd_set_state(IRMD_NULL); +                goto fail_acceptor; +        }          /* tpm_stop() called from sighandler */          tpm_fini(); +        pthread_join(irmd.acceptor, NULL);          pthread_join(irmd.irm_sanitize, NULL);          pthread_join(irmd.shm_sanitize, NULL); @@ -2122,4 +2215,18 @@ int main(int     argc,          log_info("Bye.");          exit(EXIT_SUCCESS); + + fail_acceptor: +        pthread_join(irmd.shm_sanitize, NULL); + fail_shm_sanitize: +        pthread_join(irmd.irm_sanitize, NULL); + fail_irm_sanitize: +        tpm_stop(); + fail_tpm_start: +        tpm_fini(); + fail_tpm_init: +        irm_fini(); + fail_irm_init: +        log_fini(); +        exit(EXIT_FAILURE);  } | 
