diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-24 14:34:03 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-24 14:34:03 +0200 |
commit | ff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch) | |
tree | 17f66b04659a06c018494eb732adb661111d63f2 /src/irmd/main.c | |
parent | 7cef269be64f64b920763c6f2455931422c8bfe9 (diff) | |
download | ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip |
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the
detached thread could cause a data race on shutdown.
The threadpool manager is revised to allow multiple instances in a
single program.
The irmd and ipcp now store commands in a buffer (list) instead of a
single buffer before passing it to handler threads.
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r-- | src/irmd/main.c | 104 |
1 files changed, 55 insertions, 49 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 27c771a6..3fceadb6 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -93,6 +93,14 @@ enum irm_state { IRMD_RUNNING }; +struct cmd { + struct list_head next; + + uint8_t cbuf[IB_LEN]; + size_t len; + int fd; +}; + struct { struct list_head registry; /* registered names known */ @@ -112,16 +120,15 @@ struct { int sockfd; /* UNIX socket */ - uint8_t cbuf[IB_LEN]; /* cmd message buffer */ - size_t cmd_len; /* length of cmd in cbuf */ - int csockfd; /* cmd UNIX socket */ - pthread_cond_t acc_cond; /* cmd accepted condvar */ + struct list_head cmds; /* pending commands */ pthread_cond_t cmd_cond; /* cmd signal condvar */ pthread_mutex_t cmd_lock; /* cmd signal lock */ enum irm_state state; /* state of the irmd */ pthread_rwlock_t state_lock; /* lock for the entire irmd */ + struct tpm * tpm; /* thread pool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t shm_sanitize; /* keep track of rdrbuff use */ pthread_t acceptor; /* accept new commands */ @@ -1631,7 +1638,6 @@ static void irm_fini(void) lockfile_destroy(irmd.lf); pthread_mutex_destroy(&irmd.cmd_lock); - pthread_cond_destroy(&irmd.acc_cond); pthread_cond_destroy(&irmd.cmd_cond); pthread_rwlock_destroy(&irmd.reg_lock); pthread_rwlock_destroy(&irmd.state_lock); @@ -1864,7 +1870,8 @@ static void * acceptloop(void * o) (void) o; while (irmd_get_state() == IRMD_RUNNING) { - ssize_t count; + struct cmd * cmd; + #if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(irmd.sockfd, &fds); @@ -1879,25 +1886,28 @@ static void * acceptloop(void * o) (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - pthread_mutex_lock(&irmd.cmd_lock); + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Out of memory."); + break; + } - assert(irmd.csockfd == -1); + pthread_mutex_lock(&irmd.cmd_lock); - count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE); - if (count <= 0) { + cmd->len = read(csockfd, cmd->cbuf, IRM_MSG_BUF_SIZE); + if (cmd->len <= 0) { pthread_mutex_unlock(&irmd.cmd_lock); log_err("Failed to read from socket."); close(csockfd); + free(cmd); continue; } - irmd.cmd_len = count; - irmd.csockfd = csockfd; + cmd->fd = csockfd; - pthread_cond_signal(&irmd.cmd_cond); + list_add(&cmd->next, &irmd.cmds); - while(irmd.csockfd != -1) - pthread_cond_wait(&irmd.acc_cond, &irmd.cmd_lock); + pthread_cond_signal(&irmd.cmd_cond); pthread_mutex_unlock(&irmd.cmd_lock); } @@ -1923,6 +1933,7 @@ void * mainloop(void * o) pid_t * apis = NULL; struct timespec * timeo = NULL; struct timespec ts = {0, 0}; + struct cmd * cmd; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1931,33 +1942,34 @@ void * mainloop(void * o) pthread_mutex_lock(&irmd.cmd_lock); - while (irmd.csockfd == -1 && ret != -ETIMEDOUT) + while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&irmd.cmd_cond, &irmd.cmd_lock, &dl); - sfd = irmd.csockfd; - irmd.csockfd = -1; - - if (sfd == -1) { + if (ret == -ETIMEDOUT) { pthread_mutex_unlock(&irmd.cmd_lock); - if (tpm_check()) + if (tpm_check(irmd.tpm)) break; continue; } - pthread_cond_signal(&irmd.acc_cond); + cmd = list_last_entry(&irmd.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_mutex_unlock(&irmd.cmd_lock); + + msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf); + sfd = cmd->fd; + + free(cmd); - msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf); if (msg == NULL) { - pthread_mutex_unlock(&irmd.cmd_lock); close(sfd); continue; } - pthread_mutex_unlock(&irmd.cmd_lock); - - tpm_dec(); + tpm_dec(irmd.tpm); if (msg->has_timeo_sec) { assert(msg->has_timeo_nsec); @@ -2098,7 +2110,7 @@ void * mainloop(void * o) if (ret_msg.result == -EPIPE || !ret_msg.has_result) { close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); continue; } @@ -2108,7 +2120,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); continue; } @@ -2117,7 +2129,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); continue; } @@ -2133,10 +2145,10 @@ void * mainloop(void * o) free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); } - tpm_exit(); + tpm_exit(irmd.tpm); return (void *) 0; } @@ -2184,12 +2196,6 @@ static int irm_init(void) goto fail_cmd_cond; } - if (pthread_cond_init(&irmd.acc_cond, &cattr)) { - log_err("Failed to initialize condvar."); - pthread_condattr_destroy(&cattr); - goto fail_acc_cond; - } - pthread_condattr_destroy(&cattr); list_head_init(&irmd.ipcps); @@ -2198,6 +2204,7 @@ static int irm_init(void) list_head_init(&irmd.spawned_apis); list_head_init(&irmd.registry); list_head_init(&irmd.irm_flows); + list_head_init(&irmd.cmds); irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0); if (irmd.port_ids == NULL) { @@ -2272,7 +2279,6 @@ static int irm_init(void) gcry_control(GCRYCTL_INITIALIZATION_FINISHED); #endif - irmd.csockfd = -1; irmd.state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2294,8 +2300,6 @@ static int irm_init(void) fail_lockfile: bmp_destroy(irmd.port_ids); fail_port_ids: - pthread_cond_destroy(&irmd.acc_cond); - fail_acc_cond: pthread_cond_destroy(&irmd.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&irmd.cmd_lock); @@ -2367,12 +2371,14 @@ int main(int argc, if (irm_init() < 0) goto fail_irm_init; - if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { + irmd.tpm = tpm_create(IRMD_MIN_THREADS, IRMD_ADD_THREADS, + mainloop, NULL); + if (irmd.tpm == NULL) { irmd_set_state(IRMD_NULL); - goto fail_tpm_init; + goto fail_tpm_create; } - if (tpm_start()) { + if (tpm_start(irmd.tpm)) { irmd_set_state(IRMD_NULL); goto fail_tpm_start; } @@ -2396,9 +2402,9 @@ int main(int argc, pthread_join(irmd.irm_sanitize, NULL); pthread_join(irmd.shm_sanitize, NULL); - tpm_stop(); + tpm_stop(irmd.tpm); - tpm_fini(); + tpm_destroy(irmd.tpm); pthread_sigmask(SIG_BLOCK, &sigset, NULL); @@ -2417,10 +2423,10 @@ int main(int argc, fail_shm_sanitize: pthread_join(irmd.irm_sanitize, NULL); fail_irm_sanitize: - tpm_stop(); + tpm_stop(irmd.tpm); fail_tpm_start: - tpm_fini(); - fail_tpm_init: + tpm_destroy(irmd.tpm); + fail_tpm_create: irm_fini(); fail_irm_init: log_fini(); |