summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-04-01 13:45:51 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-04-01 14:28:59 +0200
commitc72634b5d921bc06d8e06afb2a60a05a1acb7ee2 (patch)
tree833a7f4133fb3f3f8d746343cff6fb9fc40f7829
parent47b6ff3333fb3fcc3f5f76459c356c79e4bb111c (diff)
downloadouroboros-c72634b5d921bc06d8e06afb2a60a05a1acb7ee2.tar.gz
ouroboros-c72634b5d921bc06d8e06afb2a60a05a1acb7ee2.zip
irmd: Add dynamic threadpool
This makes the IRMd add/remove worker threads dynamically. IRMD_TPM_TIMEOUT sets a timer in the threadpool manager for checking idle threads. Each time this timer expires, it will reduce the threadpool by one. IRMD_MIN_AV_THREADS is the minimum number of available worker threads. If the number of active threads goes under this threshold, the threadpool manager will create threads to get the number of threads to IRMD_MAX_AV_THREADS, unless IRMD_MAX_THREADS is reached.
-rw-r--r--include/ouroboros/config.h.in11
-rw-r--r--src/irmd/main.c227
2 files changed, 204 insertions, 34 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 7616961c..067a2f85 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -35,7 +35,7 @@
#define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@"
#define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@"
#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@"
-#define AP_MAX_FLOWS 256
+#define AP_MAX_FLOWS 2048
#define AP_MAX_FQUEUES 64
#define SHM_RDRB_BLOCK_SIZE sysconf(_SC_PAGESIZE)
#define SHM_RDRB_MULTI_BLOCK
@@ -47,14 +47,19 @@
#define SHM_RBUFF_PREFIX "/ouroboros.rbuff."
#define SHM_FLOW_SET_PREFIX "/ouroboros.sets."
#define IRMD_MAX_FLOWS 4096
-#define IRMD_THREADPOOL_SIZE 16
+/* IRMD dynamic threadpooling */
+#define IRMD_MIN_AV_THREADS 16
+#define IRMD_MAX_AV_THREADS 64
+#define IRMD_MAX_THREADS 256
+
#define IPCPD_THREADPOOL_SIZE 16
#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
#define PFT_SIZE 1 << 12
/* Timeout values */
+#define IRMD_TPM_TIMEOUT 1000
#define IRMD_ACCEPT_TIMEOUT 100
-#define IRMD_REQ_ARR_TIMEOUT 200
+#define IRMD_REQ_ARR_TIMEOUT 500
#define IRMD_FLOW_TIMEOUT 5000
#define IPCP_ACCEPT_TIMEOUT 100
#define SOCKET_TIMEOUT 4000
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 39f44c44..966be500 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -75,31 +75,37 @@ enum irm_state {
};
struct irm {
- struct list_head registry;
+ struct list_head registry; /* registered names known */
- struct list_head ipcps;
+ struct list_head ipcps; /* list of ipcps in system */
- struct list_head api_table;
- struct list_head apn_table;
- struct list_head spawned_apis;
- pthread_rwlock_t reg_lock;
+ struct list_head api_table; /* ap instances */
+ struct list_head apn_table; /* ap names known */
+ struct list_head spawned_apis; /* child ap instances */
+ pthread_rwlock_t reg_lock; /* lock for registration info */
- /* keep track of all flows in this processing system */
- struct bmp * port_ids;
- /* maps port_ids to api pair */
- struct list_head irm_flows;
- pthread_rwlock_t flows_lock;
+ struct bmp * port_ids; /* port_ids for flows */
+ struct list_head irm_flows; /* flow information */
+ pthread_rwlock_t flows_lock; /* lock for flows */
- struct lockfile * lf;
- struct shm_rdrbuff * rdrb;
- pthread_t * threadpool;
- int sockfd;
+ struct lockfile * lf; /* single irmd per system */
+ struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */
+ int sockfd; /* UNIX socket */
- enum irm_state state;
- pthread_rwlock_t state_lock;
+ pthread_t * threadpool; /* pool of mainloop threads */
- pthread_t irm_sanitize;
- pthread_t shm_sanitize;
+ struct bmp * thread_ids; /* ids for mainloop threads */
+ size_t max_threads; /* max threads set by tpm */
+ size_t threads; /* available mainloop threads */
+ pthread_cond_t threads_cond; /* signal thread entry/exit */
+ pthread_mutex_t threads_lock; /* mutex for threads/condvar */
+
+ enum irm_state state; /* state of the irmd */
+ pthread_rwlock_t state_lock; /* lock for the entire irmd */
+
+ pthread_t tpm; /* threadpool manager */
+ pthread_t irm_sanitize; /* clean up irmd resources */
+ pthread_t shm_sanitize; /* keep track of rdrbuff use */
} * irmd;
static void clear_irm_flow(struct irm_flow * f) {
@@ -1449,6 +1455,13 @@ static void irm_destroy(void)
if (irmd->state != IRMD_NULL)
log_warn("Unsafe destroy.");
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ if (irmd->thread_ids != NULL)
+ bmp_destroy(irmd->thread_ids);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
if (irmd->threadpool != NULL)
free(irmd->threadpool);
@@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o)
}
}
+static void thread_inc(void)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ++irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
+static void thread_dec(void)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ --irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
+static bool thread_check(void)
+{
+ int ret;
+
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ret = irmd->threads > irmd->max_threads;
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
+ return ret;
+}
+
+static void thread_exit(ssize_t id)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+ bmp_release(irmd->thread_ids, id);
+
+ --irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
void * mainloop(void * o)
{
uint8_t buf[IRM_MSG_BUF_SIZE];
- (void) o;
+ ssize_t id = (ssize_t) o;
while (true) {
#ifdef __FreeBSD__
@@ -1747,10 +1804,13 @@ void * mainloop(void * o)
(SOCKET_TIMEOUT % 1000) * 1000};
pthread_rwlock_rdlock(&irmd->state_lock);
- if (irmd->state != IRMD_RUNNING) {
+
+ if (irmd->state != IRMD_RUNNING || thread_check()) {
+ thread_exit(id);
pthread_rwlock_unlock(&irmd->state_lock);
break;
}
+
pthread_rwlock_unlock(&irmd->state_lock);
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
@@ -1760,6 +1820,7 @@ void * mainloop(void * o)
if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0)
continue;
#endif
+
cli_sockfd = accept(irmd->sockfd, 0, 0);
if (cli_sockfd < 0)
continue;
@@ -1781,6 +1842,8 @@ void * mainloop(void * o)
continue;
}
+ thread_dec();
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -1909,6 +1972,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
+ thread_inc();
continue;
}
@@ -1917,6 +1981,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
+ thread_inc();
continue;
}
@@ -1930,6 +1995,82 @@ void * mainloop(void * o)
free(buffer.data);
close(cli_sockfd);
+
+ thread_inc();
+ }
+
+ return (void *) 0;
+}
+
+static bool is_thread_alive(ssize_t id)
+{
+ bool ret;
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ret = bmp_is_id_used(irmd->thread_ids, id);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
+ return ret;
+}
+
+void * threadpoolmgr(void * o)
+{
+ struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
+ (IRMD_TPM_TIMEOUT % 1000) * MILLION};
+ struct timespec dl;
+ size_t t;
+
+ (void) o;
+
+ while (true) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
+ log_dbg("Threadpool manager exiting.");
+ for (t = 0; t < IRMD_MAX_THREADS; ++t)
+ if (is_thread_alive(t)) {
+ log_dbg("Waiting for thread %zd.", t);
+ pthread_join(irmd->threadpool[t], NULL);
+ }
+
+ log_dbg("Threadpool manager done.");
+ break;
+ }
+
+ pthread_rwlock_unlock(&irmd->state_lock);
+
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ if (irmd->threads < IRMD_MIN_AV_THREADS) {
+ log_dbg("Increasing threadpool.");
+ irmd->max_threads = IRMD_MAX_AV_THREADS;
+
+ while (irmd->threads < irmd->max_threads) {
+ ssize_t id = bmp_allocate(irmd->thread_ids);
+ if (!bmp_is_id_valid(irmd->thread_ids, id)) {
+ log_warn("IRMd threadpool exhausted.");
+ break;
+ }
+
+ if (pthread_create(&irmd->threadpool[id],
+ NULL, mainloop, (void *) id))
+ log_warn("Failed to start new thread.");
+ else
+ ++irmd->threads;
+ }
+ }
+
+ if (pthread_cond_timedwait(&irmd->threads_cond,
+ &irmd->threads_lock,
+ &dl) == ETIMEDOUT)
+ if (irmd->threads > IRMD_MIN_AV_THREADS)
+ --irmd->max_threads;
+
+ pthread_mutex_unlock(&irmd->threads_lock);
}
return (void *) 0;
@@ -1938,6 +2079,7 @@ void * mainloop(void * o)
static int irm_create(void)
{
struct stat st;
+ pthread_condattr_t cattr;
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
@@ -1967,6 +2109,27 @@ static int irm_create(void)
return -1;
}
+ if (pthread_mutex_init(&irmd->threads_lock, NULL)) {
+ log_err("Failed to initialize mutex.");
+ free(irmd);
+ return -1;
+ }
+
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Failed to initialize condattr.");
+ free(irmd);
+ return -1;
+ }
+
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&irmd->threads_cond, &cattr)) {
+ log_err("Failed to initialize cond.");
+ free(irmd);
+ return -1;
+ }
+
list_head_init(&irmd->ipcps);
list_head_init(&irmd->api_table);
list_head_init(&irmd->apn_table);
@@ -1980,7 +2143,13 @@ static int irm_create(void)
return -ENOMEM;
}
- irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0);
+ if (irmd->thread_ids == NULL) {
+ irm_destroy();
+ return -ENOMEM;
+ }
+
+ irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);
if (irmd->threadpool == NULL) {
irm_destroy();
return -ENOMEM;
@@ -2045,7 +2214,9 @@ static int irm_create(void)
return -1;
}
- irmd->state = IRMD_RUNNING;
+ irmd->threads = 0;
+ irmd->max_threads = IRMD_MIN_AV_THREADS;
+ irmd->state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2063,8 +2234,6 @@ int main(int argc,
{
struct sigaction sig_act;
- int t = 0;
-
bool use_stdout = false;
if (geteuid() != 0) {
@@ -2108,16 +2277,12 @@ int main(int argc,
exit(EXIT_FAILURE);
}
- for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL);
+ pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL);
+ pthread_join(irmd->tpm, NULL);
pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb);
- /* Wait for (all of them) to return. */
- for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_join(irmd->threadpool[t], NULL);
-
pthread_join(irmd->irm_sanitize, NULL);
pthread_cancel(irmd->shm_sanitize);