From c995538b1c6483996c979df62feee3d79acd0e45 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 15 Aug 2017 19:30:03 +0200 Subject: irmd, ipcpd: Listen on a dedicated accept() thread The IRMd and IPCPs will now call accept on their command sockets from a single thread that will dispatch work to the other threads. This solves a problem on OS X and FreeBSD where accept() doesn't time out when setting SO_RCVTIMEO on the socket. Calling kqueue or select() on that socket to wait for events before calling accept() didn't solve it since select() or kqueue() might wake up multiple threads, with the non-working threads again blocked on the accept() on shutdown. --- src/ipcpd/ipcp.c | 199 ++++++++++++++++++++++++++++++++++++++----------------- src/ipcpd/ipcp.h | 8 +++ 2 files changed, 146 insertions(+), 61 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a8ff4c94..c5769f9e 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -91,67 +91,115 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } +static void * acceptloop(void * o) +{ + int csockfd; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; +#if defined(__FreeBSD__) || defined(__APPLE__) + fd_set fds; + struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; +#endif + (void) o; + + while (ipcp_get_state() != IPCP_SHUTDOWN && + ipcp_get_state() != IPCP_NULL) { + ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__) + FD_ZERO(&fds); + FD_SET(ipcpi.sockfd, &fds); + if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) + continue; +#endif + csockfd = accept(ipcpi.sockfd, 0, 0); + if (csockfd < 0) + continue; + + if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, + (void *) &tv, sizeof(tv))) + log_warn("Failed to set timeout on socket."); + + pthread_mutex_lock(&ipcpi.cmd_lock); + + assert(ipcpi.csockfd == -1); + + count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); + if (count <= 0) { + pthread_mutex_unlock(&ipcpi.cmd_lock); + log_err("Failed to read from socket."); + close(csockfd); + continue; + } + + ipcpi.cmd_len = count; + ipcpi.csockfd = csockfd; + + pthread_cond_signal(&ipcpi.cmd_cond); + + while (ipcpi.csockfd != -1) + pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + } + + return (void *) 0; +} + static void * mainloop(void * o) { - int lsockfd; - uint8_t buf[IPCP_MSG_BUF_SIZE]; - ssize_t count; + int sfd; buffer_t buffer; struct ipcp_config conf; struct dif_info info; - ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; - ipcp_msg_t ret_msg = IPCP_MSG__INIT; - dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; - struct timeval ltv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; - + struct timespec dl; + struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; (void) o; while (true) { -#ifdef __FreeBSD__ - fd_set fds; - struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), - (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; -#endif - int fd = -1; - - if (ipcp_get_state() == IPCP_SHUTDOWN || - ipcp_get_state() == IPCP_NULL || - tpm_check()) { - tpm_exit(); - break; - } + int ret = 0; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; + dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; + int fd = -1; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; -#ifdef __FreeBSD__ - FD_ZERO(&fds); - FD_SET(ipcpi.sockfd, &fds); - if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) - continue; -#endif - lsockfd = accept(ipcpi.sockfd, 0, 0); - if (lsockfd < 0) - continue; - if (setsockopt(lsockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) <v, sizeof(ltv))) - log_warn("Failed to set timeout on socket."); + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); - count = read(lsockfd, buf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { - log_err("Failed to read from socket"); - close(lsockfd); + pthread_mutex_lock(&ipcpi.cmd_lock); + + while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, + &ipcpi.cmd_lock, + &dl); + + sfd = ipcpi.csockfd; + ipcpi.csockfd = -1; + + if (sfd == -1) { + pthread_mutex_unlock(&ipcpi.cmd_lock); + if (tpm_check()) { + close(sfd); + break; + } continue; } - msg = ipcp_msg__unpack(NULL, count, buf); + pthread_cond_broadcast(&ipcpi.cmd_cond); + + msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - close(lsockfd); + pthread_mutex_unlock(&ipcpi.cmd_lock); + close(sfd); continue; } + pthread_mutex_unlock(&ipcpi.cmd_lock); + tpm_dec(); switch (msg->code) { @@ -398,7 +446,7 @@ static void * mainloop(void * o) buffer.len = ipcp_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { log_err("Failed to pack reply message"); - close(lsockfd); + close(sfd); tpm_inc(); continue; } @@ -406,27 +454,29 @@ static void * mainloop(void * o) buffer.data = malloc(buffer.len); if (buffer.data == NULL) { log_err("Failed to create reply buffer."); - close(lsockfd); + close(sfd); tpm_inc(); continue; } ipcp_msg__pack(&ret_msg, buffer.data); - if (write(lsockfd, buffer.data, buffer.len) == -1) { + if (write(sfd, buffer.data, buffer.len) == -1) { log_err("Failed to send reply message"); free(buffer.data); - close(lsockfd); + close(sfd); tpm_inc(); continue; } free(buffer.data); - close(lsockfd); + close(sfd); tpm_inc(); } + tpm_exit(); + return (void *) 0; } @@ -526,22 +576,30 @@ int ipcp_init(int argc, goto fail_alloc_lock; } - if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) { + if (pthread_cond_init(&ipcpi.alloc_cond, &cattr)) { log_err("Failed to init convar."); goto fail_alloc_cond; } - ipcpi.alloc_id = -1; + if (pthread_mutex_init(&ipcpi.cmd_lock, NULL)) { + log_err("Failed to init mutex."); + goto fail_cmd_lock; + } - if (type == IPCP_NORMAL) { - pthread_condattr_destroy(&cattr); - return 0; + if (pthread_cond_init(&ipcpi.cmd_cond, &cattr)) { + log_err("Failed to init convar."); + goto fail_cmd_cond; } - ipcpi.shim_data = shim_data_create(); - if (ipcpi.shim_data == NULL) { - ret = -ENOMEM; - goto fail_shim_data; + ipcpi.alloc_id = -1; + ipcpi.csockfd = -1; + + if (type != IPCP_NORMAL) { + ipcpi.shim_data = shim_data_create(); + if (ipcpi.shim_data == NULL) { + ret = -ENOMEM; + goto fail_shim_data; + } } pthread_condattr_destroy(&cattr); @@ -549,6 +607,10 @@ int ipcp_init(int argc, return 0; fail_shim_data: + pthread_cond_destroy(&ipcpi.cmd_cond); + fail_cmd_cond: + pthread_mutex_destroy(&ipcpi.cmd_lock); + fail_cmd_lock: pthread_cond_destroy(&ipcpi.alloc_cond); fail_alloc_cond: pthread_mutex_destroy(&ipcpi.alloc_lock); @@ -590,26 +652,39 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - return -1; + goto fail_tpm_init; - if (tpm_start()) { - tpm_fini(); - return -1; - } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (tpm_start()) + goto fail_tpm_start; ipcp_set_state(IPCP_INIT); + if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) { + log_err("Failed to create acceptor thread."); + ipcp_set_state(IPCP_NULL); + goto fail_acceptor; + } + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); return 0; + + fail_acceptor: + tpm_stop(); + fail_tpm_start: + tpm_fini(); + fail_tpm_init: + return -1; } void ipcp_shutdown() { tpm_fini(); + pthread_join(ipcpi.acceptor, NULL); + log_info("IPCP %d shutting down.", getpid()); } @@ -627,6 +702,8 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); + pthread_cond_destroy(&ipcpi.cmd_cond); + pthread_mutex_destroy(&ipcpi.cmd_lock); log_info("IPCP %d out.", getpid()); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9ce3ed77..d2ad7cde 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "shim-data.h" @@ -89,10 +90,17 @@ struct ipcp { int sockfd; char * sock_path; + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t cmd_len; + int csockfd; + pthread_cond_t cmd_cond; + pthread_mutex_t cmd_lock; + int alloc_id; pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; + pthread_t acceptor; } ipcpi; int ipcp_init(int argc, -- cgit v1.2.3 From 8e4e526f811fb0e1d358f79707d488b619a60e47 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 15 Aug 2017 22:23:03 +0200 Subject: ipcpd: Refactor normal IPCP failure handling --- src/ipcpd/normal/main.c | 59 ++++++++++++++++++++----------------------------- 1 file changed, 24 insertions(+), 35 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 325c1285..27fefdb6 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -366,66 +366,39 @@ int main(int argc, char * argv[]) { if (ipcp_init(argc, argv, THIS_TYPE, &normal_ops) < 0) { - ipcp_create_r(getpid(), -1); - exit(EXIT_FAILURE); + log_err("Failed to init IPCP."); + goto fail_init; } if (irm_bind_api(getpid(), ipcpi.name)) { log_err("Failed to bind AP name."); - ipcp_create_r(getpid(), -1); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_bind_api; } if (rib_init()) { log_err("Failed to initialize RIB."); - ipcp_create_r(getpid(), -1); - irm_unbind_api(getpid(), ipcpi.name); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_rib_init; } if (connmgr_init()) { log_err("Failed to initialize connection manager."); - ipcp_create_r(getpid(), -1); - rib_fini(); - irm_unbind_api(getpid(), ipcpi.name); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_connmgr_init; } if (enroll_init()) { log_err("Failed to initialize enroll component."); - ipcp_create_r(getpid(), -1); - connmgr_fini(); - rib_fini(); - irm_unbind_api(getpid(), ipcpi.name); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_enroll_init; } - if (ipcp_boot() < 0) { log_err("Failed to boot IPCP."); - ipcp_create_r(getpid(), -1); - enroll_fini(); - connmgr_fini(); - rib_fini(); - irm_unbind_api(getpid(), ipcpi.name); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_boot; } if (ipcp_create_r(getpid(), 0)) { log_err("Failed to notify IRMd we are initialized."); ipcp_set_state(IPCP_NULL); - ipcp_shutdown(); - enroll_fini(); - connmgr_fini(); - rib_fini(); - irm_unbind_api(getpid(), ipcpi.name); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_create_r; } ipcp_shutdown(); @@ -444,4 +417,20 @@ int main(int argc, ipcp_fini(); exit(EXIT_SUCCESS); + + fail_create_r: + ipcp_shutdown(); + fail_boot: + enroll_fini(); + fail_enroll_init: + connmgr_fini(); + fail_connmgr_init: + rib_fini(); + fail_rib_init: + irm_unbind_api(getpid(), ipcpi.name); + fail_bind_api: + ipcp_fini(); + fail_init: + ipcp_create_r(getpid(), -1); + exit(EXIT_FAILURE); } -- cgit v1.2.3 From a27bef54052b81406ba3142be3da4ab2a6330de6 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 15 Aug 2017 22:25:54 +0200 Subject: ipcpd: Fix missing clock_gettime in flow allocator --- src/ipcpd/normal/fa.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/ipcpd') diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 06f10b53..2488f017 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -287,6 +287,8 @@ int fa_alloc_resp(int fd, struct shm_du_buff * sdb; qoscube_t qc; + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + pthread_mutex_lock(&ipcpi.alloc_lock); while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { -- cgit v1.2.3