summaryrefslogtreecommitdiff
path: root/src/ipcpd/ipcp.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-09-24 14:34:03 +0200
commitff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch)
tree17f66b04659a06c018494eb732adb661111d63f2 /src/ipcpd/ipcp.c
parent7cef269be64f64b920763c6f2455931422c8bfe9 (diff)
downloadouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz
ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads.
Diffstat (limited to 'src/ipcpd/ipcp.c')
-rw-r--r--src/ipcpd/ipcp.c99
1 files changed, 54 insertions, 45 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 85d543da..513c638a 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -36,7 +36,6 @@
#include <ouroboros/dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
-#include <ouroboros/tpm.h>
#include "ipcp.h"
@@ -44,6 +43,14 @@
#include <sys/socket.h>
#include <stdlib.h>
+struct cmd {
+ struct list_head next;
+
+ uint8_t cbuf[IPCP_MSG_BUF_SIZE];
+ size_t len;
+ int fd;
+};
+
void ipcp_sig_handler(int sig,
siginfo_t * info,
void * c)
@@ -107,7 +114,8 @@ static void * acceptloop(void * o)
while (ipcp_get_state() != IPCP_SHUTDOWN &&
ipcp_get_state() != IPCP_NULL) {
- ssize_t count;
+ struct cmd * cmd;
+
#if defined(__FreeBSD__) || defined(__APPLE__)
FD_ZERO(&fds);
FD_SET(ipcpi.sockfd, &fds);
@@ -122,25 +130,28 @@ static void * acceptloop(void * o)
(void *) &tv, sizeof(tv)))
log_warn("Failed to set timeout on socket.");
- pthread_mutex_lock(&ipcpi.cmd_lock);
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Out of memory");
+ break;
+ }
- assert(ipcpi.csockfd == -1);
+ pthread_mutex_lock(&ipcpi.cmd_lock);
- count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE);
- if (count <= 0) {
+ cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE);
+ if (cmd->len <= 0) {
pthread_mutex_unlock(&ipcpi.cmd_lock);
log_err("Failed to read from socket.");
close(csockfd);
+ free(cmd);
continue;
}
- ipcpi.cmd_len = count;
- ipcpi.csockfd = csockfd;
+ cmd->fd = csockfd;
- pthread_cond_signal(&ipcpi.cmd_cond);
+ list_add(&cmd->next, &ipcpi.cmds);
- while (ipcpi.csockfd != -1)
- pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock);
+ pthread_cond_signal(&ipcpi.cmd_cond);
pthread_mutex_unlock(&ipcpi.cmd_lock);
}
@@ -159,13 +170,15 @@ static void * mainloop(void * o)
struct timespec dl;
struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000),
(IPCP_ACCEPT_TIMEOUT % 1000) * MILLION};
- (void) o;
+
+ (void) o;
while (true) {
int ret = 0;
ipcp_msg_t ret_msg = IPCP_MSG__INIT;
dif_info_msg_t dif_info = DIF_INFO_MSG__INIT;
int fd = -1;
+ struct cmd * cmd;
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
@@ -174,33 +187,34 @@ static void * mainloop(void * o)
pthread_mutex_lock(&ipcpi.cmd_lock);
- while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT)
+ while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,
&ipcpi.cmd_lock,
&dl);
- sfd = ipcpi.csockfd;
- ipcpi.csockfd = -1;
-
- if (sfd == -1) {
+ if (ret == -ETIMEDOUT) {
pthread_mutex_unlock(&ipcpi.cmd_lock);
- if (tpm_check())
+ if (tpm_check(ipcpi.tpm))
break;
continue;
}
- pthread_cond_signal(&ipcpi.acc_cond);
+ cmd = list_last_entry(&ipcpi.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+
+ msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf);
+ sfd = cmd->fd;
+
+ free(cmd);
- msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);
if (msg == NULL) {
- pthread_mutex_unlock(&ipcpi.cmd_lock);
close(sfd);
continue;
}
- pthread_mutex_unlock(&ipcpi.cmd_lock);
-
- tpm_dec();
+ tpm_dec(ipcpi.tpm);
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
@@ -474,7 +488,7 @@ static void * mainloop(void * o)
if (buffer.len == 0) {
log_err("Failed to pack reply message");
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
@@ -482,7 +496,7 @@ static void * mainloop(void * o)
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
@@ -492,17 +506,17 @@ static void * mainloop(void * o)
log_err("Failed to send reply message");
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
}
- tpm_exit();
+ tpm_exit(ipcpi.tpm);
return (void *) 0;
}
@@ -617,20 +631,14 @@ int ipcp_init(int argc,
goto fail_cmd_cond;
}
- if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) {
- log_err("Failed to init convar.");
- goto fail_acc_cond;
- }
+ list_head_init(&ipcpi.cmds);
ipcpi.alloc_id = -1;
- ipcpi.csockfd = -1;
pthread_condattr_destroy(&cattr);
return 0;
- fail_acc_cond:
- pthread_cond_destroy(&ipcpi.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&ipcpi.cmd_lock);
fail_cmd_lock:
@@ -675,12 +683,14 @@ int ipcp_boot()
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
- goto fail_tpm_init;
+ ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS,
+ mainloop, NULL);
+ if (ipcpi.tpm == NULL)
+ goto fail_tpm_create;
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- if (tpm_start())
+ if (tpm_start(ipcpi.tpm))
goto fail_tpm_start;
ipcp_set_state(IPCP_INIT);
@@ -696,18 +706,18 @@ int ipcp_boot()
return 0;
fail_acceptor:
- tpm_stop();
+ tpm_stop(ipcpi.tpm);
fail_tpm_start:
- tpm_fini();
- fail_tpm_init:
+ tpm_destroy(ipcpi.tpm);
+ fail_tpm_create:
return -1;
}
void ipcp_shutdown()
{
pthread_join(ipcpi.acceptor, NULL);
- tpm_stop();
- tpm_fini();
+ tpm_stop(ipcpi.tpm);
+ tpm_destroy(ipcpi.tpm);
log_info("IPCP %d shutting down.", getpid());
}
@@ -724,7 +734,6 @@ void ipcp_fini()
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);
- pthread_cond_destroy(&ipcpi.acc_cond);
pthread_cond_destroy(&ipcpi.cmd_cond);
pthread_mutex_destroy(&ipcpi.cmd_lock);