From 9405ad97e20686f74c06bcbac9523a8b4f10272e Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 30 Sep 2017 17:58:18 +0200 Subject: lib: Cancel tpm threads instead of marking exit This makes the threadpool use pthread_cancel instead of setting an exit flag that threadpool managed threads check periodically. This drastically reduces CPU consumption in the irmd when running a lot of applications. It requires cancellation handlers in the ipcp and irmd to be implemented to ensure safe cancellation during operation and shutdown. --- src/ipcpd/ipcp.c | 59 +++++++++++++------------- src/ipcpd/normal/dht.c | 113 ++++++++++++++++++++++++++++--------------------- src/irmd/api_table.c | 91 +++++++++++++++++++++------------------ src/irmd/api_table.h | 10 +++-- src/irmd/irm_flow.c | 32 +++++++++----- src/irmd/main.c | 86 ++++++++++++++----------------------- src/irmd/registry.c | 59 ++++++++++++++++---------- src/lib/tpm.c | 62 +++++++++------------------ 8 files changed, 261 insertions(+), 251 deletions(-) (limited to 'src') diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 513c638a..9f4d9eea 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -159,6 +159,16 @@ static void * acceptloop(void * o) return (void *) 0; } +static void close_ptr(void * o) +{ + close(*((int *) o)); +} + +static void free_msg(void * o) +{ + ipcp_msg__free_unpacked((ipcp_msg_t *) o, NULL); +} + static void * mainloop(void * o) { int sfd; @@ -167,14 +177,10 @@ static void * mainloop(void * o) struct dif_info info; ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; - struct timespec dl; - struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), - (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; (void) o; while (true) { - int ret = 0; ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; int fd = -1; @@ -182,27 +188,18 @@ static void * mainloop(void * o) ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; - clock_gettime(PTHREAD_COND_CLOCK, &dl); - ts_add(&dl, &to, &dl); - pthread_mutex_lock(&ipcpi.cmd_lock); - while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, - &ipcpi.cmd_lock, - &dl); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &ipcpi.cmd_lock); - if (ret == -ETIMEDOUT) { - pthread_mutex_unlock(&ipcpi.cmd_lock); - if (tpm_check(ipcpi.tpm)) - break; - continue; - } + while (list_is_empty(&ipcpi.cmds)) + pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock); cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); list_del(&cmd->next); - pthread_mutex_unlock(&ipcpi.cmd_lock); + pthread_cleanup_pop(true); msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); sfd = cmd->fd; @@ -216,6 +213,9 @@ static void * mainloop(void * o) tpm_dec(ipcpi.tpm); + pthread_cleanup_push(close_ptr, &sfd); + pthread_cleanup_push(free_msg, msg); + switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: ret_msg.has_result = true; @@ -482,7 +482,8 @@ static void * mainloop(void * o) break; } - ipcp_msg__free_unpacked(msg, NULL); + pthread_cleanup_pop(true); + pthread_cleanup_pop(false); buffer.len = ipcp_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { @@ -502,22 +503,17 @@ static void * mainloop(void * o) ipcp_msg__pack(&ret_msg, buffer.data); - if (write(sfd, buffer.data, buffer.len) == -1) { - log_err("Failed to send reply message"); - free(buffer.data); - close(sfd); - tpm_inc(ipcpi.tpm); - continue; - } + pthread_cleanup_push(close_ptr, &sfd); + + if (write(sfd, buffer.data, buffer.len) == -1) + log_warn("Failed to send reply message"); free(buffer.data); - close(sfd); + pthread_cleanup_pop(true); tpm_inc(ipcpi.tpm); } - tpm_exit(ipcpi.tpm); - return (void *) 0; } @@ -778,6 +774,9 @@ int ipcp_wait_state(enum ipcp_state state, pthread_mutex_lock(&ipcpi.state_mtx); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &ipcpi.state_mtx); + while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN && ipcpi.state != IPCP_NULL @@ -791,7 +790,7 @@ int ipcp_wait_state(enum ipcp_state state, &abstime); } - pthread_mutex_unlock(&ipcpi.state_mtx); + pthread_cleanup_pop(true); return ret; } diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b06c4480..7ca555ab 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -288,13 +288,16 @@ static int dht_wait_running(struct dht * dht) pthread_mutex_lock(&dht->mtx); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &dht->mtx); + while (dht->state == DHT_JOINING) pthread_cond_wait(&dht->cond, &dht->mtx); if (dht->state != DHT_RUNNING) ret = -1; - pthread_mutex_unlock(&dht->mtx); + pthread_cleanup_pop(true); return ret; } @@ -379,6 +382,21 @@ static void kad_req_create(struct dht * dht, pthread_rwlock_unlock(&dht->lock); } +static void cancel_req_destroy(void * o) +{ + struct kad_req * req = (struct kad_req *) o; + + pthread_mutex_unlock(&req->lock); + + pthread_cond_destroy(&req->cond); + pthread_mutex_destroy(&req->lock); + + if (req->key != NULL) + free(req->key); + + free(req); +} + static void kad_req_destroy(struct kad_req * req) { assert(req); @@ -403,18 +421,12 @@ static void kad_req_destroy(struct kad_req * req) break; } + pthread_cleanup_push(cancel_req_destroy, req); + while (req->state != REQ_NULL && req->state != REQ_DONE) pthread_cond_wait(&req->cond, &req->lock); - pthread_mutex_unlock(&req->lock); - - pthread_cond_destroy(&req->cond); - pthread_mutex_destroy(&req->lock); - - if (req->key != NULL) - free(req->key); - - free(req); + pthread_cleanup_pop(true); } static int kad_req_wait(struct kad_req * req, @@ -434,6 +446,9 @@ static int kad_req_wait(struct kad_req * req, req->state = REQ_PENDING; + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &req->lock); + while (req->state == REQ_PENDING && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); @@ -452,7 +467,7 @@ static int kad_req_wait(struct kad_req * req, break; } - pthread_mutex_unlock(&req->lock); + pthread_cleanup_pop(true); return ret; } @@ -683,11 +698,34 @@ static void lookup_add_out(struct lookup * lu, pthread_mutex_unlock(&lu->lock); } -static void lookup_destroy(struct lookup * lu) +static void cancel_lookup_destroy(void * o) { + struct lookup * lu; struct list_head * p; struct list_head * h; + lu = (struct lookup *) o; + + if (lu->key != NULL) + free(lu->key); + if (lu->addrs != NULL) + free(lu->addrs); + + list_for_each_safe(p, h, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } + + pthread_mutex_unlock(&lu->lock); + + pthread_mutex_destroy(&lu->lock); + + free(lu); +} + +static void lookup_destroy(struct lookup * lu) +{ assert(lu); pthread_mutex_lock(&lu->lock); @@ -711,25 +749,12 @@ static void lookup_destroy(struct lookup * lu) break; } + pthread_cleanup_push(cancel_lookup_destroy, lu); + while (lu->state != LU_NULL) pthread_cond_wait(&lu->cond, &lu->lock); - if (lu->key != NULL) - free(lu->key); - if (lu->addrs != NULL) - free(lu->addrs); - - list_for_each_safe(p, h, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - list_del(&c->next); - contact_destroy(c); - } - - pthread_mutex_unlock(&lu->lock); - - pthread_mutex_destroy(&lu->lock); - - free(lu); + pthread_cleanup_pop(true); } static void lookup_update(struct dht * dht, @@ -765,12 +790,17 @@ static void lookup_update(struct dht * dht, return; } + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &lu->lock); + while (lu->state == LU_INIT) { pthread_rwlock_unlock(&dht->lock); pthread_cond_wait(&lu->cond, &lu->lock); pthread_rwlock_rdlock(&dht->lock); } + pthread_cleanup_pop(false); + /* BUG: this should not be allowed since it's use-after-free. */ if (lu->state == LU_DESTROY || lu->state == LU_NULL) { log_warn("Use-after-free. Update aborted to avoid worse."); @@ -2302,10 +2332,8 @@ uint64_t dht_query(struct dht * dht, static void * dht_handle_sdu(void * o) { - struct dht * dht = (struct dht *) o; - struct timespec dl; - struct timespec to = {(HANDLE_TIMEO / 1000), - (HANDLE_TIMEO % 1000) * MILLION}; + struct dht * dht = (struct dht *) o; + assert(dht); while (true) { @@ -2318,28 +2346,19 @@ static void * dht_handle_sdu(void * o) size_t b; size_t t_expire; struct cmd * cmd; - int ret = 0; - - clock_gettime(CLOCK_REALTIME_COARSE, &dl); - ts_add(&dl, &to, &dl); pthread_mutex_lock(&dht->mtx); - while (list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&dht->cond, - &dht->mtx, &dl); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &dht->mtx); - if (ret == -ETIMEDOUT) { - pthread_mutex_unlock(&dht->mtx); - if (tpm_check(dht->tpm)) - break; - continue; - } + while (list_is_empty(&dht->cmds)) + pthread_cond_wait(&dht->cond, &dht->mtx); cmd = list_last_entry(&dht->cmds, struct cmd, next); list_del(&cmd->next); - pthread_mutex_unlock(&dht->mtx); + pthread_cleanup_pop(true); i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); @@ -2487,8 +2506,6 @@ static void * dht_handle_sdu(void * o) tpm_inc(dht->tpm); } - tpm_exit(dht->tpm); - return (void *) 0; } diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index df56dd02..a244f3a2 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -36,8 +36,6 @@ #include #include -#define ENTRY_SLEEP_TIMEOUT 10 /* ms */ - struct api_entry * api_entry_create(pid_t api, char * apn) { @@ -70,14 +68,14 @@ struct api_entry * api_entry_create(pid_t api, pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - if (pthread_mutex_init(&e->state_lock, NULL)) { + if (pthread_mutex_init(&e->lock, NULL)) { free(e); return NULL; } - if (pthread_cond_init(&e->state_cond, &cattr)) { - pthread_mutex_destroy(&e->state_lock); + if (pthread_cond_init(&e->cond, &cattr)) { + pthread_mutex_destroy(&e->lock); free(e); return NULL; } @@ -85,6 +83,15 @@ struct api_entry * api_entry_create(pid_t api, return e; } +void cancel_api_entry(void * o) +{ + struct api_entry * e = (struct api_entry *) o; + + e->state = API_NULL; + + pthread_mutex_unlock(&e->lock); +} + void api_entry_destroy(struct api_entry * e) { struct list_head * p; @@ -92,25 +99,29 @@ void api_entry_destroy(struct api_entry * e) assert(e); - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&e->lock); if (e->state == API_DESTROY) { - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&e->lock); return; } if (e->state == API_SLEEP) e->state = API_DESTROY; - pthread_cond_signal(&e->state_cond); + pthread_cond_signal(&e->cond); + + pthread_cleanup_push(cancel_api_entry, e); while (e->state != API_INIT) - pthread_cond_wait(&e->state_cond, &e->state_lock); + pthread_cond_wait(&e->cond, &e->lock); + + pthread_cleanup_pop(false); - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&e->lock); - pthread_cond_destroy(&e->state_cond); - pthread_mutex_destroy(&e->state_lock); + pthread_cond_destroy(&e->cond); + pthread_mutex_destroy(&e->lock); if (e->apn != NULL) free(e->apn); @@ -164,39 +175,34 @@ void api_entry_del_name(struct api_entry * e, } } -void api_entry_cancel(struct api_entry * e) +int api_entry_sleep(struct api_entry * e, + struct timespec * timeo) { - pthread_mutex_lock(&e->state_lock); - - e->state = API_INIT; - pthread_cond_broadcast(&e->state_cond); - - pthread_mutex_unlock(&e->state_lock); -} - -int api_entry_sleep(struct api_entry * e) -{ - struct timespec timeout = {(ENTRY_SLEEP_TIMEOUT / 1000), - (ENTRY_SLEEP_TIMEOUT % 1000) * MILLION}; - struct timespec now; struct timespec dl; int ret = 0; assert(e); - clock_gettime(PTHREAD_COND_CLOCK, &now); - ts_add(&now, &timeout, &dl); + if (timeo != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, timeo, &dl); + } - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&e->lock); if (e->state != API_WAKE && e->state != API_DESTROY) e->state = API_SLEEP; + pthread_cleanup_push(cancel_api_entry, e); + while (e->state == API_SLEEP && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&e->state_cond, - &e->state_lock, - &dl); + if (timeo) + ret = -pthread_cond_timedwait(&e->cond, &e->lock, &dl); + else + ret = -pthread_cond_wait(&e->cond, &e->lock); + + pthread_cleanup_pop(false); if (e->state == API_DESTROY) { if (e->re != NULL) @@ -204,11 +210,10 @@ int api_entry_sleep(struct api_entry * e) ret = -1; } - if (ret != -ETIMEDOUT) - e->state = API_INIT; + e->state = API_INIT; - pthread_cond_broadcast(&e->state_cond); - pthread_mutex_unlock(&e->state_lock); + pthread_cond_broadcast(&e->cond); + pthread_mutex_unlock(&e->lock); return ret; } @@ -219,25 +224,29 @@ void api_entry_wake(struct api_entry * e, assert(e); assert(re); - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&e->lock); if (e->state != API_SLEEP) { - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&e->lock); return; } e->state = API_WAKE; e->re = re; - pthread_cond_broadcast(&e->state_cond); + pthread_cond_broadcast(&e->cond); + + pthread_cleanup_push(cancel_api_entry, e); while (e->state == API_WAKE) - pthread_cond_wait(&e->state_cond, &e->state_lock); + pthread_cond_wait(&e->cond, &e->lock); + + pthread_cleanup_pop(false); if (e->state == API_DESTROY) e->state = API_INIT; - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&e->lock); } int api_table_add(struct list_head * api_table, diff --git a/src/irmd/api_table.h b/src/irmd/api_table.h index d2ac7723..1fb2e285 100644 --- a/src/irmd/api_table.h +++ b/src/irmd/api_table.h @@ -23,6 +23,7 @@ #ifndef OUROBOROS_IRMD_API_TABLE_H #define OUROBOROS_IRMD_API_TABLE_H +#include "time.h" #include "utils.h" #include @@ -45,10 +46,10 @@ struct api_entry { struct reg_entry * re; /* reg_entry for which a flow arrived */ - /* the api will block on this */ + /* The process will block on this */ enum api_state state; - pthread_cond_t state_cond; - pthread_mutex_t state_lock; + pthread_cond_t cond; + pthread_mutex_t lock; }; struct api_entry * api_entry_create(pid_t api, @@ -56,7 +57,8 @@ struct api_entry * api_entry_create(pid_t api, void api_entry_destroy(struct api_entry * e); -int api_entry_sleep(struct api_entry * e); +int api_entry_sleep(struct api_entry * e, + struct timespec * timeo); void api_entry_wake(struct api_entry * e, struct reg_entry * re); diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index e335ef48..991644c9 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -88,6 +88,21 @@ struct irm_flow * irm_flow_create(pid_t n_api, return f; } +static void cancel_irm_destroy(void * o) +{ + struct irm_flow * f = (struct irm_flow *) o; + + pthread_mutex_unlock(&f->state_lock); + + pthread_cond_destroy(&f->state_cond); + pthread_mutex_destroy(&f->state_lock); + + shm_rbuff_destroy(f->n_rb); + shm_rbuff_destroy(f->n_1_rb); + + free(f); +} + void irm_flow_destroy(struct irm_flow * f) { assert(f); @@ -106,18 +121,12 @@ void irm_flow_destroy(struct irm_flow * f) pthread_cond_signal(&f->state_cond); + pthread_cleanup_push(cancel_irm_destroy, f); + while (f->state != FLOW_NULL) pthread_cond_wait(&f->state_cond, &f->state_lock); - pthread_mutex_unlock(&f->state_lock); - - pthread_cond_destroy(&f->state_cond); - pthread_mutex_destroy(&f->state_lock); - - shm_rbuff_destroy(f->n_rb); - shm_rbuff_destroy(f->n_1_rb); - - free(f); + pthread_cleanup_pop(true); } enum flow_state irm_flow_get_state(struct irm_flow * f) @@ -172,6 +181,9 @@ int irm_flow_wait_state(struct irm_flow * f, assert(f->state != FLOW_NULL); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &f->state_lock); + while (!(f->state == state || f->state == FLOW_DESTROY || f->state == FLOW_DEALLOC_PENDING) && @@ -194,7 +206,7 @@ int irm_flow_wait_state(struct irm_flow * f, s = f->state; - pthread_mutex_unlock(&f->state_lock); + pthread_cleanup_pop(true); return ret ? ret : s; } diff --git a/src/irmd/main.c b/src/irmd/main.c index 3fceadb6..64e4d459 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1111,19 +1111,11 @@ static int flow_accept(pid_t api, struct reg_entry * re = NULL; struct list_head * p = NULL; - struct timespec dl; - struct timespec now; - pid_t api_n1; pid_t api_n; int port_id; int ret; - if (timeo != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &now); - ts_add(&now, timeo, &dl); - } - pthread_rwlock_wrlock(&irmd.reg_lock); e = api_table_get(&irmd.api_table, api); @@ -1147,28 +1139,12 @@ static int flow_accept(pid_t api, pthread_rwlock_unlock(&irmd.reg_lock); - while (true) { - if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) { - log_dbg("Accept timed out."); - return -ETIMEDOUT; - } - - if (irmd_get_state() != IRMD_RUNNING) - return -EIRMD; + ret = api_entry_sleep(e, timeo); + if (ret == -ETIMEDOUT) + return -ETIMEDOUT; - ret = api_entry_sleep(e); - if (ret == -ETIMEDOUT) { - clock_gettime(PTHREAD_COND_CLOCK, &now); - api_entry_cancel(e); - continue; - } - - if (ret == -1) - return -EPIPE; - - if (ret == 0) - break; - } + if (ret == -1) + return -EPIPE; if (irmd_get_state() != IRMD_RUNNING) { reg_entry_set_state(re, REG_NAME_NULL); @@ -1206,11 +1182,11 @@ static int flow_accept(pid_t api, return -EPERM; } - pthread_mutex_lock(&e->state_lock); + pthread_mutex_lock(&e->lock); re = e->re; - pthread_mutex_unlock(&e->state_lock); + pthread_mutex_unlock(&e->lock); if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { pthread_rwlock_unlock(&irmd.reg_lock); @@ -1915,19 +1891,25 @@ static void * acceptloop(void * o) return (void *) 0; } -void * mainloop(void * o) +static void close_ptr(void * o) +{ + close(*((int *) o)); +} + +static void free_msg(void * o) +{ + irm_msg__free_unpacked((irm_msg_t *) o, NULL); +} + +static void * mainloop(void * o) { int sfd; irm_msg_t * msg; buffer_t buffer; - struct timespec dl; - struct timespec to = {(IRMD_ACCEPT_TIMEOUT / 1000), - (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION}; (void) o; while (true) { - int ret = 0; irm_msg_t ret_msg = IRM_MSG__INIT; struct irm_flow * e = NULL; pid_t * apis = NULL; @@ -1937,27 +1919,18 @@ void * mainloop(void * o) ret_msg.code = IRM_MSG_CODE__IRM_REPLY; - clock_gettime(PTHREAD_COND_CLOCK, &dl); - ts_add(&dl, &to, &dl); - pthread_mutex_lock(&irmd.cmd_lock); - while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&irmd.cmd_cond, - &irmd.cmd_lock, - &dl); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &irmd.cmd_lock); - if (ret == -ETIMEDOUT) { - pthread_mutex_unlock(&irmd.cmd_lock); - if (tpm_check(irmd.tpm)) - break; - continue; - } + while (list_is_empty(&irmd.cmds)) + pthread_cond_wait(&irmd.cmd_cond, &irmd.cmd_lock); cmd = list_last_entry(&irmd.cmds, struct cmd, next); list_del(&cmd->next); - pthread_mutex_unlock(&irmd.cmd_lock); + pthread_cleanup_pop(true); msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf); sfd = cmd->fd; @@ -1979,6 +1952,9 @@ void * mainloop(void * o) timeo = &ts; } + pthread_cleanup_push(close_ptr, &sfd); + pthread_cleanup_push(free_msg, msg); + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -2106,7 +2082,8 @@ void * mainloop(void * o) break; } - irm_msg__free_unpacked(msg, NULL); + pthread_cleanup_pop(true); + pthread_cleanup_pop(false); if (ret_msg.result == -EPIPE || !ret_msg.has_result) { close(sfd); @@ -2138,18 +2115,19 @@ void * mainloop(void * o) if (apis != NULL) free(apis); + pthread_cleanup_push(close_ptr, &sfd); + if (write(sfd, buffer.data, buffer.len) == -1) if (ret_msg.result != -EIRMD) log_warn("Failed to send reply message."); free(buffer.data); - close(sfd); + + pthread_cleanup_pop(true); tpm_inc(irmd.tpm); } - tpm_exit(irmd.tpm); - return (void *) 0; } diff --git a/src/irmd/registry.c b/src/irmd/registry.c index c7e7b52d..f863af6a 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -92,30 +92,13 @@ static int reg_entry_init(struct reg_entry * e, return 0; } -static void reg_entry_destroy(struct reg_entry * e) +static void cancel_reg_entry_destroy(void * o) { - struct list_head * p = NULL; - struct list_head * h = NULL; - - if (e == NULL) - return; - - pthread_mutex_lock(&e->state_lock); - - if (e->state == REG_NAME_DESTROY) { - pthread_mutex_unlock(&e->state_lock); - return; - } - - if (e->state != REG_NAME_FLOW_ACCEPT) - e->state = REG_NAME_NULL; - else - e->state = REG_NAME_DESTROY; - - pthread_cond_broadcast(&e->state_cond); + struct reg_entry * e; + struct list_head * p; + struct list_head * h; - while (e->state != REG_NAME_NULL) - pthread_cond_wait(&e->state_cond, &e->state_lock); + e = (struct reg_entry *) o; pthread_mutex_unlock(&e->state_lock); @@ -148,6 +131,33 @@ static void reg_entry_destroy(struct reg_entry * e) free(e); } +static void reg_entry_destroy(struct reg_entry * e) +{ + if (e == NULL) + return; + + pthread_mutex_lock(&e->state_lock); + + if (e->state == REG_NAME_DESTROY) { + pthread_mutex_unlock(&e->state_lock); + return; + } + + if (e->state != REG_NAME_FLOW_ACCEPT) + e->state = REG_NAME_NULL; + else + e->state = REG_NAME_DESTROY; + + pthread_cond_broadcast(&e->state_cond); + + pthread_cleanup_push(cancel_reg_entry_destroy, e); + + while (e->state != REG_NAME_NULL) + pthread_cond_wait(&e->state_cond, &e->state_lock); + + pthread_cleanup_pop(true); +} + static bool reg_entry_is_local_in_dif(struct reg_entry * e, const char * dif_name) { @@ -459,6 +469,9 @@ int reg_entry_leave_state(struct reg_entry * e, pthread_mutex_lock(&e->state_lock); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &e->state_lock); + while (e->state == state && ret != -ETIMEDOUT) if (timeout) ret = -pthread_cond_timedwait(&e->state_cond, @@ -474,7 +487,7 @@ int reg_entry_leave_state(struct reg_entry * e, pthread_cond_broadcast(&e->state_cond); } - pthread_mutex_unlock(&e->state_lock); + pthread_cleanup_pop(true); return ret; } diff --git a/src/lib/tpm.c b/src/lib/tpm.c index c883e0a8..1317758b 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -38,8 +38,8 @@ struct pthr_el { struct list_head next; - bool join; bool kill; + bool busy; pthread_t thr; }; @@ -78,13 +78,12 @@ static void tpm_join(struct tpm * tpm) if (tpm->state != TPM_RUNNING) { if (!e->kill) { e->kill = true; + pthread_cancel(e->thr); --tpm->cur; } - while (!e->join) - pthread_cond_wait(&tpm->cond, &tpm->lock); } - if (e->join) { + if (e->kill) { pthread_join(e->thr, NULL); list_del(&e->next); free(e); @@ -92,32 +91,15 @@ static void tpm_join(struct tpm * tpm) } } -static struct pthr_el * tpm_pthr_el(struct tpm * tpm, - pthread_t thr) -{ - struct list_head * p; - struct pthr_el * e; - - list_for_each(p, &tpm->pool) { - e = list_entry(p, struct pthr_el, next); - if (e->thr == thr) - return e; - - } - - assert(false); - - return NULL; -} - static void tpm_kill(struct tpm * tpm) { struct list_head * p; list_for_each(p, &tpm->pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (!e->kill) { + if (!e->busy && !e->kill) { e->kill = true; + pthread_cancel(e->thr); --tpm->cur; return; } @@ -152,8 +134,8 @@ static void * tpmgr(void * o) if (e == NULL) break; - e->join = false; e->kill = false; + e->busy = false; if (pthread_create(&e->thr, NULL, tpm->func, tpm->o)) { @@ -261,23 +243,30 @@ void tpm_destroy(struct tpm * tpm) free(tpm); } -bool tpm_check(struct tpm * tpm) +static struct pthr_el * tpm_pthr_el(struct tpm * tpm, + pthread_t thr) { - bool ret; + struct list_head * p; + struct pthr_el * e; - pthread_mutex_lock(&tpm->lock); + list_for_each(p, &tpm->pool) { + e = list_entry(p, struct pthr_el, next); + if (e->thr == thr) + return e; - ret = tpm_pthr_el(tpm, pthread_self())->kill; + } - pthread_mutex_unlock(&tpm->lock); + assert(false); - return ret; + return NULL; } void tpm_inc(struct tpm * tpm) { pthread_mutex_lock(&tpm->lock); + tpm_pthr_el(tpm, pthread_self())->busy = false; + --tpm->wrk; pthread_mutex_unlock(&tpm->lock); @@ -287,18 +276,9 @@ void tpm_dec(struct tpm * tpm) { pthread_mutex_lock(&tpm->lock); - ++tpm->wrk; - - pthread_cond_signal(&tpm->cond); - - pthread_mutex_unlock(&tpm->lock); -} + tpm_pthr_el(tpm, pthread_self())->busy = true; -void tpm_exit(struct tpm * tpm) -{ - pthread_mutex_lock(&tpm->lock); - - tpm_pthr_el(tpm, pthread_self())->join = true; + ++tpm->wrk; pthread_cond_signal(&tpm->cond); -- cgit v1.2.3