diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/ipcp.c | 270 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 9 | 
2 files changed, 253 insertions, 26 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 8646121a..ee5bd76e 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -29,6 +29,7 @@  #include <ouroboros/sockets.h>  #include <ouroboros/errno.h>  #include <ouroboros/dev.h> +#include <ouroboros/bitmap.h>  #include <ouroboros/np1_flow.h>  #include "ipcp.h" @@ -37,6 +38,51 @@  #include <sys/socket.h>  #include <stdlib.h> + +static void thread_inc(void) +{ +        pthread_mutex_lock(&ipcpi.threads_lock); + +        ++ipcpi.threads; +        pthread_cond_signal(&ipcpi.threads_cond); + +        pthread_mutex_unlock(&ipcpi.threads_lock); +} + +static void thread_dec(void) +{ +        pthread_mutex_lock(&ipcpi.threads_lock); + +        --ipcpi.threads; +        pthread_cond_signal(&ipcpi.threads_cond); + +        pthread_mutex_unlock(&ipcpi.threads_lock); +} + +static bool thread_check(void) +{ +        int ret; + +        pthread_mutex_lock(&ipcpi.threads_lock); + +        ret = ipcpi.threads > ipcpi.max_threads; + +        pthread_mutex_unlock(&ipcpi.threads_lock); + +        return ret; +} + +static void thread_exit(ssize_t id) +{ +        pthread_mutex_lock(&ipcpi.threads_lock); +        bmp_release(ipcpi.thread_ids, id); + +        --ipcpi.threads; +        pthread_cond_signal(&ipcpi.threads_cond); + +        pthread_mutex_unlock(&ipcpi.threads_lock); +} +  static void * ipcp_main_loop(void * o)  {          int     lsockfd; @@ -53,7 +99,7 @@ static void * ipcp_main_loop(void * o)          struct timeval ltv = {(SOCKET_TIMEOUT / 1000),                               (SOCKET_TIMEOUT % 1000) * 1000}; -        (void) o; +        ssize_t id = (ssize_t)  o;          while (true) {  #ifdef __FreeBSD__ @@ -65,8 +111,10 @@ static void * ipcp_main_loop(void * o)                  pthread_rwlock_rdlock(&ipcpi.state_lock); -                if (ipcp_get_state() == IPCP_SHUTDOWN -                    || ipcp_get_state() == IPCP_NULL) { +                if (ipcp_get_state() == IPCP_SHUTDOWN || +                    ipcp_get_state() == IPCP_NULL || +                    thread_check()) { +                        thread_exit(id);                          pthread_rwlock_unlock(&ipcpi.state_lock);                          break;                  } @@ -101,6 +149,8 @@ static void * ipcp_main_loop(void * o)                          continue;                  } +                thread_dec(); +                  switch (msg->code) {                  case IPCP_MSG_CODE__IPCP_BOOTSTRAP:                          if (ipcpi.ops->ipcp_bootstrap == NULL) { @@ -245,12 +295,14 @@ static void * ipcp_main_loop(void * o)                  if (buffer.len == 0) {                          log_err("Failed to send reply message");                          close(lsockfd); +                        thread_inc();                          continue;                  }                  buffer.data = malloc(buffer.len);                  if (buffer.data == NULL) {                          close(lsockfd); +                        thread_inc();                          continue;                  } @@ -259,11 +311,14 @@ static void * ipcp_main_loop(void * o)                  if (write(lsockfd, buffer.data, buffer.len) == -1) {                          free(buffer.data);                          close(lsockfd); +                        thread_inc();                          continue;                  }                  free(buffer.data);                  close(lsockfd); + +                thread_inc();          }          return (void *) 0; @@ -312,12 +367,12 @@ int ipcp_init(int               argc,          if (type == IPCP_NORMAL) {                  if (ap_init(argv[0])) { -                        log_err("Failed to init normal IPCP."); +                        log_err("Failed to init normal IPCPI.");                          return -1;                  }          } else {                  if (ap_init(NULL)) { -                        log_err("Failed to init shim IPCP."); +                        log_err("Failed to init shim IPCPI.");                          return -1;                  }          } @@ -326,14 +381,19 @@ int ipcp_init(int               argc,          ipcpi.state     = IPCP_NULL;          ipcpi.shim_data = NULL; -        ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE); +        ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS);          if (ipcpi.threadpool == NULL) { +                ap_fini();                  return -ENOMEM;          } +        ipcpi.threads = 0; +        ipcpi.max_threads = IPCP_MIN_AV_THREADS; +          ipcpi.sock_path = ipcp_sock_path(getpid());          if (ipcpi.sock_path == NULL) {                  free(ipcpi.threadpool); +                ap_fini();                  return -1;          } @@ -342,6 +402,7 @@ int ipcp_init(int               argc,                  log_err("Could not open server socket.");                  free(ipcpi.threadpool);                  free(ipcpi.sock_path); +                ap_fini();                  return -1;          } @@ -351,55 +412,206 @@ int ipcp_init(int               argc,          ipcpi.ops = ops; -        pthread_rwlock_init(&ipcpi.state_lock, NULL); -        pthread_mutex_init(&ipcpi.state_mtx, NULL); -        pthread_condattr_init(&cattr); +        if (pthread_rwlock_init(&ipcpi.state_lock, NULL)) { +                log_err("Could not create rwlock."); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        } + +        if (pthread_mutex_init(&ipcpi.state_mtx, NULL)) { +                log_err("Could not create mutex."); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        } + +        if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) { +                log_err("Could not create mutex."); +                pthread_mutex_destroy(&ipcpi.state_mtx); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        } + +        if (pthread_condattr_init(&cattr)) { +                log_err("Could not create condattr."); +                pthread_mutex_destroy(&ipcpi.threads_lock); +                pthread_mutex_destroy(&ipcpi.state_mtx); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        } +                ;  #ifndef __APPLE__          pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);  #endif -        pthread_cond_init(&ipcpi.state_cond, &cattr); +        if (pthread_cond_init(&ipcpi.state_cond, &cattr)) { +                log_err("Could not init condvar."); +                pthread_condattr_destroy(&cattr); +                pthread_mutex_destroy(&ipcpi.threads_lock); +                pthread_mutex_destroy(&ipcpi.state_mtx); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        } + +        if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) { +                log_err("Could not init condvar."); +                pthread_cond_destroy(&ipcpi.state_cond); +                pthread_condattr_destroy(&cattr); +                pthread_mutex_destroy(&ipcpi.threads_lock); +                pthread_mutex_destroy(&ipcpi.state_mtx); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        }; + + +        pthread_condattr_destroy(&cattr); + +        ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0); +        if (ipcpi.thread_ids == NULL) { +                log_err("Could not init condvar."); +                pthread_cond_destroy(&ipcpi.threads_cond); +                pthread_cond_destroy(&ipcpi.state_cond); +                pthread_mutex_destroy(&ipcpi.threads_lock); +                pthread_mutex_destroy(&ipcpi.state_mtx); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd); +                free(ipcpi.threadpool); +                free(ipcpi.sock_path); +                ap_fini(); +                return -1; +        };          if (type == IPCP_NORMAL)                  return 0;          ipcpi.shim_data = shim_data_create();          if (ipcpi.shim_data == NULL) { +                bmp_destroy(ipcpi.thread_ids); +                pthread_cond_destroy(&ipcpi.threads_cond); +                pthread_cond_destroy(&ipcpi.state_cond); +                pthread_mutex_destroy(&ipcpi.state_mtx); +                pthread_rwlock_destroy(&ipcpi.state_lock); +                close(ipcpi.sockfd);                  free(ipcpi.threadpool);                  free(ipcpi.sock_path); +                ap_fini();                  return -ENOMEM;          }          return 0;  } -int ipcp_boot() +static bool is_thread_alive(ssize_t id)  { -        int t; +        bool ret; +        pthread_mutex_lock(&ipcpi.threads_lock); -        ipcp_set_state(IPCP_INIT); +        ret = bmp_is_id_used(ipcpi.thread_ids, id); -        for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) { -                if (pthread_create(&ipcpi.threadpool[t], NULL, -                                   ipcp_main_loop, NULL)) { -                        int i; -                        log_err("Failed to create main thread."); -                        ipcp_set_state(IPCP_NULL); -                        for (i = 0; i < t; ++i) -                                pthread_join(ipcpi.threadpool[i], NULL); -                        return -1; +        pthread_mutex_unlock(&ipcpi.threads_lock); + +        return ret; +} + +void * threadpoolmgr(void * o) +{ +        struct timespec to = {(IPCP_TPM_TIMEOUT / 1000), +                              (IPCP_TPM_TIMEOUT % 1000) * MILLION}; +        struct timespec dl; +        size_t t; + +        (void) o; + +        while (true) { +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_rwlock_rdlock(&ipcpi.state_lock); +                if (ipcp_get_state() == IPCP_SHUTDOWN || +                    ipcp_get_state() == IPCP_NULL) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        log_dbg("Threadpool manager exiting."); +                        for (t = 0; t < IPCP_MAX_THREADS; ++t) +                                if (is_thread_alive(t)) { +                                        log_dbg("Waiting for thread %zd.", t); +                                        pthread_join(ipcpi.threadpool[t], NULL); +                                } + +                        log_dbg("Threadpool manager done."); +                        break; +                } + +                pthread_rwlock_unlock(&ipcpi.state_lock); + +                pthread_mutex_lock(&ipcpi.threads_lock); + +                if (ipcpi.threads < IPCP_MIN_AV_THREADS) { +                        log_dbg("Increasing threadpool."); +                        ipcpi.max_threads = IPCP_MAX_AV_THREADS; + +                        while (ipcpi.threads < ipcpi.max_threads) { +                                ssize_t id = bmp_allocate(ipcpi.thread_ids); +                                if (!bmp_is_id_valid(ipcpi.thread_ids, id)) { +                                        log_warn("IPCP threadpool exhausted."); +                                        break; +                                } + +                                if (pthread_create(&ipcpi.threadpool[id], +                                                   NULL, ipcp_main_loop, +                                                   (void *) id)) +                                        log_warn("Failed to start new thread."); +                                else +                                        ++ipcpi.threads; +                        }                  } + +                if (pthread_cond_timedwait(&ipcpi.threads_cond, +                                           &ipcpi.threads_lock, +                                           &dl) == ETIMEDOUT) +                        if (ipcpi.threads > IPCP_MIN_AV_THREADS) +                                --ipcpi.max_threads; + +                pthread_mutex_unlock(&ipcpi.threads_lock);          } +        return (void *) 0; +} + +int ipcp_boot() +{ +        ipcp_set_state(IPCP_INIT); + +        pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL); +          return 0;  }  void ipcp_shutdown()  { -        int t; -        for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) -                pthread_join(ipcpi.threadpool[t], NULL); +        pthread_join(ipcpi.tpm, NULL); -        log_info("IPCP %d shutting down. Bye.", getpid()); +        log_info("IPCP %d shutting down.", getpid());  }  void ipcp_fini() @@ -408,18 +620,24 @@ void ipcp_fini()          if (unlink(ipcpi.sock_path))                  log_warn("Could not unlink %s.", ipcpi.sock_path); +        bmp_destroy(ipcpi.thread_ids); +          free(ipcpi.sock_path);          free(ipcpi.threadpool);          shim_data_destroy(ipcpi.shim_data);          pthread_cond_destroy(&ipcpi.state_cond); +        pthread_cond_destroy(&ipcpi.threads_cond); +        pthread_mutex_destroy(&ipcpi.threads_lock);          pthread_mutex_destroy(&ipcpi.state_mtx);          pthread_rwlock_destroy(&ipcpi.state_lock);          log_fini();          ap_fini(); + +        log_info("IPCP %d out.", getpid());  }  void ipcp_set_state(enum ipcp_state state) diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index a64ab65c..581ca5e3 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -80,7 +80,16 @@ struct ipcp {          int                sockfd;          char *             sock_path; +          pthread_t *        threadpool; + +        struct bmp *       thread_ids; +        size_t             max_threads; +        size_t             threads; +        pthread_cond_t     threads_cond; +        pthread_mutex_t    threads_lock; + +        pthread_t          tpm;  } ipcpi;  int             ipcp_init(int               argc, | 
