summaryrefslogtreecommitdiff
path: root/src/ipcpd/ipcp.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-08-15 19:30:03 +0200
committerdimitri staessens <dimitri.staessens@ugent.be>2017-08-16 08:18:03 +0200
commitc995538b1c6483996c979df62feee3d79acd0e45 (patch)
tree67fb33f13b02be7b9b65b47957ce5065ebb97411 /src/ipcpd/ipcp.c
parent095c2414425952836e97d88a6dde6f4415725c68 (diff)
downloadouroboros-c995538b1c6483996c979df62feee3d79acd0e45.tar.gz
ouroboros-c995538b1c6483996c979df62feee3d79acd0e45.zip
irmd, ipcpd: Listen on a dedicated accept() thread
The IRMd and IPCPs will now call accept on their command sockets from a single thread that will dispatch work to the other threads. This solves a problem on OS X and FreeBSD where accept() doesn't time out when setting SO_RCVTIMEO on the socket. Calling kqueue or select() on that socket to wait for events before calling accept() didn't solve it since select() or kqueue() might wake up multiple threads, with the non-working threads again blocked on the accept() on shutdown.
Diffstat (limited to 'src/ipcpd/ipcp.c')
-rw-r--r--src/ipcpd/ipcp.c199
1 files changed, 138 insertions, 61 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index a8ff4c94..c5769f9e 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -91,67 +91,115 @@ void ipcp_hash_str(char * buf,
buf[2 * i] = '\0';
}
+static void * acceptloop(void * o)
+{
+ int csockfd;
+ struct timeval tv = {(SOCKET_TIMEOUT / 1000),
+ (SOCKET_TIMEOUT % 1000) * 1000};
+#if defined(__FreeBSD__) || defined(__APPLE__)
+ fd_set fds;
+ struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000),
+ (IPCP_ACCEPT_TIMEOUT % 1000) * 1000};
+#endif
+ (void) o;
+
+ while (ipcp_get_state() != IPCP_SHUTDOWN &&
+ ipcp_get_state() != IPCP_NULL) {
+ ssize_t count;
+#if defined(__FreeBSD__) || defined(__APPLE__)
+ FD_ZERO(&fds);
+ FD_SET(ipcpi.sockfd, &fds);
+ if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)
+ continue;
+#endif
+ csockfd = accept(ipcpi.sockfd, 0, 0);
+ if (csockfd < 0)
+ continue;
+
+ if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO,
+ (void *) &tv, sizeof(tv)))
+ log_warn("Failed to set timeout on socket.");
+
+ pthread_mutex_lock(&ipcpi.cmd_lock);
+
+ assert(ipcpi.csockfd == -1);
+
+ count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE);
+ if (count <= 0) {
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+ log_err("Failed to read from socket.");
+ close(csockfd);
+ continue;
+ }
+
+ ipcpi.cmd_len = count;
+ ipcpi.csockfd = csockfd;
+
+ pthread_cond_signal(&ipcpi.cmd_cond);
+
+ while (ipcpi.csockfd != -1)
+ pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock);
+
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+ }
+
+ return (void *) 0;
+}
+
static void * mainloop(void * o)
{
- int lsockfd;
- uint8_t buf[IPCP_MSG_BUF_SIZE];
- ssize_t count;
+ int sfd;
buffer_t buffer;
struct ipcp_config conf;
struct dif_info info;
-
ipcp_config_msg_t * conf_msg;
ipcp_msg_t * msg;
- ipcp_msg_t ret_msg = IPCP_MSG__INIT;
- dif_info_msg_t dif_info = DIF_INFO_MSG__INIT;
- struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
- (SOCKET_TIMEOUT % 1000) * 1000};
-
+ struct timespec dl;
+ struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000),
+ (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION};
(void) o;
while (true) {
-#ifdef __FreeBSD__
- fd_set fds;
- struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000),
- (IPCP_ACCEPT_TIMEOUT % 1000) * 1000};
-#endif
- int fd = -1;
-
- if (ipcp_get_state() == IPCP_SHUTDOWN ||
- ipcp_get_state() == IPCP_NULL ||
- tpm_check()) {
- tpm_exit();
- break;
- }
+ int ret = 0;
+ ipcp_msg_t ret_msg = IPCP_MSG__INIT;
+ dif_info_msg_t dif_info = DIF_INFO_MSG__INIT;
+ int fd = -1;
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
-#ifdef __FreeBSD__
- FD_ZERO(&fds);
- FD_SET(ipcpi.sockfd, &fds);
- if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)
- continue;
-#endif
- lsockfd = accept(ipcpi.sockfd, 0, 0);
- if (lsockfd < 0)
- continue;
- if (setsockopt(lsockfd, SOL_SOCKET, SO_RCVTIMEO,
- (void *) &ltv, sizeof(ltv)))
- log_warn("Failed to set timeout on socket.");
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
- count = read(lsockfd, buf, IPCP_MSG_BUF_SIZE);
- if (count <= 0) {
- log_err("Failed to read from socket");
- close(lsockfd);
+ pthread_mutex_lock(&ipcpi.cmd_lock);
+
+ while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,
+ &ipcpi.cmd_lock,
+ &dl);
+
+ sfd = ipcpi.csockfd;
+ ipcpi.csockfd = -1;
+
+ if (sfd == -1) {
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+ if (tpm_check()) {
+ close(sfd);
+ break;
+ }
continue;
}
- msg = ipcp_msg__unpack(NULL, count, buf);
+ pthread_cond_broadcast(&ipcpi.cmd_cond);
+
+ msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);
if (msg == NULL) {
- close(lsockfd);
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+ close(sfd);
continue;
}
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+
tpm_dec();
switch (msg->code) {
@@ -398,7 +446,7 @@ static void * mainloop(void * o)
buffer.len = ipcp_msg__get_packed_size(&ret_msg);
if (buffer.len == 0) {
log_err("Failed to pack reply message");
- close(lsockfd);
+ close(sfd);
tpm_inc();
continue;
}
@@ -406,27 +454,29 @@ static void * mainloop(void * o)
buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
- close(lsockfd);
+ close(sfd);
tpm_inc();
continue;
}
ipcp_msg__pack(&ret_msg, buffer.data);
- if (write(lsockfd, buffer.data, buffer.len) == -1) {
+ if (write(sfd, buffer.data, buffer.len) == -1) {
log_err("Failed to send reply message");
free(buffer.data);
- close(lsockfd);
+ close(sfd);
tpm_inc();
continue;
}
free(buffer.data);
- close(lsockfd);
+ close(sfd);
tpm_inc();
}
+ tpm_exit();
+
return (void *) 0;
}
@@ -526,22 +576,30 @@ int ipcp_init(int argc,
goto fail_alloc_lock;
}
- if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) {
+ if (pthread_cond_init(&ipcpi.alloc_cond, &cattr)) {
log_err("Failed to init convar.");
goto fail_alloc_cond;
}
- ipcpi.alloc_id = -1;
+ if (pthread_mutex_init(&ipcpi.cmd_lock, NULL)) {
+ log_err("Failed to init mutex.");
+ goto fail_cmd_lock;
+ }
- if (type == IPCP_NORMAL) {
- pthread_condattr_destroy(&cattr);
- return 0;
+ if (pthread_cond_init(&ipcpi.cmd_cond, &cattr)) {
+ log_err("Failed to init convar.");
+ goto fail_cmd_cond;
}
- ipcpi.shim_data = shim_data_create();
- if (ipcpi.shim_data == NULL) {
- ret = -ENOMEM;
- goto fail_shim_data;
+ ipcpi.alloc_id = -1;
+ ipcpi.csockfd = -1;
+
+ if (type != IPCP_NORMAL) {
+ ipcpi.shim_data = shim_data_create();
+ if (ipcpi.shim_data == NULL) {
+ ret = -ENOMEM;
+ goto fail_shim_data;
+ }
}
pthread_condattr_destroy(&cattr);
@@ -549,6 +607,10 @@ int ipcp_init(int argc,
return 0;
fail_shim_data:
+ pthread_cond_destroy(&ipcpi.cmd_cond);
+ fail_cmd_cond:
+ pthread_mutex_destroy(&ipcpi.cmd_lock);
+ fail_cmd_lock:
pthread_cond_destroy(&ipcpi.alloc_cond);
fail_alloc_cond:
pthread_mutex_destroy(&ipcpi.alloc_lock);
@@ -590,26 +652,39 @@ int ipcp_boot()
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
- return -1;
+ goto fail_tpm_init;
- if (tpm_start()) {
- tpm_fini();
- return -1;
- }
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
+ if (tpm_start())
+ goto fail_tpm_start;
ipcp_set_state(IPCP_INIT);
+ if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) {
+ log_err("Failed to create acceptor thread.");
+ ipcp_set_state(IPCP_NULL);
+ goto fail_acceptor;
+ }
+
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
return 0;
+
+ fail_acceptor:
+ tpm_stop();
+ fail_tpm_start:
+ tpm_fini();
+ fail_tpm_init:
+ return -1;
}
void ipcp_shutdown()
{
tpm_fini();
+ pthread_join(ipcpi.acceptor, NULL);
+
log_info("IPCP %d shutting down.", getpid());
}
@@ -627,6 +702,8 @@ void ipcp_fini()
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);
+ pthread_cond_destroy(&ipcpi.cmd_cond);
+ pthread_mutex_destroy(&ipcpi.cmd_lock);
log_info("IPCP %d out.", getpid());