diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-08-16 07:01:49 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-08-16 07:01:49 +0000 | 
| commit | c8283ea410e7d640537303c6b644bbc3afb35cd5 (patch) | |
| tree | 57a4d386fcc20cd7bbef330a246357ed3cfdae35 /src/ipcpd/ipcp.c | |
| parent | c3d9dbe4971549c8d2f8f821f06dcaa1dce90073 (diff) | |
| parent | a27bef54052b81406ba3142be3da4ab2a6330de6 (diff) | |
| download | ouroboros-c8283ea410e7d640537303c6b644bbc3afb35cd5.tar.gz ouroboros-c8283ea410e7d640537303c6b644bbc3afb35cd5.zip | |
Merged in dstaesse/ouroboros/be-single-accept (pull request #553)
Be single accept
Diffstat (limited to 'src/ipcpd/ipcp.c')
| -rw-r--r-- | src/ipcpd/ipcp.c | 199 | 
1 files changed, 138 insertions, 61 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a8ff4c94..c5769f9e 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -91,67 +91,115 @@ void ipcp_hash_str(char *          buf,          buf[2 * i] = '\0';  } +static void * acceptloop(void * o) +{ +        int            csockfd; +        struct timeval tv = {(SOCKET_TIMEOUT / 1000), +                             (SOCKET_TIMEOUT % 1000) * 1000}; +#if defined(__FreeBSD__) || defined(__APPLE__) +        fd_set         fds; +        struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), +                                  (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; +#endif +        (void) o; + +        while (ipcp_get_state() != IPCP_SHUTDOWN && +               ipcp_get_state() != IPCP_NULL) { +                ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__) +                FD_ZERO(&fds); +                FD_SET(ipcpi.sockfd, &fds); +                if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) +                        continue; +#endif +                csockfd = accept(ipcpi.sockfd, 0, 0); +                if (csockfd < 0) +                        continue; + +                if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, +                               (void *) &tv, sizeof(tv))) +                        log_warn("Failed to set timeout on socket."); + +                pthread_mutex_lock(&ipcpi.cmd_lock); + +                assert(ipcpi.csockfd == -1); + +                count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); +                if (count <= 0) { +                        pthread_mutex_unlock(&ipcpi.cmd_lock); +                        log_err("Failed to read from socket."); +                        close(csockfd); +                        continue; +                } + +                ipcpi.cmd_len = count; +                ipcpi.csockfd = csockfd; + +                pthread_cond_signal(&ipcpi.cmd_cond); + +                while (ipcpi.csockfd != -1) +                        pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock); + +                pthread_mutex_unlock(&ipcpi.cmd_lock); +        } + +        return (void *) 0; +} +  static void * mainloop(void * o)  { -        int                 lsockfd; -        uint8_t             buf[IPCP_MSG_BUF_SIZE]; -        ssize_t             count; +        int                 sfd;          buffer_t            buffer;          struct ipcp_config  conf;          struct dif_info     info; -          ipcp_config_msg_t * conf_msg;          ipcp_msg_t *        msg; -        ipcp_msg_t          ret_msg  = IPCP_MSG__INIT; -        dif_info_msg_t      dif_info = DIF_INFO_MSG__INIT; -        struct timeval      ltv      = {(SOCKET_TIMEOUT / 1000), -                                        (SOCKET_TIMEOUT % 1000) * 1000}; - +        struct timespec     dl; +        struct timespec     to  = {(IPCP_ACCEPT_TIMEOUT / 1000), +                                   (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION};          (void)  o;          while (true) { -#ifdef __FreeBSD__ -                fd_set fds; -                struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), -                                          (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; -#endif -                int fd = -1; - -                if (ipcp_get_state() == IPCP_SHUTDOWN || -                    ipcp_get_state() == IPCP_NULL || -                    tpm_check()) { -                        tpm_exit(); -                        break; -                } +                int                 ret = 0; +                ipcp_msg_t          ret_msg  = IPCP_MSG__INIT; +                dif_info_msg_t      dif_info = DIF_INFO_MSG__INIT; +                int                 fd       = -1;                  ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; -#ifdef __FreeBSD__ -                FD_ZERO(&fds); -                FD_SET(ipcpi.sockfd, &fds); -                if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) -                        continue; -#endif -                lsockfd = accept(ipcpi.sockfd, 0, 0); -                if (lsockfd < 0) -                        continue; -                if (setsockopt(lsockfd, SOL_SOCKET, SO_RCVTIMEO, -                               (void *) <v, sizeof(ltv))) -                        log_warn("Failed to set timeout on socket."); +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); -                count = read(lsockfd, buf, IPCP_MSG_BUF_SIZE); -                if (count <= 0) { -                        log_err("Failed to read from socket"); -                        close(lsockfd); +                pthread_mutex_lock(&ipcpi.cmd_lock); + +                while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) +                        ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, +                                                      &ipcpi.cmd_lock, +                                                      &dl); + +                sfd           = ipcpi.csockfd; +                ipcpi.csockfd = -1; + +                if (sfd == -1) { +                        pthread_mutex_unlock(&ipcpi.cmd_lock); +                        if (tpm_check()) { +                                close(sfd); +                                break; +                        }                          continue;                  } -                msg = ipcp_msg__unpack(NULL, count, buf); +                pthread_cond_broadcast(&ipcpi.cmd_cond); + +                msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);                  if (msg == NULL) { -                        close(lsockfd); +                        pthread_mutex_unlock(&ipcpi.cmd_lock); +                        close(sfd);                          continue;                  } +                pthread_mutex_unlock(&ipcpi.cmd_lock); +                  tpm_dec();                  switch (msg->code) { @@ -398,7 +446,7 @@ static void * mainloop(void * o)                  buffer.len = ipcp_msg__get_packed_size(&ret_msg);                  if (buffer.len == 0) {                          log_err("Failed to pack reply message"); -                        close(lsockfd); +                        close(sfd);                          tpm_inc();                          continue;                  } @@ -406,27 +454,29 @@ static void * mainloop(void * o)                  buffer.data = malloc(buffer.len);                  if (buffer.data == NULL) {                          log_err("Failed to create reply buffer."); -                        close(lsockfd); +                        close(sfd);                          tpm_inc();                          continue;                  }                  ipcp_msg__pack(&ret_msg, buffer.data); -                if (write(lsockfd, buffer.data, buffer.len) == -1) { +                if (write(sfd, buffer.data, buffer.len) == -1) {                          log_err("Failed to send reply message");                          free(buffer.data); -                        close(lsockfd); +                        close(sfd);                          tpm_inc();                          continue;                  }                  free(buffer.data); -                close(lsockfd); +                close(sfd);                  tpm_inc();          } +        tpm_exit(); +          return (void *) 0;  } @@ -526,22 +576,30 @@ int ipcp_init(int               argc,                  goto fail_alloc_lock;          } -        if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) { +        if (pthread_cond_init(&ipcpi.alloc_cond, &cattr)) {                  log_err("Failed to init convar.");                  goto fail_alloc_cond;          } -        ipcpi.alloc_id = -1; +        if (pthread_mutex_init(&ipcpi.cmd_lock, NULL)) { +                log_err("Failed to init mutex."); +                goto fail_cmd_lock; +        } -        if (type == IPCP_NORMAL) { -                pthread_condattr_destroy(&cattr); -                return 0; +        if (pthread_cond_init(&ipcpi.cmd_cond, &cattr)) { +                log_err("Failed to init convar."); +                goto fail_cmd_cond;          } -        ipcpi.shim_data = shim_data_create(); -        if (ipcpi.shim_data == NULL) { -                ret = -ENOMEM; -                goto fail_shim_data; +        ipcpi.alloc_id = -1; +        ipcpi.csockfd  = -1; + +        if (type != IPCP_NORMAL) { +                ipcpi.shim_data = shim_data_create(); +                if (ipcpi.shim_data == NULL) { +                        ret = -ENOMEM; +                        goto fail_shim_data; +                }          }          pthread_condattr_destroy(&cattr); @@ -549,6 +607,10 @@ int ipcp_init(int               argc,          return 0;   fail_shim_data: +        pthread_cond_destroy(&ipcpi.cmd_cond); + fail_cmd_cond: +        pthread_mutex_destroy(&ipcpi.cmd_lock); + fail_cmd_lock:          pthread_cond_destroy(&ipcpi.alloc_cond);   fail_alloc_cond:          pthread_mutex_destroy(&ipcpi.alloc_lock); @@ -590,26 +652,39 @@ int ipcp_boot()          sigaction(SIGHUP,  &sig_act, NULL);          sigaction(SIGPIPE, &sig_act, NULL); -        pthread_sigmask(SIG_BLOCK, &sigset, NULL); -          if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) -                return -1; +                goto fail_tpm_init; -        if (tpm_start()) { -                tpm_fini(); -                return -1; -        } +        pthread_sigmask(SIG_BLOCK, &sigset, NULL); + +        if (tpm_start()) +                goto fail_tpm_start;          ipcp_set_state(IPCP_INIT); +        if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) { +                log_err("Failed to create acceptor thread."); +                ipcp_set_state(IPCP_NULL); +                goto fail_acceptor; +        } +          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);          return 0; + + fail_acceptor: +        tpm_stop(); + fail_tpm_start: +        tpm_fini(); + fail_tpm_init: +        return -1;  }  void ipcp_shutdown()  {          tpm_fini(); +        pthread_join(ipcpi.acceptor, NULL); +          log_info("IPCP %d shutting down.", getpid());  } @@ -627,6 +702,8 @@ void ipcp_fini()          pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_cond_destroy(&ipcpi.alloc_cond);          pthread_mutex_destroy(&ipcpi.alloc_lock); +        pthread_cond_destroy(&ipcpi.cmd_cond); +        pthread_mutex_destroy(&ipcpi.cmd_lock);          log_info("IPCP %d out.", getpid()); | 
