summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-04-01 12:31:05 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-04-01 12:31:05 +0000
commit2725780520d0e2c6a2c49ac8e2124b5088cbe1bb (patch)
tree833a7f4133fb3f3f8d746343cff6fb9fc40f7829 /src/irmd/main.c
parentd9f3619d791fef7d79127556304a4aa4f1cda50a (diff)
parentc72634b5d921bc06d8e06afb2a60a05a1acb7ee2 (diff)
downloadouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.tar.gz
ouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.zip
Merged in dstaesse/ouroboros/be-irmd-threadpool (pull request #448)
Be irmd threadpool
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c227
1 files changed, 196 insertions, 31 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 39f44c44..966be500 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -75,31 +75,37 @@ enum irm_state {
};
struct irm {
- struct list_head registry;
+ struct list_head registry; /* registered names known */
- struct list_head ipcps;
+ struct list_head ipcps; /* list of ipcps in system */
- struct list_head api_table;
- struct list_head apn_table;
- struct list_head spawned_apis;
- pthread_rwlock_t reg_lock;
+ struct list_head api_table; /* ap instances */
+ struct list_head apn_table; /* ap names known */
+ struct list_head spawned_apis; /* child ap instances */
+ pthread_rwlock_t reg_lock; /* lock for registration info */
- /* keep track of all flows in this processing system */
- struct bmp * port_ids;
- /* maps port_ids to api pair */
- struct list_head irm_flows;
- pthread_rwlock_t flows_lock;
+ struct bmp * port_ids; /* port_ids for flows */
+ struct list_head irm_flows; /* flow information */
+ pthread_rwlock_t flows_lock; /* lock for flows */
- struct lockfile * lf;
- struct shm_rdrbuff * rdrb;
- pthread_t * threadpool;
- int sockfd;
+ struct lockfile * lf; /* single irmd per system */
+ struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */
+ int sockfd; /* UNIX socket */
- enum irm_state state;
- pthread_rwlock_t state_lock;
+ pthread_t * threadpool; /* pool of mainloop threads */
- pthread_t irm_sanitize;
- pthread_t shm_sanitize;
+ struct bmp * thread_ids; /* ids for mainloop threads */
+ size_t max_threads; /* max threads set by tpm */
+ size_t threads; /* available mainloop threads */
+ pthread_cond_t threads_cond; /* signal thread entry/exit */
+ pthread_mutex_t threads_lock; /* mutex for threads/condvar */
+
+ enum irm_state state; /* state of the irmd */
+ pthread_rwlock_t state_lock; /* lock for the entire irmd */
+
+ pthread_t tpm; /* threadpool manager */
+ pthread_t irm_sanitize; /* clean up irmd resources */
+ pthread_t shm_sanitize; /* keep track of rdrbuff use */
} * irmd;
static void clear_irm_flow(struct irm_flow * f) {
@@ -1449,6 +1455,13 @@ static void irm_destroy(void)
if (irmd->state != IRMD_NULL)
log_warn("Unsafe destroy.");
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ if (irmd->thread_ids != NULL)
+ bmp_destroy(irmd->thread_ids);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
if (irmd->threadpool != NULL)
free(irmd->threadpool);
@@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o)
}
}
+static void thread_inc(void)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ++irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
+static void thread_dec(void)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ --irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
+static bool thread_check(void)
+{
+ int ret;
+
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ret = irmd->threads > irmd->max_threads;
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
+ return ret;
+}
+
+static void thread_exit(ssize_t id)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+ bmp_release(irmd->thread_ids, id);
+
+ --irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
void * mainloop(void * o)
{
uint8_t buf[IRM_MSG_BUF_SIZE];
- (void) o;
+ ssize_t id = (ssize_t) o;
while (true) {
#ifdef __FreeBSD__
@@ -1747,10 +1804,13 @@ void * mainloop(void * o)
(SOCKET_TIMEOUT % 1000) * 1000};
pthread_rwlock_rdlock(&irmd->state_lock);
- if (irmd->state != IRMD_RUNNING) {
+
+ if (irmd->state != IRMD_RUNNING || thread_check()) {
+ thread_exit(id);
pthread_rwlock_unlock(&irmd->state_lock);
break;
}
+
pthread_rwlock_unlock(&irmd->state_lock);
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
@@ -1760,6 +1820,7 @@ void * mainloop(void * o)
if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0)
continue;
#endif
+
cli_sockfd = accept(irmd->sockfd, 0, 0);
if (cli_sockfd < 0)
continue;
@@ -1781,6 +1842,8 @@ void * mainloop(void * o)
continue;
}
+ thread_dec();
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -1909,6 +1972,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
+ thread_inc();
continue;
}
@@ -1917,6 +1981,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
+ thread_inc();
continue;
}
@@ -1930,6 +1995,82 @@ void * mainloop(void * o)
free(buffer.data);
close(cli_sockfd);
+
+ thread_inc();
+ }
+
+ return (void *) 0;
+}
+
+static bool is_thread_alive(ssize_t id)
+{
+ bool ret;
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ret = bmp_is_id_used(irmd->thread_ids, id);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
+ return ret;
+}
+
+void * threadpoolmgr(void * o)
+{
+ struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
+ (IRMD_TPM_TIMEOUT % 1000) * MILLION};
+ struct timespec dl;
+ size_t t;
+
+ (void) o;
+
+ while (true) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
+ log_dbg("Threadpool manager exiting.");
+ for (t = 0; t < IRMD_MAX_THREADS; ++t)
+ if (is_thread_alive(t)) {
+ log_dbg("Waiting for thread %zd.", t);
+ pthread_join(irmd->threadpool[t], NULL);
+ }
+
+ log_dbg("Threadpool manager done.");
+ break;
+ }
+
+ pthread_rwlock_unlock(&irmd->state_lock);
+
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ if (irmd->threads < IRMD_MIN_AV_THREADS) {
+ log_dbg("Increasing threadpool.");
+ irmd->max_threads = IRMD_MAX_AV_THREADS;
+
+ while (irmd->threads < irmd->max_threads) {
+ ssize_t id = bmp_allocate(irmd->thread_ids);
+ if (!bmp_is_id_valid(irmd->thread_ids, id)) {
+ log_warn("IRMd threadpool exhausted.");
+ break;
+ }
+
+ if (pthread_create(&irmd->threadpool[id],
+ NULL, mainloop, (void *) id))
+ log_warn("Failed to start new thread.");
+ else
+ ++irmd->threads;
+ }
+ }
+
+ if (pthread_cond_timedwait(&irmd->threads_cond,
+ &irmd->threads_lock,
+ &dl) == ETIMEDOUT)
+ if (irmd->threads > IRMD_MIN_AV_THREADS)
+ --irmd->max_threads;
+
+ pthread_mutex_unlock(&irmd->threads_lock);
}
return (void *) 0;
@@ -1938,6 +2079,7 @@ void * mainloop(void * o)
static int irm_create(void)
{
struct stat st;
+ pthread_condattr_t cattr;
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
@@ -1967,6 +2109,27 @@ static int irm_create(void)
return -1;
}
+ if (pthread_mutex_init(&irmd->threads_lock, NULL)) {
+ log_err("Failed to initialize mutex.");
+ free(irmd);
+ return -1;
+ }
+
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Failed to initialize condattr.");
+ free(irmd);
+ return -1;
+ }
+
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&irmd->threads_cond, &cattr)) {
+ log_err("Failed to initialize cond.");
+ free(irmd);
+ return -1;
+ }
+
list_head_init(&irmd->ipcps);
list_head_init(&irmd->api_table);
list_head_init(&irmd->apn_table);
@@ -1980,7 +2143,13 @@ static int irm_create(void)
return -ENOMEM;
}
- irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0);
+ if (irmd->thread_ids == NULL) {
+ irm_destroy();
+ return -ENOMEM;
+ }
+
+ irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);
if (irmd->threadpool == NULL) {
irm_destroy();
return -ENOMEM;
@@ -2045,7 +2214,9 @@ static int irm_create(void)
return -1;
}
- irmd->state = IRMD_RUNNING;
+ irmd->threads = 0;
+ irmd->max_threads = IRMD_MIN_AV_THREADS;
+ irmd->state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2063,8 +2234,6 @@ int main(int argc,
{
struct sigaction sig_act;
- int t = 0;
-
bool use_stdout = false;
if (geteuid() != 0) {
@@ -2108,16 +2277,12 @@ int main(int argc,
exit(EXIT_FAILURE);
}
- for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL);
+ pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL);
+ pthread_join(irmd->tpm, NULL);
pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb);
- /* Wait for (all of them) to return. */
- for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_join(irmd->threadpool[t], NULL);
-
pthread_join(irmd->irm_sanitize, NULL);
pthread_cancel(irmd->shm_sanitize);