From b707d032cecb0cd97f548b755e4ec2bda190e83c Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@ugent.be>
Date: Sun, 2 Apr 2017 11:45:52 +0200
Subject: ipcpd: Add dynamic threadpooling for IPCPs

---
 src/ipcpd/ipcp.c | 270 +++++++++++++++++++++++++++++++++++++++++++++++++------
 src/ipcpd/ipcp.h |   9 ++
 2 files changed, 253 insertions(+), 26 deletions(-)

(limited to 'src')

diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 8646121a..ee5bd76e 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -29,6 +29,7 @@
 #include <ouroboros/sockets.h>
 #include <ouroboros/errno.h>
 #include <ouroboros/dev.h>
+#include <ouroboros/bitmap.h>
 #include <ouroboros/np1_flow.h>
 
 #include "ipcp.h"
@@ -37,6 +38,51 @@
 #include <sys/socket.h>
 #include <stdlib.h>
 
+
+static void thread_inc(void)
+{
+        pthread_mutex_lock(&ipcpi.threads_lock);
+
+        ++ipcpi.threads;
+        pthread_cond_signal(&ipcpi.threads_cond);
+
+        pthread_mutex_unlock(&ipcpi.threads_lock);
+}
+
+static void thread_dec(void)
+{
+        pthread_mutex_lock(&ipcpi.threads_lock);
+
+        --ipcpi.threads;
+        pthread_cond_signal(&ipcpi.threads_cond);
+
+        pthread_mutex_unlock(&ipcpi.threads_lock);
+}
+
+static bool thread_check(void)
+{
+        int ret;
+
+        pthread_mutex_lock(&ipcpi.threads_lock);
+
+        ret = ipcpi.threads > ipcpi.max_threads;
+
+        pthread_mutex_unlock(&ipcpi.threads_lock);
+
+        return ret;
+}
+
+static void thread_exit(ssize_t id)
+{
+        pthread_mutex_lock(&ipcpi.threads_lock);
+        bmp_release(ipcpi.thread_ids, id);
+
+        --ipcpi.threads;
+        pthread_cond_signal(&ipcpi.threads_cond);
+
+        pthread_mutex_unlock(&ipcpi.threads_lock);
+}
+
 static void * ipcp_main_loop(void * o)
 {
         int     lsockfd;
@@ -53,7 +99,7 @@ static void * ipcp_main_loop(void * o)
         struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
                              (SOCKET_TIMEOUT % 1000) * 1000};
 
-        (void) o;
+        ssize_t id = (ssize_t)  o;
 
         while (true) {
 #ifdef __FreeBSD__
@@ -65,8 +111,10 @@ static void * ipcp_main_loop(void * o)
 
                 pthread_rwlock_rdlock(&ipcpi.state_lock);
 
-                if (ipcp_get_state() == IPCP_SHUTDOWN
-                    || ipcp_get_state() == IPCP_NULL) {
+                if (ipcp_get_state() == IPCP_SHUTDOWN ||
+                    ipcp_get_state() == IPCP_NULL ||
+                    thread_check()) {
+                        thread_exit(id);
                         pthread_rwlock_unlock(&ipcpi.state_lock);
                         break;
                 }
@@ -101,6 +149,8 @@ static void * ipcp_main_loop(void * o)
                         continue;
                 }
 
+                thread_dec();
+
                 switch (msg->code) {
                 case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
                         if (ipcpi.ops->ipcp_bootstrap == NULL) {
@@ -245,12 +295,14 @@ static void * ipcp_main_loop(void * o)
                 if (buffer.len == 0) {
                         log_err("Failed to send reply message");
                         close(lsockfd);
+                        thread_inc();
                         continue;
                 }
 
                 buffer.data = malloc(buffer.len);
                 if (buffer.data == NULL) {
                         close(lsockfd);
+                        thread_inc();
                         continue;
                 }
 
@@ -259,11 +311,14 @@ static void * ipcp_main_loop(void * o)
                 if (write(lsockfd, buffer.data, buffer.len) == -1) {
                         free(buffer.data);
                         close(lsockfd);
+                        thread_inc();
                         continue;
                 }
 
                 free(buffer.data);
                 close(lsockfd);
+
+                thread_inc();
         }
 
         return (void *) 0;
@@ -312,12 +367,12 @@ int ipcp_init(int               argc,
 
         if (type == IPCP_NORMAL) {
                 if (ap_init(argv[0])) {
-                        log_err("Failed to init normal IPCP.");
+                        log_err("Failed to init normal IPCPI.");
                         return -1;
                 }
         } else {
                 if (ap_init(NULL)) {
-                        log_err("Failed to init shim IPCP.");
+                        log_err("Failed to init shim IPCPI.");
                         return -1;
                 }
         }
@@ -326,14 +381,19 @@ int ipcp_init(int               argc,
         ipcpi.state     = IPCP_NULL;
         ipcpi.shim_data = NULL;
 
-        ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE);
+        ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS);
         if (ipcpi.threadpool == NULL) {
+                ap_fini();
                 return -ENOMEM;
         }
 
+        ipcpi.threads = 0;
+        ipcpi.max_threads = IPCP_MIN_AV_THREADS;
+
         ipcpi.sock_path = ipcp_sock_path(getpid());
         if (ipcpi.sock_path == NULL) {
                 free(ipcpi.threadpool);
+                ap_fini();
                 return -1;
         }
 
@@ -342,6 +402,7 @@ int ipcp_init(int               argc,
                 log_err("Could not open server socket.");
                 free(ipcpi.threadpool);
                 free(ipcpi.sock_path);
+                ap_fini();
                 return -1;
         }
 
@@ -351,55 +412,206 @@ int ipcp_init(int               argc,
 
         ipcpi.ops = ops;
 
-        pthread_rwlock_init(&ipcpi.state_lock, NULL);
-        pthread_mutex_init(&ipcpi.state_mtx, NULL);
-        pthread_condattr_init(&cattr);
+        if (pthread_rwlock_init(&ipcpi.state_lock, NULL)) {
+                log_err("Could not create rwlock.");
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        }
+
+        if (pthread_mutex_init(&ipcpi.state_mtx, NULL)) {
+                log_err("Could not create mutex.");
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        }
+
+        if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) {
+                log_err("Could not create mutex.");
+                pthread_mutex_destroy(&ipcpi.state_mtx);
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        }
+
+        if (pthread_condattr_init(&cattr)) {
+                log_err("Could not create condattr.");
+                pthread_mutex_destroy(&ipcpi.threads_lock);
+                pthread_mutex_destroy(&ipcpi.state_mtx);
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        }
+                ;
 #ifndef __APPLE__
         pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
 #endif
-        pthread_cond_init(&ipcpi.state_cond, &cattr);
+        if (pthread_cond_init(&ipcpi.state_cond, &cattr)) {
+                log_err("Could not init condvar.");
+                pthread_condattr_destroy(&cattr);
+                pthread_mutex_destroy(&ipcpi.threads_lock);
+                pthread_mutex_destroy(&ipcpi.state_mtx);
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        }
+
+        if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) {
+                log_err("Could not init condvar.");
+                pthread_cond_destroy(&ipcpi.state_cond);
+                pthread_condattr_destroy(&cattr);
+                pthread_mutex_destroy(&ipcpi.threads_lock);
+                pthread_mutex_destroy(&ipcpi.state_mtx);
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        };
+
+
+        pthread_condattr_destroy(&cattr);
+
+        ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0);
+        if (ipcpi.thread_ids == NULL) {
+                log_err("Could not init condvar.");
+                pthread_cond_destroy(&ipcpi.threads_cond);
+                pthread_cond_destroy(&ipcpi.state_cond);
+                pthread_mutex_destroy(&ipcpi.threads_lock);
+                pthread_mutex_destroy(&ipcpi.state_mtx);
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
+                free(ipcpi.threadpool);
+                free(ipcpi.sock_path);
+                ap_fini();
+                return -1;
+        };
 
         if (type == IPCP_NORMAL)
                 return 0;
 
         ipcpi.shim_data = shim_data_create();
         if (ipcpi.shim_data == NULL) {
+                bmp_destroy(ipcpi.thread_ids);
+                pthread_cond_destroy(&ipcpi.threads_cond);
+                pthread_cond_destroy(&ipcpi.state_cond);
+                pthread_mutex_destroy(&ipcpi.state_mtx);
+                pthread_rwlock_destroy(&ipcpi.state_lock);
+                close(ipcpi.sockfd);
                 free(ipcpi.threadpool);
                 free(ipcpi.sock_path);
+                ap_fini();
                 return -ENOMEM;
         }
 
         return 0;
 }
 
-int ipcp_boot()
+static bool is_thread_alive(ssize_t id)
 {
-        int t;
+        bool ret;
+        pthread_mutex_lock(&ipcpi.threads_lock);
 
-        ipcp_set_state(IPCP_INIT);
+        ret = bmp_is_id_used(ipcpi.thread_ids, id);
 
-        for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) {
-                if (pthread_create(&ipcpi.threadpool[t], NULL,
-                                   ipcp_main_loop, NULL)) {
-                        int i;
-                        log_err("Failed to create main thread.");
-                        ipcp_set_state(IPCP_NULL);
-                        for (i = 0; i < t; ++i)
-                                pthread_join(ipcpi.threadpool[i], NULL);
-                        return -1;
+        pthread_mutex_unlock(&ipcpi.threads_lock);
+
+        return ret;
+}
+
+void * threadpoolmgr(void * o)
+{
+        struct timespec to = {(IPCP_TPM_TIMEOUT / 1000),
+                              (IPCP_TPM_TIMEOUT % 1000) * MILLION};
+        struct timespec dl;
+        size_t t;
+
+        (void) o;
+
+        while (true) {
+                clock_gettime(PTHREAD_COND_CLOCK, &dl);
+                ts_add(&dl, &to, &dl);
+
+                pthread_rwlock_rdlock(&ipcpi.state_lock);
+                if (ipcp_get_state() == IPCP_SHUTDOWN ||
+                    ipcp_get_state() == IPCP_NULL) {
+                        pthread_rwlock_unlock(&ipcpi.state_lock);
+                        log_dbg("Threadpool manager exiting.");
+                        for (t = 0; t < IPCP_MAX_THREADS; ++t)
+                                if (is_thread_alive(t)) {
+                                        log_dbg("Waiting for thread %zd.", t);
+                                        pthread_join(ipcpi.threadpool[t], NULL);
+                                }
+
+                        log_dbg("Threadpool manager done.");
+                        break;
+                }
+
+                pthread_rwlock_unlock(&ipcpi.state_lock);
+
+                pthread_mutex_lock(&ipcpi.threads_lock);
+
+                if (ipcpi.threads < IPCP_MIN_AV_THREADS) {
+                        log_dbg("Increasing threadpool.");
+                        ipcpi.max_threads = IPCP_MAX_AV_THREADS;
+
+                        while (ipcpi.threads < ipcpi.max_threads) {
+                                ssize_t id = bmp_allocate(ipcpi.thread_ids);
+                                if (!bmp_is_id_valid(ipcpi.thread_ids, id)) {
+                                        log_warn("IPCP threadpool exhausted.");
+                                        break;
+                                }
+
+                                if (pthread_create(&ipcpi.threadpool[id],
+                                                   NULL, ipcp_main_loop,
+                                                   (void *) id))
+                                        log_warn("Failed to start new thread.");
+                                else
+                                        ++ipcpi.threads;
+                        }
                 }
+
+                if (pthread_cond_timedwait(&ipcpi.threads_cond,
+                                           &ipcpi.threads_lock,
+                                           &dl) == ETIMEDOUT)
+                        if (ipcpi.threads > IPCP_MIN_AV_THREADS)
+                                --ipcpi.max_threads;
+
+                pthread_mutex_unlock(&ipcpi.threads_lock);
         }
 
+        return (void *) 0;
+}
+
+int ipcp_boot()
+{
+        ipcp_set_state(IPCP_INIT);
+
+        pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL);
+
         return 0;
 }
 
 void ipcp_shutdown()
 {
-        int t;
-        for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t)
-                pthread_join(ipcpi.threadpool[t], NULL);
+        pthread_join(ipcpi.tpm, NULL);
 
-        log_info("IPCP %d shutting down. Bye.", getpid());
+        log_info("IPCP %d shutting down.", getpid());
 }
 
 void ipcp_fini()
@@ -408,18 +620,24 @@ void ipcp_fini()
         if (unlink(ipcpi.sock_path))
                 log_warn("Could not unlink %s.", ipcpi.sock_path);
 
+        bmp_destroy(ipcpi.thread_ids);
+
         free(ipcpi.sock_path);
         free(ipcpi.threadpool);
 
         shim_data_destroy(ipcpi.shim_data);
 
         pthread_cond_destroy(&ipcpi.state_cond);
+        pthread_cond_destroy(&ipcpi.threads_cond);
+        pthread_mutex_destroy(&ipcpi.threads_lock);
         pthread_mutex_destroy(&ipcpi.state_mtx);
         pthread_rwlock_destroy(&ipcpi.state_lock);
 
         log_fini();
 
         ap_fini();
+
+        log_info("IPCP %d out.", getpid());
 }
 
 void ipcp_set_state(enum ipcp_state state)
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index a64ab65c..581ca5e3 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -80,7 +80,16 @@ struct ipcp {
 
         int                sockfd;
         char *             sock_path;
+
         pthread_t *        threadpool;
+
+        struct bmp *       thread_ids;
+        size_t             max_threads;
+        size_t             threads;
+        pthread_cond_t     threads_cond;
+        pthread_mutex_t    threads_lock;
+
+        pthread_t          tpm;
 } ipcpi;
 
 int             ipcp_init(int               argc,
-- 
cgit v1.2.3