From ff5063ad0e7902ce59864a466bd9d8d606d788e4 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 24 Sep 2017 14:34:03 +0200 Subject: ipcpd: Add threadpool manager to DHT This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads. --- src/lib/tpm.c | 162 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 87 insertions(+), 75 deletions(-) (limited to 'src/lib') diff --git a/src/lib/tpm.c b/src/lib/tpm.c index dd71d276..c883e0a8 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -50,38 +50,38 @@ enum tpm_state { TPM_RUNNING }; -struct { +struct tpm { size_t min; size_t inc; size_t cur; size_t wrk; void * (* func)(void *); + void * o; struct list_head pool; enum tpm_state state; - pthread_cond_t cond; pthread_mutex_t lock; pthread_t mgr; -} tpm; +}; -static void tpm_join(void) +static void tpm_join(struct tpm * tpm) { struct list_head * p; struct list_head * h; - list_for_each_safe(p, h, &tpm.pool) { + list_for_each_safe(p, h, &tpm->pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (tpm.state != TPM_RUNNING) { + if (tpm->state != TPM_RUNNING) { if (!e->kill) { e->kill = true; - --tpm.cur; + --tpm->cur; } while (!e->join) - pthread_cond_wait(&tpm.cond, &tpm.lock); + pthread_cond_wait(&tpm->cond, &tpm->lock); } if (e->join) { @@ -92,12 +92,13 @@ static void tpm_join(void) } } -static struct pthr_el * tpm_pthr_el(pthread_t thr) +static struct pthr_el * tpm_pthr_el(struct tpm * tpm, + pthread_t thr) { struct list_head * p; struct pthr_el * e; - list_for_each(p, &tpm.pool) { + list_for_each(p, &tpm->pool) { e = list_entry(p, struct pthr_el, next); if (e->thr == thr) return e; @@ -109,15 +110,15 @@ static struct pthr_el * tpm_pthr_el(pthread_t thr) return NULL; } -static void tpm_kill(void) +static void tpm_kill(struct tpm * tpm) { struct list_head * p; - list_for_each(p, &tpm.pool) { + list_for_each(p, &tpm->pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); if (!e->kill) { e->kill = true; - --tpm.cur; + --tpm->cur; return; } } @@ -128,25 +129,25 @@ static void * tpmgr(void * o) struct timespec dl; struct timespec to = {(TPM_TIMEOUT / 1000), (TPM_TIMEOUT % 1000) * MILLION}; - (void) o; + struct tpm * tpm = (struct tpm *) o; while (true) { clock_gettime(PTHREAD_COND_CLOCK, &dl); ts_add(&dl, &to, &dl); - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - if (tpm.state != TPM_RUNNING) { - tpm_join(); - pthread_mutex_unlock(&tpm.lock); + if (tpm->state != TPM_RUNNING) { + tpm_join(tpm); + pthread_mutex_unlock(&tpm->lock); break; } - tpm_join(); + tpm_join(tpm); - if (tpm.cur - tpm.wrk < tpm.min) { + if (tpm->cur - tpm->wrk < tpm->min) { size_t i; - for (i = 0; i < tpm.inc; ++i) { + for (i = 0; i < tpm->inc; ++i) { struct pthr_el * e = malloc(sizeof(*e)); if (e == NULL) break; @@ -155,35 +156,41 @@ static void * tpmgr(void * o) e->kill = false; if (pthread_create(&e->thr, NULL, - tpm.func, NULL)) { + tpm->func, tpm->o)) { free(e); break; } - list_add(&e->next, &tpm.pool); + list_add(&e->next, &tpm->pool); } - tpm.cur += i; + tpm->cur += i; } - if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) + if (pthread_cond_timedwait(&tpm->cond, &tpm->lock, &dl) == ETIMEDOUT) - if (tpm.cur > tpm.min) - tpm_kill(); + if (tpm->cur > tpm->min) + tpm_kill(tpm); - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } return (void *) 0; } -int tpm_init(size_t min, - size_t inc, - void * (* func)(void *)) +struct tpm * tpm_create(size_t min, + size_t inc, + void * (* func)(void *), + void * o) { + struct tpm * tpm; pthread_condattr_t cattr; - if (pthread_mutex_init(&tpm.lock, NULL)) + tpm = malloc(sizeof(*tpm)); + if (tpm == NULL) + goto fail_malloc; + + if (pthread_mutex_init(&tpm->lock, NULL)) goto fail_lock; if (pthread_condattr_init(&cattr)) @@ -192,103 +199,108 @@ int tpm_init(size_t min, #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - if (pthread_cond_init(&tpm.cond, &cattr)) + if (pthread_cond_init(&tpm->cond, &cattr)) goto fail_cond; - list_head_init(&tpm.pool); + list_head_init(&tpm->pool); pthread_condattr_destroy(&cattr); - tpm.state = TPM_INIT; - tpm.func = func; - tpm.min = min; - tpm.inc = inc; - tpm.cur = 0; - tpm.wrk = 0; + tpm->state = TPM_INIT; + tpm->func = func; + tpm->o = o; + tpm->min = min; + tpm->inc = inc; + tpm->cur = 0; + tpm->wrk = 0; - return 0; + return tpm; fail_cond: pthread_condattr_destroy(&cattr); fail_cattr: - pthread_mutex_destroy(&tpm.lock); + pthread_mutex_destroy(&tpm->lock); fail_lock: - return -1; + free(tpm); + fail_malloc: + return NULL; } -int tpm_start(void) +int tpm_start(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) { - pthread_mutex_unlock(&tpm.lock); + if (pthread_create(&tpm->mgr, NULL, tpmgr, tpm)) { + pthread_mutex_unlock(&tpm->lock); return -1; } - tpm.state = TPM_RUNNING; + tpm->state = TPM_RUNNING; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); return 0; } -void tpm_stop(void) +void tpm_stop(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - tpm.state = TPM_NULL; + tpm->state = TPM_NULL; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -void tpm_fini(void) +void tpm_destroy(struct tpm * tpm) { - pthread_join(tpm.mgr, NULL); + pthread_join(tpm->mgr, NULL); + + pthread_mutex_destroy(&tpm->lock); + pthread_cond_destroy(&tpm->cond); - pthread_mutex_destroy(&tpm.lock); - pthread_cond_destroy(&tpm.cond); + free(tpm); } -bool tpm_check(void) +bool tpm_check(struct tpm * tpm) { bool ret; - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - ret = tpm_pthr_el(pthread_self())->kill; + ret = tpm_pthr_el(tpm, pthread_self())->kill; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); return ret; } -void tpm_inc(void) +void tpm_inc(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - --tpm.wrk; + --tpm->wrk; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -void tpm_dec(void) +void tpm_dec(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - ++tpm.wrk; + ++tpm->wrk; - pthread_cond_signal(&tpm.cond); + pthread_cond_signal(&tpm->cond); - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -void tpm_exit(void) +void tpm_exit(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - tpm_pthr_el(pthread_self())->join = true; + tpm_pthr_el(tpm, pthread_self())->join = true; - pthread_cond_signal(&tpm.cond); + pthread_cond_signal(&tpm->cond); - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -- cgit v1.2.3