diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-04-01 12:31:05 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-01 12:31:05 +0000 |
commit | 2725780520d0e2c6a2c49ac8e2124b5088cbe1bb (patch) | |
tree | 833a7f4133fb3f3f8d746343cff6fb9fc40f7829 /src/irmd/main.c | |
parent | d9f3619d791fef7d79127556304a4aa4f1cda50a (diff) | |
parent | c72634b5d921bc06d8e06afb2a60a05a1acb7ee2 (diff) | |
download | ouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.tar.gz ouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.zip |
Merged in dstaesse/ouroboros/be-irmd-threadpool (pull request #448)
Be irmd threadpool
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r-- | src/irmd/main.c | 227 |
1 files changed, 196 insertions, 31 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state { }; struct irm { - struct list_head registry; + struct list_head registry; /* registered names known */ - struct list_head ipcps; + struct list_head ipcps; /* list of ipcps in system */ - struct list_head api_table; - struct list_head apn_table; - struct list_head spawned_apis; - pthread_rwlock_t reg_lock; + struct list_head api_table; /* ap instances */ + struct list_head apn_table; /* ap names known */ + struct list_head spawned_apis; /* child ap instances */ + pthread_rwlock_t reg_lock; /* lock for registration info */ - /* keep track of all flows in this processing system */ - struct bmp * port_ids; - /* maps port_ids to api pair */ - struct list_head irm_flows; - pthread_rwlock_t flows_lock; + struct bmp * port_ids; /* port_ids for flows */ + struct list_head irm_flows; /* flow information */ + pthread_rwlock_t flows_lock; /* lock for flows */ - struct lockfile * lf; - struct shm_rdrbuff * rdrb; - pthread_t * threadpool; - int sockfd; + struct lockfile * lf; /* single irmd per system */ + struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ - enum irm_state state; - pthread_rwlock_t state_lock; + pthread_t * threadpool; /* pool of mainloop threads */ - pthread_t irm_sanitize; - pthread_t shm_sanitize; + struct bmp * thread_ids; /* ids for mainloop threads */ + size_t max_threads; /* max threads set by tpm */ + size_t threads; /* available mainloop threads */ + pthread_cond_t threads_cond; /* signal thread entry/exit */ + pthread_mutex_t threads_lock; /* mutex for threads/condvar */ + + enum irm_state state; /* state of the irmd */ + pthread_rwlock_t state_lock; /* lock for the entire irmd */ + + pthread_t tpm; /* threadpool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ + pthread_t shm_sanitize; /* keep track of rdrbuff use */ } * irmd; static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void) if (irmd->state != IRMD_NULL) log_warn("Unsafe destroy."); + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->thread_ids != NULL) + bmp_destroy(irmd->thread_ids); + + pthread_mutex_unlock(&irmd->threads_lock); + if (irmd->threadpool != NULL) free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o) } } +static void thread_inc(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + ++irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ + int ret; + + pthread_mutex_lock(&irmd->threads_lock); + + ret = irmd->threads > irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +static void thread_exit(ssize_t id) +{ + pthread_mutex_lock(&irmd->threads_lock); + bmp_release(irmd->thread_ids, id); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + void * mainloop(void * o) { uint8_t buf[IRM_MSG_BUF_SIZE]; - (void) o; + ssize_t id = (ssize_t) o; while (true) { #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o) (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd->state_lock); - if (irmd->state != IRMD_RUNNING) { + + if (irmd->state != IRMD_RUNNING || thread_check()) { + thread_exit(id); pthread_rwlock_unlock(&irmd->state_lock); break; } + pthread_rwlock_unlock(&irmd->state_lock); ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o) if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0) continue; #endif + cli_sockfd = accept(irmd->sockfd, 0, 0); if (cli_sockfd < 0) continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o) continue; } + thread_dec(); + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1917,6 +1981,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1930,6 +1995,82 @@ void * mainloop(void * o) free(buffer.data); close(cli_sockfd); + + thread_inc(); + } + + return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ + bool ret; + pthread_mutex_lock(&irmd->threads_lock); + + ret = bmp_is_id_used(irmd->thread_ids, id); + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +void * threadpoolmgr(void * o) +{ + struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), + (IRMD_TPM_TIMEOUT % 1000) * MILLION}; + struct timespec dl; + size_t t; + + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_rwlock_rdlock(&irmd->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); + log_dbg("Threadpool manager exiting."); + for (t = 0; t < IRMD_MAX_THREADS; ++t) + if (is_thread_alive(t)) { + log_dbg("Waiting for thread %zd.", t); + pthread_join(irmd->threadpool[t], NULL); + } + + log_dbg("Threadpool manager done."); + break; + } + + pthread_rwlock_unlock(&irmd->state_lock); + + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->threads < IRMD_MIN_AV_THREADS) { + log_dbg("Increasing threadpool."); + irmd->max_threads = IRMD_MAX_AV_THREADS; + + while (irmd->threads < irmd->max_threads) { + ssize_t id = bmp_allocate(irmd->thread_ids); + if (!bmp_is_id_valid(irmd->thread_ids, id)) { + log_warn("IRMd threadpool exhausted."); + break; + } + + if (pthread_create(&irmd->threadpool[id], + NULL, mainloop, (void *) id)) + log_warn("Failed to start new thread."); + else + ++irmd->threads; + } + } + + if (pthread_cond_timedwait(&irmd->threads_cond, + &irmd->threads_lock, + &dl) == ETIMEDOUT) + if (irmd->threads > IRMD_MIN_AV_THREADS) + --irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); } return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o) static int irm_create(void) { struct stat st; + pthread_condattr_t cattr; struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void) return -1; } + if (pthread_mutex_init(&irmd->threads_lock, NULL)) { + log_err("Failed to initialize mutex."); + free(irmd); + return -1; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + free(irmd); + return -1; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd->threads_cond, &cattr)) { + log_err("Failed to initialize cond."); + free(irmd); + return -1; + } + list_head_init(&irmd->ipcps); list_head_init(&irmd->api_table); list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void) return -ENOMEM; } - irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); + if (irmd->thread_ids == NULL) { + irm_destroy(); + return -ENOMEM; + } + + irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); if (irmd->threadpool == NULL) { irm_destroy(); return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void) return -1; } - irmd->state = IRMD_RUNNING; + irmd->threads = 0; + irmd->max_threads = IRMD_MIN_AV_THREADS; + irmd->state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int argc, { struct sigaction sig_act; - int t = 0; - bool use_stdout = false; if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int argc, exit(EXIT_FAILURE); } - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); + pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); + pthread_join(irmd->tpm, NULL); pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); - /* Wait for (all of them) to return. */ - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_join(irmd->threadpool[t], NULL); - pthread_join(irmd->irm_sanitize, NULL); pthread_cancel(irmd->shm_sanitize); |