summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-12-01 19:19:04 +0100
committerSander Vrijders <sander@ouroboros.rocks>2020-12-02 19:21:29 +0100
commit8e1c0e62feb4832dca2b53e51ab0e1cb8f48e5b1 (patch)
treece90e18277c38e7ee592d441ae4de197081c6476 /src/ipcpd
parentaef6bdb1eadf8779173145710306ea5b6d81b8ec (diff)
downloadouroboros-8e1c0e62feb4832dca2b53e51ab0e1cb8f48e5b1.tar.gz
ouroboros-8e1c0e62feb4832dca2b53e51ab0e1cb8f48e5b1.zip
ipcpd: Add congestion avoidance policies
This adds congestion avoidance policies to the unicast IPCP. The default policy is a multi-bit explicit congestion avoidance algorithm based on data-center TCP congestion avoidance (DCTCP) to relay information about the maximum queue depth that packets experienced to the receiver. There's also a "nop" policy to disable congestion avoidance for testing and benchmarking purposes. The (initial) API for congestion avoidance policies is: void * (* ctx_create)(void); void (* ctx_destroy)(void * ctx); These calls create / and or destroy a context for congestion control for a specific flow. Thread-safety of the context is the responsability of the flow allocator (operations on the ctx should be performed under a lock). ca_wnd_t (* ctx_update_snd)(void * ctx, size_t len); This is the sender call to update the context, and should be called for every packet that is sent on the flow. The len parameter in this API is the packet length, which allows calculating the bandwidth. It returns an opaque union type that is used for the call to check/wait if the congestion window is open or closed (and allowing to release locks before waiting). bool (* ctx_update_rcv)(void * ctx, size_t len, uint8_t ecn, uint16_t * ece); This is the call to update the flow congestion context on the receiver side. It should be called for every received packet. It gets the ecn value from the packet and its length, and returns the ECE (explicit congestion experienced) value to be sent to the sender in case of congestion. The boolean returned signals whether or not a congestion update needs to be sent. void (* ctx_update_ece)(void * ctx, uint16_t ece); This is the call for the sending side top update the context when it receives an ECE update from the receiver. void (* wnd_wait)(ca_wnd_t wnd); This is a (blocking) call that waits for the congestion window to clear. It should be stateless (to avoid waiting under locks). This may change later on if passing the context is needed for different algorithms. uint8_t (* calc_ecn)(int fd, size_t len); This is the call that intermediate IPCPs(routers) should use to update the ECN field on passing packets. The multi-bit ECN policy bases the value for the ECN field on the depth of the rbuff queue packets will be sent on. I created another call to grab the queue depth as fccntl is write-locking the application. We can further optimize this to avoid most locking on the rbuff. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/ipcp.c4
-rw-r--r--src/ipcpd/unicast/CMakeLists.txt3
-rw-r--r--src/ipcpd/unicast/ca.c99
-rw-r--r--src/ipcpd/unicast/ca.h61
-rw-r--r--src/ipcpd/unicast/dt.c43
-rw-r--r--src/ipcpd/unicast/fa.c222
-rw-r--r--src/ipcpd/unicast/fa.h4
-rw-r--r--src/ipcpd/unicast/main.c10
-rw-r--r--src/ipcpd/unicast/pol-ca-ops.h50
-rw-r--r--src/ipcpd/unicast/pol/ca-mb-ecn.c216
-rw-r--r--src/ipcpd/unicast/pol/ca-mb-ecn.h50
-rw-r--r--src/ipcpd/unicast/pol/ca-nop.c93
-rw-r--r--src/ipcpd/unicast/pol/ca-nop.h50
-rw-r--r--src/ipcpd/unicast/pol/link_state.c6
14 files changed, 838 insertions, 73 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 95d2f783..c8b5f848 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -241,6 +241,7 @@ static void * mainloop(void * o)
conf.max_ttl = conf_msg->max_ttl;
conf.addr_auth_type = conf_msg->addr_auth_type;
conf.routing_type = conf_msg->routing_type;
+ conf.cong_avoid = conf_msg->cong_avoid;
break;
case IPCP_ETH_DIX:
conf.ethertype = conf_msg->ethertype;
@@ -261,7 +262,8 @@ static void * mainloop(void * o)
layer_info.dir_hash_algo = HASH_SHA3_256;
break;
default:
- log_err("Unknown IPCP type: %d.", conf_msg->ipcp_type);
+ log_err("Unknown IPCP type: %d.",
+ conf_msg->ipcp_type);
}
/* UDP and broadcast use fixed hash algorithm. */
diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt
index c0c55519..035ee5f4 100644
--- a/src/ipcpd/unicast/CMakeLists.txt
+++ b/src/ipcpd/unicast/CMakeLists.txt
@@ -33,6 +33,7 @@ endif ()
set(SOURCE_FILES
# Add source files here
addr_auth.c
+ ca.c
connmgr.c
dht.c
dir.c
@@ -51,6 +52,8 @@ set(SOURCE_FILES
pol/simple_pff.c
pol/alternate_pff.c
pol/multipath_pff.c
+ pol/ca-mb-ecn.c
+ pol/ca-nop.c
)
add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES}
diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c
new file mode 100644
index 00000000..f93d0504
--- /dev/null
+++ b/src/ipcpd/unicast/ca.c
@@ -0,0 +1,99 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion Avoidance
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define OUROBOROS_PREFIX "ca"
+
+#include <ouroboros/logs.h>
+
+#include "ca.h"
+#include "pol-ca-ops.h"
+#include "pol/ca-mb-ecn.h"
+#include "pol/ca-nop.h"
+
+struct {
+ struct pol_ca_ops * ops;
+} ca;
+
+int ca_init(enum pol_cong_avoid pol)
+{
+ switch(pol) {
+ case CA_NONE:
+ log_dbg("Disabling congestion control.");
+ ca.ops = &nop_ca_ops;
+ break;
+ case CA_MB_ECN:
+ log_dbg("Using multi-bit ECN.");
+ ca.ops = &mb_ecn_ca_ops;
+ break;
+ default:
+ return -1;
+ }
+
+ return 0;
+}
+
+
+void ca_fini(void)
+{
+ ca.ops = NULL;
+}
+
+void * ca_ctx_create(void)
+{
+ return ca.ops->ctx_create();
+}
+
+void ca_ctx_destroy(void * ctx)
+{
+ return ca.ops->ctx_destroy(ctx);
+}
+
+ca_wnd_t ca_ctx_update_snd(void * ctx,
+ size_t len)
+{
+ return ca.ops->ctx_update_snd(ctx, len);
+}
+
+bool ca_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece)
+{
+ return ca.ops->ctx_update_rcv(ctx, len, ecn, ece);
+}
+
+void ca_ctx_update_ece(void * ctx,
+ uint16_t ece)
+{
+ return ca.ops->ctx_update_ece(ctx, ece);
+}
+
+void ca_wnd_wait(ca_wnd_t wnd)
+{
+ return ca.ops->wnd_wait(wnd);
+}
+
+uint8_t ca_calc_ecn(int fd,
+ size_t len)
+{
+ return ca.ops->calc_ecn(fd, len);
+}
diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h
new file mode 100644
index 00000000..5cf6199c
--- /dev/null
+++ b/src/ipcpd/unicast/ca.h
@@ -0,0 +1,61 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion avoidance
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_H
+#define OUROBOROS_IPCPD_UNICAST_CA_H
+
+#include <ouroboros/ipcp.h>
+
+#include <stdbool.h>
+#include <sys/types.h>
+
+typedef union {
+ time_t wait;
+} ca_wnd_t;
+
+int ca_init(enum pol_cong_avoid ca);
+
+void ca_fini(void);
+
+
+/* OPS */
+void * ca_ctx_create(void);
+
+void ca_ctx_destroy(void * ctx);
+
+ca_wnd_t ca_ctx_update_snd(void * ctx,
+ size_t len);
+
+bool ca_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+void ca_ctx_update_ece(void * ctx,
+ uint16_t ece);
+
+void ca_wnd_wait(ca_wnd_t wnd);
+
+uint8_t ca_calc_ecn(int fd,
+ size_t len);
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_H */
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 7db766a5..53accba3 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -41,6 +41,7 @@
#include <ouroboros/fccntl.h>
#endif
+#include "ca.h"
#include "connmgr.h"
#include "ipcp.h"
#include "dt.h"
@@ -115,16 +116,12 @@ static void dt_pci_ser(uint8_t * head,
}
-static void dt_pci_des(struct shm_du_buff * sdb,
- struct dt_pci * dt_pci)
+static void dt_pci_des(uint8_t * head,
+ struct dt_pci * dt_pci)
{
- uint8_t * head;
-
- assert(sdb);
+ assert(head);
assert(dt_pci);
- head = shm_du_buff_head(sdb);
-
/* Decrease TTL */
--*(head + dt_pci_info.ttl_o);
@@ -226,7 +223,6 @@ static int dt_stat_read(const char * path,
"Queued packets (rx): %20zu\n"
"Queued packets (tx): %20zu\n\n",
tmstr, addrstr, rxqlen, txqlen);
-
for (i = 0; i < QOS_CUBE_MAX; ++i) {
sprintf(str,
"Qos cube %3d:\n"
@@ -434,13 +430,14 @@ static void packet_handler(int fd,
struct dt_pci dt_pci;
int ret;
int ofd;
-#ifndef IPCP_FLOW_STATS
- (void) fd;
-#else
+ uint8_t * head;
size_t len;
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#ifndef IPCP_FLOW_STATS
+ (void) fd;
+#else
pthread_mutex_lock(&dt.stat[fd].lock);
++dt.stat[fd].rcv_pkt[qc];
@@ -449,7 +446,10 @@ static void packet_handler(int fd,
pthread_mutex_unlock(&dt.stat[fd].lock);
#endif
memset(&dt_pci, 0, sizeof(dt_pci));
- dt_pci_des(sdb, &dt_pci);
+
+ head = shm_du_buff_head(sdb);
+
+ dt_pci_des(head, &dt_pci);
if (dt_pci.dst_addr != ipcpi.dt_addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
@@ -481,6 +481,8 @@ static void packet_handler(int fd,
return;
}
+ *(head + dt_pci_info.ecn_o) |= ca_calc_ecn(ofd, len);
+
ret = ipcp_flow_write(ofd, sdb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", ofd);
@@ -508,6 +510,9 @@ static void packet_handler(int fd,
} else {
dt_pci_shrink(sdb);
if (dt_pci.eid >= PROG_RES_FDS) {
+ uint8_t ecn = *(head + dt_pci_info.ecn_o);
+ fa_ecn_update(dt_pci.eid, ecn, len);
+
if (ipcp_flow_write(dt_pci.eid, sdb)) {
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
@@ -792,15 +797,15 @@ int dt_write_packet(uint64_t dst_addr,
int fd;
int ret;
uint8_t * head;
-#ifdef IPCP_FLOW_STATS
size_t len;
-#endif
+
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
-#ifdef IPCP_FLOW_STATS
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#ifdef IPCP_FLOW_STATS
+
pthread_mutex_lock(&dt.stat[np1_fd].lock);
++dt.stat[np1_fd].lcl_r_pkt[qc];
@@ -829,15 +834,15 @@ int dt_write_packet(uint64_t dst_addr,
goto fail_write;
}
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
dt_pci.eid = np1_fd;
- dt_pci.ecn = 0;
+ dt_pci.ecn = ca_calc_ecn(fd, len);
dt_pci_ser(head, &dt_pci);
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
+
ret = ipcp_flow_write(fd, sdb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", fd);
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index e154d785..8f268a9d 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -42,6 +42,7 @@
#include "psched.h"
#include "ipcp.h"
#include "dt.h"
+#include "ca.h"
#include <pthread.h>
#include <stdlib.h>
@@ -49,9 +50,10 @@
#define TIMEOUT 10000 /* nanoseconds */
-#define FLOW_REQ 0
-#define FLOW_REPLY 1
-#define MSGBUFSZ 2048
+#define FLOW_REQ 0
+#define FLOW_REPLY 1
+#define FLOW_UPDATE 2
+#define MSGBUFSZ 2048
struct fa_msg {
uint64_t s_addr;
@@ -59,6 +61,7 @@ struct fa_msg {
uint32_t s_eid;
uint8_t code;
int8_t response;
+ uint16_t ece;
/* QoS parameters from spec, aligned */
uint8_t availability;
uint8_t in_order;
@@ -75,10 +78,16 @@ struct cmd {
struct shm_du_buff * sdb;
};
+struct fa_flow {
+ int r_eid; /* remote endpoint id */
+ uint64_t r_addr; /* remote address */
+ void * ctx; /* congestion avoidance context */
+};
+
struct {
pthread_rwlock_t flows_lock;
- int r_eid[PROG_MAX_FLOWS];
- uint64_t r_addr[PROG_MAX_FLOWS];
+ struct fa_flow flows[PROG_MAX_FLOWS];
+
int fd;
struct list_head cmds;
@@ -93,22 +102,56 @@ static void packet_handler(int fd,
qoscube_t qc,
struct shm_du_buff * sdb)
{
- pthread_rwlock_rdlock(&fa.flows_lock);
+ struct fa_flow * flow;
+ uint64_t r_addr;
+ uint32_t r_eid;
+ ca_wnd_t wnd;
+ size_t len;
- if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) {
- pthread_rwlock_unlock(&fa.flows_lock);
+ flow = &fa.flows[fd];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+ wnd = ca_ctx_update_snd(flow->ctx, len);
+
+ r_addr = flow->r_addr;
+ r_eid = flow->r_eid;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ca_wnd_wait(wnd);
+
+ if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
log_warn("Failed to forward packet.");
return;
}
+}
- pthread_rwlock_unlock(&fa.flows_lock);
+static int fa_flow_init(struct fa_flow * flow)
+{
+ memset(flow, 0, sizeof(*flow));
+
+ flow->r_eid = -1;
+ flow->r_addr = INVALID_ADDR;
+
+ flow->ctx = ca_ctx_create();
+ if (flow->ctx == NULL)
+ return -1;
+
+ return 0;
}
-static void destroy_conn(int fd)
+static void fa_flow_fini(struct fa_flow * flow)
{
- fa.r_eid[fd] = -1;
- fa.r_addr[fd] = INVALID_ADDR;
+ ca_ctx_destroy(flow->ctx);
+
+ memset(flow, 0, sizeof(*flow));
+
+ flow->r_eid = -1;
+ flow->r_addr = INVALID_ADDR;
}
static void fa_post_packet(void * comp,
@@ -145,14 +188,15 @@ static void * fa_handle_packet(void * o)
(void) o;
while (true) {
- struct timespec abstime;
- int fd;
- uint8_t buf[MSGBUFSZ];
- struct fa_msg * msg;
- qosspec_t qs;
- struct cmd * cmd;
- size_t len;
- size_t msg_len;
+ struct timespec abstime;
+ int fd;
+ uint8_t buf[MSGBUFSZ];
+ struct fa_msg * msg;
+ qosspec_t qs;
+ struct cmd * cmd;
+ size_t len;
+ size_t msg_len;
+ struct fa_flow * flow;
pthread_mutex_lock(&fa.mtx);
@@ -232,10 +276,14 @@ static void * fa_handle_packet(void * o)
continue;
}
+ flow = &fa.flows[fd];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- fa.r_eid[fd] = ntoh32(msg->s_eid);
- fa.r_addr[fd] = ntoh64(msg->s_addr);
+ fa_flow_init(flow);
+
+ flow->r_eid = ntoh32(msg->s_eid);
+ flow->r_addr = ntoh64(msg->s_addr);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -248,19 +296,32 @@ static void * fa_handle_packet(void * o)
case FLOW_REPLY:
assert(len >= sizeof(*msg));
+ flow = &fa.flows[ntoh32(msg->r_eid)];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+ flow->r_eid = ntoh32(msg->s_eid);
+
+ if (msg->response < 0)
+ fa_flow_fini(flow);
+ else
+ psched_add(fa.psched, ntoh32(msg->r_eid));
+
+ pthread_rwlock_unlock(&fa.flows_lock);
ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
msg->response,
buf + sizeof(*msg),
len - sizeof(*msg));
+ break;
+ case FLOW_UPDATE:
+ assert(len >= sizeof(*msg));
- if (msg->response < 0)
- destroy_conn(ntoh32(msg->r_eid));
- else
- psched_add(fa.psched, ntoh32(msg->r_eid));
+ flow = &fa.flows[ntoh32(msg->r_eid)];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
pthread_rwlock_unlock(&fa.flows_lock);
@@ -275,10 +336,6 @@ static void * fa_handle_packet(void * o)
int fa_init(void)
{
pthread_condattr_t cattr;
- int i;
-
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
- destroy_conn(i);
if (pthread_rwlock_init(&fa.flows_lock, NULL))
goto fail_rwlock;
@@ -383,9 +440,10 @@ int fa_alloc(int fd,
size_t dlen)
{
struct fa_msg * msg;
- uint64_t addr;
struct shm_du_buff * sdb;
- qoscube_t qc;
+ struct fa_flow * flow;
+ uint64_t addr;
+ qoscube_t qc = QOS_CUBE_BE;
size_t len;
addr = dir_query(dst);
@@ -397,7 +455,9 @@ int fa_alloc(int fd,
if (ipcp_sdb_reserve(&sdb, len + dlen))
return -1;
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ memset(msg, 0, sizeof(*msg));
+
msg->code = FLOW_REQ;
msg->s_eid = hton32(fd);
msg->s_addr = hton64(ipcpi.dt_addr);
@@ -413,17 +473,17 @@ int fa_alloc(int fd,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
- qc = qos_spec_to_cube(qs);
-
if (dt_write_packet(addr, qc, fa.fd, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
+ flow = &fa.flows[fd];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- assert(fa.r_eid[fd] == -1);
- fa.r_addr[fd] = addr;
+ fa_flow_init(flow);
+ flow->r_addr = addr;
pthread_rwlock_unlock(&fa.flows_lock);
@@ -439,10 +499,13 @@ int fa_alloc_resp(int fd,
struct timespec abstime;
struct fa_msg * msg;
struct shm_du_buff * sdb;
- qoscube_t qc;
+ struct fa_flow * flow;
+ qoscube_t qc = QOS_CUBE_BE;
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ flow = &fa.flows[fd];
+
pthread_mutex_lock(&ipcpi.alloc_lock);
while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
@@ -463,33 +526,31 @@ int fa_alloc_resp(int fd,
pthread_mutex_unlock(&ipcpi.alloc_lock);
if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
- destroy_conn(fd);
+ fa_flow_fini(flow);
return -1;
}
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ memset(msg, 0, sizeof(*msg));
+
pthread_rwlock_wrlock(&fa.flows_lock);
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
msg->code = FLOW_REPLY;
- msg->r_eid = hton32(fa.r_eid[fd]);
+ msg->r_eid = hton32(flow->r_eid);
msg->s_eid = hton32(fd);
msg->response = response;
memcpy(msg + 1, data, len);
if (response < 0) {
- destroy_conn(fd);
+ fa_flow_fini(flow);
ipcp_sdb_release(sdb);
} else {
psched_add(fa.psched, fd);
}
- ipcp_flow_get_qoscube(fd, &qc);
-
- assert(qc >= 0 && qc < QOS_CUBE_MAX);
-
- if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) {
- destroy_conn(fd);
+ if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
return -1;
@@ -505,11 +566,11 @@ int fa_dealloc(int fd)
if (ipcp_flow_fini(fd) < 0)
return 0;
- pthread_rwlock_wrlock(&fa.flows_lock);
-
psched_del(fa.psched, fd);
- destroy_conn(fd);
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa_flow_fini(&fa.flows[fd]);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -517,3 +578,60 @@ int fa_dealloc(int fd)
return 0;
}
+
+static int fa_update_remote(int fd,
+ uint16_t ece)
+{
+ struct fa_msg * msg;
+ struct shm_du_buff * sdb;
+ qoscube_t qc = QOS_CUBE_BE;
+ struct fa_flow * flow;
+
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
+ return -1;
+ }
+
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+
+ memset(msg, 0, sizeof(*msg));
+
+ flow = &fa.flows[fd];
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ msg->code = FLOW_UPDATE;
+ msg->r_eid = hton32(flow->r_eid);
+ msg->ece = hton16(ece);
+
+ if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+
+ return 0;
+}
+
+void fa_ecn_update(int eid,
+ uint8_t ecn,
+ size_t len)
+{
+ struct fa_flow * flow;
+ bool update;
+ uint16_t ece;
+
+ flow = &fa.flows[eid];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ if (update)
+ fa_update_remote(eid, ece);
+
+}
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index 12a10a0c..daba2a51 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,4 +47,8 @@ int fa_alloc_resp(int fd,
int fa_dealloc(int fd);
+void fa_ecn_update(int eid,
+ uint8_t ecn,
+ size_t len);
+
#endif /* OUROBOROS_IPCPD_UNICAST_FA_H */
diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c
index 0ab37d25..1b2cc14e 100644
--- a/src/ipcpd/unicast/main.c
+++ b/src/ipcpd/unicast/main.c
@@ -39,6 +39,7 @@
#include <ouroboros/time_utils.h>
#include "addr_auth.h"
+#include "ca.h"
#include "connmgr.h"
#include "dir.h"
#include "dt.h"
@@ -83,6 +84,11 @@ static int initialize_components(const struct ipcp_config * conf)
log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
+ if (ca_init(conf->cong_avoid)) {
+ log_err("Failed to initialize congestion avoidance.");
+ goto fail_ca;
+ }
+
if (dt_init(conf->routing_type,
conf->addr_size,
conf->eid_size,
@@ -110,6 +116,8 @@ static int initialize_components(const struct ipcp_config * conf)
fail_fa:
dt_fini();
fail_dt:
+ ca_fini();
+ fail_ca:
addr_auth_fini();
fail_addr_auth:
free(ipcpi.layer_name);
@@ -125,6 +133,8 @@ static void finalize_components(void)
dt_fini();
+ ca_fini();
+
addr_auth_fini();
free(ipcpi.layer_name);
diff --git a/src/ipcpd/unicast/pol-ca-ops.h b/src/ipcpd/unicast/pol-ca-ops.h
new file mode 100644
index 00000000..3cb8a9d2
--- /dev/null
+++ b/src/ipcpd/unicast/pol-ca-ops.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion avoidance policy ops
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H
+#define OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H
+
+#include "ca.h"
+
+struct pol_ca_ops {
+ void * (* ctx_create)(void);
+
+ void (* ctx_destroy)(void * ctx);
+
+ ca_wnd_t (* ctx_update_snd)(void * ctx,
+ size_t len);
+
+ bool (* ctx_update_rcv)(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+ void (* ctx_update_ece)(void * ctx,
+ uint16_t ece);
+
+ void (* wnd_wait)(ca_wnd_t wnd);
+
+ uint8_t (* calc_ecn)(int fd,
+ size_t len);
+};
+
+#endif /* OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H */
diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.c b/src/ipcpd/unicast/pol/ca-mb-ecn.c
new file mode 100644
index 00000000..2de8f8e7
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-mb-ecn.c
@@ -0,0 +1,216 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Multi-bit ECN Congestion Avoidance
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200809L
+#endif
+
+#include "config.h"
+
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/time_utils.h>
+
+#include "ca-mb-ecn.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+/* congestion avoidance constants */
+#define CA_SHFT 5
+#define CA_WND (1 << CA_SHFT)
+#define CA_UPD (1 << (CA_SHFT - 3))
+#define CA_SLOT 18
+#define CA_AI 20000
+#define ECN_Q_SHFT 5
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+
+struct mb_ecn_ctx {
+ uint16_t rx_ece; /* level of congestion (upstream) */
+ size_t rx_ctr; /* receiver side packet counter */
+
+ uint16_t tx_ece; /* level of congestion (downstream) */
+ size_t tx_ctr; /* sender side packet counter */
+ size_t tx_aps; /* average packet size */
+ time_t tx_wnd; /* tgt time to send packets (ns) */
+ bool tx_cav; /* Congestion avoidance */
+ size_t tx_slot;
+
+ struct timespec t_sent; /* last sent packet */
+};
+
+struct pol_ca_ops mb_ecn_ca_ops = {
+ .ctx_create = mb_ecn_ctx_create,
+ .ctx_destroy = mb_ecn_ctx_destroy,
+ .ctx_update_snd = mb_ecn_ctx_update_snd,
+ .ctx_update_rcv = mb_ecn_ctx_update_rcv,
+ .ctx_update_ece = mb_ecn_ctx_update_ece,
+ .wnd_wait = mb_ecn_wnd_wait,
+ .calc_ecn = mb_ecn_calc_ecn
+};
+
+void * mb_ecn_ctx_create(void)
+{
+
+ struct mb_ecn_ctx * ctx;
+
+ ctx = malloc(sizeof(*ctx));
+ if (ctx == NULL)
+ return NULL;
+
+ memset(ctx, 0, sizeof(*ctx));
+
+ return (void *) ctx;
+}
+
+void mb_ecn_ctx_destroy(void * ctx)
+{
+ free(ctx);
+}
+
+ca_wnd_t mb_ecn_ctx_update_snd(void * _ctx,
+ size_t len)
+{
+ struct timespec now;
+ size_t slot;
+ time_t gap;
+ ca_wnd_t wnd;
+
+ struct mb_ecn_ctx * ctx = _ctx;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ if (ctx->tx_wnd == 0) { /* 10 ms initial window estimate */
+ ctx->tx_wnd = 10 * MILLION;
+ gap = ctx->tx_wnd >> CA_SHFT;
+ ctx->tx_aps = len >> CA_SHFT;
+ ctx->tx_slot = ts_to_ns(now) >> CA_SLOT;
+ } else {
+ gap = ts_diff_ns(&ctx->t_sent, &now);
+ ctx->tx_aps -= ctx->tx_aps >> CA_SHFT;
+ ctx->tx_aps += len;
+ }
+
+ ctx->t_sent = now;
+
+ slot = ts_to_ns(now) >> CA_SLOT;
+
+ ctx->tx_ctr++;
+
+ if (slot - ctx->tx_slot > 0) {
+ ctx->tx_slot = slot;
+
+ if (ctx->tx_ctr > CA_WND)
+ ctx->tx_ece = 0;
+
+ /* Slow start */
+ if (!ctx->tx_cav) {
+ ctx->tx_wnd >>= 1;
+ /* Multiplicative Decrease */
+ } else if (ctx->tx_ece) { /* MD */
+ ctx->tx_wnd += (ctx->tx_wnd * ctx->tx_ece)
+ >> (CA_SHFT + 8);
+ /* Additive Increase */
+ } else {
+ size_t bw = ctx->tx_aps * BILLION / ctx->tx_wnd;
+ bw += CA_AI;
+ ctx->tx_wnd = ctx->tx_aps * BILLION / bw;
+ }
+ }
+
+ wnd.wait = (ctx->tx_wnd >> CA_SHFT) - gap;
+
+ return wnd;
+}
+
+void mb_ecn_wnd_wait(ca_wnd_t wnd)
+{
+ if (wnd.wait > 0) {
+ struct timespec s = {0, 0};
+ if (wnd.wait > BILLION) /* Don't care throttling < 1pps */
+ s.tv_sec = 1;
+ else
+ s.tv_nsec = wnd.wait;
+
+ nanosleep(&s, NULL);
+ }
+}
+
+bool mb_ecn_ctx_update_rcv(void * _ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece)
+{
+ struct mb_ecn_ctx* ctx = _ctx;
+ bool update;
+
+ (void) len;
+
+ if ((ctx->rx_ece | ecn) == 0)
+ return false;
+
+ if (ecn == 0) {
+ /* end of congestion */
+ ctx->rx_ece >>= 2;
+ update = ctx->rx_ece == 0;
+ } else {
+ if (ctx->rx_ece == 0) {
+ /* start of congestion */
+ ctx->rx_ece = ecn;
+ ctx->rx_ctr = 0;
+ update = true;
+ } else {
+ /* congestion update */
+ ctx->rx_ece -= ctx->rx_ece >> CA_SHFT;
+ ctx->rx_ece += ecn;
+ update = (ctx->rx_ctr++ & (CA_UPD - 1)) == true;
+ }
+ }
+
+ *ece = ctx->rx_ece;
+
+ return update;
+}
+
+
+void mb_ecn_ctx_update_ece(void * _ctx,
+ uint16_t ece)
+{
+ struct mb_ecn_ctx* ctx = _ctx;
+
+ ctx->tx_ece = ece;
+ ctx->tx_ctr = 0;
+ ctx->tx_cav = true;
+}
+
+uint8_t mb_ecn_calc_ecn(int fd,
+ size_t len)
+{
+ size_t q;
+
+ (void) len;
+
+ q = ipcp_flow_queued(fd);
+
+ return (uint8_t) (q >> ECN_Q_SHFT);
+}
diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.h b/src/ipcpd/unicast/pol/ca-mb-ecn.h
new file mode 100644
index 00000000..456b9b13
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-mb-ecn.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Multi-bit ECN Congestion Avoidance
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H
+#define OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H
+
+#include "pol-ca-ops.h"
+
+void * mb_ecn_ctx_create(void);
+
+void mb_ecn_ctx_destroy(void * ctx);
+
+ca_wnd_t mb_ecn_ctx_update_snd(void * ctx,
+ size_t len);
+
+bool mb_ecn_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+void mb_ecn_ctx_update_ece(void * ctx,
+ uint16_t ece);
+
+void mb_ecn_wnd_wait(ca_wnd_t wnd);
+
+uint8_t mb_ecn_calc_ecn(int fd,
+ size_t len);
+
+extern struct pol_ca_ops mb_ecn_ca_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H */
diff --git a/src/ipcpd/unicast/pol/ca-nop.c b/src/ipcpd/unicast/pol/ca-nop.c
new file mode 100644
index 00000000..d0d89a2e
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-nop.c
@@ -0,0 +1,93 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Dummy Congestion Avoidance
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include "ca-nop.h"
+
+#include <string.h>
+
+struct pol_ca_ops nop_ca_ops = {
+ .ctx_create = nop_ctx_create,
+ .ctx_destroy = nop_ctx_destroy,
+ .ctx_update_snd = nop_ctx_update_snd,
+ .ctx_update_rcv = nop_ctx_update_rcv,
+ .ctx_update_ece = nop_ctx_update_ece,
+ .wnd_wait = nop_wnd_wait,
+ .calc_ecn = nop_calc_ecn
+};
+
+void * nop_ctx_create(void)
+{
+ return (void *) 1;
+}
+
+void nop_ctx_destroy(void * ctx)
+{
+ (void) ctx;
+}
+
+ca_wnd_t nop_ctx_update_snd(void * ctx,
+ size_t len)
+{
+ ca_wnd_t wnd;
+
+ (void) ctx;
+ (void) len;
+
+ memset(&wnd, 0, sizeof(wnd));
+
+ return wnd;
+}
+
+void nop_wnd_wait(ca_wnd_t wnd)
+{
+ (void) wnd;
+}
+
+bool nop_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece)
+{
+ (void) ctx;
+ (void) len;
+ (void) ecn;
+ (void) ece;
+
+ return false;
+}
+
+void nop_ctx_update_ece(void * ctx,
+ uint16_t ece)
+{
+ (void) ctx;
+ (void) ece;
+}
+
+
+uint8_t nop_calc_ecn(int fd,
+ size_t len)
+{
+ (void) fd;
+ (void) len;
+
+ return 0;
+}
diff --git a/src/ipcpd/unicast/pol/ca-nop.h b/src/ipcpd/unicast/pol/ca-nop.h
new file mode 100644
index 00000000..baf649d8
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-nop.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Dummy Congestion Avoidance
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_NOP_H
+#define OUROBOROS_IPCPD_UNICAST_CA_NOP_H
+
+#include "pol-ca-ops.h"
+
+void * nop_ctx_create(void);
+
+void nop_ctx_destroy(void * ctx);
+
+ca_wnd_t nop_ctx_update_snd(void * ctx,
+ size_t len);
+
+bool nop_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+void nop_ctx_update_ece(void * ctx,
+ uint16_t ece);
+
+void nop_wnd_wait(ca_wnd_t wnd);
+
+uint8_t nop_calc_ecn(int fd,
+ size_t len);
+
+extern struct pol_ca_ops nop_ca_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_NOP_H */
diff --git a/src/ipcpd/unicast/pol/link_state.c b/src/ipcpd/unicast/pol/link_state.c
index d9482876..ca8a7c50 100644
--- a/src/ipcpd/unicast/pol/link_state.c
+++ b/src/ipcpd/unicast/pol/link_state.c
@@ -812,8 +812,12 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
pthread_rwlock_rdlock(&ls.db_lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock,
+ (void *) &ls.db_lock);
+
send_lsm(ipcpi.dt_addr, c->conn_info.addr, 0);
- pthread_rwlock_unlock(&ls.db_lock);
+ pthread_cleanup_pop(true);
if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT))
log_dbg("Failed to add neighbor to LSDB.");