From 40ca5385e97f393d0c231446f117ad43465735a7 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 6 Mar 2022 15:15:51 +0100 Subject: ipdpd: Pass MPL to application at flow_allocation The maximum packet lifetime (MPL) is a property of the flow that needs to be passed to the reliable transmission protocol (FRCP) for its correct operation. Previously, the value of MPL was set fixed as one of the (fixed) Delta-t parameters. This patch makes the MPL a property of the layer, and it can now be set per layer-type at build time. This is a step towards a proper MPL estimator in the flow allocator. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/broadcast/CMakeLists.txt | 2 ++ src/ipcpd/broadcast/main.c | 3 ++- src/ipcpd/config.h.in | 12 +++++++++++- src/ipcpd/eth/CMakeLists.txt | 2 ++ src/ipcpd/eth/eth.c | 10 ++++++---- src/ipcpd/local/CMakeLists.txt | 2 ++ src/ipcpd/local/main.c | 7 +++++-- src/ipcpd/udp/CMakeLists.txt | 2 ++ src/ipcpd/udp/main.c | 7 +++++-- src/ipcpd/unicast/CMakeLists.txt | 2 ++ src/ipcpd/unicast/fa.c | 6 ++++-- src/irmd/irm_flow.c | 1 + src/irmd/irm_flow.h | 1 + src/irmd/main.c | 24 ++++++++++++++++++++---- src/lib/dev.c | 21 +++++++++++++++++---- src/lib/ipcpd_messages.proto | 3 ++- src/lib/irmd_messages.proto | 7 ++++--- 17 files changed, 88 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/ipcpd/broadcast/CMakeLists.txt b/src/ipcpd/broadcast/CMakeLists.txt index afcc8696..af0d8fcf 100644 --- a/src/ipcpd/broadcast/CMakeLists.txt +++ b/src/ipcpd/broadcast/CMakeLists.txt @@ -13,6 +13,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_BINARY_DIR}/include) set(IPCP_BROADCAST_TARGET ipcpd-broadcast CACHE INTERNAL "") +set(IPCP_BROADCAST_MPL 60 CACHE STRING + "Default maximum packet lifetime for the broadcast IPCP, in seconds") set(SOURCE_FILES # Add source files here diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index 522d1391..03c1e746 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -224,6 +224,7 @@ static int broadcast_ipcp_join(int fd, qosspec_t qs) { struct conn conn; + time_t mpl = IPCP_BROADCAST_MPL; (void) qs; @@ -236,7 +237,7 @@ static int broadcast_ipcp_join(int fd, notifier_event(NOTIFY_DT_CONN_ADD, &conn); - ipcp_flow_alloc_reply(fd, 0, NULL, 0); + ipcp_flow_alloc_reply(fd, 0, mpl, NULL, 0); return 0; } diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index 0bf3ad69..7891ffa8 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -48,6 +48,7 @@ #define IPCP_SCHED_THR_MUL @IPCP_SCHED_THR_MUL@ #define PFT_SIZE @PFT_SIZE@ #define DHT_ENROLL_SLACK @DHT_ENROLL_SLACK@ +#define IPCP_UNICAST_MPL @IPCP_UNICAST_MPL@ #cmakedefine IPCP_CONN_WAIT_DIR #cmakedefine DISABLE_CORE_LOCK @@ -59,8 +60,9 @@ #define NSLOOKUP_EXEC "@NSLOOKUP_EXECUTABLE@" #define IPCP_UDP_RD_THR @IPCP_UDP_RD_THR@ #define IPCP_UDP_WR_THR @IPCP_UDP_WR_THR@ +#define IPCP_UDP_MPL @IPCP_UDP_MPL@ -/* eth-llc */ +/* eth */ #cmakedefine HAVE_NETMAP #cmakedefine HAVE_BPF #cmakedefine HAVE_RAW_SOCKETS @@ -68,3 +70,11 @@ #define IPCP_ETH_RD_THR @IPCP_ETH_RD_THR@ #define IPCP_ETH_WR_THR @IPCP_ETH_WR_THR@ #define IPCP_ETH_LO_MTU @IPCP_ETH_LO_MTU@ +#define IPCP_ETH_MPL @IPCP_ETH_MPL@ + +/* local */ +#define IPCP_LOCAL_MPL @IPCP_LOCAL_MPL@ + +/* broadcast */ +/* local */ +#define IPCP_BROADCAST_MPL @IPCP_BROADCAST_MPL@ diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt index d7105b4f..d57e1848 100644 --- a/src/ipcpd/eth/CMakeLists.txt +++ b/src/ipcpd/eth/CMakeLists.txt @@ -85,6 +85,8 @@ if (HAVE_ETH) "Bypass the Qdisc in the kernel when using raw sockets") set(IPCP_ETH_LO_MTU 1500 CACHE STRING "Restrict Ethernet MTU over loopback interfaces") + set(IPCP_ETH_MPL 5 CACHE STRING + "Default maximum packet lifetime for the Ethernet IPCPs, in seconds") set(ETH_LLC_SOURCES # Add source files here diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 8b34d303..f62bd0a7 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -575,6 +575,7 @@ static int eth_ipcp_req(uint8_t * r_addr, struct timespec ts = {0, ALLOC_TIMEO * MILLION}; struct timespec abstime; int fd; + time_t mpl = IPCP_ETH_MPL; clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -594,7 +595,7 @@ static int eth_ipcp_req(uint8_t * r_addr, } /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); + fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -636,8 +637,9 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, const void * data, size_t len) { - int ret = 0; - int fd = -1; + int ret = 0; + int fd = -1; + time_t mpl = IPCP_ETH_MPL; pthread_rwlock_wrlock(ð_data.flows_lock); @@ -672,7 +674,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, #elif defined(BUILD_ETH_LLC) log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); #endif - if ((ret = ipcp_flow_alloc_reply(fd, response, data, len)) < 0) + if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data, len)) < 0) return -1; return ret; diff --git a/src/ipcpd/local/CMakeLists.txt b/src/ipcpd/local/CMakeLists.txt index a84f4f1b..10fd0120 100644 --- a/src/ipcpd/local/CMakeLists.txt +++ b/src/ipcpd/local/CMakeLists.txt @@ -13,6 +13,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_BINARY_DIR}/include) set(IPCP_LOCAL_TARGET ipcpd-local CACHE INTERNAL "") +set(IPCP_LOCAL_MPL 2 CACHE STRING + "Default maximum packet lifetime for the Ethernet IPCPs, in seconds") set(LOCAL_SOURCES # Add source files here diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c0e0b702..20f06ef8 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -204,6 +204,7 @@ static int ipcp_local_flow_alloc(int fd, struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; struct timespec abstime; int out_fd = -1; + time_t mpl = IPCP_LOCAL_MPL; log_dbg("Allocating flow to " HASH_FMT " on fd %d.", HASH_VAL(dst), fd); assert(dst); @@ -227,7 +228,8 @@ static int ipcp_local_flow_alloc(int fd, assert(ipcpi.alloc_id == -1); - out_fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); + out_fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, + data, len); if (out_fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Flow allocation failed: %d", out_fd); @@ -261,6 +263,7 @@ static int ipcp_local_flow_alloc_resp(int fd, struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; struct timespec abstime; int out_fd = -1; + time_t mpl = IPCP_LOCAL_MPL; clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -303,7 +306,7 @@ static int ipcp_local_flow_alloc_resp(int fd, fset_add(local_data.flows, fd); - if (ipcp_flow_alloc_reply(out_fd, response, data, len) < 0) + if (ipcp_flow_alloc_reply(out_fd, response, mpl, data, len) < 0) return -1; log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd); diff --git a/src/ipcpd/udp/CMakeLists.txt b/src/ipcpd/udp/CMakeLists.txt index f1a29ef6..8ae5518e 100644 --- a/src/ipcpd/udp/CMakeLists.txt +++ b/src/ipcpd/udp/CMakeLists.txt @@ -58,6 +58,8 @@ set(IPCP_UDP_RD_THR 3 CACHE STRING "Number of reader threads in UDP IPCP") set(IPCP_UDP_WR_THR 3 CACHE STRING "Number of writer threads in UDP IPCP") +set(IPCP_UDP_MPL 60 CACHE STRING + "Default maximum packet lifetime for the UDP IPCP, in seconds") include(AddCompileFlags) if (CMAKE_BUILD_TYPE MATCHES "Debug*") diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 5c57e6b8..3b354ceb 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -279,6 +279,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; struct timespec abstime; int fd; + time_t mpl = IPCP_UDP_MPL; clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -297,7 +298,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, } /* reply to IRM */ - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); + fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -329,6 +330,8 @@ static int ipcp_udp_port_alloc_reply(const struct sockaddr_in * saddr, const void * data, size_t len) { + time_t mpl = IPCP_UDP_MPL; + pthread_rwlock_wrlock(&udp_data.flows_lock); if (memcmp(&udp_data.fd_to_uf[s_eid].r_saddr, saddr, sizeof(*saddr))) { @@ -343,7 +346,7 @@ static int ipcp_udp_port_alloc_reply(const struct sockaddr_in * saddr, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_flow_alloc_reply(s_eid, response, data, len) < 0) { + if (ipcp_flow_alloc_reply(s_eid, response, mpl, data, len) < 0) { log_dbg("Failed to reply to flow allocation."); return -1; } diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt index f4887160..e1fe1074 100644 --- a/src/ipcpd/unicast/CMakeLists.txt +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -13,6 +13,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_BINARY_DIR}/include) set(IPCP_UNICAST_TARGET ipcpd-unicast CACHE INTERNAL "") +set(IPCP_UNICAST_MPL 60 CACHE STRING + "Default maximum packet lifetime for the unicast IPCP, in seconds") protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS dir/kademlia.proto) diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index d59b9760..eb467a90 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -477,6 +477,7 @@ static int fa_wait_irmd_alloc(uint8_t * dst, struct timespec ts = {0, TIMEOUT * 1000}; struct timespec abstime; int fd; + time_t mpl = IPCP_UNICAST_MPL; clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -497,7 +498,7 @@ static int fa_wait_irmd_alloc(uint8_t * dst, assert(ipcpi.alloc_id == -1); - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); + fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Failed to get fd for flow."); @@ -598,6 +599,7 @@ static int fa_handle_flow_reply(struct fa_msg * msg, struct fa_flow * flow; uint8_t * data; /* Piggbacked data on flow alloc request. */ size_t dlen; /* Length of piggybacked data. */ + time_t mpl = IPCP_UNICAST_MPL; assert(len >= sizeof(*msg)); @@ -623,7 +625,7 @@ static int fa_handle_flow_reply(struct fa_msg * msg, pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_flow_alloc_reply(fd, msg->response, data, dlen)) + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen)) return -EIRMD; return 0; diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 75df7a80..1ffff53c 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -62,6 +62,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_pid = n_pid; f->n_1_pid = n_1_pid; f->flow_id = flow_id; + f->mpl = 0; f->qs = qs; f->data = NULL; f->len = 0; diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 35e7dc2c..42b19d8d 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -48,6 +48,7 @@ struct irm_flow { pid_t n_1_pid; qosspec_t qs; + time_t mpl; void * data; size_t len; diff --git a/src/irmd/main.c b/src/irmd/main.c index 4b70c88b..fdbc25a7 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1353,11 +1353,13 @@ static int flow_accept(pid_t pid, f_out->flow_id = f->flow_id; f_out->n_pid = f->n_pid; f_out->n_1_pid = f->n_1_pid; + f_out->qs = f->qs; + f_out->mpl = f->mpl; f_out->data = f->data; /* pass owner */ f_out->len = f->len; - f_out->qs = f->qs; - f->data = NULL; - f->len = 0; + + f->data = NULL; + f->len = 0; pthread_rwlock_unlock(&irmd.flows_lock); @@ -1476,6 +1478,7 @@ static int flow_alloc(pid_t pid, f_out->n_1_pid = f->n_1_pid; f_out->data = f->data; /* pass owner */ f_out->len = f->len; + f_out->mpl = f->mpl; f->data = NULL; f->len = 0; @@ -1567,6 +1570,7 @@ static pid_t auto_execute(char ** argv) static int flow_req_arr(pid_t pid, struct irm_flow * f_out, const uint8_t * hash, + time_t mpl, qosspec_t qs, const void * data, size_t len) @@ -1681,6 +1685,8 @@ static int flow_req_arr(pid_t pid, return -1; } + f->mpl = mpl; + if (len != 0) { assert(data); f->data = malloc(len); @@ -1732,19 +1738,21 @@ static int flow_req_arr(pid_t pid, static int flow_alloc_reply(int flow_id, int response, + time_t mpl, const void * data, size_t len) { struct irm_flow * f; pthread_rwlock_wrlock(&irmd.flows_lock); - f = get_irm_flow(flow_id); if (f == NULL) { pthread_rwlock_unlock(&irmd.flows_lock); return -1; } + f->mpl = mpl; + if (!response) irm_flow_set_state(f, FLOW_ALLOCATED); else @@ -2168,6 +2176,8 @@ static void * mainloop(void * o) ret_msg->has_pk = true; ret_msg->pk.data = e.data; ret_msg->pk.len = e.len; + ret_msg->has_mpl = true; + ret_msg->mpl = e.mpl; } break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: @@ -2185,6 +2195,8 @@ static void * mainloop(void * o) ret_msg->has_pk = true; ret_msg->pk.data = e.data; ret_msg->pk.len = e.len; + ret_msg->has_mpl = true; + ret_msg->mpl = e.mpl; } break; case IRM_MSG_CODE__IRM_FLOW_JOIN: @@ -2197,6 +2209,8 @@ static void * mainloop(void * o) ret_msg->flow_id = e.flow_id; ret_msg->has_pid = true; ret_msg->pid = e.n_1_pid; + ret_msg->has_mpl = true; + ret_msg->mpl = e.mpl; } break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: @@ -2210,6 +2224,7 @@ static void * mainloop(void * o) result = flow_req_arr(msg->pid, &e, msg->hash.data, + msg->mpl, msg_to_spec(msg->qosspec), msg->pk.data, msg->pk.len); @@ -2225,6 +2240,7 @@ static void * mainloop(void * o) : msg->pk.data == NULL); result = flow_alloc_reply(msg->flow_id, msg->response, + msg->mpl, msg->pk.data, msg->pk.len); break; diff --git a/src/lib/dev.c b/src/lib/dev.c index 2e61df52..5f6a3694 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -574,6 +574,7 @@ int flow_accept(qosspec_t * qs, uint8_t buf[MSGBUFSZ]; int err = -EIRMD; ssize_t key_len; + time_t mpl; memset(s, 0, SYMMKEYSZ); @@ -617,7 +618,7 @@ int flow_accept(qosspec_t * qs, } if (!recv_msg->has_pid || !recv_msg->has_flow_id || - recv_msg->qosspec == NULL) + !recv_msg->has_mpl || recv_msg->qosspec == NULL) goto fail_result; if (recv_msg->pk.len != 0 && @@ -632,6 +633,8 @@ int flow_accept(qosspec_t * qs, fd = flow_init(recv_msg->flow_id, recv_msg->pid, msg_to_spec(recv_msg->qosspec), s); + mpl = recv_msg->mpl; + irm_msg__free_unpacked(recv_msg, NULL); if (fd < 0) @@ -642,7 +645,7 @@ int flow_accept(qosspec_t * qs, assert(ai.flows[fd].frcti == NULL); if (ai.flows[fd].qs.in_order != 0) { - ai.flows[fd].frcti = frcti_create(fd, DELT_A, DELT_R, DELT_MPL); + ai.flows[fd].frcti = frcti_create(fd, DELT_A, DELT_R, mpl); if (ai.flows[fd].frcti == NULL) { pthread_rwlock_unlock(&ai.lock); flow_dealloc(fd); @@ -678,6 +681,7 @@ static int __flow_alloc(const char * dst, uint8_t s[SYMMKEYSZ]; /* secret key for flow */ uint8_t buf[MSGBUFSZ]; int err = -EIRMD; + time_t mpl; memset(s, 0, SYMMKEYSZ); @@ -726,7 +730,8 @@ static int __flow_alloc(const char * dst, goto fail_result; } - if (!recv_msg->has_pid || !recv_msg->has_flow_id) + if (!recv_msg->has_pid || !recv_msg->has_flow_id || + !recv_msg->has_mpl) goto fail_result; if (!join && qs != NULL && qs->cypher_s != 0) { @@ -747,6 +752,8 @@ static int __flow_alloc(const char * dst, fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs == NULL ? qos_raw : *qs, s); + mpl = recv_msg->mpl; + irm_msg__free_unpacked(recv_msg, NULL); if (fd < 0) @@ -757,7 +764,7 @@ static int __flow_alloc(const char * dst, assert(ai.flows[fd].frcti == NULL); if (ai.flows[fd].qs.in_order != 0) { - ai.flows[fd].frcti = frcti_create(fd, DELT_A, DELT_R, DELT_MPL); + ai.flows[fd].frcti = frcti_create(fd, DELT_A, DELT_R, mpl); if (ai.flows[fd].frcti == NULL) { pthread_rwlock_unlock(&ai.lock); flow_dealloc(fd); @@ -1771,6 +1778,7 @@ int ipcp_create_r(int result) int ipcp_flow_req_arr(const uint8_t * dst, size_t len, qosspec_t qs, + time_t mpl, const void * data, size_t dlen) { @@ -1789,6 +1797,8 @@ int ipcp_flow_req_arr(const uint8_t * dst, msg.hash.data = (uint8_t *) dst; qs_msg = spec_to_msg(&qs); msg.qosspec = &qs_msg; + msg.has_mpl = true; + msg.mpl = mpl; msg.has_pk = true; msg.pk.data = (uint8_t *) data; msg.pk.len = dlen; @@ -1817,6 +1827,7 @@ int ipcp_flow_req_arr(const uint8_t * dst, int ipcp_flow_alloc_reply(int fd, int response, + time_t mpl, const void * data, size_t len) { @@ -1831,6 +1842,8 @@ int ipcp_flow_alloc_reply(int fd, msg.has_pk = true; msg.pk.data = (uint8_t *) data; msg.pk.len = (uint32_t) len; + msg.has_mpl = true; + msg.mpl = mpl; pthread_rwlock_rdlock(&ai.lock); diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index a8c3bfb4..4908a3a3 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -53,5 +53,6 @@ message ipcp_msg { optional int32 response = 10; optional string comp = 11; optional uint32 timeo_sec = 12; - optional int32 result = 13; + optional sint32 mpl = 13; + optional int32 result = 14; }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 515d486f..2173d514 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -84,7 +84,8 @@ message irm_msg { repeated name_info_msg names = 16; optional uint32 timeo_sec = 17; optional uint32 timeo_nsec = 18; - optional string comp = 19; - optional bytes pk = 20; /* piggyback */ - optional sint32 result = 21; + optional sint32 mpl = 19; + optional string comp = 20; + optional bytes pk = 21; /* piggyback */ + optional sint32 result = 22; }; -- cgit v1.2.3