summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-08-13 07:54:31 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-08-13 07:54:31 +0000
commit0f333a2515237a1f65785ff5e9f54efba7737362 (patch)
tree4b48e911bceaa705b5fb8642470eb1480a36b557 /src
parent79f38fef2d8f10e9a9d94b4e57283b52d6a4967d (diff)
parent18a8b448aaa46cd56e8e1c6ed4685bac116ee11d (diff)
downloadouroboros-0f333a2515237a1f65785ff5e9f54efba7737362.tar.gz
ouroboros-0f333a2515237a1f65785ff5e9f54efba7737362.zip
Merged in dstaesse/ouroboros/be-tpm (pull request #550)
lib: Fix instability in threadpool manager
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/normal/dht.c35
-rw-r--r--src/lib/tpm.c88
2 files changed, 85 insertions, 38 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index e1c34b6f..c15dacbc 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -119,6 +119,8 @@ struct lookup {
struct list_head contacts;
size_t n_contacts;
+ size_t out;
+
uint64_t * addrs;
size_t n_addrs;
@@ -575,6 +577,7 @@ static struct lookup * lookup_create(struct dht * dht,
lu->state = LU_INIT;
lu->addrs = NULL;
lu->n_addrs = 0;
+ lu->out = 0;
lu->key = dht_dup_key(id, dht->b);
if (lu->key == NULL)
goto fail_id;
@@ -613,6 +616,16 @@ static struct lookup * lookup_create(struct dht * dht,
return NULL;
}
+static void lookup_add_out(struct lookup * lu,
+ size_t n)
+{
+ pthread_mutex_lock(&lu->lock);
+
+ lu->out += n;
+
+ pthread_mutex_unlock(&lu->lock);
+}
+
static void lookup_destroy(struct lookup * lu)
{
struct list_head * p;
@@ -680,6 +693,8 @@ static void lookup_update(struct dht * dht,
pthread_mutex_lock(&lu->lock);
+ --lu->out;
+
if (msg->n_addrs > 0) {
if (lu->addrs == NULL) {
lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs);
@@ -734,7 +749,11 @@ static void lookup_update(struct dht * dht,
}
}
- lu->state = LU_UPDATE;
+ if (lu->out == 0)
+ lu->state = LU_COMPLETE;
+ else
+ lu->state = LU_UPDATE;
+
pthread_cond_signal(&lu->cond);
pthread_mutex_unlock(&lu->lock);
return;
@@ -833,7 +852,7 @@ static enum lookup_state lookup_wait(struct lookup * lu)
pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu);
- while (lu->state == LU_PENDING)
+ while (lu->state == LU_PENDING && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs);
pthread_cleanup_pop(false);
@@ -1497,7 +1516,7 @@ static struct lookup * kad_lookup(struct dht * dht,
uint64_t addrs[KAD_ALPHA + 1];
enum lookup_state state;
struct lookup * lu;
- size_t out = 0;
+ size_t out;
lu = lookup_create(dht, id);
if (lu == NULL)
@@ -1513,7 +1532,7 @@ static struct lookup * kad_lookup(struct dht * dht,
return NULL;
}
- out += kad_find(dht, id, addrs, code);
+ out = kad_find(dht, id, addrs, code);
if (out == 0) {
pthread_rwlock_wrlock(&dht->lock);
list_del(&lu->next);
@@ -1522,19 +1541,21 @@ static struct lookup * kad_lookup(struct dht * dht,
return lu;
}
+ lookup_add_out(lu, out);
+
while ((state = lookup_wait(lu)) != LU_COMPLETE) {
- --out;
switch (state) {
case LU_UPDATE:
lookup_new_addrs(lu, addrs);
- if (addrs[0] == 0 && out == 0) {
+ if (addrs[0] == 0) {
pthread_rwlock_wrlock(&dht->lock);
list_del(&lu->next);
pthread_rwlock_unlock(&dht->lock);
return lu;
}
- out += kad_find(dht, id, addrs, code);
+ out = kad_find(dht, id, addrs, code);
+ lookup_add_out(lu, out);
break;
case LU_DESTROY:
pthread_rwlock_wrlock(&dht->lock);
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
index 8298eeb5..f45744ee 100644
--- a/src/lib/tpm.c
+++ b/src/lib/tpm.c
@@ -29,6 +29,7 @@
#include <pthread.h>
#include <stdlib.h>
+#include <assert.h>
#define TPM_TIMEOUT 1000
@@ -36,6 +37,7 @@ struct pthr_el {
struct list_head next;
bool join;
+ bool kill;
pthread_t thr;
};
@@ -49,8 +51,8 @@ enum tpm_state {
struct {
size_t min;
size_t inc;
- size_t max;
size_t cur;
+ size_t wrk;
void * (* func)(void *);
@@ -71,9 +73,14 @@ static void tpm_join(void)
list_for_each_safe(p, h, &tpm.pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (tpm.state != TPM_RUNNING)
+ if (tpm.state != TPM_RUNNING) {
+ if (!e->kill) {
+ e->kill = true;
+ --tpm.cur;
+ }
while (!e->join)
pthread_cond_wait(&tpm.cond, &tpm.lock);
+ }
if (e->join) {
pthread_join(e->thr, NULL);
@@ -83,6 +90,37 @@ static void tpm_join(void)
}
}
+static struct pthr_el * tpm_pthr_el(pthread_t thr)
+{
+ struct list_head * p;
+ struct pthr_el * e;
+
+ list_for_each(p, &tpm.pool) {
+ e = list_entry(p, struct pthr_el, next);
+ if (e->thr == thr)
+ return e;
+
+ }
+
+ assert(false);
+
+ return NULL;
+}
+
+static void tpm_kill(void)
+{
+ struct list_head * p;
+
+ list_for_each(p, &tpm.pool) {
+ struct pthr_el * e = list_entry(p, struct pthr_el, next);
+ if (!e->kill) {
+ e->kill = true;
+ --tpm.cur;
+ return;
+ }
+ }
+}
+
static void * tpmgr(void * o)
{
struct timespec dl;
@@ -96,39 +134,40 @@ static void * tpmgr(void * o)
pthread_mutex_lock(&tpm.lock);
- tpm_join();
-
if (tpm.state != TPM_RUNNING) {
- tpm.max = 0;
tpm_join();
pthread_mutex_unlock(&tpm.lock);
break;
}
- if (tpm.cur < tpm.min) {
- tpm.max = tpm.inc;
+ tpm_join();
- while (tpm.cur < tpm.max) {
+ if (tpm.cur - tpm.wrk < tpm.min) {
+ size_t i;
+ for (i = 0; i < tpm.inc; ++i) {
struct pthr_el * e = malloc(sizeof(*e));
if (e == NULL)
break;
e->join = false;
+ e->kill = false;
if (pthread_create(&e->thr, NULL,
tpm.func, NULL)) {
free(e);
- } else {
- list_add(&e->next, &tpm.pool);
- ++tpm.cur;
+ break;
}
+
+ list_add(&e->next, &tpm.pool);
}
+
+ tpm.cur += tpm.inc;
}
if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)
== ETIMEDOUT)
- if (tpm.cur > tpm.min )
- --tpm.max;
+ if (tpm.cur > tpm.min)
+ tpm_kill();
pthread_mutex_unlock(&tpm.lock);
}
@@ -162,8 +201,8 @@ int tpm_init(size_t min,
tpm.func = func;
tpm.min = min;
tpm.inc = inc;
- tpm.max = 0;
tpm.cur = 0;
+ tpm.wrk = 0;
return 0;
@@ -214,7 +253,7 @@ bool tpm_check(void)
pthread_mutex_lock(&tpm.lock);
- ret = tpm.cur > tpm.max;
+ ret = tpm_pthr_el(pthread_self())->kill;
pthread_mutex_unlock(&tpm.lock);
@@ -225,7 +264,7 @@ void tpm_inc(void)
{
pthread_mutex_lock(&tpm.lock);
- ++tpm.cur;
+ --tpm.wrk;
pthread_mutex_unlock(&tpm.lock);
}
@@ -234,7 +273,7 @@ void tpm_dec(void)
{
pthread_mutex_lock(&tpm.lock);
- --tpm.cur;
+ ++tpm.wrk;
pthread_cond_signal(&tpm.cond);
@@ -243,22 +282,9 @@ void tpm_dec(void)
void tpm_exit(void)
{
- struct list_head * p;
- pthread_t id;
-
- id = pthread_self();
-
pthread_mutex_lock(&tpm.lock);
- --tpm.cur;
-
- list_for_each(p, &tpm.pool) {
- struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (e->thr == id) {
- e->join = true;
- break;
- }
- }
+ tpm_pthr_el(pthread_self())->join = true;
pthread_cond_signal(&tpm.cond);