diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-24 14:34:03 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-24 14:34:03 +0200 |
commit | ff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch) | |
tree | 17f66b04659a06c018494eb732adb661111d63f2 /src/ipcpd/ipcp.c | |
parent | 7cef269be64f64b920763c6f2455931422c8bfe9 (diff) | |
download | ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip |
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the
detached thread could cause a data race on shutdown.
The threadpool manager is revised to allow multiple instances in a
single program.
The irmd and ipcp now store commands in a buffer (list) instead of a
single buffer before passing it to handler threads.
Diffstat (limited to 'src/ipcpd/ipcp.c')
-rw-r--r-- | src/ipcpd/ipcp.c | 99 |
1 files changed, 54 insertions, 45 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 85d543da..513c638a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -36,7 +36,6 @@ #include <ouroboros/dev.h> #include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> -#include <ouroboros/tpm.h> #include "ipcp.h" @@ -44,6 +43,14 @@ #include <sys/socket.h> #include <stdlib.h> +struct cmd { + struct list_head next; + + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t len; + int fd; +}; + void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -107,7 +114,8 @@ static void * acceptloop(void * o) while (ipcp_get_state() != IPCP_SHUTDOWN && ipcp_get_state() != IPCP_NULL) { - ssize_t count; + struct cmd * cmd; + #if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(ipcpi.sockfd, &fds); @@ -122,25 +130,28 @@ static void * acceptloop(void * o) (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - pthread_mutex_lock(&ipcpi.cmd_lock); + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Out of memory"); + break; + } - assert(ipcpi.csockfd == -1); + pthread_mutex_lock(&ipcpi.cmd_lock); - count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { + cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE); + if (cmd->len <= 0) { pthread_mutex_unlock(&ipcpi.cmd_lock); log_err("Failed to read from socket."); close(csockfd); + free(cmd); continue; } - ipcpi.cmd_len = count; - ipcpi.csockfd = csockfd; + cmd->fd = csockfd; - pthread_cond_signal(&ipcpi.cmd_cond); + list_add(&cmd->next, &ipcpi.cmds); - while (ipcpi.csockfd != -1) - pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock); + pthread_cond_signal(&ipcpi.cmd_cond); pthread_mutex_unlock(&ipcpi.cmd_lock); } @@ -159,13 +170,15 @@ static void * mainloop(void * o) struct timespec dl; struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; - (void) o; + + (void) o; while (true) { int ret = 0; ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; int fd = -1; + struct cmd * cmd; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -174,33 +187,34 @@ static void * mainloop(void * o) pthread_mutex_lock(&ipcpi.cmd_lock); - while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, &ipcpi.cmd_lock, &dl); - sfd = ipcpi.csockfd; - ipcpi.csockfd = -1; - - if (sfd == -1) { + if (ret == -ETIMEDOUT) { pthread_mutex_unlock(&ipcpi.cmd_lock); - if (tpm_check()) + if (tpm_check(ipcpi.tpm)) break; continue; } - pthread_cond_signal(&ipcpi.acc_cond); + cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + + msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); + sfd = cmd->fd; + + free(cmd); - msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - pthread_mutex_unlock(&ipcpi.cmd_lock); close(sfd); continue; } - pthread_mutex_unlock(&ipcpi.cmd_lock); - - tpm_dec(); + tpm_dec(ipcpi.tpm); switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -474,7 +488,7 @@ static void * mainloop(void * o) if (buffer.len == 0) { log_err("Failed to pack reply message"); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -482,7 +496,7 @@ static void * mainloop(void * o) if (buffer.data == NULL) { log_err("Failed to create reply buffer."); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -492,17 +506,17 @@ static void * mainloop(void * o) log_err("Failed to send reply message"); free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); } - tpm_exit(); + tpm_exit(ipcpi.tpm); return (void *) 0; } @@ -617,20 +631,14 @@ int ipcp_init(int argc, goto fail_cmd_cond; } - if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) { - log_err("Failed to init convar."); - goto fail_acc_cond; - } + list_head_init(&ipcpi.cmds); ipcpi.alloc_id = -1; - ipcpi.csockfd = -1; pthread_condattr_destroy(&cattr); return 0; - fail_acc_cond: - pthread_cond_destroy(&ipcpi.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&ipcpi.cmd_lock); fail_cmd_lock: @@ -675,12 +683,14 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - goto fail_tpm_init; + ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, + mainloop, NULL); + if (ipcpi.tpm == NULL) + goto fail_tpm_create; pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_start()) + if (tpm_start(ipcpi.tpm)) goto fail_tpm_start; ipcp_set_state(IPCP_INIT); @@ -696,18 +706,18 @@ int ipcp_boot() return 0; fail_acceptor: - tpm_stop(); + tpm_stop(ipcpi.tpm); fail_tpm_start: - tpm_fini(); - fail_tpm_init: + tpm_destroy(ipcpi.tpm); + fail_tpm_create: return -1; } void ipcp_shutdown() { pthread_join(ipcpi.acceptor, NULL); - tpm_stop(); - tpm_fini(); + tpm_stop(ipcpi.tpm); + tpm_destroy(ipcpi.tpm); log_info("IPCP %d shutting down.", getpid()); } @@ -724,7 +734,6 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); - pthread_cond_destroy(&ipcpi.acc_cond); pthread_cond_destroy(&ipcpi.cmd_cond); pthread_mutex_destroy(&ipcpi.cmd_lock); |