summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
commitff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch)
tree17f66b04659a06c018494eb732adb661111d63f2 /src/irmd/main.c
parent7cef269be64f64b920763c6f2455931422c8bfe9 (diff)
downloadouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz
ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads.
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c104
1 files changed, 55 insertions, 49 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 27c771a6..3fceadb6 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -93,6 +93,14 @@ enum irm_state {
IRMD_RUNNING
};
+struct cmd {
+ struct list_head next;
+
+ uint8_t cbuf[IB_LEN];
+ size_t len;
+ int fd;
+};
+
struct {
struct list_head registry; /* registered names known */
@@ -112,16 +120,15 @@ struct {
int sockfd; /* UNIX socket */
- uint8_t cbuf[IB_LEN]; /* cmd message buffer */
- size_t cmd_len; /* length of cmd in cbuf */
- int csockfd; /* cmd UNIX socket */
- pthread_cond_t acc_cond; /* cmd accepted condvar */
+ struct list_head cmds; /* pending commands */
pthread_cond_t cmd_cond; /* cmd signal condvar */
pthread_mutex_t cmd_lock; /* cmd signal lock */
enum irm_state state; /* state of the irmd */
pthread_rwlock_t state_lock; /* lock for the entire irmd */
+ struct tpm * tpm; /* thread pool manager */
+
pthread_t irm_sanitize; /* clean up irmd resources */
pthread_t shm_sanitize; /* keep track of rdrbuff use */
pthread_t acceptor; /* accept new commands */
@@ -1631,7 +1638,6 @@ static void irm_fini(void)
lockfile_destroy(irmd.lf);
pthread_mutex_destroy(&irmd.cmd_lock);
- pthread_cond_destroy(&irmd.acc_cond);
pthread_cond_destroy(&irmd.cmd_cond);
pthread_rwlock_destroy(&irmd.reg_lock);
pthread_rwlock_destroy(&irmd.state_lock);
@@ -1864,7 +1870,8 @@ static void * acceptloop(void * o)
(void) o;
while (irmd_get_state() == IRMD_RUNNING) {
- ssize_t count;
+ struct cmd * cmd;
+
#if defined(__FreeBSD__) || defined(__APPLE__)
FD_ZERO(&fds);
FD_SET(irmd.sockfd, &fds);
@@ -1879,25 +1886,28 @@ static void * acceptloop(void * o)
(void *) &tv, sizeof(tv)))
log_warn("Failed to set timeout on socket.");
- pthread_mutex_lock(&irmd.cmd_lock);
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Out of memory.");
+ break;
+ }
- assert(irmd.csockfd == -1);
+ pthread_mutex_lock(&irmd.cmd_lock);
- count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE);
- if (count <= 0) {
+ cmd->len = read(csockfd, cmd->cbuf, IRM_MSG_BUF_SIZE);
+ if (cmd->len <= 0) {
pthread_mutex_unlock(&irmd.cmd_lock);
log_err("Failed to read from socket.");
close(csockfd);
+ free(cmd);
continue;
}
- irmd.cmd_len = count;
- irmd.csockfd = csockfd;
+ cmd->fd = csockfd;
- pthread_cond_signal(&irmd.cmd_cond);
+ list_add(&cmd->next, &irmd.cmds);
- while(irmd.csockfd != -1)
- pthread_cond_wait(&irmd.acc_cond, &irmd.cmd_lock);
+ pthread_cond_signal(&irmd.cmd_cond);
pthread_mutex_unlock(&irmd.cmd_lock);
}
@@ -1923,6 +1933,7 @@ void * mainloop(void * o)
pid_t * apis = NULL;
struct timespec * timeo = NULL;
struct timespec ts = {0, 0};
+ struct cmd * cmd;
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
@@ -1931,33 +1942,34 @@ void * mainloop(void * o)
pthread_mutex_lock(&irmd.cmd_lock);
- while (irmd.csockfd == -1 && ret != -ETIMEDOUT)
+ while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&irmd.cmd_cond,
&irmd.cmd_lock,
&dl);
- sfd = irmd.csockfd;
- irmd.csockfd = -1;
-
- if (sfd == -1) {
+ if (ret == -ETIMEDOUT) {
pthread_mutex_unlock(&irmd.cmd_lock);
- if (tpm_check())
+ if (tpm_check(irmd.tpm))
break;
continue;
}
- pthread_cond_signal(&irmd.acc_cond);
+ cmd = list_last_entry(&irmd.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_mutex_unlock(&irmd.cmd_lock);
+
+ msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf);
+ sfd = cmd->fd;
+
+ free(cmd);
- msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf);
if (msg == NULL) {
- pthread_mutex_unlock(&irmd.cmd_lock);
close(sfd);
continue;
}
- pthread_mutex_unlock(&irmd.cmd_lock);
-
- tpm_dec();
+ tpm_dec(irmd.tpm);
if (msg->has_timeo_sec) {
assert(msg->has_timeo_nsec);
@@ -2098,7 +2110,7 @@ void * mainloop(void * o)
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
continue;
}
@@ -2108,7 +2120,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
continue;
}
@@ -2117,7 +2129,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
continue;
}
@@ -2133,10 +2145,10 @@ void * mainloop(void * o)
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(irmd.tpm);
}
- tpm_exit();
+ tpm_exit(irmd.tpm);
return (void *) 0;
}
@@ -2184,12 +2196,6 @@ static int irm_init(void)
goto fail_cmd_cond;
}
- if (pthread_cond_init(&irmd.acc_cond, &cattr)) {
- log_err("Failed to initialize condvar.");
- pthread_condattr_destroy(&cattr);
- goto fail_acc_cond;
- }
-
pthread_condattr_destroy(&cattr);
list_head_init(&irmd.ipcps);
@@ -2198,6 +2204,7 @@ static int irm_init(void)
list_head_init(&irmd.spawned_apis);
list_head_init(&irmd.registry);
list_head_init(&irmd.irm_flows);
+ list_head_init(&irmd.cmds);
irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0);
if (irmd.port_ids == NULL) {
@@ -2272,7 +2279,6 @@ static int irm_init(void)
gcry_control(GCRYCTL_INITIALIZATION_FINISHED);
#endif
- irmd.csockfd = -1;
irmd.state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2294,8 +2300,6 @@ static int irm_init(void)
fail_lockfile:
bmp_destroy(irmd.port_ids);
fail_port_ids:
- pthread_cond_destroy(&irmd.acc_cond);
- fail_acc_cond:
pthread_cond_destroy(&irmd.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&irmd.cmd_lock);
@@ -2367,12 +2371,14 @@ int main(int argc,
if (irm_init() < 0)
goto fail_irm_init;
- if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) {
+ irmd.tpm = tpm_create(IRMD_MIN_THREADS, IRMD_ADD_THREADS,
+ mainloop, NULL);
+ if (irmd.tpm == NULL) {
irmd_set_state(IRMD_NULL);
- goto fail_tpm_init;
+ goto fail_tpm_create;
}
- if (tpm_start()) {
+ if (tpm_start(irmd.tpm)) {
irmd_set_state(IRMD_NULL);
goto fail_tpm_start;
}
@@ -2396,9 +2402,9 @@ int main(int argc,
pthread_join(irmd.irm_sanitize, NULL);
pthread_join(irmd.shm_sanitize, NULL);
- tpm_stop();
+ tpm_stop(irmd.tpm);
- tpm_fini();
+ tpm_destroy(irmd.tpm);
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
@@ -2417,10 +2423,10 @@ int main(int argc,
fail_shm_sanitize:
pthread_join(irmd.irm_sanitize, NULL);
fail_irm_sanitize:
- tpm_stop();
+ tpm_stop(irmd.tpm);
fail_tpm_start:
- tpm_fini();
- fail_tpm_init:
+ tpm_destroy(irmd.tpm);
+ fail_tpm_create:
irm_fini();
fail_irm_init:
log_fini();