summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
commitff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch)
tree17f66b04659a06c018494eb732adb661111d63f2 /src/lib
parent7cef269be64f64b920763c6f2455931422c8bfe9 (diff)
downloadouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz
ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/tpm.c162
1 files changed, 87 insertions, 75 deletions
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
index dd71d276..c883e0a8 100644
--- a/src/lib/tpm.c
+++ b/src/lib/tpm.c
@@ -50,38 +50,38 @@ enum tpm_state {
TPM_RUNNING
};
-struct {
+struct tpm {
size_t min;
size_t inc;
size_t cur;
size_t wrk;
void * (* func)(void *);
+ void * o;
struct list_head pool;
enum tpm_state state;
-
pthread_cond_t cond;
pthread_mutex_t lock;
pthread_t mgr;
-} tpm;
+};
-static void tpm_join(void)
+static void tpm_join(struct tpm * tpm)
{
struct list_head * p;
struct list_head * h;
- list_for_each_safe(p, h, &tpm.pool) {
+ list_for_each_safe(p, h, &tpm->pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (tpm.state != TPM_RUNNING) {
+ if (tpm->state != TPM_RUNNING) {
if (!e->kill) {
e->kill = true;
- --tpm.cur;
+ --tpm->cur;
}
while (!e->join)
- pthread_cond_wait(&tpm.cond, &tpm.lock);
+ pthread_cond_wait(&tpm->cond, &tpm->lock);
}
if (e->join) {
@@ -92,12 +92,13 @@ static void tpm_join(void)
}
}
-static struct pthr_el * tpm_pthr_el(pthread_t thr)
+static struct pthr_el * tpm_pthr_el(struct tpm * tpm,
+ pthread_t thr)
{
struct list_head * p;
struct pthr_el * e;
- list_for_each(p, &tpm.pool) {
+ list_for_each(p, &tpm->pool) {
e = list_entry(p, struct pthr_el, next);
if (e->thr == thr)
return e;
@@ -109,15 +110,15 @@ static struct pthr_el * tpm_pthr_el(pthread_t thr)
return NULL;
}
-static void tpm_kill(void)
+static void tpm_kill(struct tpm * tpm)
{
struct list_head * p;
- list_for_each(p, &tpm.pool) {
+ list_for_each(p, &tpm->pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
if (!e->kill) {
e->kill = true;
- --tpm.cur;
+ --tpm->cur;
return;
}
}
@@ -128,25 +129,25 @@ static void * tpmgr(void * o)
struct timespec dl;
struct timespec to = {(TPM_TIMEOUT / 1000),
(TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
+ struct tpm * tpm = (struct tpm *) o;
while (true) {
clock_gettime(PTHREAD_COND_CLOCK, &dl);
ts_add(&dl, &to, &dl);
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- if (tpm.state != TPM_RUNNING) {
- tpm_join();
- pthread_mutex_unlock(&tpm.lock);
+ if (tpm->state != TPM_RUNNING) {
+ tpm_join(tpm);
+ pthread_mutex_unlock(&tpm->lock);
break;
}
- tpm_join();
+ tpm_join(tpm);
- if (tpm.cur - tpm.wrk < tpm.min) {
+ if (tpm->cur - tpm->wrk < tpm->min) {
size_t i;
- for (i = 0; i < tpm.inc; ++i) {
+ for (i = 0; i < tpm->inc; ++i) {
struct pthr_el * e = malloc(sizeof(*e));
if (e == NULL)
break;
@@ -155,35 +156,41 @@ static void * tpmgr(void * o)
e->kill = false;
if (pthread_create(&e->thr, NULL,
- tpm.func, NULL)) {
+ tpm->func, tpm->o)) {
free(e);
break;
}
- list_add(&e->next, &tpm.pool);
+ list_add(&e->next, &tpm->pool);
}
- tpm.cur += i;
+ tpm->cur += i;
}
- if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl)
+ if (pthread_cond_timedwait(&tpm->cond, &tpm->lock, &dl)
== ETIMEDOUT)
- if (tpm.cur > tpm.min)
- tpm_kill();
+ if (tpm->cur > tpm->min)
+ tpm_kill(tpm);
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
return (void *) 0;
}
-int tpm_init(size_t min,
- size_t inc,
- void * (* func)(void *))
+struct tpm * tpm_create(size_t min,
+ size_t inc,
+ void * (* func)(void *),
+ void * o)
{
+ struct tpm * tpm;
pthread_condattr_t cattr;
- if (pthread_mutex_init(&tpm.lock, NULL))
+ tpm = malloc(sizeof(*tpm));
+ if (tpm == NULL)
+ goto fail_malloc;
+
+ if (pthread_mutex_init(&tpm->lock, NULL))
goto fail_lock;
if (pthread_condattr_init(&cattr))
@@ -192,103 +199,108 @@ int tpm_init(size_t min,
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_cond_init(&tpm.cond, &cattr))
+ if (pthread_cond_init(&tpm->cond, &cattr))
goto fail_cond;
- list_head_init(&tpm.pool);
+ list_head_init(&tpm->pool);
pthread_condattr_destroy(&cattr);
- tpm.state = TPM_INIT;
- tpm.func = func;
- tpm.min = min;
- tpm.inc = inc;
- tpm.cur = 0;
- tpm.wrk = 0;
+ tpm->state = TPM_INIT;
+ tpm->func = func;
+ tpm->o = o;
+ tpm->min = min;
+ tpm->inc = inc;
+ tpm->cur = 0;
+ tpm->wrk = 0;
- return 0;
+ return tpm;
fail_cond:
pthread_condattr_destroy(&cattr);
fail_cattr:
- pthread_mutex_destroy(&tpm.lock);
+ pthread_mutex_destroy(&tpm->lock);
fail_lock:
- return -1;
+ free(tpm);
+ fail_malloc:
+ return NULL;
}
-int tpm_start(void)
+int tpm_start(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) {
- pthread_mutex_unlock(&tpm.lock);
+ if (pthread_create(&tpm->mgr, NULL, tpmgr, tpm)) {
+ pthread_mutex_unlock(&tpm->lock);
return -1;
}
- tpm.state = TPM_RUNNING;
+ tpm->state = TPM_RUNNING;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
return 0;
}
-void tpm_stop(void)
+void tpm_stop(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- tpm.state = TPM_NULL;
+ tpm->state = TPM_NULL;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
-void tpm_fini(void)
+void tpm_destroy(struct tpm * tpm)
{
- pthread_join(tpm.mgr, NULL);
+ pthread_join(tpm->mgr, NULL);
+
+ pthread_mutex_destroy(&tpm->lock);
+ pthread_cond_destroy(&tpm->cond);
- pthread_mutex_destroy(&tpm.lock);
- pthread_cond_destroy(&tpm.cond);
+ free(tpm);
}
-bool tpm_check(void)
+bool tpm_check(struct tpm * tpm)
{
bool ret;
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- ret = tpm_pthr_el(pthread_self())->kill;
+ ret = tpm_pthr_el(tpm, pthread_self())->kill;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
return ret;
}
-void tpm_inc(void)
+void tpm_inc(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- --tpm.wrk;
+ --tpm->wrk;
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
-void tpm_dec(void)
+void tpm_dec(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- ++tpm.wrk;
+ ++tpm->wrk;
- pthread_cond_signal(&tpm.cond);
+ pthread_cond_signal(&tpm->cond);
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}
-void tpm_exit(void)
+void tpm_exit(struct tpm * tpm)
{
- pthread_mutex_lock(&tpm.lock);
+ pthread_mutex_lock(&tpm->lock);
- tpm_pthr_el(pthread_self())->join = true;
+ tpm_pthr_el(tpm, pthread_self())->join = true;
- pthread_cond_signal(&tpm.cond);
+ pthread_cond_signal(&tpm->cond);
- pthread_mutex_unlock(&tpm.lock);
+ pthread_mutex_unlock(&tpm->lock);
}