diff options
| author | Dimitri Staessens <dimitri.staessens@ugent.be> | 2018-10-03 01:13:43 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2018-10-03 09:14:57 +0200 | 
| commit | e00181b492d573ecd0621f55d9ad24f134c09d4c (patch) | |
| tree | f7c1b1f1df892bd3766060419d34cd1363a6eedd /src/ipcpd | |
| parent | ee73b781c1e13daee67e149f1828d7166e5ea627 (diff) | |
| download | ouroboros-e00181b492d573ecd0621f55d9ad24f134c09d4c.tar.gz ouroboros-e00181b492d573ecd0621f55d9ad24f134c09d4c.zip | |
ipcpd: Add multithreading to Ethernet IPCP
This adds multiple reader and writer threads, configurabe via cmake
with IPCP_ETH_RD_THR and IPCP_ETH_WR_THR. Improves ethernet IPCP
throughput, which looks to be limited by the raw socket calls.
Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/config.h.in | 2 | ||||
| -rw-r--r-- | src/ipcpd/eth/CMakeLists.txt | 5 | ||||
| -rw-r--r-- | src/ipcpd/eth/eth.c | 87 | 
3 files changed, 63 insertions, 31 deletions
| diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index b9961b01..afce5e86 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -63,3 +63,5 @@  #cmakedefine HAVE_NETMAP  #cmakedefine HAVE_BPF  #cmakedefine HAVE_RAW_SOCKETS +#define IPCP_ETH_RD_THR  @IPCP_ETH_RD_THR@ +#define IPCP_ETH_WR_THR  @IPCP_ETH_WR_THR@ diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt index 6b8d1a77..7bad6ac0 100644 --- a/src/ipcpd/eth/CMakeLists.txt +++ b/src/ipcpd/eth/CMakeLists.txt @@ -77,6 +77,11 @@ endif ()  if (HAVE_ETH)    message(STATUS "Supported raw packet API found, building eth-llc and eth-dix") +  set(IPCP_ETH_RD_THR 3 CACHE STRING +    "Number of reader threads in Ethernet IPCP") +  set(IPCP_ETH_WR_THR 3 CACHE STRING +    "Number of writer threads in Ethernet IPCP") +    set(ETH_LLC_SOURCES      # Add source files here      ${CMAKE_CURRENT_SOURCE_DIR}/llc.c diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 04debfd1..af34f68e 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -215,11 +215,10 @@ struct {  #endif          struct ef *        fd_to_ef;          fset_t *           np1_flows; -        fqueue_t *         fq;          pthread_rwlock_t   flows_lock; -        pthread_t          sdu_writer; -        pthread_t          sdu_reader; +        pthread_t          sdu_writer[IPCP_ETH_WR_THR]; +        pthread_t          sdu_reader[IPCP_ETH_RD_THR];  #ifdef __linux__          pthread_t          if_monitor; @@ -260,10 +259,6 @@ static int eth_data_init(void)          if (eth_data.np1_flows == NULL)                  goto fail_np1_flows; -        eth_data.fq = fqueue_create(); -        if (eth_data.fq == NULL) -                goto fail_fq; -          for (i = 0; i < SYS_MAX_FLOWS; ++i) {  #if defined(BUILD_ETH_DIX)                  eth_data.fd_to_ef[i].r_eid = -1; @@ -311,8 +306,6 @@ static int eth_data_init(void)   fail_flows_lock:          shim_data_destroy(eth_data.shim_data);   fail_shim_data: -        fqueue_destroy(eth_data.fq); - fail_fq:          fset_destroy(eth_data.np1_flows);   fail_np1_flows:  #ifdef BUILD_ETH_LLC @@ -339,7 +332,6 @@ static void eth_data_fini(void)          pthread_mutex_destroy(ð_data.mgmt_lock);          pthread_rwlock_destroy(ð_data.flows_lock);          shim_data_destroy(eth_data.shim_data); -        fqueue_destroy(eth_data.fq);          fset_destroy(eth_data.np1_flows);  #ifdef BUILD_ETH_LLC          bmp_destroy(eth_data.saps); @@ -964,6 +956,11 @@ static void * eth_ipcp_sdu_reader(void * o)          return (void *) 0;  } +static void cleanup_writer(void * o) +{ +        fqueue_destroy((fqueue_t *) o); +} +  static void * eth_ipcp_sdu_writer(void * o)  {          int                  fd; @@ -977,15 +974,22 @@ static void * eth_ipcp_sdu_writer(void * o)  #endif          uint8_t              r_addr[MAC_SIZE]; +        fqueue_t *           fq; + +        fq = fqueue_create(); +        if (fq == NULL) +                return (void *) -1; +          (void) o; +        pthread_cleanup_push(cleanup_writer, fq); +          ipcp_lock_to_core();          while (true) { -                fevent(eth_data.np1_flows, eth_data.fq, NULL); - +                fevent(eth_data.np1_flows, fq, NULL);                  pthread_rwlock_rdlock(ð_data.flows_lock); -                while ((fd = fqueue_next(eth_data.fq)) >= 0) { +                while ((fd = fqueue_next(fq)) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  log_dbg("Bad read from fd %d.", fd);                                  continue; @@ -1021,6 +1025,8 @@ static void * eth_ipcp_sdu_writer(void * o)                  pthread_rwlock_unlock(ð_data.flows_lock);          } +        pthread_cleanup_pop(true); +          return (void *) 1;  } @@ -1382,20 +1388,24 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)                  goto fail_mgmt_handler;          } -        if (pthread_create(ð_data.sdu_reader, -                           NULL, -                           eth_ipcp_sdu_reader, -                           NULL)) { -                ipcp_set_state(IPCP_INIT); -                goto fail_sdu_reader; +        for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { +                if (pthread_create(ð_data.sdu_reader[idx], +                                   NULL, +                                   eth_ipcp_sdu_reader, +                                   NULL)) { +                        ipcp_set_state(IPCP_INIT); +                        goto fail_sdu_reader; +                }          } -        if (pthread_create(ð_data.sdu_writer, -                           NULL, -                           eth_ipcp_sdu_writer, -                           NULL)) { -                ipcp_set_state(IPCP_INIT); -                goto fail_sdu_writer; +        for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { +                if (pthread_create(ð_data.sdu_writer[idx], +                                   NULL, +                                   eth_ipcp_sdu_writer, +                                   NULL)) { +                        ipcp_set_state(IPCP_INIT); +                        goto fail_sdu_writer; +                }          }  #if defined(BUILD_ETH_DIX) @@ -1409,9 +1419,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)          return 0;   fail_sdu_writer: -        pthread_cancel(eth_data.sdu_reader); -        pthread_join(eth_data.sdu_reader, NULL); +        while (idx > 0) { +                pthread_cancel(eth_data.sdu_writer[--idx]); +                pthread_join(eth_data.sdu_writer[idx], NULL); +        } +        idx = IPCP_ETH_RD_THR;   fail_sdu_reader: +        while (idx > 0) { +                pthread_cancel(eth_data.sdu_reader[--idx]); +                pthread_join(eth_data.sdu_reader[idx], NULL); +        }          pthread_cancel(eth_data.mgmt_handler);          pthread_join(eth_data.mgmt_handler, NULL);   fail_mgmt_handler: @@ -1697,6 +1714,8 @@ static struct ipcp_ops eth_ops = {  int main(int    argc,           char * argv[])  { +        int i; +          if (ipcp_init(argc, argv, ð_ops) < 0)                  goto fail_init; @@ -1723,14 +1742,20 @@ int main(int    argc,          ipcp_shutdown();          if (ipcp_get_state() == IPCP_SHUTDOWN) { -                pthread_cancel(eth_data.sdu_writer); -                pthread_cancel(eth_data.sdu_reader); +                for (i = 0; i < IPCP_ETH_WR_THR; ++i) +                        pthread_cancel(eth_data.sdu_writer[i]); +                for (i = 0; i < IPCP_ETH_RD_THR; ++i) +                        pthread_cancel(eth_data.sdu_reader[i]); +                  pthread_cancel(eth_data.mgmt_handler);  #ifdef __linux__                  pthread_cancel(eth_data.if_monitor);  #endif -                pthread_join(eth_data.sdu_writer, NULL); -                pthread_join(eth_data.sdu_reader, NULL); +                for (i = 0; i < IPCP_ETH_WR_THR; ++i) +                        pthread_join(eth_data.sdu_writer[i], NULL); +                for (i = 0; i < IPCP_ETH_RD_THR; ++i) +                        pthread_join(eth_data.sdu_reader[i], NULL); +                  pthread_join(eth_data.mgmt_handler, NULL);  #ifdef __linux__                  pthread_join(eth_data.if_monitor, NULL); | 
