summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-30 17:58:18 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-09-30 17:58:18 +0200
commit9405ad97e20686f74c06bcbac9523a8b4f10272e (patch)
treea0489929634ee7588de3ad77a6a1166ce11508e2 /src
parent5e974395fadc5e1922f200855c14ca0538ba50dc (diff)
downloadouroboros-9405ad97e20686f74c06bcbac9523a8b4f10272e.tar.gz
ouroboros-9405ad97e20686f74c06bcbac9523a8b4f10272e.zip
lib: Cancel tpm threads instead of marking exit
This makes the threadpool use pthread_cancel instead of setting an exit flag that threadpool managed threads check periodically. This drastically reduces CPU consumption in the irmd when running a lot of applications. It requires cancellation handlers in the ipcp and irmd to be implemented to ensure safe cancellation during operation and shutdown.
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/ipcp.c59
-rw-r--r--src/ipcpd/normal/dht.c113
-rw-r--r--src/irmd/api_table.c91
-rw-r--r--src/irmd/api_table.h10
-rw-r--r--src/irmd/irm_flow.c32
-rw-r--r--src/irmd/main.c86
-rw-r--r--src/irmd/registry.c59
-rw-r--r--src/lib/tpm.c62
8 files changed, 261 insertions, 251 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 513c638a..9f4d9eea 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -159,6 +159,16 @@ static void * acceptloop(void * o)
return (void *) 0;
}
+static void close_ptr(void * o)
+{
+ close(*((int *) o));
+}
+
+static void free_msg(void * o)
+{
+ ipcp_msg__free_unpacked((ipcp_msg_t *) o, NULL);
+}
+
static void * mainloop(void * o)
{
int sfd;
@@ -167,14 +177,10 @@ static void * mainloop(void * o)
struct dif_info info;
ipcp_config_msg_t * conf_msg;
ipcp_msg_t * msg;
- struct timespec dl;
- struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000),
- (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION};
(void) o;
while (true) {
- int ret = 0;
ipcp_msg_t ret_msg = IPCP_MSG__INIT;
dif_info_msg_t dif_info = DIF_INFO_MSG__INIT;
int fd = -1;
@@ -182,27 +188,18 @@ static void * mainloop(void * o)
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
pthread_mutex_lock(&ipcpi.cmd_lock);
- while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,
- &ipcpi.cmd_lock,
- &dl);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &ipcpi.cmd_lock);
- if (ret == -ETIMEDOUT) {
- pthread_mutex_unlock(&ipcpi.cmd_lock);
- if (tpm_check(ipcpi.tpm))
- break;
- continue;
- }
+ while (list_is_empty(&ipcpi.cmds))
+ pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock);
cmd = list_last_entry(&ipcpi.cmds, struct cmd, next);
list_del(&cmd->next);
- pthread_mutex_unlock(&ipcpi.cmd_lock);
+ pthread_cleanup_pop(true);
msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf);
sfd = cmd->fd;
@@ -216,6 +213,9 @@ static void * mainloop(void * o)
tpm_dec(ipcpi.tpm);
+ pthread_cleanup_push(close_ptr, &sfd);
+ pthread_cleanup_push(free_msg, msg);
+
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
ret_msg.has_result = true;
@@ -482,7 +482,8 @@ static void * mainloop(void * o)
break;
}
- ipcp_msg__free_unpacked(msg, NULL);
+ pthread_cleanup_pop(true);
+ pthread_cleanup_pop(false);
buffer.len = ipcp_msg__get_packed_size(&ret_msg);
if (buffer.len == 0) {
@@ -502,22 +503,17 @@ static void * mainloop(void * o)
ipcp_msg__pack(&ret_msg, buffer.data);
- if (write(sfd, buffer.data, buffer.len) == -1) {
- log_err("Failed to send reply message");
- free(buffer.data);
- close(sfd);
- tpm_inc(ipcpi.tpm);
- continue;
- }
+ pthread_cleanup_push(close_ptr, &sfd);
+
+ if (write(sfd, buffer.data, buffer.len) == -1)
+ log_warn("Failed to send reply message");
free(buffer.data);
- close(sfd);
+ pthread_cleanup_pop(true);
tpm_inc(ipcpi.tpm);
}
- tpm_exit(ipcpi.tpm);
-
return (void *) 0;
}
@@ -778,6 +774,9 @@ int ipcp_wait_state(enum ipcp_state state,
pthread_mutex_lock(&ipcpi.state_mtx);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &ipcpi.state_mtx);
+
while (ipcpi.state != state
&& ipcpi.state != IPCP_SHUTDOWN
&& ipcpi.state != IPCP_NULL
@@ -791,7 +790,7 @@ int ipcp_wait_state(enum ipcp_state state,
&abstime);
}
- pthread_mutex_unlock(&ipcpi.state_mtx);
+ pthread_cleanup_pop(true);
return ret;
}
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index b06c4480..7ca555ab 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -288,13 +288,16 @@ static int dht_wait_running(struct dht * dht)
pthread_mutex_lock(&dht->mtx);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &dht->mtx);
+
while (dht->state == DHT_JOINING)
pthread_cond_wait(&dht->cond, &dht->mtx);
if (dht->state != DHT_RUNNING)
ret = -1;
- pthread_mutex_unlock(&dht->mtx);
+ pthread_cleanup_pop(true);
return ret;
}
@@ -379,6 +382,21 @@ static void kad_req_create(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
}
+static void cancel_req_destroy(void * o)
+{
+ struct kad_req * req = (struct kad_req *) o;
+
+ pthread_mutex_unlock(&req->lock);
+
+ pthread_cond_destroy(&req->cond);
+ pthread_mutex_destroy(&req->lock);
+
+ if (req->key != NULL)
+ free(req->key);
+
+ free(req);
+}
+
static void kad_req_destroy(struct kad_req * req)
{
assert(req);
@@ -403,18 +421,12 @@ static void kad_req_destroy(struct kad_req * req)
break;
}
+ pthread_cleanup_push(cancel_req_destroy, req);
+
while (req->state != REQ_NULL && req->state != REQ_DONE)
pthread_cond_wait(&req->cond, &req->lock);
- pthread_mutex_unlock(&req->lock);
-
- pthread_cond_destroy(&req->cond);
- pthread_mutex_destroy(&req->lock);
-
- if (req->key != NULL)
- free(req->key);
-
- free(req);
+ pthread_cleanup_pop(true);
}
static int kad_req_wait(struct kad_req * req,
@@ -434,6 +446,9 @@ static int kad_req_wait(struct kad_req * req,
req->state = REQ_PENDING;
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &req->lock);
+
while (req->state == REQ_PENDING && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs);
@@ -452,7 +467,7 @@ static int kad_req_wait(struct kad_req * req,
break;
}
- pthread_mutex_unlock(&req->lock);
+ pthread_cleanup_pop(true);
return ret;
}
@@ -683,11 +698,34 @@ static void lookup_add_out(struct lookup * lu,
pthread_mutex_unlock(&lu->lock);
}
-static void lookup_destroy(struct lookup * lu)
+static void cancel_lookup_destroy(void * o)
{
+ struct lookup * lu;
struct list_head * p;
struct list_head * h;
+ lu = (struct lookup *) o;
+
+ if (lu->key != NULL)
+ free(lu->key);
+ if (lu->addrs != NULL)
+ free(lu->addrs);
+
+ list_for_each_safe(p, h, &lu->contacts) {
+ struct contact * c = list_entry(p, struct contact, next);
+ list_del(&c->next);
+ contact_destroy(c);
+ }
+
+ pthread_mutex_unlock(&lu->lock);
+
+ pthread_mutex_destroy(&lu->lock);
+
+ free(lu);
+}
+
+static void lookup_destroy(struct lookup * lu)
+{
assert(lu);
pthread_mutex_lock(&lu->lock);
@@ -711,25 +749,12 @@ static void lookup_destroy(struct lookup * lu)
break;
}
+ pthread_cleanup_push(cancel_lookup_destroy, lu);
+
while (lu->state != LU_NULL)
pthread_cond_wait(&lu->cond, &lu->lock);
- if (lu->key != NULL)
- free(lu->key);
- if (lu->addrs != NULL)
- free(lu->addrs);
-
- list_for_each_safe(p, h, &lu->contacts) {
- struct contact * c = list_entry(p, struct contact, next);
- list_del(&c->next);
- contact_destroy(c);
- }
-
- pthread_mutex_unlock(&lu->lock);
-
- pthread_mutex_destroy(&lu->lock);
-
- free(lu);
+ pthread_cleanup_pop(true);
}
static void lookup_update(struct dht * dht,
@@ -765,12 +790,17 @@ static void lookup_update(struct dht * dht,
return;
}
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &lu->lock);
+
while (lu->state == LU_INIT) {
pthread_rwlock_unlock(&dht->lock);
pthread_cond_wait(&lu->cond, &lu->lock);
pthread_rwlock_rdlock(&dht->lock);
}
+ pthread_cleanup_pop(false);
+
/* BUG: this should not be allowed since it's use-after-free. */
if (lu->state == LU_DESTROY || lu->state == LU_NULL) {
log_warn("Use-after-free. Update aborted to avoid worse.");
@@ -2302,10 +2332,8 @@ uint64_t dht_query(struct dht * dht,
static void * dht_handle_sdu(void * o)
{
- struct dht * dht = (struct dht *) o;
- struct timespec dl;
- struct timespec to = {(HANDLE_TIMEO / 1000),
- (HANDLE_TIMEO % 1000) * MILLION};
+ struct dht * dht = (struct dht *) o;
+
assert(dht);
while (true) {
@@ -2318,28 +2346,19 @@ static void * dht_handle_sdu(void * o)
size_t b;
size_t t_expire;
struct cmd * cmd;
- int ret = 0;
-
- clock_gettime(CLOCK_REALTIME_COARSE, &dl);
- ts_add(&dl, &to, &dl);
pthread_mutex_lock(&dht->mtx);
- while (list_is_empty(&dht->cmds) && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&dht->cond,
- &dht->mtx, &dl);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &dht->mtx);
- if (ret == -ETIMEDOUT) {
- pthread_mutex_unlock(&dht->mtx);
- if (tpm_check(dht->tpm))
- break;
- continue;
- }
+ while (list_is_empty(&dht->cmds))
+ pthread_cond_wait(&dht->cond, &dht->mtx);
cmd = list_last_entry(&dht->cmds, struct cmd, next);
list_del(&cmd->next);
- pthread_mutex_unlock(&dht->mtx);
+ pthread_cleanup_pop(true);
i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
@@ -2487,8 +2506,6 @@ static void * dht_handle_sdu(void * o)
tpm_inc(dht->tpm);
}
- tpm_exit(dht->tpm);
-
return (void *) 0;
}
diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c
index df56dd02..a244f3a2 100644
--- a/src/irmd/api_table.c
+++ b/src/irmd/api_table.c
@@ -36,8 +36,6 @@
#include <limits.h>
#include <assert.h>
-#define ENTRY_SLEEP_TIMEOUT 10 /* ms */
-
struct api_entry * api_entry_create(pid_t api,
char * apn)
{
@@ -70,14 +68,14 @@ struct api_entry * api_entry_create(pid_t api,
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_mutex_init(&e->state_lock, NULL)) {
+ if (pthread_mutex_init(&e->lock, NULL)) {
free(e);
return NULL;
}
- if (pthread_cond_init(&e->state_cond, &cattr)) {
- pthread_mutex_destroy(&e->state_lock);
+ if (pthread_cond_init(&e->cond, &cattr)) {
+ pthread_mutex_destroy(&e->lock);
free(e);
return NULL;
}
@@ -85,6 +83,15 @@ struct api_entry * api_entry_create(pid_t api,
return e;
}
+void cancel_api_entry(void * o)
+{
+ struct api_entry * e = (struct api_entry *) o;
+
+ e->state = API_NULL;
+
+ pthread_mutex_unlock(&e->lock);
+}
+
void api_entry_destroy(struct api_entry * e)
{
struct list_head * p;
@@ -92,25 +99,29 @@ void api_entry_destroy(struct api_entry * e)
assert(e);
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
if (e->state == API_DESTROY) {
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
return;
}
if (e->state == API_SLEEP)
e->state = API_DESTROY;
- pthread_cond_signal(&e->state_cond);
+ pthread_cond_signal(&e->cond);
+
+ pthread_cleanup_push(cancel_api_entry, e);
while (e->state != API_INIT)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ pthread_cond_wait(&e->cond, &e->lock);
+
+ pthread_cleanup_pop(false);
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
- pthread_cond_destroy(&e->state_cond);
- pthread_mutex_destroy(&e->state_lock);
+ pthread_cond_destroy(&e->cond);
+ pthread_mutex_destroy(&e->lock);
if (e->apn != NULL)
free(e->apn);
@@ -164,39 +175,34 @@ void api_entry_del_name(struct api_entry * e,
}
}
-void api_entry_cancel(struct api_entry * e)
+int api_entry_sleep(struct api_entry * e,
+ struct timespec * timeo)
{
- pthread_mutex_lock(&e->state_lock);
-
- e->state = API_INIT;
- pthread_cond_broadcast(&e->state_cond);
-
- pthread_mutex_unlock(&e->state_lock);
-}
-
-int api_entry_sleep(struct api_entry * e)
-{
- struct timespec timeout = {(ENTRY_SLEEP_TIMEOUT / 1000),
- (ENTRY_SLEEP_TIMEOUT % 1000) * MILLION};
- struct timespec now;
struct timespec dl;
int ret = 0;
assert(e);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- ts_add(&now, &timeout, &dl);
+ if (timeo != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, timeo, &dl);
+ }
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
if (e->state != API_WAKE && e->state != API_DESTROY)
e->state = API_SLEEP;
+ pthread_cleanup_push(cancel_api_entry, e);
+
while (e->state == API_SLEEP && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&e->state_cond,
- &e->state_lock,
- &dl);
+ if (timeo)
+ ret = -pthread_cond_timedwait(&e->cond, &e->lock, &dl);
+ else
+ ret = -pthread_cond_wait(&e->cond, &e->lock);
+
+ pthread_cleanup_pop(false);
if (e->state == API_DESTROY) {
if (e->re != NULL)
@@ -204,11 +210,10 @@ int api_entry_sleep(struct api_entry * e)
ret = -1;
}
- if (ret != -ETIMEDOUT)
- e->state = API_INIT;
+ e->state = API_INIT;
- pthread_cond_broadcast(&e->state_cond);
- pthread_mutex_unlock(&e->state_lock);
+ pthread_cond_broadcast(&e->cond);
+ pthread_mutex_unlock(&e->lock);
return ret;
}
@@ -219,25 +224,29 @@ void api_entry_wake(struct api_entry * e,
assert(e);
assert(re);
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
if (e->state != API_SLEEP) {
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
return;
}
e->state = API_WAKE;
e->re = re;
- pthread_cond_broadcast(&e->state_cond);
+ pthread_cond_broadcast(&e->cond);
+
+ pthread_cleanup_push(cancel_api_entry, e);
while (e->state == API_WAKE)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ pthread_cond_wait(&e->cond, &e->lock);
+
+ pthread_cleanup_pop(false);
if (e->state == API_DESTROY)
e->state = API_INIT;
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
}
int api_table_add(struct list_head * api_table,
diff --git a/src/irmd/api_table.h b/src/irmd/api_table.h
index d2ac7723..1fb2e285 100644
--- a/src/irmd/api_table.h
+++ b/src/irmd/api_table.h
@@ -23,6 +23,7 @@
#ifndef OUROBOROS_IRMD_API_TABLE_H
#define OUROBOROS_IRMD_API_TABLE_H
+#include "time.h"
#include "utils.h"
#include <unistd.h>
@@ -45,10 +46,10 @@ struct api_entry {
struct reg_entry * re; /* reg_entry for which a flow arrived */
- /* the api will block on this */
+ /* The process will block on this */
enum api_state state;
- pthread_cond_t state_cond;
- pthread_mutex_t state_lock;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
};
struct api_entry * api_entry_create(pid_t api,
@@ -56,7 +57,8 @@ struct api_entry * api_entry_create(pid_t api,
void api_entry_destroy(struct api_entry * e);
-int api_entry_sleep(struct api_entry * e);
+int api_entry_sleep(struct api_entry * e,
+ struct timespec * timeo);
void api_entry_wake(struct api_entry * e,
struct reg_entry * re);
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index e335ef48..991644c9 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -88,6 +88,21 @@ struct irm_flow * irm_flow_create(pid_t n_api,
return f;
}
+static void cancel_irm_destroy(void * o)
+{
+ struct irm_flow * f = (struct irm_flow *) o;
+
+ pthread_mutex_unlock(&f->state_lock);
+
+ pthread_cond_destroy(&f->state_cond);
+ pthread_mutex_destroy(&f->state_lock);
+
+ shm_rbuff_destroy(f->n_rb);
+ shm_rbuff_destroy(f->n_1_rb);
+
+ free(f);
+}
+
void irm_flow_destroy(struct irm_flow * f)
{
assert(f);
@@ -106,18 +121,12 @@ void irm_flow_destroy(struct irm_flow * f)
pthread_cond_signal(&f->state_cond);
+ pthread_cleanup_push(cancel_irm_destroy, f);
+
while (f->state != FLOW_NULL)
pthread_cond_wait(&f->state_cond, &f->state_lock);
- pthread_mutex_unlock(&f->state_lock);
-
- pthread_cond_destroy(&f->state_cond);
- pthread_mutex_destroy(&f->state_lock);
-
- shm_rbuff_destroy(f->n_rb);
- shm_rbuff_destroy(f->n_1_rb);
-
- free(f);
+ pthread_cleanup_pop(true);
}
enum flow_state irm_flow_get_state(struct irm_flow * f)
@@ -172,6 +181,9 @@ int irm_flow_wait_state(struct irm_flow * f,
assert(f->state != FLOW_NULL);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &f->state_lock);
+
while (!(f->state == state ||
f->state == FLOW_DESTROY ||
f->state == FLOW_DEALLOC_PENDING) &&
@@ -194,7 +206,7 @@ int irm_flow_wait_state(struct irm_flow * f,
s = f->state;
- pthread_mutex_unlock(&f->state_lock);
+ pthread_cleanup_pop(true);
return ret ? ret : s;
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 3fceadb6..64e4d459 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1111,19 +1111,11 @@ static int flow_accept(pid_t api,
struct reg_entry * re = NULL;
struct list_head * p = NULL;
- struct timespec dl;
- struct timespec now;
-
pid_t api_n1;
pid_t api_n;
int port_id;
int ret;
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- ts_add(&now, timeo, &dl);
- }
-
pthread_rwlock_wrlock(&irmd.reg_lock);
e = api_table_get(&irmd.api_table, api);
@@ -1147,28 +1139,12 @@ static int flow_accept(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
- while (true) {
- if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) {
- log_dbg("Accept timed out.");
- return -ETIMEDOUT;
- }
-
- if (irmd_get_state() != IRMD_RUNNING)
- return -EIRMD;
+ ret = api_entry_sleep(e, timeo);
+ if (ret == -ETIMEDOUT)
+ return -ETIMEDOUT;
- ret = api_entry_sleep(e);
- if (ret == -ETIMEDOUT) {
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- api_entry_cancel(e);
- continue;
- }
-
- if (ret == -1)
- return -EPIPE;
-
- if (ret == 0)
- break;
- }
+ if (ret == -1)
+ return -EPIPE;
if (irmd_get_state() != IRMD_RUNNING) {
reg_entry_set_state(re, REG_NAME_NULL);
@@ -1206,11 +1182,11 @@ static int flow_accept(pid_t api,
return -EPERM;
}
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
re = e->re;
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
pthread_rwlock_unlock(&irmd.reg_lock);
@@ -1915,19 +1891,25 @@ static void * acceptloop(void * o)
return (void *) 0;
}
-void * mainloop(void * o)
+static void close_ptr(void * o)
+{
+ close(*((int *) o));
+}
+
+static void free_msg(void * o)
+{
+ irm_msg__free_unpacked((irm_msg_t *) o, NULL);
+}
+
+static void * mainloop(void * o)
{
int sfd;
irm_msg_t * msg;
buffer_t buffer;
- struct timespec dl;
- struct timespec to = {(IRMD_ACCEPT_TIMEOUT / 1000),
- (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION};
(void) o;
while (true) {
- int ret = 0;
irm_msg_t ret_msg = IRM_MSG__INIT;
struct irm_flow * e = NULL;
pid_t * apis = NULL;
@@ -1937,27 +1919,18 @@ void * mainloop(void * o)
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
pthread_mutex_lock(&irmd.cmd_lock);
- while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&irmd.cmd_cond,
- &irmd.cmd_lock,
- &dl);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &irmd.cmd_lock);
- if (ret == -ETIMEDOUT) {
- pthread_mutex_unlock(&irmd.cmd_lock);
- if (tpm_check(irmd.tpm))
- break;
- continue;
- }
+ while (list_is_empty(&irmd.cmds))
+ pthread_cond_wait(&irmd.cmd_cond, &irmd.cmd_lock);
cmd = list_last_entry(&irmd.cmds, struct cmd, next);
list_del(&cmd->next);
- pthread_mutex_unlock(&irmd.cmd_lock);
+ pthread_cleanup_pop(true);
msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf);
sfd = cmd->fd;
@@ -1979,6 +1952,9 @@ void * mainloop(void * o)
timeo = &ts;
}
+ pthread_cleanup_push(close_ptr, &sfd);
+ pthread_cleanup_push(free_msg, msg);
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -2106,7 +2082,8 @@ void * mainloop(void * o)
break;
}
- irm_msg__free_unpacked(msg, NULL);
+ pthread_cleanup_pop(true);
+ pthread_cleanup_pop(false);
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(sfd);
@@ -2138,18 +2115,19 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
+ pthread_cleanup_push(close_ptr, &sfd);
+
if (write(sfd, buffer.data, buffer.len) == -1)
if (ret_msg.result != -EIRMD)
log_warn("Failed to send reply message.");
free(buffer.data);
- close(sfd);
+
+ pthread_cleanup_pop(true);
tpm_inc(irmd.tpm);
}
- tpm_exit(irmd.tpm);
-
return (void *) 0;
}
diff --git a/src/irmd/registry.c b/src/irmd/registry.c
index c7e7b52d..f863af6a 100644
--- a/src/irmd/registry.c
+++ b/src/irmd/registry.c
@@ -92,30 +92,13 @@ static int reg_entry_init(struct reg_entry * e,
return 0;
}
-static void reg_entry_destroy(struct reg_entry * e)
+static void cancel_reg_entry_destroy(void * o)
{
- struct list_head * p = NULL;
- struct list_head * h = NULL;
-
- if (e == NULL)
- return;
-
- pthread_mutex_lock(&e->state_lock);
-
- if (e->state == REG_NAME_DESTROY) {
- pthread_mutex_unlock(&e->state_lock);
- return;
- }
-
- if (e->state != REG_NAME_FLOW_ACCEPT)
- e->state = REG_NAME_NULL;
- else
- e->state = REG_NAME_DESTROY;
-
- pthread_cond_broadcast(&e->state_cond);
+ struct reg_entry * e;
+ struct list_head * p;
+ struct list_head * h;
- while (e->state != REG_NAME_NULL)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ e = (struct reg_entry *) o;
pthread_mutex_unlock(&e->state_lock);
@@ -148,6 +131,33 @@ static void reg_entry_destroy(struct reg_entry * e)
free(e);
}
+static void reg_entry_destroy(struct reg_entry * e)
+{
+ if (e == NULL)
+ return;
+
+ pthread_mutex_lock(&e->state_lock);
+
+ if (e->state == REG_NAME_DESTROY) {
+ pthread_mutex_unlock(&e->state_lock);
+ return;
+ }
+
+ if (e->state != REG_NAME_FLOW_ACCEPT)
+ e->state = REG_NAME_NULL;
+ else
+ e->state = REG_NAME_DESTROY;
+
+ pthread_cond_broadcast(&e->state_cond);
+
+ pthread_cleanup_push(cancel_reg_entry_destroy, e);
+
+ while (e->state != REG_NAME_NULL)
+ pthread_cond_wait(&e->state_cond, &e->state_lock);
+
+ pthread_cleanup_pop(true);
+}
+
static bool reg_entry_is_local_in_dif(struct reg_entry * e,
const char * dif_name)
{
@@ -459,6 +469,9 @@ int reg_entry_leave_state(struct reg_entry * e,
pthread_mutex_lock(&e->state_lock);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &e->state_lock);
+
while (e->state == state && ret != -ETIMEDOUT)
if (timeout)
ret = -pthread_cond_timedwait(&e->state_cond,
@@ -474,7 +487,7 @@ int reg_entry_leave_state(struct reg_entry * e,
pthread_cond_broadcast(&e->state_cond);
}
- pthread_mutex_unlock(&e->state_lock);
+ pthread_cleanup_pop(true);
return ret;
}
diff --git a/src/lib/tpm.c b/src/lib/tpm.c
index c883e0a8..1317758b 100644
--- a/src/lib/tpm.c
+++ b/src/lib/tpm.c
@@ -38,8 +38,8 @@
struct pthr_el {
struct list_head next;
- bool join;
bool kill;
+ bool busy;
pthread_t thr;
};
@@ -78,13 +78,12 @@ static void tpm_join(struct tpm * tpm)
if (tpm->state != TPM_RUNNING) {
if (!e->kill) {
e->kill = true;
+ pthread_cancel(e->thr);
--tpm->cur;
}
- while (!e->join)
- pthread_cond_wait(&tpm->cond, &tpm->lock);
}
- if (e->join) {
+ if (e->kill) {
pthread_join(e->thr, NULL);
list_del(&e->next);
free(e);
@@ -92,32 +91,15 @@ static void tpm_join(struct tpm * tpm)
}
}
-static struct pthr_el * tpm_pthr_el(struct tpm * tpm,
- pthread_t thr)
-{
- struct list_head * p;
- struct pthr_el * e;
-
- list_for_each(p, &tpm->pool) {
- e = list_entry(p, struct pthr_el, next);
- if (e->thr == thr)
- return e;
-
- }
-
- assert(false);
-
- return NULL;
-}
-
static void tpm_kill(struct tpm * tpm)
{
struct list_head * p;
list_for_each(p, &tpm->pool) {
struct pthr_el * e = list_entry(p, struct pthr_el, next);
- if (!e->kill) {
+ if (!e->busy && !e->kill) {
e->kill = true;
+ pthread_cancel(e->thr);
--tpm->cur;
return;
}
@@ -152,8 +134,8 @@ static void * tpmgr(void * o)
if (e == NULL)
break;
- e->join = false;
e->kill = false;
+ e->busy = false;
if (pthread_create(&e->thr, NULL,
tpm->func, tpm->o)) {
@@ -261,23 +243,30 @@ void tpm_destroy(struct tpm * tpm)
free(tpm);
}
-bool tpm_check(struct tpm * tpm)
+static struct pthr_el * tpm_pthr_el(struct tpm * tpm,
+ pthread_t thr)
{
- bool ret;
+ struct list_head * p;
+ struct pthr_el * e;
- pthread_mutex_lock(&tpm->lock);
+ list_for_each(p, &tpm->pool) {
+ e = list_entry(p, struct pthr_el, next);
+ if (e->thr == thr)
+ return e;
- ret = tpm_pthr_el(tpm, pthread_self())->kill;
+ }
- pthread_mutex_unlock(&tpm->lock);
+ assert(false);
- return ret;
+ return NULL;
}
void tpm_inc(struct tpm * tpm)
{
pthread_mutex_lock(&tpm->lock);
+ tpm_pthr_el(tpm, pthread_self())->busy = false;
+
--tpm->wrk;
pthread_mutex_unlock(&tpm->lock);
@@ -287,18 +276,9 @@ void tpm_dec(struct tpm * tpm)
{
pthread_mutex_lock(&tpm->lock);
- ++tpm->wrk;
-
- pthread_cond_signal(&tpm->cond);
-
- pthread_mutex_unlock(&tpm->lock);
-}
+ tpm_pthr_el(tpm, pthread_self())->busy = true;
-void tpm_exit(struct tpm * tpm)
-{
- pthread_mutex_lock(&tpm->lock);
-
- tpm_pthr_el(tpm, pthread_self())->join = true;
+ ++tpm->wrk;
pthread_cond_signal(&tpm->cond);