From c995538b1c6483996c979df62feee3d79acd0e45 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 15 Aug 2017 19:30:03 +0200 Subject: irmd, ipcpd: Listen on a dedicated accept() thread The IRMd and IPCPs will now call accept on their command sockets from a single thread that will dispatch work to the other threads. This solves a problem on OS X and FreeBSD where accept() doesn't time out when setting SO_RCVTIMEO on the socket. Calling kqueue or select() on that socket to wait for events before calling accept() didn't solve it since select() or kqueue() might wake up multiple threads, with the non-working threads again blocked on the accept() on shutdown. --- src/irmd/main.c | 247 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 177 insertions(+), 70 deletions(-) (limited to 'src/irmd') diff --git a/src/irmd/main.c b/src/irmd/main.c index bf6daacc..a316b3cf 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -58,6 +58,7 @@ #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ #define SHM_SAN_HOLDOFF 1000 /* ms */ #define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) +#define IB_LEN IRM_MSG_BUF_SIZE struct ipcp_entry { struct list_head next; @@ -78,7 +79,7 @@ enum irm_state { IRMD_RUNNING }; -struct irm { +struct { struct list_head registry; /* registered names known */ struct list_head ipcps; /* list of ipcps in system */ @@ -94,13 +95,21 @@ struct irm { struct lockfile * lf; /* single irmd per system */ struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ + uint8_t cbuf[IB_LEN]; /* cmd message buffer */ + size_t cmd_len; /* length of cmd in cbuf */ + int csockfd; /* cmd UNIX socket */ + pthread_cond_t cmd_cond; /* cmd signal condvar */ + pthread_mutex_t cmd_lock; /* cmd signal lock */ + enum irm_state state; /* state of the irmd */ pthread_rwlock_t state_lock; /* lock for the entire irmd */ pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t shm_sanitize; /* keep track of rdrbuff use */ + pthread_t acceptor; /* accept new commands */ } irmd; static enum irm_state irmd_get_state(void) @@ -1466,6 +1475,8 @@ static void irm_fini(void) if (irmd.lf != NULL) lockfile_destroy(irmd.lf); + pthread_mutex_destroy(&irmd.cmd_lock); + pthread_cond_destroy(&irmd.cmd_cond); pthread_rwlock_destroy(&irmd.reg_lock); pthread_rwlock_destroy(&irmd.state_lock); } @@ -1680,69 +1691,114 @@ void * irm_sanitize(void * o) } } -void * mainloop(void * o) +static void * acceptloop(void * o) { - uint8_t buf[IRM_MSG_BUF_SIZE]; - - (void) o; - - while (true) { + int csockfd; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; #if defined(__FreeBSD__) || defined(__APPLE__) - fd_set fds; - struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), - (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; + fd_set fds; + struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), + (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; #endif - int cli_sockfd; - irm_msg_t * msg; - ssize_t count; - buffer_t buffer; - irm_msg_t ret_msg = IRM_MSG__INIT; - struct irm_flow * e = NULL; - pid_t * apis = NULL; - struct timespec * timeo = NULL; - struct timespec ts = {0, 0}; - struct timeval tv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; - - if (irmd_get_state() != IRMD_RUNNING || tpm_check()) { - tpm_exit(); - break; - } + (void) o; - ret_msg.code = IRM_MSG_CODE__IRM_REPLY; -#if defined(__FreeBSD__) || defined(__APPLE__) + while (irmd_get_state() == IRMD_RUNNING) { + ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(irmd.sockfd, &fds); if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) continue; #endif - cli_sockfd = accept(irmd.sockfd, 0, 0); - if (cli_sockfd < 0) + csockfd = accept(irmd.sockfd, 0, 0); + if (csockfd < 0) continue; - if (setsockopt(cli_sockfd, SOL_SOCKET, SO_RCVTIMEO, + if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); + pthread_mutex_lock(&irmd.cmd_lock); + + assert(irmd.csockfd == -1); + + count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE); if (count <= 0) { + pthread_mutex_unlock(&irmd.cmd_lock); log_err("Failed to read from socket."); - close(cli_sockfd); + close(csockfd); continue; } - if (irmd_get_state() != IRMD_RUNNING) { - close(cli_sockfd); - tpm_exit(); - break; + irmd.cmd_len = count; + irmd.csockfd = csockfd; + + pthread_cond_signal(&irmd.cmd_cond); + + while(irmd.csockfd != -1) + pthread_cond_wait(&irmd.cmd_cond, &irmd.cmd_lock); + + pthread_mutex_unlock(&irmd.cmd_lock); + } + + return (void *) 0; +} + +void * mainloop(void * o) +{ + int sfd; + irm_msg_t * msg; + buffer_t buffer; + struct timespec dl; + struct timespec to = {(IRMD_ACCEPT_TIMEOUT / 1000), + (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION}; + + (void) o; + + while (true) { + int ret = 0; + irm_msg_t ret_msg = IRM_MSG__INIT; + struct irm_flow * e = NULL; + pid_t * apis = NULL; + struct timespec * timeo = NULL; + struct timespec ts = {0,0}; + + ret_msg.code = IRM_MSG_CODE__IRM_REPLY; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&irmd.cmd_lock); + + while (irmd.csockfd == -1 && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&irmd.cmd_cond, + &irmd.cmd_lock, + &dl); + + sfd = irmd.csockfd; + irmd.csockfd = -1; + + if (sfd == -1) { + pthread_mutex_unlock(&irmd.cmd_lock); + if (tpm_check()) { + close(sfd); + break; + } + continue; } - msg = irm_msg__unpack(NULL, count, buf); + pthread_cond_broadcast(&irmd.cmd_cond); + + msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf); if (msg == NULL) { - close(cli_sockfd); + pthread_mutex_unlock(&irmd.cmd_lock); + close(sfd); continue; } + pthread_mutex_unlock(&irmd.cmd_lock); + tpm_dec(); if (msg->has_timeo_sec) { @@ -1871,7 +1927,7 @@ void * mainloop(void * o) irm_msg__free_unpacked(msg, NULL); if (ret_msg.result == -EPIPE || !ret_msg.has_result) { - close(cli_sockfd); + close(sfd); tpm_inc(); continue; } @@ -1881,7 +1937,7 @@ void * mainloop(void * o) log_err("Failed to calculate length of reply message."); if (apis != NULL) free(apis); - close(cli_sockfd); + close(sfd); tpm_inc(); continue; } @@ -1890,7 +1946,7 @@ void * mainloop(void * o) if (buffer.data == NULL) { if (apis != NULL) free(apis); - close(cli_sockfd); + close(sfd); tpm_inc(); continue; } @@ -1900,28 +1956,29 @@ void * mainloop(void * o) if (apis != NULL) free(apis); - if (write(cli_sockfd, buffer.data, buffer.len) == -1) + if (write(sfd, buffer.data, buffer.len) == -1) log_warn("Failed to send reply message."); free(buffer.data); - close(cli_sockfd); + close(sfd); tpm_inc(); } + tpm_exit(); + return (void *) 0; } static int irm_init(void) { - struct stat st; - struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), - (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; + struct stat st; + struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), + (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; + pthread_condattr_t cattr; memset(&st, 0, sizeof(st)); - irmd.state = IRMD_NULL; - if (pthread_rwlock_init(&irmd.state_lock, NULL)) { log_err("Failed to initialize rwlock."); goto fail_state_lock; @@ -1937,6 +1994,27 @@ static int irm_init(void) goto fail_flows_lock; } + if (pthread_mutex_init(&irmd.cmd_lock, NULL)) { + log_err("Failed to initialize mutex."); + goto fail_cmd_lock; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize mutex."); + goto fail_cmd_lock; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd.cmd_cond, &cattr)) { + log_err("Failed to initialize condvar."); + pthread_condattr_destroy(&cattr); + goto fail_cmd_cond; + } + + pthread_condattr_destroy(&cattr); + list_head_init(&irmd.ipcps); list_head_init(&irmd.api_table); list_head_init(&irmd.apn_table); @@ -2004,29 +2082,34 @@ static int irm_init(void) goto fail_rdrbuff; } - irmd.state = IRMD_RUNNING; + irmd.csockfd = -1; + irmd.state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); return 0; -fail_rdrbuff: + fail_rdrbuff: shm_rdrbuff_destroy(irmd.rdrb); -fail_sock_opt: + fail_sock_opt: close(irmd.sockfd); -fail_sock_path: + fail_sock_path: unlink(IRM_SOCK_PATH); -fail_stat: + fail_stat: lockfile_destroy(irmd.lf); -fail_lockfile: + fail_lockfile: bmp_destroy(irmd.port_ids); -fail_port_ids: + fail_port_ids: + pthread_cond_destroy(&irmd.cmd_cond); + fail_cmd_cond: + pthread_mutex_destroy(&irmd.cmd_lock); + fail_cmd_lock: pthread_rwlock_destroy(&irmd.flows_lock); -fail_flows_lock: + fail_flows_lock: pthread_rwlock_destroy(&irmd.reg_lock); -fail_reg_lock: + fail_reg_lock: pthread_rwlock_destroy(&irmd.state_lock); -fail_state_lock: + fail_state_lock: return -1; } @@ -2085,29 +2168,39 @@ int main(int argc, log_init(!use_stdout); - if (irm_init() < 0) { - log_fini(); - exit(EXIT_FAILURE); - } + if (irm_init() < 0) + goto fail_irm_init; if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { - log_fini(); - exit(EXIT_FAILURE); + irmd_set_state(IRMD_NULL); + goto fail_tpm_init; } if (tpm_start()) { - tpm_fini(); - log_fini(); - exit(EXIT_FAILURE); + irmd_set_state(IRMD_NULL); + goto fail_tpm_start; + } + + if (pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL)) { + irmd_set_state(IRMD_NULL); + goto fail_irm_sanitize; + } + + if (pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb)) { + irmd_set_state(IRMD_NULL); + goto fail_shm_sanitize; } - pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL); - pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb); + if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) { + irmd_set_state(IRMD_NULL); + goto fail_acceptor; + } /* tpm_stop() called from sighandler */ tpm_fini(); + pthread_join(irmd.acceptor, NULL); pthread_join(irmd.irm_sanitize, NULL); pthread_join(irmd.shm_sanitize, NULL); @@ -2122,4 +2215,18 @@ int main(int argc, log_info("Bye."); exit(EXIT_SUCCESS); + + fail_acceptor: + pthread_join(irmd.shm_sanitize, NULL); + fail_shm_sanitize: + pthread_join(irmd.irm_sanitize, NULL); + fail_irm_sanitize: + tpm_stop(); + fail_tpm_start: + tpm_fini(); + fail_tpm_init: + irm_fini(); + fail_irm_init: + log_fini(); + exit(EXIT_FAILURE); } -- cgit v1.2.3