From 188aba280f7c5b80b868cb1527fce9d45702a196 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 26 Oct 2016 14:59:43 +0200 Subject: ipcpd: Add threadpool for main loop This adds a threadpool for the main loop of the IPCPs. Before there was a single thread handling each request, which could result in starvation since performing name queries at the same time as enrolling a normal IPCP was impossible. --- include/ouroboros/config.h.in | 1 + src/ipcpd/ipcp.c | 76 ++++++++++++++++++++++++++----------------- src/ipcpd/ipcp.h | 4 ++- src/lib/dev.c | 3 +- 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 6ffcb97f..122899f3 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -48,6 +48,7 @@ #define SHM_FLOW_SET_PREFIX "/ouroboros.sets." #define IRMD_MAX_FLOWS 4096 #define IRMD_THREADPOOL_SIZE 5 +#define IPCPD_THREADPOOL_SIZE 3 #define LOG_DIR "/@LOG_DIR@/" #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC /* Timeout values */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 90fb94ef..694db7cf 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -41,15 +41,45 @@ int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops) { pthread_condattr_t cattr; + int t; + + struct timeval tv = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; ipcpi.irmd_fd = -1; ipcpi.state = IPCP_INIT; + ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE); + if (ipcpi.threadpool == NULL) { + return -ENOMEM; + } + + ipcpi.sock_path = ipcp_sock_path(getpid()); + if (ipcpi.sock_path == NULL) { + free(ipcpi.threadpool); + return -1; + } + + ipcpi.sockfd = server_socket_open(ipcpi.sock_path); + if (ipcpi.sockfd < 0) { + LOG_ERR("Could not open server socket."); + free(ipcpi.threadpool); + free(ipcpi.sock_path); + return -1; + } + + if (setsockopt(ipcpi.sockfd, SOL_SOCKET, SO_RCVTIMEO, + (void *) &tv, sizeof(tv))) + LOG_WARN("Failed to set timeout on socket."); + ipcpi.ops = ops; ipcpi.data = ipcp_data_create(); - if (ipcpi.data == NULL) + if (ipcpi.data == NULL) { + free(ipcpi.threadpool); + free(ipcpi.sock_path); return -ENOMEM; + } ipcp_data_init(ipcpi.data, type); @@ -61,14 +91,26 @@ int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops) #endif pthread_cond_init(&ipcpi.state_cond, &cattr); - pthread_create(&ipcpi.mainloop, NULL, ipcp_main_loop, NULL); + for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) + pthread_create(&ipcpi.threadpool[t], NULL, + ipcp_main_loop, NULL); return 0; } void ipcp_fini() { - pthread_join(ipcpi.mainloop, NULL); + int t; + + for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t) + pthread_join(ipcpi.threadpool[t], NULL); + + close(ipcpi.sockfd); + if (unlink(ipcpi.sock_path)) + LOG_DBG("Could not unlink %s.", ipcpi.sock_path); + + free(ipcpi.sock_path); + free(ipcpi.threadpool); ipcp_data_destroy(ipcpi.data); pthread_cond_destroy(&ipcpi.state_cond); @@ -171,7 +213,6 @@ int ipcp_parse_arg(int argc, char * argv[]) void * ipcp_main_loop(void * o) { int lsockfd; - int sockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; ipcp_msg_t * msg; @@ -182,32 +223,13 @@ void * ipcp_main_loop(void * o) dif_config_msg_t * conf_msg; struct dif_config conf; - char * sock_path; char * msg_name_dup; - struct timeval tv = {(IPCP_ACCEPT_TIMEOUT / 1000), - (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; - struct timeval ltv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; (void) o; - sock_path = ipcp_sock_path(getpid()); - if (sock_path == NULL) - return (void *) 1; - - sockfd = server_socket_open(sock_path); - if (sockfd < 0) { - LOG_ERR("Could not open server socket."); - free(sock_path); - return (void *) 1; - } - - if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) &tv, sizeof(tv))) - LOG_WARN("Failed to set timeout on socket."); - while (true) { int fd = -1; @@ -221,7 +243,7 @@ void * ipcp_main_loop(void * o) ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; - lsockfd = accept(sockfd, 0, 0); + lsockfd = accept(ipcpi.sockfd, 0, 0); if (lsockfd < 0) continue; @@ -416,11 +438,5 @@ void * ipcp_main_loop(void * o) close(lsockfd); } - close(sockfd); - if (unlink(sock_path)) - LOG_DBG("Could not unlink %s.", sock_path); - - free(sock_path); - return (void *) 0; } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 18a5bdab..c89fe438 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -50,7 +50,9 @@ struct ipcp { pthread_mutex_t state_mtx; pthread_cond_t state_cond; - pthread_t mainloop; + int sockfd; + char * sock_path; + pthread_t * threadpool; } ipcpi; int ipcp_init(enum ipcp_type type, diff --git a/src/lib/dev.c b/src/lib/dev.c index 94fbd394..55ee7572 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -489,9 +489,8 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) { + if (recv_msg == NULL) return -1; - } if (!recv_msg->has_api || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); -- cgit v1.2.3