diff options
-rw-r--r-- | src/ipcpd/ipcp.c | 199 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 8 | ||||
-rw-r--r-- | src/irmd/main.c | 247 |
3 files changed, 323 insertions, 131 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a8ff4c94..c5769f9e 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -91,67 +91,115 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } +static void * acceptloop(void * o) +{ + int csockfd; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; +#if defined(__FreeBSD__) || defined(__APPLE__) + fd_set fds; + struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; +#endif + (void) o; + + while (ipcp_get_state() != IPCP_SHUTDOWN && + ipcp_get_state() != IPCP_NULL) { + ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__) + FD_ZERO(&fds); + FD_SET(ipcpi.sockfd, &fds); + if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) + continue; +#endif + csockfd = accept(ipcpi.sockfd, 0, 0); + if (csockfd < 0) + continue; + + if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, + (void *) &tv, sizeof(tv))) + log_warn("Failed to set timeout on socket."); + + pthread_mutex_lock(&ipcpi.cmd_lock); + + assert(ipcpi.csockfd == -1); + + count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); + if (count <= 0) { + pthread_mutex_unlock(&ipcpi.cmd_lock); + log_err("Failed to read from socket."); + close(csockfd); + continue; + } + + ipcpi.cmd_len = count; + ipcpi.csockfd = csockfd; + + pthread_cond_signal(&ipcpi.cmd_cond); + + while (ipcpi.csockfd != -1) + pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + } + + return (void *) 0; +} + static void * mainloop(void * o) { - int lsockfd; - uint8_t buf[IPCP_MSG_BUF_SIZE]; - ssize_t count; + int sfd; buffer_t buffer; struct ipcp_config conf; struct dif_info info; - ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; - ipcp_msg_t ret_msg = IPCP_MSG__INIT; - dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; - struct timeval ltv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; - + struct timespec dl; + struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; (void) o; while (true) { -#ifdef __FreeBSD__ - fd_set fds; - struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), - (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; -#endif - int fd = -1; - - if (ipcp_get_state() == IPCP_SHUTDOWN || - ipcp_get_state() == IPCP_NULL || - tpm_check()) { - tpm_exit(); - break; - } + int ret = 0; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; + dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; + int fd = -1; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; -#ifdef __FreeBSD__ - FD_ZERO(&fds); - FD_SET(ipcpi.sockfd, &fds); - if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) - continue; -#endif - lsockfd = accept(ipcpi.sockfd, 0, 0); - if (lsockfd < 0) - continue; - if (setsockopt(lsockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) <v, sizeof(ltv))) - log_warn("Failed to set timeout on socket."); + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); - count = read(lsockfd, buf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { - log_err("Failed to read from socket"); - close(lsockfd); + pthread_mutex_lock(&ipcpi.cmd_lock); + + while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, + &ipcpi.cmd_lock, + &dl); + + sfd = ipcpi.csockfd; + ipcpi.csockfd = -1; + + if (sfd == -1) { + pthread_mutex_unlock(&ipcpi.cmd_lock); + if (tpm_check()) { + close(sfd); + break; + } continue; } - msg = ipcp_msg__unpack(NULL, count, buf); + pthread_cond_broadcast(&ipcpi.cmd_cond); + + msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - close(lsockfd); + pthread_mutex_unlock(&ipcpi.cmd_lock); + close(sfd); continue; } + pthread_mutex_unlock(&ipcpi.cmd_lock); + tpm_dec(); switch (msg->code) { @@ -398,7 +446,7 @@ static void * mainloop(void * o) buffer.len = ipcp_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { log_err("Failed to pack reply message"); - close(lsockfd); + close(sfd); tpm_inc(); continue; } @@ -406,27 +454,29 @@ static void * mainloop(void * o) buffer.data = malloc(buffer.len); if (buffer.data == NULL) { log_err("Failed to create reply buffer."); - close(lsockfd); + close(sfd); tpm_inc(); continue; } ipcp_msg__pack(&ret_msg, buffer.data); - if (write(lsockfd, buffer.data, buffer.len) == -1) { + if (write(sfd, buffer.data, buffer.len) == -1) { log_err("Failed to send reply message"); free(buffer.data); - close(lsockfd); + close(sfd); tpm_inc(); continue; } free(buffer.data); - close(lsockfd); + close(sfd); tpm_inc(); } + tpm_exit(); + return (void *) 0; } @@ -526,22 +576,30 @@ int ipcp_init(int argc, goto fail_alloc_lock; } - if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) { + if (pthread_cond_init(&ipcpi.alloc_cond, &cattr)) { log_err("Failed to init convar."); goto fail_alloc_cond; } - ipcpi.alloc_id = -1; + if (pthread_mutex_init(&ipcpi.cmd_lock, NULL)) { + log_err("Failed to init mutex."); + goto fail_cmd_lock; + } - if (type == IPCP_NORMAL) { - pthread_condattr_destroy(&cattr); - return 0; + if (pthread_cond_init(&ipcpi.cmd_cond, &cattr)) { + log_err("Failed to init convar."); + goto fail_cmd_cond; } - ipcpi.shim_data = shim_data_create(); - if (ipcpi.shim_data == NULL) { - ret = -ENOMEM; - goto fail_shim_data; + ipcpi.alloc_id = -1; + ipcpi.csockfd = -1; + + if (type != IPCP_NORMAL) { + ipcpi.shim_data = shim_data_create(); + if (ipcpi.shim_data == NULL) { + ret = -ENOMEM; + goto fail_shim_data; + } } pthread_condattr_destroy(&cattr); @@ -549,6 +607,10 @@ int ipcp_init(int argc, return 0; fail_shim_data: + pthread_cond_destroy(&ipcpi.cmd_cond); + fail_cmd_cond: + pthread_mutex_destroy(&ipcpi.cmd_lock); + fail_cmd_lock: pthread_cond_destroy(&ipcpi.alloc_cond); fail_alloc_cond: pthread_mutex_destroy(&ipcpi.alloc_lock); @@ -590,26 +652,39 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - return -1; + goto fail_tpm_init; - if (tpm_start()) { - tpm_fini(); - return -1; - } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (tpm_start()) + goto fail_tpm_start; ipcp_set_state(IPCP_INIT); + if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) { + log_err("Failed to create acceptor thread."); + ipcp_set_state(IPCP_NULL); + goto fail_acceptor; + } + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); return 0; + + fail_acceptor: + tpm_stop(); + fail_tpm_start: + tpm_fini(); + fail_tpm_init: + return -1; } void ipcp_shutdown() { tpm_fini(); + pthread_join(ipcpi.acceptor, NULL); + log_info("IPCP %d shutting down.", getpid()); } @@ -627,6 +702,8 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); + pthread_cond_destroy(&ipcpi.cmd_cond); + pthread_mutex_destroy(&ipcpi.cmd_lock); log_info("IPCP %d out.", getpid()); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9ce3ed77..d2ad7cde 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -26,6 +26,7 @@ #include <ouroboros/config.h> #include <ouroboros/hash.h> #include <ouroboros/ipcp.h> +#include <ouroboros/sockets.h> #include "shim-data.h" @@ -89,10 +90,17 @@ struct ipcp { int sockfd; char * sock_path; + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t cmd_len; + int csockfd; + pthread_cond_t cmd_cond; + pthread_mutex_t cmd_lock; + int alloc_id; pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; + pthread_t acceptor; } ipcpi; int ipcp_init(int argc, diff --git a/src/irmd/main.c b/src/irmd/main.c index bf6daacc..a316b3cf 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -58,6 +58,7 @@ #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ #define SHM_SAN_HOLDOFF 1000 /* ms */ #define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) +#define IB_LEN IRM_MSG_BUF_SIZE struct ipcp_entry { struct list_head next; @@ -78,7 +79,7 @@ enum irm_state { IRMD_RUNNING }; -struct irm { +struct { struct list_head registry; /* registered names known */ struct list_head ipcps; /* list of ipcps in system */ @@ -94,13 +95,21 @@ struct irm { struct lockfile * lf; /* single irmd per system */ struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ + uint8_t cbuf[IB_LEN]; /* cmd message buffer */ + size_t cmd_len; /* length of cmd in cbuf */ + int csockfd; /* cmd UNIX socket */ + pthread_cond_t cmd_cond; /* cmd signal condvar */ + pthread_mutex_t cmd_lock; /* cmd signal lock */ + enum irm_state state; /* state of the irmd */ pthread_rwlock_t state_lock; /* lock for the entire irmd */ pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t shm_sanitize; /* keep track of rdrbuff use */ + pthread_t acceptor; /* accept new commands */ } irmd; static enum irm_state irmd_get_state(void) @@ -1466,6 +1475,8 @@ static void irm_fini(void) if (irmd.lf != NULL) lockfile_destroy(irmd.lf); + pthread_mutex_destroy(&irmd.cmd_lock); + pthread_cond_destroy(&irmd.cmd_cond); pthread_rwlock_destroy(&irmd.reg_lock); pthread_rwlock_destroy(&irmd.state_lock); } @@ -1680,69 +1691,114 @@ void * irm_sanitize(void * o) } } -void * mainloop(void * o) +static void * acceptloop(void * o) { - uint8_t buf[IRM_MSG_BUF_SIZE]; - - (void) o; - - while (true) { + int csockfd; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; #if defined(__FreeBSD__) || defined(__APPLE__) - fd_set fds; - struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), - (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; + fd_set fds; + struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), + (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; #endif - int cli_sockfd; - irm_msg_t * msg; - ssize_t count; - buffer_t buffer; - irm_msg_t ret_msg = IRM_MSG__INIT; - struct irm_flow * e = NULL; - pid_t * apis = NULL; - struct timespec * timeo = NULL; - struct timespec ts = {0, 0}; - struct timeval tv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; - - if (irmd_get_state() != IRMD_RUNNING || tpm_check()) { - tpm_exit(); - break; - } + (void) o; - ret_msg.code = IRM_MSG_CODE__IRM_REPLY; -#if defined(__FreeBSD__) || defined(__APPLE__) + while (irmd_get_state() == IRMD_RUNNING) { + ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(irmd.sockfd, &fds); if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) continue; #endif - cli_sockfd = accept(irmd.sockfd, 0, 0); - if (cli_sockfd < 0) + csockfd = accept(irmd.sockfd, 0, 0); + if (csockfd < 0) continue; - if (setsockopt(cli_sockfd, SOL_SOCKET, SO_RCVTIMEO, + if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); + pthread_mutex_lock(&irmd.cmd_lock); + + assert(irmd.csockfd == -1); + + count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE); if (count <= 0) { + pthread_mutex_unlock(&irmd.cmd_lock); log_err("Failed to read from socket."); - close(cli_sockfd); + close(csockfd); continue; } - if (irmd_get_state() != IRMD_RUNNING) { - close(cli_sockfd); - tpm_exit(); - break; + irmd.cmd_len = count; + irmd.csockfd = csockfd; + + pthread_cond_signal(&irmd.cmd_cond); + + while(irmd.csockfd != -1) + pthread_cond_wait(&irmd.cmd_cond, &irmd.cmd_lock); + + pthread_mutex_unlock(&irmd.cmd_lock); + } + + return (void *) 0; +} + +void * mainloop(void * o) +{ + int sfd; + irm_msg_t * msg; + buffer_t buffer; + struct timespec dl; + struct timespec to = {(IRMD_ACCEPT_TIMEOUT / 1000), + (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION}; + + (void) o; + + while (true) { + int ret = 0; + irm_msg_t ret_msg = IRM_MSG__INIT; + struct irm_flow * e = NULL; + pid_t * apis = NULL; + struct timespec * timeo = NULL; + struct timespec ts = {0,0}; + + ret_msg.code = IRM_MSG_CODE__IRM_REPLY; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&irmd.cmd_lock); + + while (irmd.csockfd == -1 && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&irmd.cmd_cond, + &irmd.cmd_lock, + &dl); + + sfd = irmd.csockfd; + irmd.csockfd = -1; + + if (sfd == -1) { + pthread_mutex_unlock(&irmd.cmd_lock); + if (tpm_check()) { + close(sfd); + break; + } + continue; } - msg = irm_msg__unpack(NULL, count, buf); + pthread_cond_broadcast(&irmd.cmd_cond); + + msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf); if (msg == NULL) { - close(cli_sockfd); + pthread_mutex_unlock(&irmd.cmd_lock); + close(sfd); continue; } + pthread_mutex_unlock(&irmd.cmd_lock); + tpm_dec(); if (msg->has_timeo_sec) { @@ -1871,7 +1927,7 @@ void * mainloop(void * o) irm_msg__free_unpacked(msg, NULL); if (ret_msg.result == -EPIPE || !ret_msg.has_result) { - close(cli_sockfd); + close(sfd); tpm_inc(); continue; } @@ -1881,7 +1937,7 @@ void * mainloop(void * o) log_err("Failed to calculate length of reply message."); if (apis != NULL) free(apis); - close(cli_sockfd); + close(sfd); tpm_inc(); continue; } @@ -1890,7 +1946,7 @@ void * mainloop(void * o) if (buffer.data == NULL) { if (apis != NULL) free(apis); - close(cli_sockfd); + close(sfd); tpm_inc(); continue; } @@ -1900,28 +1956,29 @@ void * mainloop(void * o) if (apis != NULL) free(apis); - if (write(cli_sockfd, buffer.data, buffer.len) == -1) + if (write(sfd, buffer.data, buffer.len) == -1) log_warn("Failed to send reply message."); free(buffer.data); - close(cli_sockfd); + close(sfd); tpm_inc(); } + tpm_exit(); + return (void *) 0; } static int irm_init(void) { - struct stat st; - struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), - (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; + struct stat st; + struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), + (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; + pthread_condattr_t cattr; memset(&st, 0, sizeof(st)); - irmd.state = IRMD_NULL; - if (pthread_rwlock_init(&irmd.state_lock, NULL)) { log_err("Failed to initialize rwlock."); goto fail_state_lock; @@ -1937,6 +1994,27 @@ static int irm_init(void) goto fail_flows_lock; } + if (pthread_mutex_init(&irmd.cmd_lock, NULL)) { + log_err("Failed to initialize mutex."); + goto fail_cmd_lock; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize mutex."); + goto fail_cmd_lock; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd.cmd_cond, &cattr)) { + log_err("Failed to initialize condvar."); + pthread_condattr_destroy(&cattr); + goto fail_cmd_cond; + } + + pthread_condattr_destroy(&cattr); + list_head_init(&irmd.ipcps); list_head_init(&irmd.api_table); list_head_init(&irmd.apn_table); @@ -2004,29 +2082,34 @@ static int irm_init(void) goto fail_rdrbuff; } - irmd.state = IRMD_RUNNING; + irmd.csockfd = -1; + irmd.state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); return 0; -fail_rdrbuff: + fail_rdrbuff: shm_rdrbuff_destroy(irmd.rdrb); -fail_sock_opt: + fail_sock_opt: close(irmd.sockfd); -fail_sock_path: + fail_sock_path: unlink(IRM_SOCK_PATH); -fail_stat: + fail_stat: lockfile_destroy(irmd.lf); -fail_lockfile: + fail_lockfile: bmp_destroy(irmd.port_ids); -fail_port_ids: + fail_port_ids: + pthread_cond_destroy(&irmd.cmd_cond); + fail_cmd_cond: + pthread_mutex_destroy(&irmd.cmd_lock); + fail_cmd_lock: pthread_rwlock_destroy(&irmd.flows_lock); -fail_flows_lock: + fail_flows_lock: pthread_rwlock_destroy(&irmd.reg_lock); -fail_reg_lock: + fail_reg_lock: pthread_rwlock_destroy(&irmd.state_lock); -fail_state_lock: + fail_state_lock: return -1; } @@ -2085,29 +2168,39 @@ int main(int argc, log_init(!use_stdout); - if (irm_init() < 0) { - log_fini(); - exit(EXIT_FAILURE); - } + if (irm_init() < 0) + goto fail_irm_init; if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { - log_fini(); - exit(EXIT_FAILURE); + irmd_set_state(IRMD_NULL); + goto fail_tpm_init; } if (tpm_start()) { - tpm_fini(); - log_fini(); - exit(EXIT_FAILURE); + irmd_set_state(IRMD_NULL); + goto fail_tpm_start; + } + + if (pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL)) { + irmd_set_state(IRMD_NULL); + goto fail_irm_sanitize; + } + + if (pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb)) { + irmd_set_state(IRMD_NULL); + goto fail_shm_sanitize; } - pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL); - pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb); + if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) { + irmd_set_state(IRMD_NULL); + goto fail_acceptor; + } /* tpm_stop() called from sighandler */ tpm_fini(); + pthread_join(irmd.acceptor, NULL); pthread_join(irmd.irm_sanitize, NULL); pthread_join(irmd.shm_sanitize, NULL); @@ -2122,4 +2215,18 @@ int main(int argc, log_info("Bye."); exit(EXIT_SUCCESS); + + fail_acceptor: + pthread_join(irmd.shm_sanitize, NULL); + fail_shm_sanitize: + pthread_join(irmd.irm_sanitize, NULL); + fail_irm_sanitize: + tpm_stop(); + fail_tpm_start: + tpm_fini(); + fail_tpm_init: + irm_fini(); + fail_irm_init: + log_fini(); + exit(EXIT_FAILURE); } |