diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-08-13 07:54:31 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-08-13 07:54:31 +0000 |
commit | 0f333a2515237a1f65785ff5e9f54efba7737362 (patch) | |
tree | 4b48e911bceaa705b5fb8642470eb1480a36b557 /src | |
parent | 79f38fef2d8f10e9a9d94b4e57283b52d6a4967d (diff) | |
parent | 18a8b448aaa46cd56e8e1c6ed4685bac116ee11d (diff) | |
download | ouroboros-0f333a2515237a1f65785ff5e9f54efba7737362.tar.gz ouroboros-0f333a2515237a1f65785ff5e9f54efba7737362.zip |
Merged in dstaesse/ouroboros/be-tpm (pull request #550)
lib: Fix instability in threadpool manager
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/normal/dht.c | 35 | ||||
-rw-r--r-- | src/lib/tpm.c | 88 |
2 files changed, 85 insertions, 38 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index e1c34b6f..c15dacbc 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -119,6 +119,8 @@ struct lookup { struct list_head contacts; size_t n_contacts; + size_t out; + uint64_t * addrs; size_t n_addrs; @@ -575,6 +577,7 @@ static struct lookup * lookup_create(struct dht * dht, lu->state = LU_INIT; lu->addrs = NULL; lu->n_addrs = 0; + lu->out = 0; lu->key = dht_dup_key(id, dht->b); if (lu->key == NULL) goto fail_id; @@ -613,6 +616,16 @@ static struct lookup * lookup_create(struct dht * dht, return NULL; } +static void lookup_add_out(struct lookup * lu, + size_t n) +{ + pthread_mutex_lock(&lu->lock); + + lu->out += n; + + pthread_mutex_unlock(&lu->lock); +} + static void lookup_destroy(struct lookup * lu) { struct list_head * p; @@ -680,6 +693,8 @@ static void lookup_update(struct dht * dht, pthread_mutex_lock(&lu->lock); + --lu->out; + if (msg->n_addrs > 0) { if (lu->addrs == NULL) { lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); @@ -734,7 +749,11 @@ static void lookup_update(struct dht * dht, } } - lu->state = LU_UPDATE; + if (lu->out == 0) + lu->state = LU_COMPLETE; + else + lu->state = LU_UPDATE; + pthread_cond_signal(&lu->cond); pthread_mutex_unlock(&lu->lock); return; @@ -833,7 +852,7 @@ static enum lookup_state lookup_wait(struct lookup * lu) pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); - while (lu->state == LU_PENDING) + while (lu->state == LU_PENDING && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); pthread_cleanup_pop(false); @@ -1497,7 +1516,7 @@ static struct lookup * kad_lookup(struct dht * dht, uint64_t addrs[KAD_ALPHA + 1]; enum lookup_state state; struct lookup * lu; - size_t out = 0; + size_t out; lu = lookup_create(dht, id); if (lu == NULL) @@ -1513,7 +1532,7 @@ static struct lookup * kad_lookup(struct dht * dht, return NULL; } - out += kad_find(dht, id, addrs, code); + out = kad_find(dht, id, addrs, code); if (out == 0) { pthread_rwlock_wrlock(&dht->lock); list_del(&lu->next); @@ -1522,19 +1541,21 @@ static struct lookup * kad_lookup(struct dht * dht, return lu; } + lookup_add_out(lu, out); + while ((state = lookup_wait(lu)) != LU_COMPLETE) { - --out; switch (state) { case LU_UPDATE: lookup_new_addrs(lu, addrs); - if (addrs[0] == 0 && out == 0) { + if (addrs[0] == 0) { pthread_rwlock_wrlock(&dht->lock); list_del(&lu->next); pthread_rwlock_unlock(&dht->lock); return lu; } - out += kad_find(dht, id, addrs, code); + out = kad_find(dht, id, addrs, code); + lookup_add_out(lu, out); break; case LU_DESTROY: pthread_rwlock_wrlock(&dht->lock); diff --git a/src/lib/tpm.c b/src/lib/tpm.c index 8298eeb5..f45744ee 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -29,6 +29,7 @@ #include <pthread.h> #include <stdlib.h> +#include <assert.h> #define TPM_TIMEOUT 1000 @@ -36,6 +37,7 @@ struct pthr_el { struct list_head next; bool join; + bool kill; pthread_t thr; }; @@ -49,8 +51,8 @@ enum tpm_state { struct { size_t min; size_t inc; - size_t max; size_t cur; + size_t wrk; void * (* func)(void *); @@ -71,9 +73,14 @@ static void tpm_join(void) list_for_each_safe(p, h, &tpm.pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (tpm.state != TPM_RUNNING) + if (tpm.state != TPM_RUNNING) { + if (!e->kill) { + e->kill = true; + --tpm.cur; + } while (!e->join) pthread_cond_wait(&tpm.cond, &tpm.lock); + } if (e->join) { pthread_join(e->thr, NULL); @@ -83,6 +90,37 @@ static void tpm_join(void) } } +static struct pthr_el * tpm_pthr_el(pthread_t thr) +{ + struct list_head * p; + struct pthr_el * e; + + list_for_each(p, &tpm.pool) { + e = list_entry(p, struct pthr_el, next); + if (e->thr == thr) + return e; + + } + + assert(false); + + return NULL; +} + +static void tpm_kill(void) +{ + struct list_head * p; + + list_for_each(p, &tpm.pool) { + struct pthr_el * e = list_entry(p, struct pthr_el, next); + if (!e->kill) { + e->kill = true; + --tpm.cur; + return; + } + } +} + static void * tpmgr(void * o) { struct timespec dl; @@ -96,39 +134,40 @@ static void * tpmgr(void * o) pthread_mutex_lock(&tpm.lock); - tpm_join(); - if (tpm.state != TPM_RUNNING) { - tpm.max = 0; tpm_join(); pthread_mutex_unlock(&tpm.lock); break; } - if (tpm.cur < tpm.min) { - tpm.max = tpm.inc; + tpm_join(); - while (tpm.cur < tpm.max) { + if (tpm.cur - tpm.wrk < tpm.min) { + size_t i; + for (i = 0; i < tpm.inc; ++i) { struct pthr_el * e = malloc(sizeof(*e)); if (e == NULL) break; e->join = false; + e->kill = false; if (pthread_create(&e->thr, NULL, tpm.func, NULL)) { free(e); - } else { - list_add(&e->next, &tpm.pool); - ++tpm.cur; + break; } + + list_add(&e->next, &tpm.pool); } + + tpm.cur += tpm.inc; } if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) == ETIMEDOUT) - if (tpm.cur > tpm.min ) - --tpm.max; + if (tpm.cur > tpm.min) + tpm_kill(); pthread_mutex_unlock(&tpm.lock); } @@ -162,8 +201,8 @@ int tpm_init(size_t min, tpm.func = func; tpm.min = min; tpm.inc = inc; - tpm.max = 0; tpm.cur = 0; + tpm.wrk = 0; return 0; @@ -214,7 +253,7 @@ bool tpm_check(void) pthread_mutex_lock(&tpm.lock); - ret = tpm.cur > tpm.max; + ret = tpm_pthr_el(pthread_self())->kill; pthread_mutex_unlock(&tpm.lock); @@ -225,7 +264,7 @@ void tpm_inc(void) { pthread_mutex_lock(&tpm.lock); - ++tpm.cur; + --tpm.wrk; pthread_mutex_unlock(&tpm.lock); } @@ -234,7 +273,7 @@ void tpm_dec(void) { pthread_mutex_lock(&tpm.lock); - --tpm.cur; + ++tpm.wrk; pthread_cond_signal(&tpm.cond); @@ -243,22 +282,9 @@ void tpm_dec(void) void tpm_exit(void) { - struct list_head * p; - pthread_t id; - - id = pthread_self(); - pthread_mutex_lock(&tpm.lock); - --tpm.cur; - - list_for_each(p, &tpm.pool) { - struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (e->thr == id) { - e->join = true; - break; - } - } + tpm_pthr_el(pthread_self())->join = true; pthread_cond_signal(&tpm.cond); |