summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-04-02 11:45:52 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-04-02 11:45:52 +0200
commitb707d032cecb0cd97f548b755e4ec2bda190e83c (patch)
tree0adff5f0a4016e2c0384b5cd06e3783a4eb9c995
parent1e62996112a2a43bd0f572676bc8d87761ad6386 (diff)
downloadouroboros-b707d032cecb0cd97f548b755e4ec2bda190e83c.tar.gz
ouroboros-b707d032cecb0cd97f548b755e4ec2bda190e83c.zip
ipcpd: Add dynamic threadpooling for IPCPs
-rw-r--r--include/ouroboros/config.h.in6
-rw-r--r--src/ipcpd/ipcp.c270
-rw-r--r--src/ipcpd/ipcp.h9
3 files changed, 258 insertions, 27 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 30bab68f..b19bb17f 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -52,13 +52,17 @@
#define IRMD_MIN_AV_THREADS 16
#define IRMD_MAX_AV_THREADS 64
#define IRMD_MAX_THREADS 256
+/* IPCP dynamic threadpooling */
+#define IPCP_MIN_AV_THREADS 4
+#define IPCP_MAX_AV_THREADS 32
+#define IPCP_MAX_THREADS 64
-#define IPCPD_THREADPOOL_SIZE 16
#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
#define PFT_SIZE 1 << 12
/* Timeout values */
#define IRMD_TPM_TIMEOUT 1000
+#define IPCP_TPM_TIMEOUT 1000
#define IRMD_ACCEPT_TIMEOUT 100
#define IRMD_REQ_ARR_TIMEOUT 500
#define IRMD_FLOW_TIMEOUT 5000
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 8646121a..ee5bd76e 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -29,6 +29,7 @@
#include <ouroboros/sockets.h>
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
+#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
#include "ipcp.h"
@@ -37,6 +38,51 @@
#include <sys/socket.h>
#include <stdlib.h>
+
+static void thread_inc(void)
+{
+ pthread_mutex_lock(&ipcpi.threads_lock);
+
+ ++ipcpi.threads;
+ pthread_cond_signal(&ipcpi.threads_cond);
+
+ pthread_mutex_unlock(&ipcpi.threads_lock);
+}
+
+static void thread_dec(void)
+{
+ pthread_mutex_lock(&ipcpi.threads_lock);
+
+ --ipcpi.threads;
+ pthread_cond_signal(&ipcpi.threads_cond);
+
+ pthread_mutex_unlock(&ipcpi.threads_lock);
+}
+
+static bool thread_check(void)
+{
+ int ret;
+
+ pthread_mutex_lock(&ipcpi.threads_lock);
+
+ ret = ipcpi.threads > ipcpi.max_threads;
+
+ pthread_mutex_unlock(&ipcpi.threads_lock);
+
+ return ret;
+}
+
+static void thread_exit(ssize_t id)
+{
+ pthread_mutex_lock(&ipcpi.threads_lock);
+ bmp_release(ipcpi.thread_ids, id);
+
+ --ipcpi.threads;
+ pthread_cond_signal(&ipcpi.threads_cond);
+
+ pthread_mutex_unlock(&ipcpi.threads_lock);
+}
+
static void * ipcp_main_loop(void * o)
{
int lsockfd;
@@ -53,7 +99,7 @@ static void * ipcp_main_loop(void * o)
struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- (void) o;
+ ssize_t id = (ssize_t) o;
while (true) {
#ifdef __FreeBSD__
@@ -65,8 +111,10 @@ static void * ipcp_main_loop(void * o)
pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_SHUTDOWN
- || ipcp_get_state() == IPCP_NULL) {
+ if (ipcp_get_state() == IPCP_SHUTDOWN ||
+ ipcp_get_state() == IPCP_NULL ||
+ thread_check()) {
+ thread_exit(id);
pthread_rwlock_unlock(&ipcpi.state_lock);
break;
}
@@ -101,6 +149,8 @@ static void * ipcp_main_loop(void * o)
continue;
}
+ thread_dec();
+
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
if (ipcpi.ops->ipcp_bootstrap == NULL) {
@@ -245,12 +295,14 @@ static void * ipcp_main_loop(void * o)
if (buffer.len == 0) {
log_err("Failed to send reply message");
close(lsockfd);
+ thread_inc();
continue;
}
buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
close(lsockfd);
+ thread_inc();
continue;
}
@@ -259,11 +311,14 @@ static void * ipcp_main_loop(void * o)
if (write(lsockfd, buffer.data, buffer.len) == -1) {
free(buffer.data);
close(lsockfd);
+ thread_inc();
continue;
}
free(buffer.data);
close(lsockfd);
+
+ thread_inc();
}
return (void *) 0;
@@ -312,12 +367,12 @@ int ipcp_init(int argc,
if (type == IPCP_NORMAL) {
if (ap_init(argv[0])) {
- log_err("Failed to init normal IPCP.");
+ log_err("Failed to init normal IPCPI.");
return -1;
}
} else {
if (ap_init(NULL)) {
- log_err("Failed to init shim IPCP.");
+ log_err("Failed to init shim IPCPI.");
return -1;
}
}
@@ -326,14 +381,19 @@ int ipcp_init(int argc,
ipcpi.state = IPCP_NULL;
ipcpi.shim_data = NULL;
- ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE);
+ ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS);
if (ipcpi.threadpool == NULL) {
+ ap_fini();
return -ENOMEM;
}
+ ipcpi.threads = 0;
+ ipcpi.max_threads = IPCP_MIN_AV_THREADS;
+
ipcpi.sock_path = ipcp_sock_path(getpid());
if (ipcpi.sock_path == NULL) {
free(ipcpi.threadpool);
+ ap_fini();
return -1;
}
@@ -342,6 +402,7 @@ int ipcp_init(int argc,
log_err("Could not open server socket.");
free(ipcpi.threadpool);
free(ipcpi.sock_path);
+ ap_fini();
return -1;
}
@@ -351,55 +412,206 @@ int ipcp_init(int argc,
ipcpi.ops = ops;
- pthread_rwlock_init(&ipcpi.state_lock, NULL);
- pthread_mutex_init(&ipcpi.state_mtx, NULL);
- pthread_condattr_init(&cattr);
+ if (pthread_rwlock_init(&ipcpi.state_lock, NULL)) {
+ log_err("Could not create rwlock.");
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ }
+
+ if (pthread_mutex_init(&ipcpi.state_mtx, NULL)) {
+ log_err("Could not create mutex.");
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ }
+
+ if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) {
+ log_err("Could not create mutex.");
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ }
+
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Could not create condattr.");
+ pthread_mutex_destroy(&ipcpi.threads_lock);
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ }
+ ;
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- pthread_cond_init(&ipcpi.state_cond, &cattr);
+ if (pthread_cond_init(&ipcpi.state_cond, &cattr)) {
+ log_err("Could not init condvar.");
+ pthread_condattr_destroy(&cattr);
+ pthread_mutex_destroy(&ipcpi.threads_lock);
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ }
+
+ if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) {
+ log_err("Could not init condvar.");
+ pthread_cond_destroy(&ipcpi.state_cond);
+ pthread_condattr_destroy(&cattr);
+ pthread_mutex_destroy(&ipcpi.threads_lock);
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ };
+
+
+ pthread_condattr_destroy(&cattr);
+
+ ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0);
+ if (ipcpi.thread_ids == NULL) {
+ log_err("Could not init condvar.");
+ pthread_cond_destroy(&ipcpi.threads_cond);
+ pthread_cond_destroy(&ipcpi.state_cond);
+ pthread_mutex_destroy(&ipcpi.threads_lock);
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ ap_fini();
+ return -1;
+ };
if (type == IPCP_NORMAL)
return 0;
ipcpi.shim_data = shim_data_create();
if (ipcpi.shim_data == NULL) {
+ bmp_destroy(ipcpi.thread_ids);
+ pthread_cond_destroy(&ipcpi.threads_cond);
+ pthread_cond_destroy(&ipcpi.state_cond);
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+ close(ipcpi.sockfd);
free(ipcpi.threadpool);
free(ipcpi.sock_path);
+ ap_fini();
return -ENOMEM;
}
return 0;
}
-int ipcp_boot()
+static bool is_thread_alive(ssize_t id)
{
- int t;
+ bool ret;
+ pthread_mutex_lock(&ipcpi.threads_lock);
- ipcp_set_state(IPCP_INIT);
+ ret = bmp_is_id_used(ipcpi.thread_ids, id);
- for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) {
- if (pthread_create(&ipcpi.threadpool[t], NULL,
- ipcp_main_loop, NULL)) {
- int i;
- log_err("Failed to create main thread.");
- ipcp_set_state(IPCP_NULL);
- for (i = 0; i < t; ++i)
- pthread_join(ipcpi.threadpool[i], NULL);
- return -1;
+ pthread_mutex_unlock(&ipcpi.threads_lock);
+
+ return ret;
+}
+
+void * threadpoolmgr(void * o)
+{
+ struct timespec to = {(IPCP_TPM_TIMEOUT / 1000),
+ (IPCP_TPM_TIMEOUT % 1000) * MILLION};
+ struct timespec dl;
+ size_t t;
+
+ (void) o;
+
+ while (true) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ if (ipcp_get_state() == IPCP_SHUTDOWN ||
+ ipcp_get_state() == IPCP_NULL) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ log_dbg("Threadpool manager exiting.");
+ for (t = 0; t < IPCP_MAX_THREADS; ++t)
+ if (is_thread_alive(t)) {
+ log_dbg("Waiting for thread %zd.", t);
+ pthread_join(ipcpi.threadpool[t], NULL);
+ }
+
+ log_dbg("Threadpool manager done.");
+ break;
+ }
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ pthread_mutex_lock(&ipcpi.threads_lock);
+
+ if (ipcpi.threads < IPCP_MIN_AV_THREADS) {
+ log_dbg("Increasing threadpool.");
+ ipcpi.max_threads = IPCP_MAX_AV_THREADS;
+
+ while (ipcpi.threads < ipcpi.max_threads) {
+ ssize_t id = bmp_allocate(ipcpi.thread_ids);
+ if (!bmp_is_id_valid(ipcpi.thread_ids, id)) {
+ log_warn("IPCP threadpool exhausted.");
+ break;
+ }
+
+ if (pthread_create(&ipcpi.threadpool[id],
+ NULL, ipcp_main_loop,
+ (void *) id))
+ log_warn("Failed to start new thread.");
+ else
+ ++ipcpi.threads;
+ }
}
+
+ if (pthread_cond_timedwait(&ipcpi.threads_cond,
+ &ipcpi.threads_lock,
+ &dl) == ETIMEDOUT)
+ if (ipcpi.threads > IPCP_MIN_AV_THREADS)
+ --ipcpi.max_threads;
+
+ pthread_mutex_unlock(&ipcpi.threads_lock);
}
+ return (void *) 0;
+}
+
+int ipcp_boot()
+{
+ ipcp_set_state(IPCP_INIT);
+
+ pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL);
+
return 0;
}
void ipcp_shutdown()
{
- int t;
- for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t)
- pthread_join(ipcpi.threadpool[t], NULL);
+ pthread_join(ipcpi.tpm, NULL);
- log_info("IPCP %d shutting down. Bye.", getpid());
+ log_info("IPCP %d shutting down.", getpid());
}
void ipcp_fini()
@@ -408,18 +620,24 @@ void ipcp_fini()
if (unlink(ipcpi.sock_path))
log_warn("Could not unlink %s.", ipcpi.sock_path);
+ bmp_destroy(ipcpi.thread_ids);
+
free(ipcpi.sock_path);
free(ipcpi.threadpool);
shim_data_destroy(ipcpi.shim_data);
pthread_cond_destroy(&ipcpi.state_cond);
+ pthread_cond_destroy(&ipcpi.threads_cond);
+ pthread_mutex_destroy(&ipcpi.threads_lock);
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_rwlock_destroy(&ipcpi.state_lock);
log_fini();
ap_fini();
+
+ log_info("IPCP %d out.", getpid());
}
void ipcp_set_state(enum ipcp_state state)
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index a64ab65c..581ca5e3 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -80,7 +80,16 @@ struct ipcp {
int sockfd;
char * sock_path;
+
pthread_t * threadpool;
+
+ struct bmp * thread_ids;
+ size_t max_threads;
+ size_t threads;
+ pthread_cond_t threads_cond;
+ pthread_mutex_t threads_lock;
+
+ pthread_t tpm;
} ipcpi;
int ipcp_init(int argc,