diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-01 13:45:51 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-01 14:28:59 +0200 |
commit | c72634b5d921bc06d8e06afb2a60a05a1acb7ee2 (patch) | |
tree | 833a7f4133fb3f3f8d746343cff6fb9fc40f7829 /src/irmd/main.c | |
parent | 47b6ff3333fb3fcc3f5f76459c356c79e4bb111c (diff) | |
download | ouroboros-c72634b5d921bc06d8e06afb2a60a05a1acb7ee2.tar.gz ouroboros-c72634b5d921bc06d8e06afb2a60a05a1acb7ee2.zip |
irmd: Add dynamic threadpool
This makes the IRMd add/remove worker threads dynamically.
IRMD_TPM_TIMEOUT sets a timer in the threadpool manager for checking
idle threads. Each time this timer expires, it will reduce the
threadpool by one. IRMD_MIN_AV_THREADS is the minimum number of
available worker threads. If the number of active threads goes under
this threshold, the threadpool manager will create threads to get the
number of threads to IRMD_MAX_AV_THREADS, unless IRMD_MAX_THREADS is
reached.
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r-- | src/irmd/main.c | 227 |
1 files changed, 196 insertions, 31 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state { }; struct irm { - struct list_head registry; + struct list_head registry; /* registered names known */ - struct list_head ipcps; + struct list_head ipcps; /* list of ipcps in system */ - struct list_head api_table; - struct list_head apn_table; - struct list_head spawned_apis; - pthread_rwlock_t reg_lock; + struct list_head api_table; /* ap instances */ + struct list_head apn_table; /* ap names known */ + struct list_head spawned_apis; /* child ap instances */ + pthread_rwlock_t reg_lock; /* lock for registration info */ - /* keep track of all flows in this processing system */ - struct bmp * port_ids; - /* maps port_ids to api pair */ - struct list_head irm_flows; - pthread_rwlock_t flows_lock; + struct bmp * port_ids; /* port_ids for flows */ + struct list_head irm_flows; /* flow information */ + pthread_rwlock_t flows_lock; /* lock for flows */ - struct lockfile * lf; - struct shm_rdrbuff * rdrb; - pthread_t * threadpool; - int sockfd; + struct lockfile * lf; /* single irmd per system */ + struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ - enum irm_state state; - pthread_rwlock_t state_lock; + pthread_t * threadpool; /* pool of mainloop threads */ - pthread_t irm_sanitize; - pthread_t shm_sanitize; + struct bmp * thread_ids; /* ids for mainloop threads */ + size_t max_threads; /* max threads set by tpm */ + size_t threads; /* available mainloop threads */ + pthread_cond_t threads_cond; /* signal thread entry/exit */ + pthread_mutex_t threads_lock; /* mutex for threads/condvar */ + + enum irm_state state; /* state of the irmd */ + pthread_rwlock_t state_lock; /* lock for the entire irmd */ + + pthread_t tpm; /* threadpool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ + pthread_t shm_sanitize; /* keep track of rdrbuff use */ } * irmd; static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void) if (irmd->state != IRMD_NULL) log_warn("Unsafe destroy."); + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->thread_ids != NULL) + bmp_destroy(irmd->thread_ids); + + pthread_mutex_unlock(&irmd->threads_lock); + if (irmd->threadpool != NULL) free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o) } } +static void thread_inc(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + ++irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ + int ret; + + pthread_mutex_lock(&irmd->threads_lock); + + ret = irmd->threads > irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +static void thread_exit(ssize_t id) +{ + pthread_mutex_lock(&irmd->threads_lock); + bmp_release(irmd->thread_ids, id); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + void * mainloop(void * o) { uint8_t buf[IRM_MSG_BUF_SIZE]; - (void) o; + ssize_t id = (ssize_t) o; while (true) { #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o) (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd->state_lock); - if (irmd->state != IRMD_RUNNING) { + + if (irmd->state != IRMD_RUNNING || thread_check()) { + thread_exit(id); pthread_rwlock_unlock(&irmd->state_lock); break; } + pthread_rwlock_unlock(&irmd->state_lock); ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o) if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0) continue; #endif + cli_sockfd = accept(irmd->sockfd, 0, 0); if (cli_sockfd < 0) continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o) continue; } + thread_dec(); + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1917,6 +1981,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1930,6 +1995,82 @@ void * mainloop(void * o) free(buffer.data); close(cli_sockfd); + + thread_inc(); + } + + return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ + bool ret; + pthread_mutex_lock(&irmd->threads_lock); + + ret = bmp_is_id_used(irmd->thread_ids, id); + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +void * threadpoolmgr(void * o) +{ + struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), + (IRMD_TPM_TIMEOUT % 1000) * MILLION}; + struct timespec dl; + size_t t; + + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_rwlock_rdlock(&irmd->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); + log_dbg("Threadpool manager exiting."); + for (t = 0; t < IRMD_MAX_THREADS; ++t) + if (is_thread_alive(t)) { + log_dbg("Waiting for thread %zd.", t); + pthread_join(irmd->threadpool[t], NULL); + } + + log_dbg("Threadpool manager done."); + break; + } + + pthread_rwlock_unlock(&irmd->state_lock); + + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->threads < IRMD_MIN_AV_THREADS) { + log_dbg("Increasing threadpool."); + irmd->max_threads = IRMD_MAX_AV_THREADS; + + while (irmd->threads < irmd->max_threads) { + ssize_t id = bmp_allocate(irmd->thread_ids); + if (!bmp_is_id_valid(irmd->thread_ids, id)) { + log_warn("IRMd threadpool exhausted."); + break; + } + + if (pthread_create(&irmd->threadpool[id], + NULL, mainloop, (void *) id)) + log_warn("Failed to start new thread."); + else + ++irmd->threads; + } + } + + if (pthread_cond_timedwait(&irmd->threads_cond, + &irmd->threads_lock, + &dl) == ETIMEDOUT) + if (irmd->threads > IRMD_MIN_AV_THREADS) + --irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); } return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o) static int irm_create(void) { struct stat st; + pthread_condattr_t cattr; struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void) return -1; } + if (pthread_mutex_init(&irmd->threads_lock, NULL)) { + log_err("Failed to initialize mutex."); + free(irmd); + return -1; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + free(irmd); + return -1; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd->threads_cond, &cattr)) { + log_err("Failed to initialize cond."); + free(irmd); + return -1; + } + list_head_init(&irmd->ipcps); list_head_init(&irmd->api_table); list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void) return -ENOMEM; } - irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); + if (irmd->thread_ids == NULL) { + irm_destroy(); + return -ENOMEM; + } + + irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); if (irmd->threadpool == NULL) { irm_destroy(); return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void) return -1; } - irmd->state = IRMD_RUNNING; + irmd->threads = 0; + irmd->max_threads = IRMD_MIN_AV_THREADS; + irmd->state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int argc, { struct sigaction sig_act; - int t = 0; - bool use_stdout = false; if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int argc, exit(EXIT_FAILURE); } - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); + pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); + pthread_join(irmd->tpm, NULL); pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); - /* Wait for (all of them) to return. */ - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_join(irmd->threadpool[t], NULL); - pthread_join(irmd->irm_sanitize, NULL); pthread_cancel(irmd->shm_sanitize); |