summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-08-12 22:29:04 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-08-12 23:01:11 +0200
commit7729888c3fe454733759903a56c0e3e82ac3f31b (patch)
tree1fdde62b7a8b16aead8bcfa39d0edd2cc8bfd63a /src/lib
parent2b42b1e1121dfd715a78502a3652d326330b8160 (diff)
downloadouroboros-7729888c3fe454733759903a56c0e3e82ac3f31b.tar.gz
ouroboros-7729888c3fe454733759903a56c0e3e82ac3f31b.zip
lib: Fix instability in threadpool manager
The threadpool manager now tracks threads to prevent cyclic behaviour where too many threads shut down and the TPM responds with creating additional threads.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/tpm.c88
1 files changed, 57 insertions, 31 deletions
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
index 8298eeb5..f45744ee 100644
--- a/src/lib/tpm.c
+++ b/src/lib/tpm.c
@@ -29,6 +29,7 @@
#include <pthread.h>
#include <stdlib.h>
+#include <assert.h>
#define TPM_TIMEOUT 1000
@@ -36,6 +37,7 @@ struct pthr_el {
struct list_head next;
bool join;
+ bool kill;
pthread_t thr;
};
@@ -49,8 +51,8 @@ enum tpm_state {
struct {
size_t min;
size_t inc;
- size_t max;
size_t cur;
+ size_t wrk;
void * (* func)(void *);
@@ -71,9 +73,14 @@ static void tpm_join(void)
list_for_each_safe(p, h, &tpm.pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (tpm.state != TPM_RUNNING)
+ if (tpm.state != TPM_RUNNING) {
+ if (!e->kill) {
+ e->kill = true;
+ --tpm.cur;
+ }
while (!e->join)
pthread_cond_wait(&tpm.cond, &tpm.lock);
+ }
if (e->join) {
pthread_join(e->thr, NULL);
@@ -83,6 +90,37 @@ static void tpm_join(void)
}
}
+static struct pthr_el * tpm_pthr_el(pthread_t thr)
+{
+ struct list_head * p;
+ struct pthr_el * e;
+
+ list_for_each(p, &tpm.pool) {
+ e = list_entry(p, struct pthr_el, next);
+ if (e->thr == thr)
+ return e;
+
+ }
+
+ assert(false);
+
+ return NULL;
+}
+
+static void tpm_kill(void)
+{
+ struct list_head * p;
+
+ list_for_each(p, &tpm.pool) {
+ struct pthr_el * e = list_entry(p, struct pthr_el, next);
+ if (!e->kill) {
+ e->kill = true;
+ --tpm.cur;
+ return;
+ }
+ }
+}
+
static void * tpmgr(void * o)
{
struct timespec dl;
@@ -96,39 +134,40 @@ static void * tpmgr(void * o)
pthread_mutex_lock(&tpm.lock);
- tpm_join();
-
if (tpm.state != TPM_RUNNING) {
- tpm.max = 0;
tpm_join();
pthread_mutex_unlock(&tpm.lock);
break;
}
- if (tpm.cur < tpm.min) {
- tpm.max = tpm.inc;
+ tpm_join();
- while (tpm.cur < tpm.max) {
+ if (tpm.cur - tpm.wrk < tpm.min) {
+ size_t i;
+ for (i = 0; i < tpm.inc; ++i) {
struct pthr_el * e = malloc(sizeof(*e));
if (e == NULL)
break;
e->join = false;
+ e->kill = false;
if (pthread_create(&e->thr, NULL,
tpm.func, NULL)) {
free(e);
- } else {
- list_add(&e->next, &tpm.pool);
- ++tpm.cur;
+ break;
}
+
+ list_add(&e->next, &tpm.pool);
}
+
+ tpm.cur += tpm.inc;
}
if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)
== ETIMEDOUT)
- if (tpm.cur > tpm.min )
- --tpm.max;
+ if (tpm.cur > tpm.min)
+ tpm_kill();
pthread_mutex_unlock(&tpm.lock);
}
@@ -162,8 +201,8 @@ int tpm_init(size_t min,
tpm.func = func;
tpm.min = min;
tpm.inc = inc;
- tpm.max = 0;
tpm.cur = 0;
+ tpm.wrk = 0;
return 0;
@@ -214,7 +253,7 @@ bool tpm_check(void)
pthread_mutex_lock(&tpm.lock);
- ret = tpm.cur > tpm.max;
+ ret = tpm_pthr_el(pthread_self())->kill;
pthread_mutex_unlock(&tpm.lock);
@@ -225,7 +264,7 @@ void tpm_inc(void)
{
pthread_mutex_lock(&tpm.lock);
- ++tpm.cur;
+ --tpm.wrk;
pthread_mutex_unlock(&tpm.lock);
}
@@ -234,7 +273,7 @@ void tpm_dec(void)
{
pthread_mutex_lock(&tpm.lock);
- --tpm.cur;
+ ++tpm.wrk;
pthread_cond_signal(&tpm.cond);
@@ -243,22 +282,9 @@ void tpm_dec(void)
void tpm_exit(void)
{
- struct list_head * p;
- pthread_t id;
-
- id = pthread_self();
-
pthread_mutex_lock(&tpm.lock);
- --tpm.cur;
-
- list_for_each(p, &tpm.pool) {
- struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (e->thr == id) {
- e->join = true;
- break;
- }
- }
+ tpm_pthr_el(pthread_self())->join = true;
pthread_cond_signal(&tpm.cond);