diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-02 11:45:52 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-02 11:45:52 +0200 |
commit | b707d032cecb0cd97f548b755e4ec2bda190e83c (patch) | |
tree | 0adff5f0a4016e2c0384b5cd06e3783a4eb9c995 | |
parent | 1e62996112a2a43bd0f572676bc8d87761ad6386 (diff) | |
download | ouroboros-b707d032cecb0cd97f548b755e4ec2bda190e83c.tar.gz ouroboros-b707d032cecb0cd97f548b755e4ec2bda190e83c.zip |
ipcpd: Add dynamic threadpooling for IPCPs
-rw-r--r-- | include/ouroboros/config.h.in | 6 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 270 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 9 |
3 files changed, 258 insertions, 27 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 30bab68f..b19bb17f 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -52,13 +52,17 @@ #define IRMD_MIN_AV_THREADS 16 #define IRMD_MAX_AV_THREADS 64 #define IRMD_MAX_THREADS 256 +/* IPCP dynamic threadpooling */ +#define IPCP_MIN_AV_THREADS 4 +#define IPCP_MAX_AV_THREADS 32 +#define IPCP_MAX_THREADS 64 -#define IPCPD_THREADPOOL_SIZE 16 #define IPCPD_MAX_CONNS IRMD_MAX_FLOWS #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC #define PFT_SIZE 1 << 12 /* Timeout values */ #define IRMD_TPM_TIMEOUT 1000 +#define IPCP_TPM_TIMEOUT 1000 #define IRMD_ACCEPT_TIMEOUT 100 #define IRMD_REQ_ARR_TIMEOUT 500 #define IRMD_FLOW_TIMEOUT 5000 diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 8646121a..ee5bd76e 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -29,6 +29,7 @@ #include <ouroboros/sockets.h> #include <ouroboros/errno.h> #include <ouroboros/dev.h> +#include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> #include "ipcp.h" @@ -37,6 +38,51 @@ #include <sys/socket.h> #include <stdlib.h> + +static void thread_inc(void) +{ + pthread_mutex_lock(&ipcpi.threads_lock); + + ++ipcpi.threads; + pthread_cond_signal(&ipcpi.threads_cond); + + pthread_mutex_unlock(&ipcpi.threads_lock); +} + +static void thread_dec(void) +{ + pthread_mutex_lock(&ipcpi.threads_lock); + + --ipcpi.threads; + pthread_cond_signal(&ipcpi.threads_cond); + + pthread_mutex_unlock(&ipcpi.threads_lock); +} + +static bool thread_check(void) +{ + int ret; + + pthread_mutex_lock(&ipcpi.threads_lock); + + ret = ipcpi.threads > ipcpi.max_threads; + + pthread_mutex_unlock(&ipcpi.threads_lock); + + return ret; +} + +static void thread_exit(ssize_t id) +{ + pthread_mutex_lock(&ipcpi.threads_lock); + bmp_release(ipcpi.thread_ids, id); + + --ipcpi.threads; + pthread_cond_signal(&ipcpi.threads_cond); + + pthread_mutex_unlock(&ipcpi.threads_lock); +} + static void * ipcp_main_loop(void * o) { int lsockfd; @@ -53,7 +99,7 @@ static void * ipcp_main_loop(void * o) struct timeval ltv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; - (void) o; + ssize_t id = (ssize_t) o; while (true) { #ifdef __FreeBSD__ @@ -65,8 +111,10 @@ static void * ipcp_main_loop(void * o) pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_SHUTDOWN - || ipcp_get_state() == IPCP_NULL) { + if (ipcp_get_state() == IPCP_SHUTDOWN || + ipcp_get_state() == IPCP_NULL || + thread_check()) { + thread_exit(id); pthread_rwlock_unlock(&ipcpi.state_lock); break; } @@ -101,6 +149,8 @@ static void * ipcp_main_loop(void * o) continue; } + thread_dec(); + switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: if (ipcpi.ops->ipcp_bootstrap == NULL) { @@ -245,12 +295,14 @@ static void * ipcp_main_loop(void * o) if (buffer.len == 0) { log_err("Failed to send reply message"); close(lsockfd); + thread_inc(); continue; } buffer.data = malloc(buffer.len); if (buffer.data == NULL) { close(lsockfd); + thread_inc(); continue; } @@ -259,11 +311,14 @@ static void * ipcp_main_loop(void * o) if (write(lsockfd, buffer.data, buffer.len) == -1) { free(buffer.data); close(lsockfd); + thread_inc(); continue; } free(buffer.data); close(lsockfd); + + thread_inc(); } return (void *) 0; @@ -312,12 +367,12 @@ int ipcp_init(int argc, if (type == IPCP_NORMAL) { if (ap_init(argv[0])) { - log_err("Failed to init normal IPCP."); + log_err("Failed to init normal IPCPI."); return -1; } } else { if (ap_init(NULL)) { - log_err("Failed to init shim IPCP."); + log_err("Failed to init shim IPCPI."); return -1; } } @@ -326,14 +381,19 @@ int ipcp_init(int argc, ipcpi.state = IPCP_NULL; ipcpi.shim_data = NULL; - ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE); + ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS); if (ipcpi.threadpool == NULL) { + ap_fini(); return -ENOMEM; } + ipcpi.threads = 0; + ipcpi.max_threads = IPCP_MIN_AV_THREADS; + ipcpi.sock_path = ipcp_sock_path(getpid()); if (ipcpi.sock_path == NULL) { free(ipcpi.threadpool); + ap_fini(); return -1; } @@ -342,6 +402,7 @@ int ipcp_init(int argc, log_err("Could not open server socket."); free(ipcpi.threadpool); free(ipcpi.sock_path); + ap_fini(); return -1; } @@ -351,55 +412,206 @@ int ipcp_init(int argc, ipcpi.ops = ops; - pthread_rwlock_init(&ipcpi.state_lock, NULL); - pthread_mutex_init(&ipcpi.state_mtx, NULL); - pthread_condattr_init(&cattr); + if (pthread_rwlock_init(&ipcpi.state_lock, NULL)) { + log_err("Could not create rwlock."); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + } + + if (pthread_mutex_init(&ipcpi.state_mtx, NULL)) { + log_err("Could not create mutex."); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + } + + if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) { + log_err("Could not create mutex."); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Could not create condattr."); + pthread_mutex_destroy(&ipcpi.threads_lock); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + } + ; #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - pthread_cond_init(&ipcpi.state_cond, &cattr); + if (pthread_cond_init(&ipcpi.state_cond, &cattr)) { + log_err("Could not init condvar."); + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&ipcpi.threads_lock); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + } + + if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) { + log_err("Could not init condvar."); + pthread_cond_destroy(&ipcpi.state_cond); + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&ipcpi.threads_lock); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + }; + + + pthread_condattr_destroy(&cattr); + + ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0); + if (ipcpi.thread_ids == NULL) { + log_err("Could not init condvar."); + pthread_cond_destroy(&ipcpi.threads_cond); + pthread_cond_destroy(&ipcpi.state_cond); + pthread_mutex_destroy(&ipcpi.threads_lock); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + ap_fini(); + return -1; + }; if (type == IPCP_NORMAL) return 0; ipcpi.shim_data = shim_data_create(); if (ipcpi.shim_data == NULL) { + bmp_destroy(ipcpi.thread_ids); + pthread_cond_destroy(&ipcpi.threads_cond); + pthread_cond_destroy(&ipcpi.state_cond); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); + close(ipcpi.sockfd); free(ipcpi.threadpool); free(ipcpi.sock_path); + ap_fini(); return -ENOMEM; } return 0; } -int ipcp_boot() +static bool is_thread_alive(ssize_t id) { - int t; + bool ret; + pthread_mutex_lock(&ipcpi.threads_lock); - ipcp_set_state(IPCP_INIT); + ret = bmp_is_id_used(ipcpi.thread_ids, id); - for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) { - if (pthread_create(&ipcpi.threadpool[t], NULL, - ipcp_main_loop, NULL)) { - int i; - log_err("Failed to create main thread."); - ipcp_set_state(IPCP_NULL); - for (i = 0; i < t; ++i) - pthread_join(ipcpi.threadpool[i], NULL); - return -1; + pthread_mutex_unlock(&ipcpi.threads_lock); + + return ret; +} + +void * threadpoolmgr(void * o) +{ + struct timespec to = {(IPCP_TPM_TIMEOUT / 1000), + (IPCP_TPM_TIMEOUT % 1000) * MILLION}; + struct timespec dl; + size_t t; + + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN || + ipcp_get_state() == IPCP_NULL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + log_dbg("Threadpool manager exiting."); + for (t = 0; t < IPCP_MAX_THREADS; ++t) + if (is_thread_alive(t)) { + log_dbg("Waiting for thread %zd.", t); + pthread_join(ipcpi.threadpool[t], NULL); + } + + log_dbg("Threadpool manager done."); + break; + } + + pthread_rwlock_unlock(&ipcpi.state_lock); + + pthread_mutex_lock(&ipcpi.threads_lock); + + if (ipcpi.threads < IPCP_MIN_AV_THREADS) { + log_dbg("Increasing threadpool."); + ipcpi.max_threads = IPCP_MAX_AV_THREADS; + + while (ipcpi.threads < ipcpi.max_threads) { + ssize_t id = bmp_allocate(ipcpi.thread_ids); + if (!bmp_is_id_valid(ipcpi.thread_ids, id)) { + log_warn("IPCP threadpool exhausted."); + break; + } + + if (pthread_create(&ipcpi.threadpool[id], + NULL, ipcp_main_loop, + (void *) id)) + log_warn("Failed to start new thread."); + else + ++ipcpi.threads; + } } + + if (pthread_cond_timedwait(&ipcpi.threads_cond, + &ipcpi.threads_lock, + &dl) == ETIMEDOUT) + if (ipcpi.threads > IPCP_MIN_AV_THREADS) + --ipcpi.max_threads; + + pthread_mutex_unlock(&ipcpi.threads_lock); } + return (void *) 0; +} + +int ipcp_boot() +{ + ipcp_set_state(IPCP_INIT); + + pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL); + return 0; } void ipcp_shutdown() { - int t; - for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) - pthread_join(ipcpi.threadpool[t], NULL); + pthread_join(ipcpi.tpm, NULL); - log_info("IPCP %d shutting down. Bye.", getpid()); + log_info("IPCP %d shutting down.", getpid()); } void ipcp_fini() @@ -408,18 +620,24 @@ void ipcp_fini() if (unlink(ipcpi.sock_path)) log_warn("Could not unlink %s.", ipcpi.sock_path); + bmp_destroy(ipcpi.thread_ids); + free(ipcpi.sock_path); free(ipcpi.threadpool); shim_data_destroy(ipcpi.shim_data); pthread_cond_destroy(&ipcpi.state_cond); + pthread_cond_destroy(&ipcpi.threads_cond); + pthread_mutex_destroy(&ipcpi.threads_lock); pthread_mutex_destroy(&ipcpi.state_mtx); pthread_rwlock_destroy(&ipcpi.state_lock); log_fini(); ap_fini(); + + log_info("IPCP %d out.", getpid()); } void ipcp_set_state(enum ipcp_state state) diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index a64ab65c..581ca5e3 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -80,7 +80,16 @@ struct ipcp { int sockfd; char * sock_path; + pthread_t * threadpool; + + struct bmp * thread_ids; + size_t max_threads; + size_t threads; + pthread_cond_t threads_cond; + pthread_mutex_t threads_lock; + + pthread_t tpm; } ipcpi; int ipcp_init(int argc, |