summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-07-26 11:18:53 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-07-26 11:18:53 +0000
commitbddac9e135e1a412d60de39cf17249507107499d (patch)
treee6cdc6bfba21e87be04df6d6fa62490813d94ce3
parent0a36839e75c933fbc260b430e159b525d2d7df19 (diff)
parent809abada865727ea986d69afcf2a9a3b00df560a (diff)
downloadouroboros-bddac9e135e1a412d60de39cf17249507107499d.tar.gz
ouroboros-bddac9e135e1a412d60de39cf17249507107499d.zip
Merged in dstaesse/ouroboros/be-tpm (pull request #536)
lib: Add threadpool manager
-rw-r--r--include/ouroboros/config.h.in12
-rw-r--r--include/ouroboros/tpm.h47
-rw-r--r--src/ipcpd/ipcp.c181
-rw-r--r--src/ipcpd/ipcp.h9
-rw-r--r--src/irmd/main.c209
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/tpm.c266
7 files changed, 365 insertions, 360 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 1b8e0db6..736ba5b3 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -51,20 +51,16 @@
#define SHM_FLOW_SET_PREFIX "/ouroboros.sets."
#define IRMD_MAX_FLOWS 4096
/* IRMD dynamic threadpooling */
-#define IRMD_MIN_AV_THREADS 16
-#define IRMD_MAX_AV_THREADS 64
-#define IRMD_MAX_THREADS 256
+#define IRMD_MIN_THREADS 16
+#define IRMD_ADD_THREADS 32
/* IPCP dynamic threadpooling */
-#define IPCP_MIN_AV_THREADS 4
-#define IPCP_MAX_AV_THREADS 32
-#define IPCP_MAX_THREADS 64
+#define IPCP_MIN_THREADS 4
+#define IPCP_ADD_THREADS 16
#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
#define PFT_SIZE 1 << 12
/* Timeout values */
-#define IRMD_TPM_TIMEOUT 1000
-#define IPCP_TPM_TIMEOUT 1000
#define IRMD_ACCEPT_TIMEOUT 100
#define IRMD_REQ_ARR_TIMEOUT 500
#define IRMD_FLOW_TIMEOUT 5000
diff --git a/include/ouroboros/tpm.h b/include/ouroboros/tpm.h
new file mode 100644
index 00000000..d34f06f3
--- /dev/null
+++ b/include/ouroboros/tpm.h
@@ -0,0 +1,47 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Threadpool management
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#ifndef OUROBOROS_LIB_TPM_H
+#define OUROBOROS_LIB_TPM_H
+
+#include <stdbool.h>
+
+int tpm_init(size_t min,
+ size_t inc,
+ void * (* func)(void *));
+
+int tpm_start(void);
+
+void tpm_stop(void);
+
+void tpm_fini(void);
+
+bool tpm_check(void);
+
+void tpm_exit(void);
+
+void tpm_dec(void);
+
+void tpm_inc(void);
+
+#endif /* OUROBOROS_LIB_TPM_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 0d6d850f..a56e46f7 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -31,6 +31,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
+#include <ouroboros/tpm.h>
#include "ipcp.h"
@@ -56,6 +57,8 @@ void ipcp_sig_handler(int sig,
if (ipcp_get_state() == IPCP_OPERATIONAL)
ipcp_set_state(IPCP_SHUTDOWN);
}
+
+ tpm_stop();
default:
return;
}
@@ -87,51 +90,7 @@ void ipcp_hash_str(char * buf,
buf[2 * i] = '\0';
}
-static void thread_inc(void)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- ++ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static void thread_dec(void)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- --ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static bool thread_check(void)
-{
- int ret;
-
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- ret = ipcpi.threads > ipcpi.max_threads;
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-
- return ret;
-}
-
-static void thread_exit(ssize_t id)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
- bmp_release(ipcpi.thread_ids, id);
-
- --ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static void * ipcp_main_loop(void * o)
+static void * mainloop(void * o)
{
int lsockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
@@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o)
struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- ssize_t id = (ssize_t) o;
+ (void) o;
while (true) {
#ifdef __FreeBSD__
@@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o)
if (ipcp_get_state() == IPCP_SHUTDOWN ||
ipcp_get_state() == IPCP_NULL ||
- thread_check()) {
- thread_exit(id);
+ tpm_check()) {
+ tpm_exit();
break;
}
@@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o)
continue;
}
- thread_dec();
+ tpm_dec();
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
@@ -408,7 +367,7 @@ static void * ipcp_main_loop(void * o)
if (buffer.len == 0) {
log_err("Failed to pack reply message");
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -416,7 +375,7 @@ static void * ipcp_main_loop(void * o)
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -426,14 +385,14 @@ static void * ipcp_main_loop(void * o)
log_err("Failed to send reply message");
free(buffer.data);
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
free(buffer.data);
close(lsockfd);
- thread_inc();
+ tpm_inc();
}
return (void *) 0;
@@ -496,15 +455,6 @@ int ipcp_init(int argc,
ipcpi.state = IPCP_NULL;
ipcpi.shim_data = NULL;
- ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS);
- if (ipcpi.threadpool == NULL) {
- ret = -ENOMEM;
- goto fail_thr;
- }
-
- ipcpi.threads = 0;
- ipcpi.max_threads = IPCP_MIN_AV_THREADS;
-
ipcpi.sock_path = ipcp_sock_path(getpid());
if (ipcpi.sock_path == NULL)
goto fail_sock_path;
@@ -526,11 +476,6 @@ int ipcp_init(int argc,
goto fail_state_mtx;
}
- if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) {
- log_err("Could not create mutex.");
- goto fail_thread_lock;
- }
-
if (pthread_condattr_init(&cattr)) {
log_err("Could not create condattr.");
goto fail_cond_attr;
@@ -544,17 +489,6 @@ int ipcp_init(int argc,
goto fail_state_cond;
}
- if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) {
- log_err("Could not init condvar.");
- goto fail_thread_cond;
- }
-
- ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0);
- if (ipcpi.thread_ids == NULL) {
- log_err("Could not init condvar.");
- goto fail_bmp;
- }
-
if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) {
log_err("Failed to init mutex.");
goto fail_alloc_lock;
@@ -587,94 +521,21 @@ int ipcp_init(int argc,
fail_alloc_cond:
pthread_mutex_destroy(&ipcpi.alloc_lock);
fail_alloc_lock:
- bmp_destroy(ipcpi.thread_ids);
- fail_bmp:
- pthread_cond_destroy(&ipcpi.threads_cond);
- fail_thread_cond:
pthread_cond_destroy(&ipcpi.state_cond);
fail_state_cond:
pthread_condattr_destroy(&cattr);
fail_cond_attr:
- pthread_mutex_destroy(&ipcpi.threads_lock);
- fail_thread_lock:
pthread_mutex_destroy(&ipcpi.state_mtx);
fail_state_mtx:
close(ipcpi.sockfd);
fail_serv_sock:
free(ipcpi.sock_path);
fail_sock_path:
- free(ipcpi.threadpool);
- fail_thr:
ouroboros_fini();
return ret;
}
-void * threadpoolmgr(void * o)
-{
- pthread_attr_t pattr;
- struct timespec dl;
- struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
- (IRMD_TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
-
- if (pthread_attr_init(&pattr))
- return (void *) -1;
-
- pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
-
- while (true) {
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
- if (ipcp_get_state() == IPCP_SHUTDOWN ||
- ipcp_get_state() == IPCP_NULL) {
- pthread_attr_destroy(&pattr);
- log_dbg("Waiting for threads to exit.");
- pthread_mutex_lock(&ipcpi.threads_lock);
- while (ipcpi.threads > 0)
- pthread_cond_wait(&ipcpi.threads_cond,
- &ipcpi.threads_lock);
- pthread_mutex_unlock(&ipcpi.threads_lock);
-
- log_dbg("Threadpool manager done.");
- break;
- }
-
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- if (ipcpi.threads < IPCP_MIN_AV_THREADS) {
- log_dbg("Increasing threadpool.");
- ipcpi.max_threads = IPCP_MAX_AV_THREADS;
-
- while (ipcpi.threads < ipcpi.max_threads) {
- ssize_t id = bmp_allocate(ipcpi.thread_ids);
- if (!bmp_is_id_valid(ipcpi.thread_ids, id)) {
- log_warn("IPCP threadpool exhausted.");
- break;
- }
-
- if (pthread_create(&ipcpi.threadpool[id],
- &pattr, ipcp_main_loop,
- (void *) id))
- log_warn("Failed to start new thread.");
- else
- ++ipcpi.threads;
- }
- }
-
- if (pthread_cond_timedwait(&ipcpi.threads_cond,
- &ipcpi.threads_lock,
- &dl) == ETIMEDOUT)
- if (ipcpi.threads > IPCP_MIN_AV_THREADS)
- --ipcpi.max_threads;
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
- }
-
- return (void *) 0;
-}
-
int ipcp_boot()
{
struct sigaction sig_act;
@@ -699,9 +560,15 @@ int ipcp_boot()
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- ipcp_set_state(IPCP_INIT);
+ if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
+ return -1;
+
+ if (tpm_start()) {
+ tpm_fini();
+ return -1;
+ }
- pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL);
+ ipcp_set_state(IPCP_INIT);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
@@ -710,8 +577,7 @@ int ipcp_boot()
void ipcp_shutdown()
{
- pthread_join(ipcpi.tpm, NULL);
-
+ tpm_fini();
log_info("IPCP %d shutting down.", getpid());
}
@@ -721,16 +587,11 @@ void ipcp_fini()
if (unlink(ipcpi.sock_path))
log_warn("Could not unlink %s.", ipcpi.sock_path);
- bmp_destroy(ipcpi.thread_ids);
-
free(ipcpi.sock_path);
- free(ipcpi.threadpool);
shim_data_destroy(ipcpi.shim_data);
pthread_cond_destroy(&ipcpi.state_cond);
- pthread_cond_destroy(&ipcpi.threads_cond);
- pthread_mutex_destroy(&ipcpi.threads_lock);
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 3f5e1bd6..fb69df5c 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -93,15 +93,6 @@ struct ipcp {
pthread_cond_t alloc_cond;
pthread_mutex_t alloc_lock;
- pthread_t * threadpool;
-
- struct bmp * thread_ids;
- size_t max_threads;
- size_t threads;
- pthread_cond_t threads_cond;
- pthread_mutex_t threads_lock;
-
- pthread_t tpm;
} ipcpi;
int ipcp_init(int argc,
diff --git a/src/irmd/main.c b/src/irmd/main.c
index b72893ba..8b22bdef 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -36,6 +36,7 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/qos.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
#include <ouroboros/logs.h>
#include "utils.h"
@@ -99,18 +100,9 @@ struct irm {
struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */
int sockfd; /* UNIX socket */
- pthread_t * threadpool; /* pool of mainloop threads */
-
- struct bmp * thread_ids; /* ids for mainloop threads */
- size_t max_threads; /* max threads set by tpm */
- size_t threads; /* available mainloop threads */
- pthread_cond_t threads_cond; /* signal thread entry/exit */
- pthread_mutex_t threads_lock; /* mutex for threads/condvar */
-
enum irm_state state; /* state of the irmd */
pthread_rwlock_t state_lock; /* lock for the entire irmd */
- pthread_t tpm; /* threadpool manager */
pthread_t irm_sanitize; /* clean up irmd resources */
pthread_t shm_sanitize; /* keep track of rdrbuff use */
} irmd;
@@ -478,7 +470,7 @@ static int enroll_ipcp(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
if (ipcp_enroll(api, dst_name, &info) < 0) {
- log_err("Could not enroll IPCP.");
+ log_err("Could not enroll IPCP %d.", api);
return -1;
}
@@ -1426,16 +1418,6 @@ static void irm_fini(void)
if (irmd_get_state() != IRMD_NULL)
log_warn("Unsafe destroy.");
- pthread_mutex_lock(&irmd.threads_lock);
-
- if (irmd.thread_ids != NULL)
- bmp_destroy(irmd.thread_ids);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-
- if (irmd.threadpool != NULL)
- free(irmd.threadpool);
-
pthread_rwlock_wrlock(&irmd.flows_lock);
if (irmd.port_ids != NULL)
@@ -1509,8 +1491,8 @@ void irmd_sig_handler(int sig,
}
log_info("IRMd shutting down...");
-
irmd_set_state(IRMD_NULL);
+ tpm_stop();
break;
case SIGPIPE:
log_dbg("Ignored SIGPIPE.");
@@ -1692,55 +1674,11 @@ void * irm_sanitize(void * o)
}
}
-static void thread_inc(void)
-{
- pthread_mutex_lock(&irmd.threads_lock);
-
- ++irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
-static void thread_dec(void)
-{
- pthread_mutex_lock(&irmd.threads_lock);
-
- --irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
-static bool thread_check(void)
-{
- int ret;
-
- pthread_mutex_lock(&irmd.threads_lock);
-
- ret = irmd.threads > irmd.max_threads;
-
- pthread_mutex_unlock(&irmd.threads_lock);
-
- return ret;
-}
-
-static void thread_exit(ssize_t id)
-{
- pthread_mutex_lock(&irmd.threads_lock);
- bmp_release(irmd.thread_ids, id);
-
- --irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
void * mainloop(void * o)
{
uint8_t buf[IRM_MSG_BUF_SIZE];
- ssize_t id = (ssize_t) o;
+ (void) o;
while (true) {
#ifdef __FreeBSD__
@@ -1760,8 +1698,8 @@ void * mainloop(void * o)
struct timeval tv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- if (irmd_get_state() != IRMD_RUNNING || thread_check()) {
- thread_exit(id);
+ if (irmd_get_state() != IRMD_RUNNING || tpm_check()) {
+ tpm_exit();
break;
}
@@ -1772,7 +1710,6 @@ void * mainloop(void * o)
if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)
continue;
#endif
-
cli_sockfd = accept(irmd.sockfd, 0, 0);
if (cli_sockfd < 0)
continue;
@@ -1790,7 +1727,7 @@ void * mainloop(void * o)
if (irmd_get_state() != IRMD_RUNNING) {
close(cli_sockfd);
- thread_exit(id);
+ tpm_exit();
break;
}
@@ -1800,7 +1737,7 @@ void * mainloop(void * o)
continue;
}
- thread_dec();
+ tpm_dec();
if (msg->has_timeo_sec) {
assert(msg->has_timeo_nsec);
@@ -1929,7 +1866,7 @@ void * mainloop(void * o)
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1939,7 +1876,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1948,7 +1885,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1963,70 +1900,7 @@ void * mainloop(void * o)
free(buffer.data);
close(cli_sockfd);
- thread_inc();
- }
-
- return (void *) 0;
-}
-
-void * threadpoolmgr(void * o)
-{
- pthread_attr_t pattr;
- struct timespec dl;
- struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
- (IRMD_TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
-
- if (pthread_attr_init(&pattr))
- return (void *) -1;
-
- pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
-
- while (true) {
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
- if (irmd_get_state() != IRMD_RUNNING) {
- pthread_attr_destroy(&pattr);
- log_dbg("Waiting for threads to exit.");
- pthread_mutex_lock(&irmd.threads_lock);
- while (irmd.threads > 0)
- pthread_cond_wait(&irmd.threads_cond,
- &irmd.threads_lock);
- pthread_mutex_unlock(&irmd.threads_lock);
- log_dbg("Threadpool manager done.");
- break;
- }
-
- pthread_mutex_lock(&irmd.threads_lock);
-
- if (irmd.threads < IRMD_MIN_AV_THREADS) {
- log_dbg("Increasing threadpool.");
- irmd.max_threads = IRMD_MAX_AV_THREADS;
-
- while (irmd.threads < irmd.max_threads) {
- ssize_t id = bmp_allocate(irmd.thread_ids);
- if (!bmp_is_id_valid(irmd.thread_ids, id)) {
- log_warn("IRMd threadpool exhausted.");
- break;
- }
-
- if (pthread_create(&irmd.threadpool[id],
- &pattr, mainloop,
- (void *) id))
- log_warn("Failed to start new thread.");
- else
- ++irmd.threads;
- }
- }
-
- if (pthread_cond_timedwait(&irmd.threads_cond,
- &irmd.threads_lock,
- &dl) == ETIMEDOUT)
- if (irmd.threads > IRMD_MIN_AV_THREADS )
- --irmd.max_threads;
-
- pthread_mutex_unlock(&irmd.threads_lock);
+ tpm_inc();
}
return (void *) 0;
@@ -2035,7 +1909,6 @@ void * threadpoolmgr(void * o)
static int irm_init(void)
{
struct stat st;
- pthread_condattr_t cattr;
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
@@ -2058,24 +1931,6 @@ static int irm_init(void)
goto fail_flows_lock;
}
- if (pthread_mutex_init(&irmd.threads_lock, NULL)) {
- log_err("Failed to initialize mutex.");
- goto fail_threads_lock;
- }
-
- if (pthread_condattr_init(&cattr)) {
- log_err("Failed to initialize condattr.");
- goto fail_cattr;
- }
-
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&irmd.threads_cond, &cattr)) {
- log_err("Failed to initialize cond.");
- goto fail_threads_cond;
- }
-
list_head_init(&irmd.ipcps);
list_head_init(&irmd.api_table);
list_head_init(&irmd.apn_table);
@@ -2089,18 +1944,6 @@ static int irm_init(void)
goto fail_port_ids;
}
- irmd.thread_ids = bmp_create(IRMD_MAX_THREADS, 0);
- if (irmd.thread_ids == NULL) {
- log_err("Failed to thread thread_ids bitmap.");
- goto fail_thread_ids;
- }
-
- irmd.threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);
- if (irmd.threadpool == NULL) {
- log_err("Failed to malloc threadpool");
- goto fail_thrpool;
- }
-
if ((irmd.lf = lockfile_create()) == NULL) {
if ((irmd.lf = lockfile_open()) == NULL) {
log_err("Lockfile error.");
@@ -2155,8 +1998,6 @@ static int irm_init(void)
goto fail_rdrbuff;
}
- irmd.threads = 0;
- irmd.max_threads = IRMD_MIN_AV_THREADS;
irmd.state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2172,18 +2013,8 @@ fail_sock_path:
fail_stat:
lockfile_destroy(irmd.lf);
fail_lockfile:
- free(irmd.threadpool);
-fail_thrpool:
- bmp_destroy(irmd.thread_ids);
-fail_thread_ids:
bmp_destroy(irmd.port_ids);
fail_port_ids:
- pthread_cond_destroy(&irmd.threads_cond);
-fail_threads_cond:
- pthread_condattr_destroy(&cattr);
-fail_cattr:
- pthread_mutex_destroy(&irmd.threads_lock);
-fail_threads_lock:
pthread_rwlock_destroy(&irmd.flows_lock);
fail_flows_lock:
pthread_rwlock_destroy(&irmd.reg_lock);
@@ -2253,12 +2084,24 @@ int main(int argc,
exit(EXIT_FAILURE);
}
- pthread_create(&irmd.tpm, NULL, threadpoolmgr, NULL);
+ if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) {
+ log_fini();
+ exit(EXIT_FAILURE);
+ }
+
+ if (tpm_start()) {
+ tpm_fini();
+ log_fini();
+ exit(EXIT_FAILURE);
+ }
pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb);
- pthread_join(irmd.tpm, NULL);
+ /* tpm_stop() called from sighandler */
+
+ tpm_fini();
+
pthread_join(irmd.irm_sanitize, NULL);
pthread_join(irmd.shm_sanitize, NULL);
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index e08869b8..822c8c9b 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -59,6 +59,7 @@ set(SOURCE_FILES
shm_rdrbuff.c
sockets.c
time_utils.c
+ tpm.c
utils.c
)
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
new file mode 100644
index 00000000..8298eeb5
--- /dev/null
+++ b/src/lib/tpm.c
@@ -0,0 +1,266 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Threadpool management
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/list.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
+
+#include <pthread.h>
+#include <stdlib.h>
+
+#define TPM_TIMEOUT 1000
+
+struct pthr_el {
+ struct list_head next;
+
+ bool join;
+
+ pthread_t thr;
+};
+
+enum tpm_state {
+ TPM_NULL = 0,
+ TPM_INIT,
+ TPM_RUNNING
+};
+
+struct {
+ size_t min;
+ size_t inc;
+ size_t max;
+ size_t cur;
+
+ void * (* func)(void *);
+
+ struct list_head pool;
+
+ enum tpm_state state;
+
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+
+ pthread_t mgr;
+} tpm;
+
+static void tpm_join(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, &tpm.pool) {
+ struct pthr_el * e = list_entry(p, struct pthr_el, next);
+ if (tpm.state != TPM_RUNNING)
+ while (!e->join)
+ pthread_cond_wait(&tpm.cond, &tpm.lock);
+
+ if (e->join) {
+ pthread_join(e->thr, NULL);
+ list_del(&e->next);
+ free(e);
+ }
+ }
+}
+
+static void * tpmgr(void * o)
+{
+ struct timespec dl;
+ struct timespec to = {(TPM_TIMEOUT / 1000),
+ (TPM_TIMEOUT % 1000) * MILLION};
+ (void) o;
+
+ while (true) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_mutex_lock(&tpm.lock);
+
+ tpm_join();
+
+ if (tpm.state != TPM_RUNNING) {
+ tpm.max = 0;
+ tpm_join();
+ pthread_mutex_unlock(&tpm.lock);
+ break;
+ }
+
+ if (tpm.cur < tpm.min) {
+ tpm.max = tpm.inc;
+
+ while (tpm.cur < tpm.max) {
+ struct pthr_el * e = malloc(sizeof(*e));
+ if (e == NULL)
+ break;
+
+ e->join = false;
+
+ if (pthread_create(&e->thr, NULL,
+ tpm.func, NULL)) {
+ free(e);
+ } else {
+ list_add(&e->next, &tpm.pool);
+ ++tpm.cur;
+ }
+ }
+ }
+
+ if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)
+ == ETIMEDOUT)
+ if (tpm.cur > tpm.min )
+ --tpm.max;
+
+ pthread_mutex_unlock(&tpm.lock);
+ }
+
+ return (void *) 0;
+}
+
+int tpm_init(size_t min,
+ size_t inc,
+ void * (* func)(void *))
+{
+ pthread_condattr_t cattr;
+
+ if (pthread_mutex_init(&tpm.lock, NULL))
+ goto fail_lock;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&tpm.cond, &cattr))
+ goto fail_cond;
+
+ list_head_init(&tpm.pool);
+
+ pthread_condattr_destroy(&cattr);
+
+ tpm.state = TPM_INIT;
+ tpm.func = func;
+ tpm.min = min;
+ tpm.inc = inc;
+ tpm.max = 0;
+ tpm.cur = 0;
+
+ return 0;
+
+ fail_cond:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&tpm.lock);
+ fail_lock:
+ return -1;
+}
+
+int tpm_start(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) {
+ pthread_mutex_unlock(&tpm.lock);
+ return -1;
+ }
+
+ tpm.state = TPM_RUNNING;
+
+ pthread_mutex_unlock(&tpm.lock);
+
+ return 0;
+}
+
+void tpm_stop(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ tpm.state = TPM_NULL;
+
+ pthread_mutex_unlock(&tpm.lock);
+}
+
+void tpm_fini(void)
+{
+ pthread_join(tpm.mgr, NULL);
+
+ pthread_mutex_destroy(&tpm.lock);
+ pthread_cond_destroy(&tpm.cond);
+}
+
+bool tpm_check(void)
+{
+ bool ret;
+
+ pthread_mutex_lock(&tpm.lock);
+
+ ret = tpm.cur > tpm.max;
+
+ pthread_mutex_unlock(&tpm.lock);
+
+ return ret;
+}
+
+void tpm_inc(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ ++tpm.cur;
+
+ pthread_mutex_unlock(&tpm.lock);
+}
+
+void tpm_dec(void)
+{
+ pthread_mutex_lock(&tpm.lock);
+
+ --tpm.cur;
+
+ pthread_cond_signal(&tpm.cond);
+
+ pthread_mutex_unlock(&tpm.lock);
+}
+
+void tpm_exit(void)
+{
+ struct list_head * p;
+ pthread_t id;
+
+ id = pthread_self();
+
+ pthread_mutex_lock(&tpm.lock);
+
+ --tpm.cur;
+
+ list_for_each(p, &tpm.pool) {
+ struct pthr_el * e = list_entry(p, struct pthr_el, next);
+ if (e->thr == id) {
+ e->join = true;
+ break;
+ }
+ }
+
+ pthread_cond_signal(&tpm.cond);
+
+ pthread_mutex_unlock(&tpm.lock);
+}