From 02b3893b1ec392f1b3ca030a03267c31eb1dc290 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 27 Mar 2022 11:09:43 +0200 Subject: lib: Add np1_flow_read and np1_flow_write calls Reading/writing to (N + 1)-flows from the IPCP was using a raw QoS flow to bypass some functions in the ipcp_flow_read call. But this call was broken for keepalive packets. Fixing the ipcp_flow_read call for (N - 1) flows causes the IPCPs to drop 0-byte keepalive packets coming from (N + 1) client flows. >From now on, there is a dedicated call for (N + 1) reads/writes from the IPCPs that's more efficient and cleaner. The (N + 1) flow internal QoS is now also defaulted to a qos_np1 qosspec, instead of tampering with the qosspec requested by the (N + 1) client. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/eth/eth.c | 11 ++++-- src/ipcpd/ipcp.c | 10 ++--- src/ipcpd/udp/main.c | 20 +++++++--- src/ipcpd/unicast/dt.c | 2 +- src/ipcpd/unicast/fa.c | 2 +- src/ipcpd/unicast/psched.c | 7 +++- src/ipcpd/unicast/psched.h | 6 ++- src/lib/dev.c | 91 ++++++++++++++++++++++++++++++++++++++++------ 8 files changed, 119 insertions(+), 30 deletions(-) (limited to 'src') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 53dc3b69..b7b3a41d 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1016,10 +1016,15 @@ static void * eth_ipcp_packet_reader(void * o) #ifndef HAVE_NETMAP shm_du_buff_head_release(sdb, ETH_HEADER_TOT_SIZE); shm_du_buff_truncate(sdb, length); - ipcp_flow_write(fd, sdb); #else - flow_write(fd, &e_frame->payload, length); + if (ipcp_sdb_reserve(&sdb, length)) + continue; + + buf = shm_du_buff_head(sdb); + memcpy(buf, &e_frame->payload, length); #endif + if (np1_flow_write(fd, sdb) < 0) + ipcp_sdb_release(sdb); } } @@ -1062,7 +1067,7 @@ static void * eth_ipcp_packet_writer(void * o) if (fqueue_type(fq) != FLOW_PKT) continue; - if (ipcp_flow_read(fd, &sdb)) { + if (np1_flow_read(fd, &sdb)) { log_dbg("Bad read from fd %d.", fd); continue; } diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d19d8e43..2426fbab 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -514,10 +514,8 @@ static void * mainloop(void * o) break; } - qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, - msg->flow_id, - qs); + msg->flow_id); if (fd < 0) { log_err("Failed allocating fd on flow_id %d.", msg->flow_id); @@ -525,6 +523,7 @@ static void * mainloop(void * o) break; } + qs = msg_to_spec(msg->qosspec); ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, msg->hash.data, @@ -549,10 +548,8 @@ static void * mainloop(void * o) break; } - qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, - msg->flow_id, - qs); + msg->flow_id); if (fd < 0) { log_err("Failed allocating fd on flow_id %d.", msg->flow_id); @@ -560,6 +557,7 @@ static void * mainloop(void * o) break; } + qs = msg_to_spec(msg->qosspec); ret_msg.result = ipcpi.ops->ipcp_flow_join(fd, msg->hash.data, diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index d3104163..6e32638d 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -450,9 +450,11 @@ static void * ipcp_udp_packet_reader(void * o) eid_p = (uint32_t *) buf; while (true) { - struct mgmt_frame * frame; - struct sockaddr_in r_saddr; - socklen_t len; + struct mgmt_frame * frame; + struct sockaddr_in r_saddr; + socklen_t len; + struct shm_du_buff * sdb; + uint8_t * head; len = sizeof(r_saddr); @@ -493,7 +495,15 @@ static void * ipcp_udp_packet_reader(void * o) continue; } - flow_write(eid, data, n - sizeof(eid)); + n-= sizeof(eid); + + if (ipcp_sdb_reserve(&sdb, n)) + continue; + + head = shm_du_buff_head(sdb); + memcpy(head, data, n); + if (np1_flow_write(eid, sdb) < 0) + ipcp_sdb_release(sdb); } return 0; @@ -536,7 +546,7 @@ static void * ipcp_udp_packet_writer(void * o) if (fqueue_type(fq) != FLOW_PKT) continue; - if (ipcp_flow_read(fd, &sdb)) { + if (np1_flow_read(fd, &sdb)) { log_dbg("Bad read from fd %d.", fd); continue; } diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 9c16e5d0..9cc53edc 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -713,7 +713,7 @@ void dt_fini(void) int dt_start(void) { - dt.psched = psched_create(packet_handler); + dt.psched = psched_create(packet_handler, ipcp_flow_read); if (dt.psched == NULL) { log_err("Failed to create N-1 packet scheduler."); return -1; diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 5f3dd1a7..345d4031 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -756,7 +756,7 @@ int fa_start(void) int pol; int max; - fa.psched = psched_create(packet_handler); + fa.psched = psched_create(packet_handler, np1_flow_read); if (fa.psched == NULL) { log_err("Failed to start packet scheduler."); goto fail_psched; diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c index 33ac5afe..bb452726 100644 --- a/src/ipcpd/unicast/psched.c +++ b/src/ipcpd/unicast/psched.c @@ -50,6 +50,7 @@ static int qos_prio [] = { struct psched { fset_t * set[QOS_CUBE_MAX]; next_packet_fn_t callback; + read_fn_t read; pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; }; @@ -101,7 +102,7 @@ static void * packet_reader(void * o) notifier_event(NOTIFY_DT_FLOW_UP, &fd); break; case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) + if (sched->read(fd, &sdb) < 0) continue; sched->callback(fd, qc, sdb); @@ -117,7 +118,8 @@ static void * packet_reader(void * o) return (void *) 0; } -struct psched * psched_create(next_packet_fn_t callback) +struct psched * psched_create(next_packet_fn_t callback, + read_fn_t read) { struct psched * psched; struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; @@ -131,6 +133,7 @@ struct psched * psched_create(next_packet_fn_t callback) goto fail_malloc; psched->callback = callback; + psched->read = read; for (i = 0; i < QOS_CUBE_MAX; ++i) { psched->set[i] = fset_create(); diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h index 1f22b34b..654d73d9 100644 --- a/src/ipcpd/unicast/psched.h +++ b/src/ipcpd/unicast/psched.h @@ -30,7 +30,11 @@ typedef void (* next_packet_fn_t)(int fd, qoscube_t qc, struct shm_du_buff * sdb); -struct psched * psched_create(next_packet_fn_t callback); +typedef int (* read_fn_t)(int fd, + struct shm_du_buff ** sdb); + +struct psched * psched_create(next_packet_fn_t callback, + read_fn_t read); void psched_destroy(struct psched * psched); diff --git a/src/lib/dev.c b/src/lib/dev.c index ac885711..b3e9c69e 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -1330,7 +1331,7 @@ ssize_t flow_read(int fd, idx = flow_rx_sdb(flow, &sdb, block, &tictime); if (idx < 0) { - if (idx != -ETIMEDOUT) + if (idx != -ETIMEDOUT && idx != -EAGAIN) return idx; if (abstime != NULL @@ -1740,12 +1741,9 @@ ssize_t fevent(struct flow_set * set, /* ipcp-dev functions. */ int np1_flow_alloc(pid_t n_pid, - int flow_id, - qosspec_t qs) + int flow_id) { - qs.cypher_s = 0; /* No encryption ctx for np1 */ - qs.in_order = 0; /* No frct for np1 */ - return flow_init(flow_id, n_pid, qs, NULL, 0); + return flow_init(flow_id, n_pid, qos_np1, NULL, 0); } int np1_flow_dealloc(int flow_id, @@ -1855,9 +1853,7 @@ int ipcp_flow_req_arr(const uint8_t * dst, return -1; } - qs.cypher_s = 0; /* No encryption ctx for np1 */ - qs.in_order = 0; /* No frct for np1 */ - fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs, NULL, 0); + fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0); irm_msg__free_unpacked(recv_msg, NULL); @@ -1928,8 +1924,14 @@ int ipcp_flow_read(int fd, pthread_rwlock_unlock(&ai.lock); idx = flow_rx_sdb(flow, sdb, false, NULL); - if (idx < 0) + if (idx < 0) { + if (idx == -EAGAIN) { + pthread_rwlock_rdlock(&ai.lock); + continue; + } + return idx; + } pthread_rwlock_rdlock(&ai.lock); @@ -1964,7 +1966,74 @@ int ipcp_flow_write(int fd, return -EPERM; } - ret = flow_tx_sdb(flow, sdb, false, NULL); + pthread_rwlock_unlock(&ai.lock); + + ret = flow_tx_sdb(flow, sdb, true, NULL); + + return ret; +} + +int np1_flow_read(int fd, + struct shm_du_buff ** sdb) +{ + struct flow * flow; + ssize_t idx = -1; + + assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(sdb); + + flow = &ai.flows[fd]; + + assert(flow->flow_id >= 0); + + pthread_rwlock_rdlock(&ai.lock); + + idx = shm_rbuff_read(flow->rx_rb);; + if (idx < 0) { + pthread_rwlock_unlock(&ai.lock); + return idx; + } + + pthread_rwlock_unlock(&ai.lock); + + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + + return 0; +} + +int np1_flow_write(int fd, + struct shm_du_buff * sdb) +{ + struct flow * flow; + int ret; + ssize_t idx; + + assert(fd >= 0 && fd < SYS_MAX_FLOWS); + assert(sdb); + + flow = &ai.flows[fd]; + + pthread_rwlock_rdlock(&ai.lock); + + if (flow->flow_id < 0) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } + + if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { + pthread_rwlock_unlock(&ai.lock); + return -EPERM; + } + + pthread_rwlock_unlock(&ai.lock); + + idx = shm_du_buff_get_idx(sdb); + + ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); + if (ret < 0) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); return ret; } -- cgit v1.2.3