From e00181b492d573ecd0621f55d9ad24f134c09d4c Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 3 Oct 2018 01:13:43 +0200 Subject: ipcpd: Add multithreading to Ethernet IPCP This adds multiple reader and writer threads, configurabe via cmake with IPCP_ETH_RD_THR and IPCP_ETH_WR_THR. Improves ethernet IPCP throughput, which looks to be limited by the raw socket calls. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/config.h.in | 2 + src/ipcpd/eth/CMakeLists.txt | 5 +++ src/ipcpd/eth/eth.c | 87 ++++++++++++++++++++++++++++---------------- 3 files changed, 63 insertions(+), 31 deletions(-) diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index b9961b01..afce5e86 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -63,3 +63,5 @@ #cmakedefine HAVE_NETMAP #cmakedefine HAVE_BPF #cmakedefine HAVE_RAW_SOCKETS +#define IPCP_ETH_RD_THR @IPCP_ETH_RD_THR@ +#define IPCP_ETH_WR_THR @IPCP_ETH_WR_THR@ diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt index 6b8d1a77..7bad6ac0 100644 --- a/src/ipcpd/eth/CMakeLists.txt +++ b/src/ipcpd/eth/CMakeLists.txt @@ -77,6 +77,11 @@ endif () if (HAVE_ETH) message(STATUS "Supported raw packet API found, building eth-llc and eth-dix") + set(IPCP_ETH_RD_THR 3 CACHE STRING + "Number of reader threads in Ethernet IPCP") + set(IPCP_ETH_WR_THR 3 CACHE STRING + "Number of writer threads in Ethernet IPCP") + set(ETH_LLC_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/llc.c diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 04debfd1..af34f68e 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -215,11 +215,10 @@ struct { #endif struct ef * fd_to_ef; fset_t * np1_flows; - fqueue_t * fq; pthread_rwlock_t flows_lock; - pthread_t sdu_writer; - pthread_t sdu_reader; + pthread_t sdu_writer[IPCP_ETH_WR_THR]; + pthread_t sdu_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -260,10 +259,6 @@ static int eth_data_init(void) if (eth_data.np1_flows == NULL) goto fail_np1_flows; - eth_data.fq = fqueue_create(); - if (eth_data.fq == NULL) - goto fail_fq; - for (i = 0; i < SYS_MAX_FLOWS; ++i) { #if defined(BUILD_ETH_DIX) eth_data.fd_to_ef[i].r_eid = -1; @@ -311,8 +306,6 @@ static int eth_data_init(void) fail_flows_lock: shim_data_destroy(eth_data.shim_data); fail_shim_data: - fqueue_destroy(eth_data.fq); - fail_fq: fset_destroy(eth_data.np1_flows); fail_np1_flows: #ifdef BUILD_ETH_LLC @@ -339,7 +332,6 @@ static void eth_data_fini(void) pthread_mutex_destroy(ð_data.mgmt_lock); pthread_rwlock_destroy(ð_data.flows_lock); shim_data_destroy(eth_data.shim_data); - fqueue_destroy(eth_data.fq); fset_destroy(eth_data.np1_flows); #ifdef BUILD_ETH_LLC bmp_destroy(eth_data.saps); @@ -964,6 +956,11 @@ static void * eth_ipcp_sdu_reader(void * o) return (void *) 0; } +static void cleanup_writer(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + static void * eth_ipcp_sdu_writer(void * o) { int fd; @@ -977,15 +974,22 @@ static void * eth_ipcp_sdu_writer(void * o) #endif uint8_t r_addr[MAC_SIZE]; + fqueue_t * fq; + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + (void) o; + pthread_cleanup_push(cleanup_writer, fq); + ipcp_lock_to_core(); while (true) { - fevent(eth_data.np1_flows, eth_data.fq, NULL); - + fevent(eth_data.np1_flows, fq, NULL); pthread_rwlock_rdlock(ð_data.flows_lock); - while ((fd = fqueue_next(eth_data.fq)) >= 0) { + while ((fd = fqueue_next(fq)) >= 0) { if (ipcp_flow_read(fd, &sdb)) { log_dbg("Bad read from fd %d.", fd); continue; @@ -1021,6 +1025,8 @@ static void * eth_ipcp_sdu_writer(void * o) pthread_rwlock_unlock(ð_data.flows_lock); } + pthread_cleanup_pop(true); + return (void *) 1; } @@ -1382,20 +1388,24 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) goto fail_mgmt_handler; } - if (pthread_create(ð_data.sdu_reader, - NULL, - eth_ipcp_sdu_reader, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { + if (pthread_create(ð_data.sdu_reader[idx], + NULL, + eth_ipcp_sdu_reader, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_sdu_reader; + } } - if (pthread_create(ð_data.sdu_writer, - NULL, - eth_ipcp_sdu_writer, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { + if (pthread_create(ð_data.sdu_writer[idx], + NULL, + eth_ipcp_sdu_writer, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_sdu_writer; + } } #if defined(BUILD_ETH_DIX) @@ -1409,9 +1419,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return 0; fail_sdu_writer: - pthread_cancel(eth_data.sdu_reader); - pthread_join(eth_data.sdu_reader, NULL); + while (idx > 0) { + pthread_cancel(eth_data.sdu_writer[--idx]); + pthread_join(eth_data.sdu_writer[idx], NULL); + } + idx = IPCP_ETH_RD_THR; fail_sdu_reader: + while (idx > 0) { + pthread_cancel(eth_data.sdu_reader[--idx]); + pthread_join(eth_data.sdu_reader[idx], NULL); + } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); fail_mgmt_handler: @@ -1697,6 +1714,8 @@ static struct ipcp_ops eth_ops = { int main(int argc, char * argv[]) { + int i; + if (ipcp_init(argc, argv, ð_ops) < 0) goto fail_init; @@ -1723,14 +1742,20 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(eth_data.sdu_writer); - pthread_cancel(eth_data.sdu_reader); + for (i = 0; i < IPCP_ETH_WR_THR; ++i) + pthread_cancel(eth_data.sdu_writer[i]); + for (i = 0; i < IPCP_ETH_RD_THR; ++i) + pthread_cancel(eth_data.sdu_reader[i]); + pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif - pthread_join(eth_data.sdu_writer, NULL); - pthread_join(eth_data.sdu_reader, NULL); + for (i = 0; i < IPCP_ETH_WR_THR; ++i) + pthread_join(eth_data.sdu_writer[i], NULL); + for (i = 0; i < IPCP_ETH_RD_THR; ++i) + pthread_join(eth_data.sdu_reader[i], NULL); + pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ pthread_join(eth_data.if_monitor, NULL); -- cgit v1.2.3