From 51bb05f15b78fbfdf53417ec1d1c21e999c0e556 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 26 Sep 2018 14:49:39 +0200 Subject: include: Remove _DEFAULT_SOURCE in endian.h This removes the _DEFAULT_SOURCE definition in the endian header as it should not be there. This avoids double and conflicting definitions. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/eth/eth.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 443f3fdb..114cafce 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -29,6 +29,8 @@ #define _DARWIN_C_SOURCE #elif defined(__FreeBSD__) #define __BSD_VISIBLE 1 +#elif defined (__linux__) || defined (__CYGWIN__) +#define _DEFAULT_SOURCE #else #define _POSIX_C_SOURCE 200112L #endif -- cgit v1.2.3 From 3beb3927d0207db06e24228c314a2199b0be27d3 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 27 Sep 2018 11:47:14 +0200 Subject: ipcpd: Remove double close in Ethernet IPCP There was a double close of a fd in the Ethernet IPCP. It also passes the correct max length to a memcpy (a strlen of the source string was used previously). Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/eth.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 114cafce..04debfd1 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1200,7 +1200,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } memset(&ifr, 0, sizeof(ifr)); - memcpy(ifr.ifr_name, conf->dev, strlen(conf->dev)); + memcpy(ifr.ifr_name, conf->dev, IFNAMSIZ); #ifdef BUILD_ETH_DIX if (conf->ethertype < 0x0600 || conf->ethertype == 0xFFFF) { @@ -1276,7 +1276,6 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) idx = if_nametoindex(conf->dev); if (idx == 0) { log_err("Failed to retrieve interface index."); - close(skfd); return -1; } #endif /* __FreeBSD__ */ -- cgit v1.2.3 From e00181b492d573ecd0621f55d9ad24f134c09d4c Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 3 Oct 2018 01:13:43 +0200 Subject: ipcpd: Add multithreading to Ethernet IPCP This adds multiple reader and writer threads, configurabe via cmake with IPCP_ETH_RD_THR and IPCP_ETH_WR_THR. Improves ethernet IPCP throughput, which looks to be limited by the raw socket calls. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/config.h.in | 2 + src/ipcpd/eth/CMakeLists.txt | 5 +++ src/ipcpd/eth/eth.c | 87 ++++++++++++++++++++++++++++---------------- 3 files changed, 63 insertions(+), 31 deletions(-) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index b9961b01..afce5e86 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -63,3 +63,5 @@ #cmakedefine HAVE_NETMAP #cmakedefine HAVE_BPF #cmakedefine HAVE_RAW_SOCKETS +#define IPCP_ETH_RD_THR @IPCP_ETH_RD_THR@ +#define IPCP_ETH_WR_THR @IPCP_ETH_WR_THR@ diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt index 6b8d1a77..7bad6ac0 100644 --- a/src/ipcpd/eth/CMakeLists.txt +++ b/src/ipcpd/eth/CMakeLists.txt @@ -77,6 +77,11 @@ endif () if (HAVE_ETH) message(STATUS "Supported raw packet API found, building eth-llc and eth-dix") + set(IPCP_ETH_RD_THR 3 CACHE STRING + "Number of reader threads in Ethernet IPCP") + set(IPCP_ETH_WR_THR 3 CACHE STRING + "Number of writer threads in Ethernet IPCP") + set(ETH_LLC_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/llc.c diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 04debfd1..af34f68e 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -215,11 +215,10 @@ struct { #endif struct ef * fd_to_ef; fset_t * np1_flows; - fqueue_t * fq; pthread_rwlock_t flows_lock; - pthread_t sdu_writer; - pthread_t sdu_reader; + pthread_t sdu_writer[IPCP_ETH_WR_THR]; + pthread_t sdu_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -260,10 +259,6 @@ static int eth_data_init(void) if (eth_data.np1_flows == NULL) goto fail_np1_flows; - eth_data.fq = fqueue_create(); - if (eth_data.fq == NULL) - goto fail_fq; - for (i = 0; i < SYS_MAX_FLOWS; ++i) { #if defined(BUILD_ETH_DIX) eth_data.fd_to_ef[i].r_eid = -1; @@ -311,8 +306,6 @@ static int eth_data_init(void) fail_flows_lock: shim_data_destroy(eth_data.shim_data); fail_shim_data: - fqueue_destroy(eth_data.fq); - fail_fq: fset_destroy(eth_data.np1_flows); fail_np1_flows: #ifdef BUILD_ETH_LLC @@ -339,7 +332,6 @@ static void eth_data_fini(void) pthread_mutex_destroy(ð_data.mgmt_lock); pthread_rwlock_destroy(ð_data.flows_lock); shim_data_destroy(eth_data.shim_data); - fqueue_destroy(eth_data.fq); fset_destroy(eth_data.np1_flows); #ifdef BUILD_ETH_LLC bmp_destroy(eth_data.saps); @@ -964,6 +956,11 @@ static void * eth_ipcp_sdu_reader(void * o) return (void *) 0; } +static void cleanup_writer(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + static void * eth_ipcp_sdu_writer(void * o) { int fd; @@ -977,15 +974,22 @@ static void * eth_ipcp_sdu_writer(void * o) #endif uint8_t r_addr[MAC_SIZE]; + fqueue_t * fq; + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + (void) o; + pthread_cleanup_push(cleanup_writer, fq); + ipcp_lock_to_core(); while (true) { - fevent(eth_data.np1_flows, eth_data.fq, NULL); - + fevent(eth_data.np1_flows, fq, NULL); pthread_rwlock_rdlock(ð_data.flows_lock); - while ((fd = fqueue_next(eth_data.fq)) >= 0) { + while ((fd = fqueue_next(fq)) >= 0) { if (ipcp_flow_read(fd, &sdb)) { log_dbg("Bad read from fd %d.", fd); continue; @@ -1021,6 +1025,8 @@ static void * eth_ipcp_sdu_writer(void * o) pthread_rwlock_unlock(ð_data.flows_lock); } + pthread_cleanup_pop(true); + return (void *) 1; } @@ -1382,20 +1388,24 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) goto fail_mgmt_handler; } - if (pthread_create(ð_data.sdu_reader, - NULL, - eth_ipcp_sdu_reader, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { + if (pthread_create(ð_data.sdu_reader[idx], + NULL, + eth_ipcp_sdu_reader, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_sdu_reader; + } } - if (pthread_create(ð_data.sdu_writer, - NULL, - eth_ipcp_sdu_writer, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { + if (pthread_create(ð_data.sdu_writer[idx], + NULL, + eth_ipcp_sdu_writer, + NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_sdu_writer; + } } #if defined(BUILD_ETH_DIX) @@ -1409,9 +1419,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return 0; fail_sdu_writer: - pthread_cancel(eth_data.sdu_reader); - pthread_join(eth_data.sdu_reader, NULL); + while (idx > 0) { + pthread_cancel(eth_data.sdu_writer[--idx]); + pthread_join(eth_data.sdu_writer[idx], NULL); + } + idx = IPCP_ETH_RD_THR; fail_sdu_reader: + while (idx > 0) { + pthread_cancel(eth_data.sdu_reader[--idx]); + pthread_join(eth_data.sdu_reader[idx], NULL); + } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); fail_mgmt_handler: @@ -1697,6 +1714,8 @@ static struct ipcp_ops eth_ops = { int main(int argc, char * argv[]) { + int i; + if (ipcp_init(argc, argv, ð_ops) < 0) goto fail_init; @@ -1723,14 +1742,20 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(eth_data.sdu_writer); - pthread_cancel(eth_data.sdu_reader); + for (i = 0; i < IPCP_ETH_WR_THR; ++i) + pthread_cancel(eth_data.sdu_writer[i]); + for (i = 0; i < IPCP_ETH_RD_THR; ++i) + pthread_cancel(eth_data.sdu_reader[i]); + pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif - pthread_join(eth_data.sdu_writer, NULL); - pthread_join(eth_data.sdu_reader, NULL); + for (i = 0; i < IPCP_ETH_WR_THR; ++i) + pthread_join(eth_data.sdu_writer[i], NULL); + for (i = 0; i < IPCP_ETH_RD_THR; ++i) + pthread_join(eth_data.sdu_reader[i], NULL); + pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ pthread_join(eth_data.if_monitor, NULL); -- cgit v1.2.3 From 0698f576d98ed1c4131d419c40019090f3ae06d1 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 3 Oct 2018 09:51:52 +0200 Subject: ipcpd: Fix bad lock in Ethernet IPCP An unlock was called twice instead of a lock/unlock sequence, causing a data race. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/eth.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index af34f68e..9456f4cb 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -905,8 +905,8 @@ static void * eth_ipcp_sdu_reader(void * o) memcpy(frame->buf, &e_frame->payload, length); memcpy(frame->r_addr, e_frame->src_hwaddr, MAC_SIZE); - pthread_mutex_unlock(ð_data.mgmt_lock); + pthread_mutex_lock(ð_data.mgmt_lock); list_add(&frame->next, ð_data.mgmt_frames); pthread_cond_signal(ð_data.mgmt_cond); pthread_mutex_unlock(ð_data.mgmt_lock); -- cgit v1.2.3 From a789531e1dab39fcddc6d57af2312ead3b1fa4cc Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 3 Oct 2018 10:59:00 +0200 Subject: ipcpd: Fix build for netmap enabled Ethernet IPCP There were some compilation issues introduced by adding the interface monitor to the Ethernet IPCP. Furthermore it was not possible to select between raw sockets or netmap if both were available. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/CMakeLists.txt | 6 ++---- src/ipcpd/eth/eth.c | 10 +++++++--- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt index 7bad6ac0..624b8266 100644 --- a/src/ipcpd/eth/CMakeLists.txt +++ b/src/ipcpd/eth/CMakeLists.txt @@ -19,6 +19,8 @@ find_path(NETMAP_C_INCLUDE_DIR mark_as_advanced(NETMAP_C_INCLUDE_DIR) +unset(HAVE_ETH) + if (CMAKE_SYSTEM_NAME STREQUAL "Linux") set(DISABLE_RAW_SOCKETS FALSE CACHE BOOL "Disable raw socket support for Ethernet IPCPs") @@ -29,7 +31,6 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux") else () message(STATUS "Raw socket support for Ethernet IPCPs disabled by user") unset(HAVE_RAW_SOCKETS) - unset(HAVE_ETH) endif () endif () @@ -53,7 +54,6 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL "Linux") message(STATUS "Berkeley Packet Filter support " "for Ethernet IPCPs disabled by user") unset(HAVE_BPF) - unset(HAVE_ETH) endif () endif () endif () @@ -69,8 +69,6 @@ if (NETMAP_C_INCLUDE_DIR) else () message(STATUS "Netmap support for Ethernet IPCPs disabled by user") unset(HAVE_NETMAP) - unset(HAVE_ETH) - unset(IPCP_ETH_TARGET CACHE) endif () endif () diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 9456f4cb..8175d803 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -194,6 +194,7 @@ struct { struct shim_data * shim_data; #ifdef __linux__ int mtu; + int if_idx; #endif #if defined(HAVE_NETMAP) struct nm_desc * nmd; @@ -884,7 +885,9 @@ static void * eth_ipcp_sdu_reader(void * o) if (deid == MGMT_EID) { #elif defined (BUILD_ETH_LLC) if (length > 0x05FF) {/* DIX */ +#ifndef HAVE_NETMAP ipcp_sdb_release(sdb); +#endif continue; } @@ -1137,7 +1140,7 @@ static void * eth_ipcp_if_monitor(void * o) ifi = NLMSG_DATA(h); /* Not our interface */ - if (ifi->ifi_index != eth_data.device.sll_ifindex) + if (ifi->ifi_index != eth_data.if_idx) continue; if (ifi->ifi_flags & IFF_UP) { @@ -1284,6 +1287,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) log_err("Failed to retrieve interface index."); return -1; } + eth_data.if_idx = idx; #endif /* __FreeBSD__ */ #if defined(HAVE_NETMAP) @@ -1370,7 +1374,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #endif /* HAVE_NETMAP */ ipcp_set_state(IPCP_OPERATIONAL); -#ifdef __linux__ +#if defined(__linux__) if (pthread_create(ð_data.if_monitor, NULL, eth_ipcp_if_monitor, @@ -1436,7 +1440,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) pthread_cancel(eth_data.if_monitor); pthread_join(eth_data.if_monitor, NULL); #endif -#if !defined(HAVE_NETMAP) +#if defined(__linux__) || !defined(HAVE_NETMAP) fail_device: #endif #if defined(HAVE_NETMAP) -- cgit v1.2.3 From 4b88be0201a947491130f174552a86a50c233dc2 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 3 Oct 2018 12:16:44 +0200 Subject: ipcpd: Bypass Qdisc in Ethernet IPCP Since Linux kernel 3.14 there is the option to bypass the kernel Qdisc. This will speed up the Ethernet IPCP. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/eth.c | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 8175d803..05dd9ac7 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1199,6 +1199,9 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #endif #ifndef SHM_RDRB_MULTI_BLOCK size_t maxsz; +#endif +#if defined(HAVE_RAW_SOCKETS) + int qdisc_bypass = 1; #endif assert(conf); assert(conf->type == THIS_TYPE); @@ -1365,6 +1368,11 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return -1; } + if (setsockopt(eth_data.s_fd, SOL_PACKET, PACKET_QDISC_BYPASS, + &qdisc_bypass, sizeof(qdisc_bypass))) { + log_info("Qdisc bypass not supported."); + } + if (bind(eth_data.s_fd, (struct sockaddr *) ð_data.device, sizeof(eth_data.device))) { log_err("Failed to bind socket to interface"); -- cgit v1.2.3 From c9a54ee6b45c8566cf2d24f9763907451b50f7c1 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 3 Oct 2018 14:37:15 +0200 Subject: build: Prioritize raw socket API in build This will change the build to exclusively select one raw socket API in case multiple are present in the sytem, which will simplify the code. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/CMakeLists.txt | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt index 624b8266..6672f93c 100644 --- a/src/ipcpd/eth/CMakeLists.txt +++ b/src/ipcpd/eth/CMakeLists.txt @@ -14,31 +14,31 @@ include_directories(${CMAKE_BINARY_DIR}/include) find_path(NETMAP_C_INCLUDE_DIR net/netmap_user.h - HINTS /usr/include /usr/local/include -) + HINTS /usr/include /usr/local/include) mark_as_advanced(NETMAP_C_INCLUDE_DIR) -unset(HAVE_ETH) - +# Check for raw sockets if (CMAKE_SYSTEM_NAME STREQUAL "Linux") set(DISABLE_RAW_SOCKETS FALSE CACHE BOOL "Disable raw socket support for Ethernet IPCPs") if (NOT DISABLE_RAW_SOCKETS) message(STATUS "Raw socket support for Ethernet IPCPs enabled") set(HAVE_RAW_SOCKETS TRUE PARENT_SCOPE) + set(HAVE_RAW_SOCKETS TRUE) set(HAVE_ETH TRUE) else () message(STATUS "Raw socket support for Ethernet IPCPs disabled by user") + unset(HAVE_RAW_SOCKETS PARENT_SCOPE) unset(HAVE_RAW_SOCKETS) endif () endif () +# Check for BPF if (NOT CMAKE_SYSTEM_NAME STREQUAL "Linux") find_path(BPF_C_INCLUDE_DIR - net/bpf.h - HINTS /usr/include /usr/local/include - ) + net/bpf.h + HINTS /usr/include /usr/local/include) mark_as_advanced(BPF_C_INCLUDE_DIR) @@ -47,28 +47,31 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL "Linux") "Disable Berkeley Packet Filter support for Ethernet IPCPs") if (NOT DISABLE_BPF) message(STATUS "Berkeley Packet Filter support " - "for Ethernet IPCPs enabled") + "for Ethernet IPCPs enabled") set(HAVE_BPF TRUE PARENT_SCOPE) + set(HAVE_BPF TRUE) set(HAVE_ETH TRUE) else () message(STATUS "Berkeley Packet Filter support " - "for Ethernet IPCPs disabled by user") + "for Ethernet IPCPs disabled by user") + unset(HAVE_BPF PARENT_SCOPE) unset(HAVE_BPF) endif () endif () endif () -if (NETMAP_C_INCLUDE_DIR) +# Check for netmap exclusively +if (NOT HAVE_RAW_SOCKETS AND NOT HAVE_BPF AND NETMAP_C_INCLUDE_DIR) set(DISABLE_NETMAP FALSE CACHE BOOL - "Disable netmap support for ETH IPCPs") + "Disable netmap support for ETH IPCPs") if (NOT DISABLE_NETMAP) message(STATUS "Netmap support for Ethernet IPCPs enabled") - set(HAVE_NETMAP TRUE PARENT_SCOPE) test_and_set_c_compiler_flag_global(-std=c99) + set(HAVE_NETMAP TRUE PARENT_SCOPE) set(HAVE_ETH TRUE) else () message(STATUS "Netmap support for Ethernet IPCPs disabled by user") - unset(HAVE_NETMAP) + unset(HAVE_NETMAP PARENT_SCOPE) endif () endif () @@ -83,12 +86,12 @@ if (HAVE_ETH) set(ETH_LLC_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/llc.c - ) + ) set(ETH_DIX_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/dix.c - ) + ) set(IPCP_ETH_LLC_TARGET ipcpd-eth-llc CACHE INTERNAL "") set(IPCP_ETH_DIX_TARGET ipcpd-eth-dix CACHE INTERNAL "") -- cgit v1.2.3 From 937adca2a718b160b6d42bb8a3f28d96321fdb49 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 3 Oct 2018 15:32:22 +0200 Subject: ipcpd: Use non-blocking socket in Ethernet IPCP Since the Ethernet IPCP now has multiple reader threads it was possible that both exit the select call, which caused one of the two threads to block on the recv call. This makes the socket non-blocking so that the recv call simply fails. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/eth.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'src/ipcpd/eth') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 05dd9ac7..44ef3756 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -991,7 +991,6 @@ static void * eth_ipcp_sdu_writer(void * o) while (true) { fevent(eth_data.np1_flows, fq, NULL); - pthread_rwlock_rdlock(ð_data.flows_lock); while ((fd = fqueue_next(fq)) >= 0) { if (ipcp_flow_read(fd, &sdb)) { log_dbg("Bad read from fd %d.", fd); @@ -1005,6 +1004,8 @@ static void * eth_ipcp_sdu_writer(void * o) log_dbg("Failed to allocate header."); ipcp_sdb_release(sdb); } + + pthread_rwlock_rdlock(ð_data.flows_lock); #if defined(BUILD_ETH_DIX) deid = eth_data.fd_to_ef[fd].r_eid; #elif defined(BUILD_ETH_LLC) @@ -1015,6 +1016,8 @@ static void * eth_ipcp_sdu_writer(void * o) eth_data.fd_to_ef[fd].r_addr, MAC_SIZE); + pthread_rwlock_unlock(ð_data.flows_lock); + eth_ipcp_send_frame(r_addr, #if defined(BUILD_ETH_DIX) deid, @@ -1025,7 +1028,6 @@ static void * eth_ipcp_sdu_writer(void * o) len); ipcp_sdb_release(sdb); } - pthread_rwlock_unlock(ð_data.flows_lock); } pthread_cleanup_pop(true); @@ -1202,6 +1204,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #endif #if defined(HAVE_RAW_SOCKETS) int qdisc_bypass = 1; + int flags; #endif assert(conf); assert(conf->type == THIS_TYPE); @@ -1368,6 +1371,17 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return -1; } + flags = fcntl(eth_data.s_fd, F_GETFL, 0); + if (flags < 0) { + log_err("Failed to get flags."); + goto fail_device; + } + + if (fcntl(eth_data.s_fd, F_SETFL, flags | O_NONBLOCK)) { + log_err("Failed to set socket non-blocking."); + goto fail_device; + } + if (setsockopt(eth_data.s_fd, SOL_PACKET, PACKET_QDISC_BYPASS, &qdisc_bypass, sizeof(qdisc_bypass))) { log_info("Qdisc bypass not supported."); @@ -1375,7 +1389,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) if (bind(eth_data.s_fd, (struct sockaddr *) ð_data.device, sizeof(eth_data.device))) { - log_err("Failed to bind socket to interface"); + log_err("Failed to bind socket to interface."); goto fail_device; } -- cgit v1.2.3 From b802b25ddfe6f1b6ecabe3ba70e3dac2e99e7a50 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 4 Oct 2018 18:06:32 +0200 Subject: lib: Pass qosspec at flow allocation The flow allocator now passes the full qos specification to the endpoint, instead of just a cube. This is a more flexible architecture, as it makes QoS cubes internal to the layers. Adds endianness transforms for the flow allocator protocol in the normal IPCP. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/ipcp-dev.h | 2 +- include/ouroboros/np1_flow.h | 2 +- include/ouroboros/qos.h | 22 ++++------ include/ouroboros/sockets.h.in | 11 +++++ src/ipcpd/eth/eth.c | 51 ++++++++++++++++------ src/ipcpd/ipcp.c | 13 +++++- src/ipcpd/ipcp.h | 2 +- src/ipcpd/local/main.c | 4 +- src/ipcpd/normal/dt.c | 2 - src/ipcpd/normal/fa.c | 57 +++++++++++++++++------- src/ipcpd/normal/fa.h | 2 +- src/ipcpd/raptor/CMakeLists.txt | 1 + src/ipcpd/raptor/main.c | 77 +++++++++++++++++++------------- src/ipcpd/udp/main.c | 48 +++++++++++++------- src/irmd/ipcp.c | 13 +++--- src/irmd/ipcp.h | 2 +- src/irmd/irm_flow.c | 4 +- src/irmd/irm_flow.h | 5 ++- src/irmd/main.c | 22 ++++++---- src/lib/CMakeLists.txt | 4 +- src/lib/dev.c | 87 ++++++++++++++++-------------------- src/lib/frct.c | 5 +-- src/lib/ipcpd_messages.proto | 3 +- src/lib/irmd_messages.proto | 3 +- src/lib/qos.c | 97 +++++++++++++++++++---------------------- src/lib/qoscube.c | 30 +++---------- src/lib/qosspec.proto | 33 ++++++++++++++ src/lib/sockets.c | 35 +++++++++++++++ 28 files changed, 388 insertions(+), 249 deletions(-) create mode 100644 src/lib/qosspec.proto (limited to 'src/ipcpd/eth') diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 9a33c25a..a0ed026c 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -31,7 +31,7 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t cube); + qosspec_t qs); int ipcp_flow_alloc_reply(int fd, int response); diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index 789e82df..3435c24a 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -29,7 +29,7 @@ int np1_flow_alloc(pid_t n_pid, int port_id, - qoscube_t qc); + qosspec_t qs); int np1_flow_resp(int port_id); diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h index 011828d7..2b93f1d0 100644 --- a/include/ouroboros/qos.h +++ b/include/ouroboros/qos.h @@ -27,26 +27,20 @@ #include typedef struct qos_spec { - uint32_t delay; /* In ms */ - uint64_t bandwidth; /* In bits/s */ - uint8_t availability; /* Class of 9s */ - uint32_t loss; /* Packet loss */ - uint8_t in_order; /* In-order delivery, enables FRCT */ - uint32_t maximum_interruption; /* In ms */ + uint32_t delay; /* In ms */ + uint64_t bandwidth; /* In bits/s */ + uint8_t availability; /* Class of 9s */ + uint32_t loss; /* Packet loss */ + uint32_t ber; /* Bit error rate, errors per billion bits */ + uint8_t in_order; /* In-order delivery, enables FRCT */ + uint32_t max_gap; /* In ms */ } qosspec_t; qosspec_t qos_raw; +qosspec_t qos_raw_no_errors; qosspec_t qos_best_effort; qosspec_t qos_video; qosspec_t qos_voice; qosspec_t qos_data; -__BEGIN_DECLS - -int qosspec_init(qosspec_t * qs); - -int qosspec_fini(qosspec_t * qs); - -__END_DECLS - #endif /* OUROBOROS_QOS_H */ diff --git a/include/ouroboros/sockets.h.in b/include/ouroboros/sockets.h.in index 4557a9ef..368923db 100644 --- a/include/ouroboros/sockets.h.in +++ b/include/ouroboros/sockets.h.in @@ -23,6 +23,8 @@ #ifndef OUROBOROS_SOCKETS_H #define OUROBOROS_SOCKETS_H +#include + #include #include "ipcp_config.pb-c.h" @@ -36,6 +38,9 @@ typedef IpcpInfoMsg ipcp_info_msg_t; #include "ipcpd_messages.pb-c.h" typedef IpcpMsg ipcp_msg_t; +#include "qosspec.pb-c.h" +typedef QosspecMsg qosspec_msg_t; + #define SOCK_PATH "/var/run/ouroboros/" #define SOCK_PATH_SUFFIX ".sock" @@ -53,4 +58,10 @@ int client_socket_open(char * file_name); irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); + +/* qos message conversion needed in different components */ +qosspec_msg_t spec_to_msg(qosspec_t * qs); + +qosspec_t msg_to_spec(qosspec_msg_t * msg); + #endif diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 44ef3756..6fd7b805 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -146,15 +146,27 @@ #define NAME_QUERY_REPLY 3 struct mgmt_msg { - uint8_t code; #if defined(BUILD_ETH_DIX) uint16_t seid; uint16_t deid; #elif defined(BUILD_ETH_LLC) uint8_t ssap; uint8_t dsap; + /* QoS here for alignment */ + uint8_t code; + uint8_t availability; +#endif + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint8_t in_order; +#if defined (BUILD_ETH_DIX) + uint8_t code; + uint8_t availability; #endif - uint8_t qoscube; int8_t response; } __attribute__((packed)); @@ -433,7 +445,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, uint8_t ssap, #endif const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -453,7 +465,14 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, #elif defined(BUILD_ETH_LLC) msg->ssap = ssap; #endif - msg->qoscube = cube; + + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -523,7 +542,7 @@ static int eth_ipcp_req(uint8_t * r_addr, uint8_t r_sap, #endif const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, ALLOC_TIMEO * MILLION}; struct timespec abstime; @@ -547,7 +566,7 @@ static int eth_ipcp_req(uint8_t * r_addr, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -687,11 +706,20 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, uint8_t * r_addr) { struct mgmt_msg * msg; + qosspec_t qs; msg = (struct mgmt_msg *) buf; switch (msg->code) { case FLOW_REQ: + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -701,7 +729,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->ssap, #endif buf + sizeof(*msg), - msg->qoscube); + qs); } break; case FLOW_REPLY: @@ -1553,7 +1581,7 @@ static int eth_ipcp_query(const uint8_t * hash) static int eth_ipcp_flow_alloc(int fd, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1565,11 +1593,6 @@ static int eth_ipcp_flow_alloc(int fd, assert(hash); - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(eth_data.shim_data, hash)) { log_err("Destination unreachable."); return -1; @@ -1597,7 +1620,7 @@ static int eth_ipcp_flow_alloc(int fd, #elif defined(BUILD_ETH_LLC) ssap, #endif - hash, cube) < 0) { + hash, qs) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 5ea54533..e415bbd9 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -20,6 +20,13 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#define __XSI_VISIBLE 500 +#endif + #if defined(__linux__) && !defined(DISABLE_CORE_LOCK) #define _GNU_SOURCE #define NPROC (sysconf(_SC_NPROCESSORS_ONLN)) @@ -198,6 +205,7 @@ static void * mainloop(void * o) layer_info_msg_t layer_info = LAYER_INFO_MSG__INIT; int fd = -1; struct cmd * cmd; + qosspec_t qs; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -422,9 +430,10 @@ static void * mainloop(void * o) break; } + qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, msg->port_id, - msg->qoscube); + qs); if (fd < 0) { log_err("Failed allocating fd on port_id %d.", msg->port_id); @@ -435,7 +444,7 @@ static void * mainloop(void * o) ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, msg->hash.data, - msg->qoscube); + qs); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: ret_msg.has_result = true; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 5417fc74..13751b6d 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -60,7 +60,7 @@ struct ipcp_ops { int (* ipcp_flow_alloc)(int fd, const uint8_t * dst, - qoscube_t qos); + qosspec_t qs); int (* ipcp_flow_alloc_resp)(int fd, int response); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c83f85fe..8eae7503 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -183,7 +183,7 @@ static int ipcp_local_query(const uint8_t * hash) static int ipcp_local_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; struct timespec abstime; @@ -212,7 +212,7 @@ static int ipcp_local_flow_alloc(int fd, assert(ipcpi.alloc_id == -1); - out_fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + out_fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (out_fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Flow allocation failed: %d", out_fd); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index c3f8f198..a350e4be 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -31,8 +31,6 @@ #define DT "dt" #define OUROBOROS_PREFIX DT -/* FIXME: fix #defines and remove endian.h include. */ -#include #include #include #include diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 10f0a863..4c82e0e0 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -57,8 +57,15 @@ struct fa_msg { uint32_t r_eid; uint32_t s_eid; uint8_t code; - uint8_t qc; int8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct { @@ -100,6 +107,7 @@ static void fa_post_sdu(void * comp, int fd; uint8_t * buf; struct fa_msg * msg; + qosspec_t qs; (void) comp; @@ -142,10 +150,18 @@ static void fa_post_sdu(void * comp, assert(ipcpi.alloc_id == -1); + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + fd = ipcp_flow_req_arr(getpid(), (uint8_t *) (msg + 1), ipcp_dir_hash_len(), - msg->qc); + qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Failed to get fd for flow."); @@ -155,8 +171,8 @@ static void fa_post_sdu(void * comp, pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[fd] = msg->s_eid; - fa.r_addr[fd] = msg->s_addr; + fa.r_eid[fd] = ntoh32(msg->s_eid); + fa.r_addr[fd] = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -169,14 +185,14 @@ static void fa_post_sdu(void * comp, case FLOW_REPLY: pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[msg->r_eid] = msg->s_eid; + fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); - ipcp_flow_alloc_reply(msg->r_eid, msg->response); + ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); if (msg->response < 0) - destroy_conn(msg->r_eid); + destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, msg->r_eid); + sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -227,11 +243,12 @@ void fa_stop(void) int fa_alloc(int fd, const uint8_t * dst, - qoscube_t qc) + qosspec_t qs) { struct fa_msg * msg; uint64_t addr; struct shm_du_buff * sdb; + qoscube_t qc; addr = dir_query(dst); if (addr == 0) @@ -240,14 +257,22 @@ int fa_alloc(int fd, if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REQ; - msg->qc = qc; - msg->s_eid = fd; - msg->s_addr = ipcpi.dt_addr; + msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg->code = FLOW_REQ; + msg->s_eid = hton32(fd); + msg->s_addr = hton64(ipcpi.dt_addr); + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); + qc = qos_spec_to_cube(qs); + if (dt_write_sdu(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; @@ -302,8 +327,8 @@ int fa_alloc_resp(int fd, msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REPLY; - msg->r_eid = fa.r_eid[fd]; - msg->s_eid = fd; + msg->r_eid = hton32(fa.r_eid[fd]); + msg->s_eid = hton32(fd); msg->response = response; if (response < 0) { diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 87819d6f..a98d834a 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -36,7 +36,7 @@ void fa_stop(void); int fa_alloc(int fd, const uint8_t * dst, - qoscube_t qos); + qosspec_t qs); int fa_alloc_resp(int fd, int response); diff --git a/src/ipcpd/raptor/CMakeLists.txt b/src/ipcpd/raptor/CMakeLists.txt index 06e6ee29..1883d9bb 100644 --- a/src/ipcpd/raptor/CMakeLists.txt +++ b/src/ipcpd/raptor/CMakeLists.txt @@ -16,6 +16,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Linux") find_path(RAPTOR_KERNEL_MODULE NAMES raptor.ko.gz + raptor.ko.xz HINTS /lib/modules/${CMAKE_SYSTEM_VERSION}/extra ) diff --git a/src/ipcpd/raptor/main.c b/src/ipcpd/raptor/main.c index 4f0099b3..a01889ec 100644 --- a/src/ipcpd/raptor/main.c +++ b/src/ipcpd/raptor/main.c @@ -90,11 +90,18 @@ #define NAME_QUERY_REPLY 3 struct mgmt_msg { - uint8_t code; - uint8_t ssap; - uint8_t dsap; - uint8_t qoscube; - int8_t response; + uint8_t code; + uint8_t ssap; + uint8_t dsap; + int8_t response; + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint8_t in_order; + uint8_t availability; } __attribute__((packed)); struct ef { @@ -278,7 +285,7 @@ static int raptor_send_frame(struct shm_du_buff * sdb, static int raptor_sap_alloc(uint8_t ssap, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { struct mgmt_msg * msg; struct shm_du_buff * sdb; @@ -288,10 +295,16 @@ static int raptor_sap_alloc(uint8_t ssap, return -1; } - msg = (struct mgmt_msg *) shm_du_buff_head(sdb); - msg->code = FLOW_REQ; - msg->ssap = ssap; - msg->qoscube = cube; + msg = (struct mgmt_msg *) shm_du_buff_head(sdb); + msg->code = FLOW_REQ; + msg->ssap = ssap; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -306,15 +319,15 @@ static int raptor_sap_alloc(uint8_t ssap, return 0; } -static int raptor_sap_alloc_resp(uint8_t ssap, - uint8_t dsap, - int response) +static int raptor_sap_alloc_resp(uint8_t ssap, + uint8_t dsap, + int response) { - struct mgmt_msg * msg; + struct mgmt_msg * msg; struct shm_du_buff * sdb; if (ipcp_sdb_reserve(&sdb, sizeof(*msg)) < 0) { - log_err("failed to reserve sdb for management frame."); + log_err("Failed to reserve sdb for management frame."); return -1; } @@ -337,7 +350,7 @@ static int raptor_sap_alloc_resp(uint8_t ssap, static int raptor_sap_req(uint8_t r_sap, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, EVENT_WAIT_TIMEOUT * 1000}; struct timespec abstime; @@ -361,7 +374,7 @@ static int raptor_sap_req(uint8_t r_sap, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -424,12 +437,12 @@ static int raptor_name_query_req(const uint8_t * hash) return 0; if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len()) < 0) { - log_err("failed to reserve sdb for management frame."); + log_err("Failed to reserve sdb for management frame."); return -1; } - msg = (struct mgmt_msg *) shm_du_buff_head(sdb); - msg->code = NAME_QUERY_REPLY; + msg = (struct mgmt_msg *) shm_du_buff_head(sdb); + msg->code = NAME_QUERY_REPLY; memcpy(msg + 1, hash, ipcp_dir_hash_len()); @@ -456,8 +469,9 @@ static int raptor_name_query_reply(const uint8_t * hash) static int raptor_mgmt_frame(const uint8_t * buf, size_t len) { - struct mgmt_msg * msg = (struct mgmt_msg *) buf; - uint8_t * hash = (uint8_t *) (msg + 1); + struct mgmt_msg * msg = (struct mgmt_msg *) buf; + uint8_t * hash = (uint8_t *) (msg + 1); + qosspec_t qs; switch (msg->code) { case FLOW_REQ: @@ -466,8 +480,16 @@ static int raptor_mgmt_frame(const uint8_t * buf, return -1; } + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + if (shim_data_reg_has(raptor_data.shim_data, hash)) - raptor_sap_req(msg->ssap, hash, msg->qoscube); + raptor_sap_req(msg->ssap, hash, qs); break; case FLOW_REPLY: if (len != sizeof(*msg)) { @@ -901,7 +923,7 @@ static int raptor_query(const uint8_t * hash) static int raptor_flow_alloc(int fd, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { uint8_t ssap = 0; @@ -909,11 +931,6 @@ static int raptor_flow_alloc(int fd, assert(hash); - if (cube != QOS_CUBE_BE) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(raptor_data.shim_data, hash)) { log_err("Destination unreachable."); return -1; @@ -932,7 +949,7 @@ static int raptor_flow_alloc(int fd, pthread_rwlock_unlock(&raptor_data.flows_lock); - if (raptor_sap_alloc(ssap, hash, cube) < 0) { + if (raptor_sap_alloc(ssap, hash, qs) < 0) { pthread_rwlock_wrlock(&raptor_data.flows_lock); bmp_release(raptor_data.saps, raptor_data.fd_to_ef[fd].sap); raptor_data.fd_to_ef[fd].sap = -1; diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 6a350da0..96820662 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -73,8 +73,15 @@ struct mgmt_msg { uint16_t src_udp_port; uint16_t dst_udp_port; uint8_t code; - uint8_t qoscube; uint8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct uf { @@ -219,7 +226,7 @@ static int send_shim_udp_msg(uint8_t * buf, static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, uint16_t src_udp_port, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -235,7 +242,13 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, msg = (struct mgmt_msg *) buf; msg->code = FLOW_REQ; msg->src_udp_port = src_udp_port; - msg->qoscube = cube; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); @@ -272,7 +285,7 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; struct timespec abstime; @@ -331,7 +344,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, } /* reply to IRM */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -436,7 +449,7 @@ static void * ipcp_udp_listener(void * o) while (true) { struct mgmt_msg * msg = NULL; - + qosspec_t qs; memset(&buf, 0, SHIM_UDP_MSG_SIZE); n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, (struct sockaddr *) &c_saddr, @@ -455,9 +468,16 @@ static void * ipcp_udp_listener(void * o) switch (msg->code) { case FLOW_REQ: c_saddr.sin_port = msg->src_udp_port; + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); ipcp_udp_port_req(&c_saddr, (uint8_t *) (msg + 1), - msg->qoscube); + qs); break; case FLOW_REPLY: ipcp_udp_port_alloc_reply(msg->src_udp_port, @@ -555,7 +575,8 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_cleanup_push((void (*)(void *)) ipcp_sdb_release, + pthread_cleanup_push((void (*)(void *)) + ipcp_sdb_release, (void *) sdb); if (send(fd, shm_du_buff_head(sdb), @@ -968,7 +989,7 @@ static int ipcp_udp_query(const uint8_t * hash) static int ipcp_udp_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct sockaddr_in r_saddr; /* server address */ struct sockaddr_in f_saddr; /* flow */ @@ -978,12 +999,9 @@ static int ipcp_udp_flow_alloc(int fd, log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); - assert(dst); + (void) qs; - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } + assert(dst); skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (skfd < 0) @@ -1034,7 +1052,7 @@ static int ipcp_udp_flow_alloc(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) { + if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) { pthread_rwlock_wrlock(&udp_data.flows_lock); udp_data.fd_to_uf[fd].udp = -1; diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index dc8f1c6e..0bdf674b 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -433,11 +433,12 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qoscube_t cube) + qosspec_t qs) { - ipcp_msg_t msg = IPCP_MSG__INIT; - ipcp_msg_t * recv_msg = NULL; - int ret = -1; + ipcp_msg_t msg = IPCP_MSG__INIT; + qosspec_msg_t qs_msg; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; assert(dst); @@ -449,8 +450,8 @@ int ipcp_flow_alloc(pid_t pid, msg.has_hash = true; msg.hash.len = len; msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = cube; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 8ff062b2..28396333 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -67,7 +67,7 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qoscube_t qos); + qosspec_t qs); int ipcp_flow_alloc_resp(pid_t pid, int port_id, diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index dfbe5e95..a5a9f28c 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -39,7 +39,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, int port_id, - qoscube_t qc) + qosspec_t qs) { pthread_condattr_t cattr; struct irm_flow * f = malloc(sizeof(*f)); @@ -61,7 +61,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_pid = n_pid; f->n_1_pid = n_1_pid; f->port_id = port_id; - f->qc = qc; + f->qs = qs; f->n_rb = shm_rbuff_create(n_pid, port_id); if (f->n_rb == NULL) { diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index d53984e8..f4de8187 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -43,11 +43,12 @@ struct irm_flow { struct list_head next; int port_id; - qoscube_t qc; pid_t n_pid; pid_t n_1_pid; + qosspec_t qs; + struct shm_rbuff * n_rb; struct shm_rbuff * n_1_rb; @@ -61,7 +62,7 @@ struct irm_flow { struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, int port_id, - qoscube_t qc); + qosspec_t qs); void irm_flow_destroy(struct irm_flow * f); diff --git a/src/irmd/main.c b/src/irmd/main.c index 634bf4de..9504f3b5 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1264,7 +1264,7 @@ static int flow_accept(pid_t pid, static int flow_alloc(pid_t pid, const char * dst, - qoscube_t cube, + qosspec_t qs, struct timespec * timeo, struct irm_flow ** e) { @@ -1288,7 +1288,7 @@ static int flow_alloc(pid_t pid, return -EBADF; } - f = irm_flow_create(pid, ipcp->pid, port_id, cube); + f = irm_flow_create(pid, ipcp->pid, port_id, qs); if (f == NULL) { bmp_release(irmd.port_ids, port_id); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1310,7 +1310,7 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); if (ipcp_flow_alloc(ipcp->pid, port_id, pid, hash, - IPCP_HASH_LEN(ipcp), cube)) { + IPCP_HASH_LEN(ipcp), qs)) { /* sanitizer cleans this */ log_info("Flow_allocation failed."); free(hash); @@ -1418,7 +1418,7 @@ static pid_t auto_execute(char ** argv) static struct irm_flow * flow_req_arr(pid_t pid, const uint8_t * hash, - qoscube_t cube) + qosspec_t qs) { struct reg_entry * re = NULL; struct prog_entry * a = NULL; @@ -1521,7 +1521,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, return NULL; } - f = irm_flow_create(h_pid, pid, port_id, cube); + f = irm_flow_create(h_pid, pid, port_id, qs); if (f == NULL) { bmp_release(irmd.port_ids, port_id); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1993,17 +1993,19 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IRM_FLOW_ACCEPT: result = flow_accept(msg->pid, timeo, &e); if (result == 0) { + qosspec_msg_t qs_msg; ret_msg->has_port_id = true; ret_msg->port_id = e->port_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; - ret_msg->has_qoscube = true; - ret_msg->qoscube = e->qc; + qs_msg = spec_to_msg(&e->qs); + ret_msg->qosspec = &qs_msg; } break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: result = flow_alloc(msg->pid, msg->dst, - msg->qoscube, timeo, &e); + msg_to_spec(msg->qosspec), + timeo, &e); if (result == 0) { ret_msg->has_port_id = true; ret_msg->port_id = e->port_id; @@ -2017,7 +2019,7 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->pid, msg->hash.data, - msg->qoscube); + msg_to_spec(msg->qosspec)); result = (e == NULL ? -1 : 0); if (result == 0) { ret_msg->has_port_id = true; @@ -2061,6 +2063,8 @@ static void * mainloop(void * o) irm_msg__pack(ret_msg, buffer.data); + /* Can't free the qosspec. */ + ret_msg->qosspec = NULL; irm_msg__free_unpacked(ret_msg, NULL); pthread_cleanup_push(close_ptr, &sfd); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e7e07802..aa4e5bf3 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -6,6 +6,8 @@ include_directories(${CMAKE_BINARY_DIR}/include) protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto) protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto) +protobuf_generate_c(QOSSPEC_PROTO_SRCS QOSSPEC_PROTO_HDRS + qosspec.proto) protobuf_generate_c(LAYER_CONFIG_PROTO_SRCS LAYER_CONFIG_PROTO_HDRS ipcp_config.proto) protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) @@ -214,7 +216,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in" "${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY) add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS} - ${IPCP_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) + ${IPCP_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS} ${QOSSPEC_PROTO_SRCS}) add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CACEP_PROTO_SRCS}) diff --git a/src/lib/dev.c b/src/lib/dev.c index 3d9e1d49..a92c1e42 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -44,7 +44,6 @@ #include #include #include -#include #include #include @@ -94,7 +93,6 @@ struct flow { struct shm_flow_set * set; int port_id; int oflags; - qoscube_t cube; qosspec_t spec; ssize_t part_idx; @@ -235,7 +233,6 @@ static void flow_clear(int fd) ai.flows[fd].port_id = -1; ai.flows[fd].pid = -1; - ai.flows[fd].cube = QOS_CUBE_BE; } static void flow_fini(int fd) @@ -272,7 +269,7 @@ static void flow_fini(int fd) static int flow_init(int port_id, pid_t pid, - qoscube_t qc) + qosspec_t qs) { int fd; int err = -ENOMEM; @@ -300,9 +297,8 @@ static int flow_init(int port_id, ai.flows[fd].port_id = port_id; ai.flows[fd].oflags = FLOWFDEFAULT; ai.flows[fd].pid = pid; - ai.flows[fd].cube = qc; - ai.flows[fd].spec = qos_cube_to_spec(qc); ai.flows[fd].part_idx = NO_PART; + ai.flows[fd].spec = qs; ai.ports[port_id].fd = fd; @@ -499,7 +495,6 @@ int flow_accept(qosspec_t * qs, irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg; int fd; - qoscube_t qc; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; @@ -528,14 +523,13 @@ int flow_accept(qosspec_t * qs, } if (!recv_msg->has_pid || !recv_msg->has_port_id || - !recv_msg->has_qoscube) { + recv_msg->qosspec == NULL) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - qc = recv_msg->qoscube; - - fd = flow_init(recv_msg->port_id, recv_msg->pid, recv_msg->qoscube); + fd = flow_init(recv_msg->port_id, recv_msg->pid, + msg_to_spec(recv_msg->qosspec)); irm_msg__free_unpacked(recv_msg, NULL); @@ -544,12 +538,10 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - /* FIXME: check if FRCT is needed based on qc? */ - assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -569,21 +561,17 @@ int flow_alloc(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - qoscube_t qc = QOS_CUBE_RAW; - int fd; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.has_qoscube = true; - msg.pid = ai.pid; - - if (qs != NULL) - qc = qos_spec_to_cube(*qs); + irm_msg_t msg = IRM_MSG__INIT; + qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; + irm_msg_t * recv_msg; + int fd; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; + msg.dst = (char *) dst; + msg.has_pid = true; + msg.pid = ai.pid; + qs_msg = spec_to_msg(qs); + msg.qosspec = &qs_msg; if (timeo != NULL) { msg.has_timeo_sec = true; @@ -612,7 +600,8 @@ int flow_alloc(const char * dst, return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->port_id, recv_msg->pid, + qs == NULL ? qos_raw : *qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -623,8 +612,8 @@ int flow_alloc(const char * dst, assert(ai.flows[fd].frcti == NULL); - if (qc != QOS_CUBE_RAW) { - ai.flows[fd].frcti = frcti_create(fd, qc); + if (ai.flows[fd].spec.in_order != 0) { + ai.flows[fd].frcti = frcti_create(fd); if (ai.flows[fd].frcti == NULL) { flow_fini(fd); pthread_rwlock_unlock(&ai.lock); @@ -1178,9 +1167,9 @@ int fevent(struct flow_set * set, int np1_flow_alloc(pid_t n_pid, int port_id, - qoscube_t qc) + qosspec_t qs) { - return flow_init(port_id, n_pid, qc); + return flow_init(port_id, n_pid, qs); } int np1_flow_dealloc(int port_id) @@ -1243,25 +1232,25 @@ int ipcp_create_r(pid_t pid, int ipcp_flow_req_arr(pid_t pid, const uint8_t * dst, size_t len, - qoscube_t qc) + qosspec_t qs) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + qosspec_msg_t qs_msg; + int fd; assert(dst != NULL); - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = pid; - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = qc; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; + msg.has_hash = true; + msg.hash.len = len; + msg.hash.data = (uint8_t *) dst; + qs_msg = spec_to_msg(&qs); + msg.qosspec = &qs_msg; recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) return -EIRMD; @@ -1275,7 +1264,7 @@ int ipcp_flow_req_arr(pid_t pid, return -1; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qc); + fd = flow_init(recv_msg->port_id, recv_msg->pid, qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -1457,7 +1446,7 @@ int ipcp_flow_get_qoscube(int fd, assert(ai.flows[fd].port_id >= 0); - *cube = ai.flows[fd].cube; + *cube = qos_spec_to_cube(ai.flows[fd].spec); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index 516c958b..e9acb1dc 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -101,8 +101,7 @@ static void frct_fini(void) timerwheel_destroy(frct.tw); } -static struct frcti * frcti_create(int fd, - qoscube_t qc) +static struct frcti * frcti_create(int fd) { struct frcti * frcti; time_t delta_t; @@ -133,7 +132,7 @@ static struct frcti * frcti_create(int fd, frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - if (qc == QOS_CUBE_DATA) + if (ai.flows[fd].spec.loss == 0) frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.inact = 2 * delta_t; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index 454af0dc..48198a5b 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum ipcp_msg_code { IPCP_BOOTSTRAP = 1; @@ -43,7 +44,7 @@ message ipcp_msg { optional bytes hash = 2; optional int32 port_id = 3; optional string dst = 4; - optional uint32 qoscube = 5; + optional qosspec_msg qosspec = 5; optional ipcp_config_msg conf = 6; optional int32 pid = 7; optional layer_info_msg layer_info = 8; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 16dfe828..2ed2ec37 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum irm_msg_code { IRM_CREATE_IPCP = 1; @@ -67,7 +68,7 @@ message irm_msg { optional string dst = 9; optional bytes hash = 10; optional sint32 port_id = 11; - optional sint32 qoscube = 12; + optional qosspec_msg qosspec = 12; optional ipcp_config_msg conf = 13; optional uint32 opts = 14; repeated ipcp_info_msg ipcps = 15; diff --git a/src/lib/qos.c b/src/lib/qos.c index bee6ed71..8607031e 100644 --- a/src/lib/qos.c +++ b/src/lib/qos.c @@ -28,66 +28,61 @@ #include qosspec_t qos_raw = { - .delay = UINT32_MAX, - .bandwidth = 0, - .availability = 0, - .loss = 1, - .in_order = 0, - .maximum_interruption = UINT32_MAX + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 1, + .in_order = 0, + .max_gap = UINT32_MAX +}; + +qosspec_t qos_raw_no_errors = { + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 0, + .in_order = 0, + .max_gap = UINT32_MAX }; qosspec_t qos_best_effort = { - .delay = UINT32_MAX, - .bandwidth = 0, - .availability = 0, - .loss = 1, - .in_order = 1, - .maximum_interruption = UINT32_MAX + .delay = UINT32_MAX, + .bandwidth = 0, + .availability = 0, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = UINT32_MAX }; -qosspec_t qos_video = { - .delay = 100, - .bandwidth = UINT64_MAX, - .availability = 3, - .loss = 1, - .in_order = 1, - .maximum_interruption = 100 +qosspec_t qos_video = { + .delay = 100, + .bandwidth = UINT64_MAX, + .availability = 3, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = 100 }; qosspec_t qos_voice = { - .delay = 50, - .bandwidth = 100000, - .availability = 5, - .loss = 1, - .in_order = 1, - .maximum_interruption = 50 + .delay = 50, + .bandwidth = 100000, + .availability = 5, + .loss = 1, + .ber = 0, + .in_order = 1, + .max_gap = 50 }; qosspec_t qos_data = { - .delay = 1000, - .bandwidth = 0, - .availability = 0, - .in_order = 1, - .loss = 0, - .maximum_interruption = 2000 + .delay = 1000, + .bandwidth = 0, + .availability = 0, + .loss = 0, + .ber = 0, + .in_order = 1, + .max_gap = 2000 }; - -int qosspec_init(qosspec_t * qs) -{ - if (qs == NULL) - return -EINVAL; - - *qs = qos_best_effort; - - return 0; -} - -int qosspec_fini(qosspec_t * qs) -{ - if (qs == NULL) - return -EINVAL; - - memset(qs, 0, sizeof(*qs)); - - return 0; -} diff --git a/src/lib/qoscube.c b/src/lib/qoscube.c index 5dfa35ad..efca0e42 100644 --- a/src/lib/qoscube.c +++ b/src/lib/qoscube.c @@ -25,38 +25,20 @@ #include + + qoscube_t qos_spec_to_cube(qosspec_t qs) { - if (qs.loss == 0) - return QOS_CUBE_DATA; - else if (qs.delay <= qos_voice.delay && + if (qs.delay <= qos_voice.delay && qs.bandwidth <= qos_voice.bandwidth && qs.availability >= qos_voice.availability && - qs.maximum_interruption <= qos_voice.maximum_interruption) + qs.max_gap <= qos_voice.max_gap) return QOS_CUBE_VOICE; else if (qs.delay <= qos_video.delay && qs.bandwidth <= qos_video.bandwidth && qs.availability >= qos_video.availability && - qs.maximum_interruption <= qos_video.maximum_interruption) + qs.max_gap <= qos_video.max_gap) return QOS_CUBE_VIDEO; - else if (qs.in_order == 1) - return QOS_CUBE_BE; else - return QOS_CUBE_RAW; -} - -qosspec_t qos_cube_to_spec(qoscube_t qc) -{ - switch (qc) { - case QOS_CUBE_VOICE: - return qos_voice; - case QOS_CUBE_VIDEO: - return qos_video; - case QOS_CUBE_BE: - return qos_best_effort; - case QOS_CUBE_DATA: - return qos_data; - default: - return qos_raw; - } + return QOS_CUBE_BE; } diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto new file mode 100644 index 00000000..f355e345 --- /dev/null +++ b/src/lib/qosspec.proto @@ -0,0 +1,33 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * QoS specification message + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message qosspec_msg { + required uint32 delay = 1; /* In ms */ + required uint64 bandwidth = 2; /* In bits/s */ + required uint32 availability = 3; /* Class of 9s */ + required uint32 loss = 4; /* Packet loss */ + required uint32 ber = 5; /* Bit error rate, ppb */ + required uint32 in_order = 6; /* In-order delivery */ + required uint32 max_gap = 7; /* In ms */ +}; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index b148b7ca..85726783 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -165,3 +165,38 @@ char * ipcp_sock_path(pid_t pid) return full_name; } + +qosspec_msg_t spec_to_msg(qosspec_t * qs) +{ + qosspec_t spec; + qosspec_msg_t msg = QOSSPEC_MSG__INIT; + + spec = (qs == NULL ? qos_raw : *qs); + + msg.delay = spec.delay; + msg.bandwidth = spec.bandwidth; + msg.availability = spec.availability; + msg.loss = spec.loss; + msg.ber = spec.ber; + msg.in_order = spec.in_order; + msg.max_gap = spec.max_gap; + + return msg; +} + +qosspec_t msg_to_spec(qosspec_msg_t * msg) +{ + qosspec_t spec; + + assert(msg); + + spec.delay = msg->delay; + spec.bandwidth = msg->bandwidth; + spec.availability = msg->availability; + spec.loss = msg->loss; + spec.ber = msg->ber; + spec.in_order = msg->in_order; + spec.max_gap = msg->max_gap; + + return spec; +} -- cgit v1.2.3 From 5d11a6ad590133c92925c6162eb47b4401f16bef Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 5 Oct 2018 10:24:01 +0200 Subject: ipcpd, lib, irmd, tools: Change SDU to packet This will change SDU (Service Data Unit) to packet everywhere. SDU is OSI terminology, whereas packet is Ouroboros terminology. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- doc/man/ouroboros-tutorial.7 | 2 +- include/ouroboros/shm_rbuff.h | 2 +- include/ouroboros/timerwheel.h | 2 +- src/ipcpd/eth/eth.c | 46 ++++---- src/ipcpd/local/main.c | 12 +- src/ipcpd/normal/CMakeLists.txt | 2 +- src/ipcpd/normal/dht.c | 48 ++++---- src/ipcpd/normal/dt.c | 52 ++++----- src/ipcpd/normal/dt.h | 8 +- src/ipcpd/normal/fa.c | 36 +++--- src/ipcpd/normal/packet_sched.c | 241 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/packet_sched.h | 43 +++++++ src/ipcpd/normal/sdu_sched.c | 241 ---------------------------------------- src/ipcpd/normal/sdu_sched.h | 43 ------- src/ipcpd/udp/main.c | 64 +++++------ src/irmd/main.c | 2 +- src/lib/CMakeLists.txt | 6 +- src/lib/dev.c | 14 +-- src/lib/frct.c | 2 +- src/lib/shm_rbuff.c | 6 +- src/lib/shm_rbuff_ll.c | 2 +- src/lib/shm_rbuff_pthr.c | 2 +- src/lib/shm_rdrbuff.c | 2 +- src/tools/ocbr/ocbr.c | 10 +- src/tools/ocbr/ocbr_client.c | 2 +- src/tools/ocbr/ocbr_server.c | 17 +-- src/tools/oecho/oecho.c | 8 +- src/tools/operf/operf.c | 4 +- src/tools/operf/operf_client.c | 9 +- src/tools/oping/oping_client.c | 4 +- 30 files changed, 468 insertions(+), 464 deletions(-) create mode 100644 src/ipcpd/normal/packet_sched.c create mode 100644 src/ipcpd/normal/packet_sched.h delete mode 100644 src/ipcpd/normal/sdu_sched.c delete mode 100644 src/ipcpd/normal/sdu_sched.h (limited to 'src/ipcpd/eth') diff --git a/doc/man/ouroboros-tutorial.7 b/doc/man/ouroboros-tutorial.7 index 98e27254..76fa0068 100644 --- a/doc/man/ouroboros-tutorial.7 +++ b/doc/man/ouroboros-tutorial.7 @@ -108,7 +108,7 @@ Pinging my.oping.server with 64 bytes of data: --- my.oping.server ping statistics --- .br -3 SDUs transmitted, 3 received, 0% packet loss, time: 3001.011 ms +3 packets transmitted, 3 received, 0% packet loss, time: 3001.011 ms .br rtt min/avg/max/mdev = 0.304/0.392/0.475/0.086 ms .RE diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index b2e27c7b..223f6bf4 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h index 231e8103..34994185 100644 --- a/include/ouroboros/timerwheel.h +++ b/include/ouroboros/timerwheel.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Timerwheel * * Dimitri Staessens * Sander Vrijders diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 6fd7b805..1bbfac5b 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -123,7 +123,7 @@ #define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE) #define MAX_EIDS (1 << (8 * DIX_EID_SIZE)) -#define ETH_MAX_SDU_SIZE (ETH_MTU - DIX_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #elif defined(BUILD_ETH_LLC) #define THIS_TYPE IPCP_ETH_LLC @@ -131,7 +131,7 @@ #define LLC_HEADER_SIZE 3 #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE) #define MAX_SAPS 64 -#define ETH_MAX_SDU_SIZE (ETH_MTU - LLC_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif @@ -230,8 +230,8 @@ struct { fset_t * np1_flows; pthread_rwlock_t flows_lock; - pthread_t sdu_writer[IPCP_ETH_WR_THR]; - pthread_t sdu_reader[IPCP_ETH_RD_THR]; + pthread_t packet_writer[IPCP_ETH_WR_THR]; + pthread_t packet_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -383,7 +383,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, assert(frame); - if (len > (size_t) ETH_MAX_SDU_SIZE) + if (len > (size_t) ETH_MAX_PACKET_SIZE) return -1; e_frame = (struct eth_frame *) frame; @@ -808,7 +808,7 @@ static void * eth_ipcp_mgmt_handler(void * o) return (void *) 0; } -static void * eth_ipcp_sdu_reader(void * o) +static void * eth_ipcp_packet_reader(void * o) { uint8_t br_addr[MAC_SIZE]; #if defined(BUILD_ETH_DIX) @@ -992,7 +992,7 @@ static void cleanup_writer(void * o) fqueue_destroy((fqueue_t *) o); } -static void * eth_ipcp_sdu_writer(void * o) +static void * eth_ipcp_packet_writer(void * o) { int fd; struct shm_du_buff * sdb; @@ -1443,22 +1443,22 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { - if (pthread_create(ð_data.sdu_reader[idx], + if (pthread_create(ð_data.packet_reader[idx], NULL, - eth_ipcp_sdu_reader, + eth_ipcp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } } for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { - if (pthread_create(ð_data.sdu_writer[idx], + if (pthread_create(ð_data.packet_writer[idx], NULL, - eth_ipcp_sdu_writer, + eth_ipcp_packet_writer, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + goto fail_packet_writer; } } @@ -1472,16 +1472,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sdu_writer: + fail_packet_writer: while (idx > 0) { - pthread_cancel(eth_data.sdu_writer[--idx]); - pthread_join(eth_data.sdu_writer[idx], NULL); + pthread_cancel(eth_data.packet_writer[--idx]); + pthread_join(eth_data.packet_writer[idx], NULL); } idx = IPCP_ETH_RD_THR; - fail_sdu_reader: + fail_packet_reader: while (idx > 0) { - pthread_cancel(eth_data.sdu_reader[--idx]); - pthread_join(eth_data.sdu_reader[idx], NULL); + pthread_cancel(eth_data.packet_reader[--idx]); + pthread_join(eth_data.packet_reader[idx], NULL); } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); @@ -1792,18 +1792,18 @@ int main(int argc, if (ipcp_get_state() == IPCP_SHUTDOWN) { for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_cancel(eth_data.sdu_writer[i]); + pthread_cancel(eth_data.packet_writer[i]); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_cancel(eth_data.sdu_reader[i]); + pthread_cancel(eth_data.packet_reader[i]); pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_join(eth_data.sdu_writer[i], NULL); + pthread_join(eth_data.packet_writer[i], NULL); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_join(eth_data.sdu_reader[i], NULL); + pthread_join(eth_data.packet_reader[i], NULL); pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 8eae7503..ab43f1f8 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -59,7 +59,7 @@ struct { fqueue_t * fq; pthread_rwlock_t lock; - pthread_t sduloop; + pthread_t packet_loop; } local_data; static int local_data_init(void) @@ -97,7 +97,7 @@ static void local_data_fini(void){ pthread_rwlock_destroy(&local_data.lock); } -static void * ipcp_local_sdu_loop(void * o) +static void * ipcp_local_packet_loop(void * o) { (void) o; @@ -139,8 +139,8 @@ static int ipcp_local_bootstrap(const struct ipcp_config * conf) ipcp_set_state(IPCP_OPERATIONAL); - if (pthread_create(&local_data.sduloop, NULL, - ipcp_local_sdu_loop, NULL)) { + if (pthread_create(&local_data.packet_loop, NULL, + ipcp_local_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); return -1; } @@ -364,8 +364,8 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(local_data.sduloop); - pthread_join(local_data.sduloop, NULL); + pthread_cancel(local_data.packet_loop); + pthread_join(local_data.packet_loop, NULL); } local_data_fini(); diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 1cba7630..0cb7b770 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -42,7 +42,7 @@ set(SOURCE_FILES main.c pff.c routing.c - sdu_sched.c + packet_sched.c # Add policies last pol/alternate_pff.c pol/flat.c diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index a2fa4863..4064bf5c 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -62,21 +62,21 @@ typedef KadContactMsg kad_contact_msg_t; #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ -#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ enum dht_state { DHT_INIT = 0, @@ -251,7 +251,7 @@ struct join_info { uint64_t addr; }; -struct sdu_info { +struct packet_info { struct dht * dht; struct shm_du_buff * sdb; }; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -2400,7 +2400,7 @@ uint64_t dht_query(struct dht * dht, return 0; } -static void * dht_handle_sdu(void * o) +static void * dht_handle_packet(void * o) { struct dht * dht = (struct dht *) o; @@ -2584,8 +2584,8 @@ static void * dht_handle_sdu(void * o) return (void *) 0; } -static void dht_post_sdu(void * comp, - struct shm_du_buff * sdb) +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { struct cmd * cmd; struct dht * dht = (struct dht *) comp; @@ -2800,19 +2800,19 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ - dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); if (dht->tpm == NULL) goto fail_tpm_create; if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht->fd = dt_reg_comp(dht, &dht_post_sdu, DHT); + dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT); notifier_reg(handle_event, dht); #else (void) handle_event; - (void) dht_handle_sdu; - (void) dht_post_sdu; + (void) dht_handle_packet; + (void) dht_post_packet; #endif dht->state = DHT_INIT; diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index a350e4be..08c937e7 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -46,7 +46,7 @@ #include "dt.h" #include "pff.h" #include "routing.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "comp.h" #include "fa.h" @@ -65,7 +65,7 @@ #endif struct comp_info { - void (* post_sdu)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct shm_du_buff * sdb); void * comp; char * name; }; @@ -154,7 +154,7 @@ static void dt_pci_shrink(struct shm_du_buff * sdb) } struct { - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -421,24 +421,25 @@ static void handle_event(void * self, #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, c->conn_info.addr); #endif - sdu_sched_add(dt.sdu_sched, c->flow_info.fd); - log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); + packet_sched_add(dt.packet_sched, c->flow_info.fd); + log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, INVALID_ADDR); #endif - sdu_sched_del(dt.sdu_sched, c->flow_info.fd); - log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); + packet_sched_del(dt.packet_sched, c->flow_info.fd); + log_dbg("Removed fd %d from " + "packet scheduler.", c->flow_info.fd); break; default: break; } } -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { struct dt_pci dt_pci; int ret; @@ -491,7 +492,7 @@ static void sdu_handler(int fd, ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", ofd); + log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); ipcp_sdb_release(sdb); @@ -560,7 +561,7 @@ static void sdu_handler(int fd, return; } - if (dt.comps[dt_pci.eid].post_sdu == NULL) { + if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); @@ -596,7 +597,8 @@ static void sdu_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif - dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb); + dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, + sdb); } } @@ -761,15 +763,15 @@ void dt_fini(void) int dt_start(void) { - dt.sdu_sched = sdu_sched_create(sdu_handler); - if (dt.sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); + dt.packet_sched = packet_sched_create(packet_handler); + if (dt.packet_sched == NULL) { + log_err("Failed to create N-1 packet scheduler."); return -1; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); return -1; } @@ -780,7 +782,7 @@ void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); } int dt_reg_comp(void * comp, @@ -800,11 +802,11 @@ int dt_reg_comp(void * comp, return -EBADF; } - assert(dt.comps[res_fd].post_sdu == NULL); + assert(dt.comps[res_fd].post_packet == NULL); assert(dt.comps[res_fd].comp == NULL); assert(dt.comps[res_fd].name == NULL); - dt.comps[res_fd].post_sdu = func; + dt.comps[res_fd].post_packet = func; dt.comps[res_fd].comp = comp; dt.comps[res_fd].name = name; @@ -815,10 +817,10 @@ int dt_reg_comp(void * comp, return res_fd; } -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int np1_fd, + struct shm_du_buff * sdb) { int fd; struct dt_pci dt_pci; @@ -863,7 +865,7 @@ int dt_write_sdu(uint64_t dst_addr, #endif ret = ipcp_flow_write(fd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); goto fail_write; diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index a17098b7..05b8220c 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -47,9 +47,9 @@ int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), char * name); -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int res_fd, - struct shm_du_buff * sdb); +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int res_fd, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 4c82e0e0..d67ba61e 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -39,7 +39,7 @@ #include "dir.h" #include "fa.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "ipcp.h" #include "dt.h" @@ -74,19 +74,19 @@ struct { uint64_t r_addr[PROG_MAX_FLOWS]; int fd; - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; } fa; -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { pthread_rwlock_rdlock(&fa.flows_lock); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); - log_warn("Failed to forward SDU."); + log_warn("Failed to forward packet."); return; } @@ -99,7 +99,7 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } -static void fa_post_sdu(void * comp, +static void fa_post_packet(void * comp, struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; @@ -192,7 +192,7 @@ static void fa_post_sdu(void * comp, if (msg->response < 0) destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); + packet_sched_add(fa.packet_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -215,7 +215,7 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; - fa.fd = dt_reg_comp(&fa, &fa_post_sdu, FA); + fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; } @@ -227,9 +227,9 @@ void fa_fini(void) int fa_start(void) { - fa.sdu_sched = sdu_sched_create(sdu_handler); - if (fa.sdu_sched == NULL) { - log_err("Failed to create SDU scheduler."); + fa.packet_sched = packet_sched_create(packet_handler); + if (fa.packet_sched == NULL) { + log_err("Failed to create packet scheduler."); return -1; } @@ -238,7 +238,7 @@ int fa_start(void) void fa_stop(void) { - sdu_sched_destroy(fa.sdu_sched); + packet_sched_destroy(fa.packet_sched); } int fa_alloc(int fd, @@ -273,7 +273,7 @@ int fa_alloc(int fd, qc = qos_spec_to_cube(qs); - if (dt_write_sdu(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -335,14 +335,14 @@ int fa_alloc_resp(int fd, destroy_conn(fd); ipcp_sdb_release(sdb); } else { - sdu_sched_add(fa.sdu_sched, fd); + packet_sched_add(fa.packet_sched, fd); } ipcp_flow_get_qoscube(fd, &qc); assert(qc >= 0 && qc < QOS_CUBE_MAX); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.fd, sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); @@ -360,7 +360,7 @@ int fa_dealloc(int fd) pthread_rwlock_wrlock(&fa.flows_lock); - sdu_sched_del(fa.sdu_sched, fd); + packet_sched_del(fa.packet_sched, fd); destroy_conn(fd); diff --git a/src/ipcpd/normal/packet_sched.c b/src/ipcpd/normal/packet_sched.c new file mode 100644 index 00000000..fc01fb32 --- /dev/null +++ b/src/ipcpd/normal/packet_sched.c @@ -0,0 +1,241 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include +#include + +#include "ipcp.h" +#include "packet_sched.h" +#include "connmgr.h" + +#include +#include +#include +#include +#include + +static int qos_prio [] = { + QOS_PRIO_RAW, + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, + QOS_PRIO_DATA +}; + +struct packet_sched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct packet_sched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct packet_sched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct packet_sched * packet_sched_create(next_packet_fn_t callback) +{ + struct packet_sched * packet_sched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + packet_sched = malloc(sizeof(*packet_sched)); + if (packet_sched == NULL) + goto fail_malloc; + + packet_sched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + packet_sched->set[i] = fset_create(); + if (packet_sched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(packet_sched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = packet_sched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&packet_sched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(packet_sched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(packet_sched->readers[i], pol, &par)) + goto fail_sched; + } + + return packet_sched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(packet_sched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(packet_sched->set[j]); + fail_flow_set: + free(packet_sched); + fail_malloc: + return NULL; +} + +void packet_sched_destroy(struct packet_sched * packet_sched) +{ + int i; + + assert(packet_sched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(packet_sched->readers[i]); + pthread_join(packet_sched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(packet_sched->set[i]); + + free(packet_sched); +} + +void packet_sched_add(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(packet_sched->set[qc], fd); +} + +void packet_sched_del(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(packet_sched->set[qc], fd); +} diff --git a/src/ipcpd/normal/packet_sched.h b/src/ipcpd/normal/packet_sched.h new file mode 100644 index 00000000..13ff400d --- /dev/null +++ b/src/ipcpd/normal/packet_sched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H + +#include +#include + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct packet_sched * packet_sched_create(next_packet_fn_t callback); + +void packet_sched_destroy(struct packet_sched * packet_sched); + +void packet_sched_add(struct packet_sched * packet_sched, + int fd); + +void packet_sched_del(struct packet_sched * packet_sched, + int fd); + +#endif /* OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c deleted file mode 100644 index e6d705fb..00000000 --- a/src/ipcpd/normal/sdu_sched.c +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#include -#include - -#include "ipcp.h" -#include "sdu_sched.h" -#include "connmgr.h" - -#include -#include -#include -#include -#include - -static int qos_prio [] = { - QOS_PRIO_RAW, - QOS_PRIO_BE, - QOS_PRIO_VIDEO, - QOS_PRIO_VOICE, - QOS_PRIO_DATA -}; - -struct sdu_sched { - fset_t * set[QOS_CUBE_MAX]; - next_sdu_fn_t callback; - pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; -}; - -struct sched_info { - struct sdu_sched * sch; - qoscube_t qc; -}; - -static void cleanup_reader(void * o) -{ - fqueue_destroy((fqueue_t *) o); -} - -static void * sdu_reader(void * o) -{ - struct sdu_sched * sched; - struct shm_du_buff * sdb; - int fd; - fqueue_t * fq; - qoscube_t qc; - - sched = ((struct sched_info *) o)->sch; - qc = ((struct sched_info *) o)->qc; - - ipcp_lock_to_core(); - - free(o); - - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - pthread_cleanup_push(cleanup_reader, fq); - - while (true) { - int ret = fevent(sched->set[qc], fq, NULL); - if (ret < 0) - continue; - - while ((fd = fqueue_next(fq)) >= 0) { - switch (fqueue_type(fq)) { - case FLOW_DEALLOC: - notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); - break; - case FLOW_DOWN: - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - break; - case FLOW_UP: - notifier_event(NOTIFY_DT_FLOW_UP, &fd); - break; - case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) - continue; - - sched->callback(fd, qc, sdb); - break; - default: - break; - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) -{ - struct sdu_sched * sdu_sched; - struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; - int i; - int j; - - assert(callback); - - sdu_sched = malloc(sizeof(*sdu_sched)); - if (sdu_sched == NULL) - goto fail_malloc; - - sdu_sched->callback = callback; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->set[i] = fset_create(); - if (sdu_sched->set[i] == NULL) { - for (j = 0; j < i; ++j) - fset_destroy(sdu_sched->set[j]); - goto fail_flow_set; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - infos[i] = malloc(sizeof(*infos[i])); - if (infos[i] == NULL) { - for (j = 0; j < i; ++j) - free(infos[j]); - goto fail_infos; - } - infos[i]->sch = sdu_sched; - infos[i]->qc = i % QOS_CUBE_MAX; - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - if (pthread_create(&sdu_sched->readers[i], NULL, - sdu_reader, infos[i])) { - for (j = 0; j < i; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < i; ++j) - pthread_join(sdu_sched->readers[j], NULL); - for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - free(infos[i]); - goto fail_infos; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - struct sched_param par; - int pol = SCHED_RR; - int min; - int max; - - min = sched_get_priority_min(pol); - max = sched_get_priority_max(pol); - - min = (max - min) / 2; - - par.sched_priority = min + - (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); - - if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) - goto fail_sched; - } - - return sdu_sched; - - fail_sched: - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_join(sdu_sched->readers[j], NULL); - fail_infos: - for (j = 0; j < QOS_CUBE_MAX; ++j) - fset_destroy(sdu_sched->set[j]); - fail_flow_set: - free(sdu_sched); - fail_malloc: - return NULL; -} - -void sdu_sched_destroy(struct sdu_sched * sdu_sched) -{ - int i; - - assert(sdu_sched); - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - pthread_cancel(sdu_sched->readers[i]); - pthread_join(sdu_sched->readers[i], NULL); - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fset_destroy(sdu_sched->set[i]); - - free(sdu_sched); -} - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_add(sdu_sched->set[qc], fd); -} - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_del(sdu_sched->set[qc], fd); -} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h deleted file mode 100644 index cdbda272..00000000 --- a/src/ipcpd/normal/sdu_sched.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H -#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H - -#include -#include - -typedef void (* next_sdu_fn_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback); - -void sdu_sched_destroy(struct sdu_sched * sdu_sched); - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd); - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd); - -#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 96820662..a1af1e85 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -54,20 +54,20 @@ #include #include -#define FLOW_REQ 1 -#define FLOW_REPLY 2 +#define FLOW_REQ 1 +#define FLOW_REPLY 2 -#define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_SDU_SIZE 8980 -#define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ +#define THIS_TYPE IPCP_UDP +#define LISTEN_PORT htons(0x0D1F) +#define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 +#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define DNS_TTL 86400 +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS 0xFFFF +#define UDP_MAX_PORTS 0xFFFF struct mgmt_msg { uint16_t src_udp_port; @@ -106,9 +106,9 @@ struct { struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t sduloop; + pthread_t packet_loop; pthread_t handler; - pthread_t sdu_reader; + pthread_t packet_reader; bool fd_set_mod; pthread_cond_t fd_set_cond; @@ -495,13 +495,13 @@ static void * ipcp_udp_listener(void * o) return 0; } -static void * ipcp_udp_sdu_reader(void * o) +static void * ipcp_udp_packet_reader(void * o) { ssize_t n; int skfd; int fd; /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_SDU_SIZE]; + char buf[SHIM_UDP_MAX_PACKET_SIZE]; struct sockaddr_in r_saddr; struct timeval tv = {0, FD_UPDATE_TIMEOUT}; fd_set read_fds; @@ -533,7 +533,7 @@ static void * ipcp_udp_sdu_reader(void * o) n = sizeof(r_saddr); if ((n = recvfrom(skfd, &buf, - SHIM_UDP_MAX_SDU_SIZE, + SHIM_UDP_MAX_PACKET_SIZE, 0, (struct sockaddr *) &r_saddr, (unsigned *) &n)) <= 0) @@ -552,7 +552,7 @@ static void * ipcp_udp_sdu_reader(void * o) return (void *) 0; } -static void * ipcp_udp_sdu_loop(void * o) +static void * ipcp_udp_packet_loop(void * o) { int fd; struct shm_du_buff * sdb; @@ -582,7 +582,7 @@ static void * ipcp_udp_sdu_loop(void * o) if (send(fd, shm_du_buff_head(sdb), shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), 0) < 0) - log_err("Failed to send SDU."); + log_err("Failed to send PACKET."); pthread_cleanup_pop(true); } @@ -666,20 +666,20 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) goto fail_bind; } - if (pthread_create(&udp_data.sdu_reader, + if (pthread_create(&udp_data.packet_reader, NULL, - ipcp_udp_sdu_reader, + ipcp_udp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } - if (pthread_create(&udp_data.sduloop, + if (pthread_create(&udp_data.packet_loop, NULL, - ipcp_udp_sdu_loop, + ipcp_udp_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sduloop; + goto fail_packet_loop; } log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); @@ -688,10 +688,10 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sduloop: - pthread_cancel(udp_data.sdu_reader); - pthread_join(udp_data.sdu_reader, NULL); - fail_sdu_reader: + fail_packet_loop: + pthread_cancel(udp_data.packet_reader); + pthread_join(udp_data.packet_reader, NULL); + fail_packet_reader: pthread_cancel(udp_data.handler); pthread_join(udp_data.handler, NULL); fail_bind: @@ -1222,13 +1222,13 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.sduloop); + pthread_cancel(udp_data.packet_loop); pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.sdu_reader); + pthread_cancel(udp_data.packet_reader); - pthread_join(udp_data.sduloop, NULL); + pthread_join(udp_data.packet_loop, NULL); pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.sdu_reader, NULL); + pthread_join(udp_data.packet_reader, NULL); } udp_data_fini(); diff --git a/src/irmd/main.c b/src/irmd/main.c index 9504f3b5..427e09d1 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -123,7 +123,7 @@ struct { pthread_rwlock_t flows_lock; /* lock for flows */ struct lockfile * lf; /* single irmd per system */ - struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + struct shm_rdrbuff * rdrb; /* rdrbuff for packets */ int sockfd; /* UNIX socket */ diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index aa4e5bf3..84a9fd72 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -138,7 +138,7 @@ mark_as_advanced(LIBRT_LIBRARIES LIBPTHREAD_LIBRARIES LIBGCRYPT_INCLUDE_DIR SYS_RND_HDR) set(SHM_BUFFER_SIZE 4096 CACHE STRING - "Number of blocks in SDU buffer, must be a power of 2") + "Number of blocks in packet buffer, must be a power of 2") set(SYS_MAX_FLOWS 10240 CACHE STRING "Maximum number of total flows for this system") set(PROG_MAX_FLOWS 4096 CACHE STRING @@ -171,9 +171,9 @@ set(SHM_FLOW_SET_PREFIX "/${SHM_PREFIX}.set." CACHE INTERNAL set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL "Name for the main POSIX shared memory buffer") set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING - "SDU buffer block size, multiple of pagesize for performance") + "Packet buffer block size, multiple of pagesize for performance") set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL - "SDU buffer multiblock SDU support") + "Packet buffer multiblock packet support") set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL "Enable shared memory lockless rbuff support") diff --git a/src/lib/dev.c b/src/lib/dev.c index a92c1e42..00dcf991 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -895,7 +895,7 @@ ssize_t flow_read(int fd, { ssize_t idx; ssize_t n; - uint8_t * sdu; + uint8_t * packet; struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; @@ -948,19 +948,19 @@ ssize_t flow_read(int fd, } } - n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); + n = shm_rdrbuff_read(&packet, ai.rdrb, idx); assert(n >= 0); if (n <= (ssize_t) count) { - memcpy(buf, sdu, n); + memcpy(buf, packet, n); shm_rdrbuff_remove(ai.rdrb, idx); flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; return n; } else { if (partrd) { - memcpy(buf, sdu, count); + memcpy(buf, packet, count); sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); flow->part_idx = idx; @@ -1042,7 +1042,7 @@ int fset_add(struct flow_set * set, int fd) { int ret; - size_t sdus; + size_t packets; size_t i; if (set == NULL || fd < 0 || fd > SYS_MAX_FLOWS) @@ -1052,8 +1052,8 @@ int fset_add(struct flow_set * set, ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); - sdus = shm_rbuff_queued(ai.flows[fd].rx_rb); - for (i = 0; i < sdus; i++) + packets = shm_rbuff_queued(ai.flows[fd].rx_rb); + for (i = 0; i < packets; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index e9acb1dc..2e5c385d 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -294,7 +294,7 @@ static int __frcti_snd(struct frcti * frcti, return 0; } -/* Returns 0 when idx contains an SDU for the application. */ +/* Returns 0 when idx contains a packet for the application. */ static int __frcti_rcv(struct frcti * frcti, struct shm_du_buff * sdb) { diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 453f5183..5319e89c 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer implementations for incoming SDUs + * Ring buffer implementations for incoming packets * * Dimitri Staessens * Sander Vrijders @@ -63,8 +63,8 @@ struct shm_rbuff { size_t * tail; /* start of ringbuffer tail */ size_t * acl; /* access control */ pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ + pthread_cond_t * add; /* packet arrived */ + pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ int port_id; /* port_id of the flow */ }; diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index c488f274..43eac7f6 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Lockless ring buffer for incoming SDUs + * Lockless ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 3b7ea2d4..c74503ff 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2018 * - * Ring buffer for incoming SDUs + * Ring buffer for incoming packets * * Dimitri Staessens * Sander Vrijders diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 5ae2085d..8bbdda46 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -87,7 +87,7 @@ struct shm_rdrbuff { size_t * tail; /* start of ringbuffer tail */ pthread_mutex_t * lock; /* lock all free space in shm */ pthread_cond_t * full; /* flag when full */ - pthread_cond_t * healthy; /* flag when SDU is read */ + pthread_cond_t * healthy; /* flag when packet is read */ pid_t * pid; /* pid of the irmd owner */ }; diff --git a/src/tools/ocbr/ocbr.c b/src/tools/ocbr/ocbr.c index e2bd84af..12983da3 100644 --- a/src/tools/ocbr/ocbr.c +++ b/src/tools/ocbr/ocbr.c @@ -60,7 +60,7 @@ struct s { static void usage(void) { printf("Usage: cbr [OPTION]...\n" - "Sends SDUs from client to server at a constant bit rate.\n\n" + "Sends packets from client to server at a constant bit rate.\n\n" " -l, --listen Run in server mode\n" "\n" "Server options:\n" @@ -70,10 +70,10 @@ static void usage(void) "Client options:\n" " -n, --server_apn Specify the name of the server.\n" " -d, --duration Duration for sending (s)\n" - " -f, --flood Send SDUs as fast as possible\n" - " -s, --size SDU size (B, max %ld B)\n" + " -f, --flood Send packets as fast as possible\n" + " -s, --size packet size (B, max %ld B)\n" " -r, --rate Rate (b/s)\n" - " --sleep Sleep in between sending SDUs\n" + " --sleep Sleep in between sending packets\n" "\n\n" " --help Display this help text and exit\n", BUF_SIZE); @@ -82,7 +82,7 @@ static void usage(void) int main(int argc, char ** argv) { int duration = 60; /* One minute test */ - int size = 1000; /* 1000 byte SDUs */ + int size = 1000; /* 1000 byte packets */ long rate = 1000000; /* 1 Mb/s */ bool flood = false; bool sleep = false; diff --git a/src/tools/ocbr/ocbr_client.c b/src/tools/ocbr/ocbr_client.c index 026ab001..63b43721 100644 --- a/src/tools/ocbr/ocbr_client.c +++ b/src/tools/ocbr/ocbr_client.c @@ -155,7 +155,7 @@ int client_main(char * server, ms = ts_diff_ms(&start, &end); printf("sent statistics: " - "%9ld SDUs, %12ld bytes in %9d ms, %4.4f Mb/s\n", + "%9ld packets, %12ld bytes in %9d ms, %4.4f Mb/s\n", seqnr, seqnr * size, ms, (seqnr / (ms * 1000.0)) * size * 8.0); flow_dealloc(fd); diff --git a/src/tools/ocbr/ocbr_server.c b/src/tools/ocbr/ocbr_server.c index 4f080eff..75983201 100644 --- a/src/tools/ocbr/ocbr_server.c +++ b/src/tools/ocbr/ocbr_server.c @@ -90,8 +90,8 @@ static void handle_flow(int fd) bool stop = false; - long sdus = 0; - long sdus_intv = 0; + long packets = 0; + long packets_intv = 0; long bytes_read = 0; long bytes_read_intv = 0; @@ -109,7 +109,7 @@ static void handle_flow(int fd) if (count > 0) { clock_gettime(CLOCK_REALTIME, &alive); - sdus++; + packets++; bytes_read += count; } @@ -121,17 +121,18 @@ static void handle_flow(int fd) if (stop || ts_diff_ms(&now, &iv_end) < 0) { long us = ts_diff_us(&iv_start, &now); - printf("Flow %4d: %9ld SDUs (%12ld bytes) in %9ld ms" - " => %9.4f p/s, %9.4f Mb/s\n", + printf("Flow %4d: %9ld packets (%12ld bytes) in %9ld ms" + " => %9.4f pps, %9.4f Mbps\n", fd, - sdus - sdus_intv, + packets - packets_intv, bytes_read - bytes_read_intv, us / 1000, - ((sdus - sdus_intv) / (double) us) * MILLION, + ((packets - packets_intv) / (double) us) + * MILLION, 8 * ((bytes_read - bytes_read_intv) / (double)(us))); iv_start = iv_end; - sdus_intv = sdus; + packets_intv = packets; bytes_read_intv = bytes_read; ts_add(&iv_start, &intv, &iv_end); } diff --git a/src/tools/oecho/oecho.c b/src/tools/oecho/oecho.c index cc173988..b6a74aa5 100644 --- a/src/tools/oecho/oecho.c +++ b/src/tools/oecho/oecho.c @@ -72,7 +72,7 @@ static int server_main(void) count = flow_read(fd, &buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU.\n"); + printf("Failed to read packet.\n"); flow_dealloc(fd); continue; } @@ -80,7 +80,7 @@ static int server_main(void) printf("Message from client is %.*s.\n", (int) count, buf); if (flow_write(fd, buf, count) == -1) { - printf("Failed to write SDU.\n"); + printf("Failed to write packet.\n"); flow_dealloc(fd); continue; } @@ -105,14 +105,14 @@ static int client_main(void) } if (flow_write(fd, message, strlen(message) + 1) < 0) { - printf("Failed to write SDU.\n"); + printf("Failed to write packet.\n"); flow_dealloc(fd); return -1; } count = flow_read(fd, buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU.\n"); + printf("Failed to read packet.\n"); flow_dealloc(fd); return -1; } diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c index 137e8647..92555c23 100644 --- a/src/tools/operf/operf.c +++ b/src/tools/operf/operf.c @@ -119,8 +119,8 @@ static void usage(void) " -d, --duration Test duration (default 60s)\n" " -r, --rate Rate (b/s)\n" " -s, --size Payload size (B, default 1500)\n" - " -f, --flood Send SDUs as fast as possible\n" - " --sleep Sleep in between sending SDUs\n" + " -f, --flood Send packets as fast as possible\n" + " --sleep Sleep in between sending packets\n" "\n" " --help Display this help text and exit\n"); } diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index c8873c54..6862944e 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -120,11 +120,12 @@ void * writer(void * o) msg = (struct msg *) buf; if (client.flood) - printf("Flooding %s with %d byte SDUs for %d seconds.\n\n", + printf("Flooding %s with %d byte packets for %d seconds.\n\n", client.server_name, client.size, client.duration / 1000); else - printf("Sending %d byte SDUs for %d s to %s at %.3lf Mb/s.\n\n", + printf("Sending %d byte packets for %d s to %s " + "at %.3lf Mb/s.\n\n", client.size, client.duration / 1000, client.server_name, client.rate / (double) MILLION); @@ -141,7 +142,7 @@ void * writer(void * o) msg->id = client.sent; if (flow_write(*fdp, buf, client.size) == -1) { - printf("Failed to send SDU.\n"); + printf("Failed to send packet.\n"); flow_dealloc(*fdp); free(buf); return (void *) -1; @@ -225,7 +226,7 @@ int client_main(void) printf("\n"); printf("--- %s perf statistics ---\n", client.server_name); - printf("%ld SDUs transmitted, ", client.sent); + printf("%ld packets transmitted, ", client.sent); printf("%ld received, ", client.rcvd); printf("%ld%% packet loss, ", client.sent == 0 ? 0 : 100 - ((100 * client.rcvd) / client.sent)); diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 0f7695b5..a978e659 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -176,7 +176,7 @@ void * writer(void * o) msg->tv_nsec = now.tv_nsec; if (flow_write(*fdp, buf, client.size) == -1) { - printf("Failed to send SDU.\n"); + printf("Failed to send packet.\n"); flow_dealloc(*fdp); free(buf); return (void *) -1; @@ -253,7 +253,7 @@ static int client_main(void) printf("\n"); printf("--- %s ping statistics ---\n", client.s_apn); - printf("%d SDUs transmitted, ", client.sent); + printf("%d packets transmitted, ", client.sent); printf("%d received, ", client.rcvd); printf("%zd out-of-order, ", client.ooo); printf("%.0lf%% packet loss, ", client.sent == 0 ? 0 : -- cgit v1.2.3