diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-07-25 19:54:39 +0200 | 
|---|---|---|
| committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-07-26 10:39:51 +0200 | 
| commit | 809abada865727ea986d69afcf2a9a3b00df560a (patch) | |
| tree | e6cdc6bfba21e87be04df6d6fa62490813d94ce3 /src/ipcpd | |
| parent | 36343a9c19fca9494881a9529b5fbbb0d51c1900 (diff) | |
| download | ouroboros-809abada865727ea986d69afcf2a9a3b00df560a.tar.gz ouroboros-809abada865727ea986d69afcf2a9a3b00df560a.zip | |
lib: Add threadpool manager
This adds a threadpool manager component in the library that is used
in the IRMd and IPCPs. The threadpool manager now doesn't detach
threads but does a join when they exit. This solves a data race in the
previous implementation where some threads were not completely finished
upon release of some resources.
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/ipcp.c | 181 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 9 | 
2 files changed, 21 insertions, 169 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 0d6d850f..a56e46f7 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -31,6 +31,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/np1_flow.h> +#include <ouroboros/tpm.h>  #include "ipcp.h" @@ -56,6 +57,8 @@ void ipcp_sig_handler(int         sig,                          if (ipcp_get_state() == IPCP_OPERATIONAL)                                  ipcp_set_state(IPCP_SHUTDOWN);                  } + +                tpm_stop();          default:                  return;          } @@ -87,51 +90,7 @@ void ipcp_hash_str(char *          buf,          buf[2 * i] = '\0';  } -static void thread_inc(void) -{ -        pthread_mutex_lock(&ipcpi.threads_lock); - -        ++ipcpi.threads; -        pthread_cond_signal(&ipcpi.threads_cond); - -        pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void thread_dec(void) -{ -        pthread_mutex_lock(&ipcpi.threads_lock); - -        --ipcpi.threads; -        pthread_cond_signal(&ipcpi.threads_cond); - -        pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static bool thread_check(void) -{ -        int ret; - -        pthread_mutex_lock(&ipcpi.threads_lock); - -        ret = ipcpi.threads > ipcpi.max_threads; - -        pthread_mutex_unlock(&ipcpi.threads_lock); - -        return ret; -} - -static void thread_exit(ssize_t id) -{ -        pthread_mutex_lock(&ipcpi.threads_lock); -        bmp_release(ipcpi.thread_ids, id); - -        --ipcpi.threads; -        pthread_cond_signal(&ipcpi.threads_cond); - -        pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void * ipcp_main_loop(void * o) +static void * mainloop(void * o)  {          int                 lsockfd;          uint8_t             buf[IPCP_MSG_BUF_SIZE]; @@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o)          struct timeval      ltv      = {(SOCKET_TIMEOUT / 1000),                                          (SOCKET_TIMEOUT % 1000) * 1000}; -        ssize_t             id       = (ssize_t)  o; +        (void)  o;          while (true) {  #ifdef __FreeBSD__ @@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o)                  if (ipcp_get_state() == IPCP_SHUTDOWN ||                      ipcp_get_state() == IPCP_NULL || -                    thread_check()) { -                        thread_exit(id); +                    tpm_check()) { +                        tpm_exit();                          break;                  } @@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o)                          continue;                  } -                thread_dec(); +                tpm_dec();                  switch (msg->code) {                  case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -408,7 +367,7 @@ static void * ipcp_main_loop(void * o)                  if (buffer.len == 0) {                          log_err("Failed to pack reply message");                          close(lsockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -416,7 +375,7 @@ static void * ipcp_main_loop(void * o)                  if (buffer.data == NULL) {                          log_err("Failed to create reply buffer.");                          close(lsockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  } @@ -426,14 +385,14 @@ static void * ipcp_main_loop(void * o)                          log_err("Failed to send reply message");                          free(buffer.data);                          close(lsockfd); -                        thread_inc(); +                        tpm_inc();                          continue;                  }                  free(buffer.data);                  close(lsockfd); -                thread_inc(); +                tpm_inc();          }          return (void *) 0; @@ -496,15 +455,6 @@ int ipcp_init(int               argc,          ipcpi.state     = IPCP_NULL;          ipcpi.shim_data = NULL; -        ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS); -        if (ipcpi.threadpool == NULL) { -                ret = -ENOMEM; -                goto fail_thr; -        } - -        ipcpi.threads = 0; -        ipcpi.max_threads = IPCP_MIN_AV_THREADS; -          ipcpi.sock_path = ipcp_sock_path(getpid());          if (ipcpi.sock_path == NULL)                  goto fail_sock_path; @@ -526,11 +476,6 @@ int ipcp_init(int               argc,                  goto fail_state_mtx;          } -        if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) { -                log_err("Could not create mutex."); -                goto fail_thread_lock; -        } -          if (pthread_condattr_init(&cattr)) {                  log_err("Could not create condattr.");                  goto fail_cond_attr; @@ -544,17 +489,6 @@ int ipcp_init(int               argc,                  goto fail_state_cond;          } -        if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) { -                log_err("Could not init condvar."); -                goto fail_thread_cond; -        } - -        ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0); -        if (ipcpi.thread_ids == NULL) { -                log_err("Could not init condvar."); -                goto fail_bmp; -        } -          if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) {                  log_err("Failed to init mutex.");                  goto fail_alloc_lock; @@ -587,94 +521,21 @@ int ipcp_init(int               argc,   fail_alloc_cond:          pthread_mutex_destroy(&ipcpi.alloc_lock);   fail_alloc_lock: -        bmp_destroy(ipcpi.thread_ids); - fail_bmp: -        pthread_cond_destroy(&ipcpi.threads_cond); - fail_thread_cond:          pthread_cond_destroy(&ipcpi.state_cond);   fail_state_cond:          pthread_condattr_destroy(&cattr);   fail_cond_attr: -        pthread_mutex_destroy(&ipcpi.threads_lock); - fail_thread_lock:          pthread_mutex_destroy(&ipcpi.state_mtx);   fail_state_mtx:          close(ipcpi.sockfd);   fail_serv_sock:          free(ipcpi.sock_path);   fail_sock_path: -        free(ipcpi.threadpool); - fail_thr:          ouroboros_fini();          return ret;  } -void * threadpoolmgr(void * o) -{ -        pthread_attr_t  pattr; -        struct timespec dl; -        struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), -                              (IRMD_TPM_TIMEOUT % 1000) * MILLION}; -        (void) o; - -        if (pthread_attr_init(&pattr)) -                return (void *) -1; - -        pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); - -        while (true) { -                clock_gettime(PTHREAD_COND_CLOCK, &dl); -                ts_add(&dl, &to, &dl); - -                if (ipcp_get_state() == IPCP_SHUTDOWN || -                    ipcp_get_state() == IPCP_NULL) { -                        pthread_attr_destroy(&pattr); -                        log_dbg("Waiting for threads to exit."); -                        pthread_mutex_lock(&ipcpi.threads_lock); -                        while (ipcpi.threads > 0) -                                pthread_cond_wait(&ipcpi.threads_cond, -                                                  &ipcpi.threads_lock); -                        pthread_mutex_unlock(&ipcpi.threads_lock); - -                        log_dbg("Threadpool manager done."); -                        break; -                } - -                pthread_mutex_lock(&ipcpi.threads_lock); - -                if (ipcpi.threads < IPCP_MIN_AV_THREADS) { -                        log_dbg("Increasing threadpool."); -                        ipcpi.max_threads = IPCP_MAX_AV_THREADS; - -                        while (ipcpi.threads < ipcpi.max_threads) { -                                ssize_t id = bmp_allocate(ipcpi.thread_ids); -                                if (!bmp_is_id_valid(ipcpi.thread_ids, id)) { -                                        log_warn("IPCP threadpool exhausted."); -                                        break; -                                } - -                                if (pthread_create(&ipcpi.threadpool[id], -                                                   &pattr, ipcp_main_loop, -                                                   (void *) id)) -                                        log_warn("Failed to start new thread."); -                                else -                                        ++ipcpi.threads; -                        } -                } - -                if (pthread_cond_timedwait(&ipcpi.threads_cond, -                                           &ipcpi.threads_lock, -                                           &dl) == ETIMEDOUT) -                        if (ipcpi.threads > IPCP_MIN_AV_THREADS) -                                --ipcpi.max_threads; - -                pthread_mutex_unlock(&ipcpi.threads_lock); -        } - -        return (void *) 0; -} -  int ipcp_boot()  {          struct sigaction sig_act; @@ -699,9 +560,15 @@ int ipcp_boot()          pthread_sigmask(SIG_BLOCK, &sigset, NULL); -        ipcp_set_state(IPCP_INIT); +        if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) +                return -1; + +        if (tpm_start()) { +                tpm_fini(); +                return -1; +        } -        pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL); +        ipcp_set_state(IPCP_INIT);          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); @@ -710,8 +577,7 @@ int ipcp_boot()  void ipcp_shutdown()  { -        pthread_join(ipcpi.tpm, NULL); - +        tpm_fini();          log_info("IPCP %d shutting down.", getpid());  } @@ -721,16 +587,11 @@ void ipcp_fini()          if (unlink(ipcpi.sock_path))                  log_warn("Could not unlink %s.", ipcpi.sock_path); -        bmp_destroy(ipcpi.thread_ids); -          free(ipcpi.sock_path); -        free(ipcpi.threadpool);          shim_data_destroy(ipcpi.shim_data);          pthread_cond_destroy(&ipcpi.state_cond); -        pthread_cond_destroy(&ipcpi.threads_cond); -        pthread_mutex_destroy(&ipcpi.threads_lock);          pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_cond_destroy(&ipcpi.alloc_cond);          pthread_mutex_destroy(&ipcpi.alloc_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 3f5e1bd6..fb69df5c 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -93,15 +93,6 @@ struct ipcp {          pthread_cond_t     alloc_cond;          pthread_mutex_t    alloc_lock; -        pthread_t *        threadpool; - -        struct bmp *       thread_ids; -        size_t             max_threads; -        size_t             threads; -        pthread_cond_t     threads_cond; -        pthread_mutex_t    threads_lock; - -        pthread_t          tpm;  } ipcpi;  int             ipcp_init(int               argc, | 
