summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-03-27 11:09:43 +0200
committerSander Vrijders <sander@ouroboros.rocks>2022-03-30 15:05:05 +0200
commit02b3893b1ec392f1b3ca030a03267c31eb1dc290 (patch)
treef7cebdb5ef2c4994bc1e675e838bc8922cbae950
parent56654f2cd1813d87d32695f126939bbfaad52385 (diff)
downloadouroboros-02b3893b1ec392f1b3ca030a03267c31eb1dc290.tar.gz
ouroboros-02b3893b1ec392f1b3ca030a03267c31eb1dc290.zip
lib: Add np1_flow_read and np1_flow_write calls
Reading/writing to (N + 1)-flows from the IPCP was using a raw QoS flow to bypass some functions in the ipcp_flow_read call. But this call was broken for keepalive packets. Fixing the ipcp_flow_read call for (N - 1) flows causes the IPCPs to drop 0-byte keepalive packets coming from (N + 1) client flows. >From now on, there is a dedicated call for (N + 1) reads/writes from the IPCPs that's more efficient and cleaner. The (N + 1) flow internal QoS is now also defaulted to a qos_np1 qosspec, instead of tampering with the qosspec requested by the (N + 1) client. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--include/ouroboros/ipcp-dev.h6
-rw-r--r--include/ouroboros/np1_flow.h19
-rw-r--r--src/ipcpd/eth/eth.c11
-rw-r--r--src/ipcpd/ipcp.c10
-rw-r--r--src/ipcpd/udp/main.c20
-rw-r--r--src/ipcpd/unicast/dt.c2
-rw-r--r--src/ipcpd/unicast/fa.c2
-rw-r--r--src/ipcpd/unicast/psched.c7
-rw-r--r--src/ipcpd/unicast/psched.h6
-rw-r--r--src/lib/dev.c91
10 files changed, 140 insertions, 34 deletions
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index 6472c9fe..307cf3a2 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -47,6 +47,12 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb);
+int np1_flow_read(int fd,
+ struct shm_du_buff ** sdb);
+
+int np1_flow_write(int fd,
+ struct shm_du_buff * sdb);
+
int ipcp_flow_fini(int fd);
int ipcp_flow_get_qoscube(int fd,
diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h
index b764de91..fdef443b 100644
--- a/include/ouroboros/np1_flow.h
+++ b/include/ouroboros/np1_flow.h
@@ -27,13 +27,24 @@
#include <unistd.h>
-int np1_flow_alloc(pid_t n_pid,
- int flow_id,
- qosspec_t qs);
+int np1_flow_alloc(pid_t n_pid,
+ int flow_id);
int np1_flow_resp(int flow_id);
-int np1_flow_dealloc(int flow_id,
+int np1_flow_dealloc(int flow_id,
time_t timeo);
+static const qosspec_t qos_np1 = {
+ .delay = UINT32_MAX,
+ .bandwidth = 0,
+ .availability = 0,
+ .loss = UINT32_MAX,
+ .ber = UINT32_MAX,
+ .in_order = 0,
+ .max_gap = UINT32_MAX,
+ .cypher_s = 0,
+ .timeout = 0
+};
+
#endif /* OUROBOROS_NP1_FLOW_H */
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 53dc3b69..b7b3a41d 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -1016,10 +1016,15 @@ static void * eth_ipcp_packet_reader(void * o)
#ifndef HAVE_NETMAP
shm_du_buff_head_release(sdb, ETH_HEADER_TOT_SIZE);
shm_du_buff_truncate(sdb, length);
- ipcp_flow_write(fd, sdb);
#else
- flow_write(fd, &e_frame->payload, length);
+ if (ipcp_sdb_reserve(&sdb, length))
+ continue;
+
+ buf = shm_du_buff_head(sdb);
+ memcpy(buf, &e_frame->payload, length);
#endif
+ if (np1_flow_write(fd, sdb) < 0)
+ ipcp_sdb_release(sdb);
}
}
@@ -1062,7 +1067,7 @@ static void * eth_ipcp_packet_writer(void * o)
if (fqueue_type(fq) != FLOW_PKT)
continue;
- if (ipcp_flow_read(fd, &sdb)) {
+ if (np1_flow_read(fd, &sdb)) {
log_dbg("Bad read from fd %d.", fd);
continue;
}
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index d19d8e43..2426fbab 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -514,10 +514,8 @@ static void * mainloop(void * o)
break;
}
- qs = msg_to_spec(msg->qosspec);
fd = np1_flow_alloc(msg->pid,
- msg->flow_id,
- qs);
+ msg->flow_id);
if (fd < 0) {
log_err("Failed allocating fd on flow_id %d.",
msg->flow_id);
@@ -525,6 +523,7 @@ static void * mainloop(void * o)
break;
}
+ qs = msg_to_spec(msg->qosspec);
ret_msg.result =
ipcpi.ops->ipcp_flow_alloc(fd,
msg->hash.data,
@@ -549,10 +548,8 @@ static void * mainloop(void * o)
break;
}
- qs = msg_to_spec(msg->qosspec);
fd = np1_flow_alloc(msg->pid,
- msg->flow_id,
- qs);
+ msg->flow_id);
if (fd < 0) {
log_err("Failed allocating fd on flow_id %d.",
msg->flow_id);
@@ -560,6 +557,7 @@ static void * mainloop(void * o)
break;
}
+ qs = msg_to_spec(msg->qosspec);
ret_msg.result =
ipcpi.ops->ipcp_flow_join(fd,
msg->hash.data,
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index d3104163..6e32638d 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -450,9 +450,11 @@ static void * ipcp_udp_packet_reader(void * o)
eid_p = (uint32_t *) buf;
while (true) {
- struct mgmt_frame * frame;
- struct sockaddr_in r_saddr;
- socklen_t len;
+ struct mgmt_frame * frame;
+ struct sockaddr_in r_saddr;
+ socklen_t len;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
len = sizeof(r_saddr);
@@ -493,7 +495,15 @@ static void * ipcp_udp_packet_reader(void * o)
continue;
}
- flow_write(eid, data, n - sizeof(eid));
+ n-= sizeof(eid);
+
+ if (ipcp_sdb_reserve(&sdb, n))
+ continue;
+
+ head = shm_du_buff_head(sdb);
+ memcpy(head, data, n);
+ if (np1_flow_write(eid, sdb) < 0)
+ ipcp_sdb_release(sdb);
}
return 0;
@@ -536,7 +546,7 @@ static void * ipcp_udp_packet_writer(void * o)
if (fqueue_type(fq) != FLOW_PKT)
continue;
- if (ipcp_flow_read(fd, &sdb)) {
+ if (np1_flow_read(fd, &sdb)) {
log_dbg("Bad read from fd %d.", fd);
continue;
}
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 9c16e5d0..9cc53edc 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -713,7 +713,7 @@ void dt_fini(void)
int dt_start(void)
{
- dt.psched = psched_create(packet_handler);
+ dt.psched = psched_create(packet_handler, ipcp_flow_read);
if (dt.psched == NULL) {
log_err("Failed to create N-1 packet scheduler.");
return -1;
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 5f3dd1a7..345d4031 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -756,7 +756,7 @@ int fa_start(void)
int pol;
int max;
- fa.psched = psched_create(packet_handler);
+ fa.psched = psched_create(packet_handler, np1_flow_read);
if (fa.psched == NULL) {
log_err("Failed to start packet scheduler.");
goto fail_psched;
diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c
index 33ac5afe..bb452726 100644
--- a/src/ipcpd/unicast/psched.c
+++ b/src/ipcpd/unicast/psched.c
@@ -50,6 +50,7 @@ static int qos_prio [] = {
struct psched {
fset_t * set[QOS_CUBE_MAX];
next_packet_fn_t callback;
+ read_fn_t read;
pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
};
@@ -101,7 +102,7 @@ static void * packet_reader(void * o)
notifier_event(NOTIFY_DT_FLOW_UP, &fd);
break;
case FLOW_PKT:
- if (ipcp_flow_read(fd, &sdb))
+ if (sched->read(fd, &sdb) < 0)
continue;
sched->callback(fd, qc, sdb);
@@ -117,7 +118,8 @@ static void * packet_reader(void * o)
return (void *) 0;
}
-struct psched * psched_create(next_packet_fn_t callback)
+struct psched * psched_create(next_packet_fn_t callback,
+ read_fn_t read)
{
struct psched * psched;
struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
@@ -131,6 +133,7 @@ struct psched * psched_create(next_packet_fn_t callback)
goto fail_malloc;
psched->callback = callback;
+ psched->read = read;
for (i = 0; i < QOS_CUBE_MAX; ++i) {
psched->set[i] = fset_create();
diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h
index 1f22b34b..654d73d9 100644
--- a/src/ipcpd/unicast/psched.h
+++ b/src/ipcpd/unicast/psched.h
@@ -30,7 +30,11 @@ typedef void (* next_packet_fn_t)(int fd,
qoscube_t qc,
struct shm_du_buff * sdb);
-struct psched * psched_create(next_packet_fn_t callback);
+typedef int (* read_fn_t)(int fd,
+ struct shm_du_buff ** sdb);
+
+struct psched * psched_create(next_packet_fn_t callback,
+ read_fn_t read);
void psched_destroy(struct psched * psched);
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ac885711..b3e9c69e 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -38,6 +38,7 @@
#include <ouroboros/sockets.h>
#include <ouroboros/fccntl.h>
#include <ouroboros/bitmap.h>
+#include <ouroboros/np1_flow.h>
#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
#include <ouroboros/shm_flow_set.h>
@@ -1330,7 +1331,7 @@ ssize_t flow_read(int fd,
idx = flow_rx_sdb(flow, &sdb, block, &tictime);
if (idx < 0) {
- if (idx != -ETIMEDOUT)
+ if (idx != -ETIMEDOUT && idx != -EAGAIN)
return idx;
if (abstime != NULL
@@ -1740,12 +1741,9 @@ ssize_t fevent(struct flow_set * set,
/* ipcp-dev functions. */
int np1_flow_alloc(pid_t n_pid,
- int flow_id,
- qosspec_t qs)
+ int flow_id)
{
- qs.cypher_s = 0; /* No encryption ctx for np1 */
- qs.in_order = 0; /* No frct for np1 */
- return flow_init(flow_id, n_pid, qs, NULL, 0);
+ return flow_init(flow_id, n_pid, qos_np1, NULL, 0);
}
int np1_flow_dealloc(int flow_id,
@@ -1855,9 +1853,7 @@ int ipcp_flow_req_arr(const uint8_t * dst,
return -1;
}
- qs.cypher_s = 0; /* No encryption ctx for np1 */
- qs.in_order = 0; /* No frct for np1 */
- fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs, NULL, 0);
+ fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -1928,8 +1924,14 @@ int ipcp_flow_read(int fd,
pthread_rwlock_unlock(&ai.lock);
idx = flow_rx_sdb(flow, sdb, false, NULL);
- if (idx < 0)
+ if (idx < 0) {
+ if (idx == -EAGAIN) {
+ pthread_rwlock_rdlock(&ai.lock);
+ continue;
+ }
+
return idx;
+ }
pthread_rwlock_rdlock(&ai.lock);
@@ -1964,7 +1966,74 @@ int ipcp_flow_write(int fd,
return -EPERM;
}
- ret = flow_tx_sdb(flow, sdb, false, NULL);
+ pthread_rwlock_unlock(&ai.lock);
+
+ ret = flow_tx_sdb(flow, sdb, true, NULL);
+
+ return ret;
+}
+
+int np1_flow_read(int fd,
+ struct shm_du_buff ** sdb)
+{
+ struct flow * flow;
+ ssize_t idx = -1;
+
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(sdb);
+
+ flow = &ai.flows[fd];
+
+ assert(flow->flow_id >= 0);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ idx = shm_rbuff_read(flow->rx_rb);;
+ if (idx < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return idx;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ return 0;
+}
+
+int np1_flow_write(int fd,
+ struct shm_du_buff * sdb)
+{
+ struct flow * flow;
+ int ret;
+ ssize_t idx;
+
+ assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ assert(sdb);
+
+ flow = &ai.flows[fd];
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->flow_id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOTALLOC;
+ }
+
+ if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -EPERM;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
+ if (ret < 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
return ret;
}