summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-02-25 17:34:29 +0100
committerSander Vrijders <sander@ouroboros.rocks>2022-03-03 12:00:54 +0100
commitf5d642a06f9c1a58197313b32f6b213a152e446f (patch)
tree19ec9813f2d83fae986ff2bddbf5511c5b7662da
parentdb5e9bf4f884097ec919aa40b02d8eafab05cfa8 (diff)
downloadouroboros-f5d642a06f9c1a58197313b32f6b213a152e446f.tar.gz
ouroboros-f5d642a06f9c1a58197313b32f6b213a152e446f.zip
lib: Make flow liveness timeout configurable
The qosspec_t now has a timeout value that sets the timeout value of the flow. Flows with a peer that has timed out will now return -EFLOWPEER on flow_read() or flow_write(). Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--doc/man/flow_read.33
-rw-r--r--include/ouroboros/errno.h5
-rw-r--r--include/ouroboros/fqueue.h3
-rw-r--r--include/ouroboros/qos.h52
-rw-r--r--src/ipcpd/eth/eth.c3
-rw-r--r--src/ipcpd/udp/main.c4
-rw-r--r--src/ipcpd/unicast/fa.c7
-rw-r--r--src/irmd/main.c2
-rw-r--r--src/lib/dev.c66
-rw-r--r--src/lib/qosspec.proto17
-rw-r--r--src/lib/sockets.c2
11 files changed, 111 insertions, 53 deletions
diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3
index 99f96544..e41ee374 100644
--- a/doc/man/flow_read.3
+++ b/doc/man/flow_read.3
@@ -58,6 +58,9 @@ The flow was not allocated.
.B -EFLOWDOWN
The flow has been reported down.
+.B -EFLOWPEER
+The flow's peer is unresponsive (flow timed out).
+
.B -EMSGSIZE
The buffer was too large to be written.
diff --git a/include/ouroboros/errno.h b/include/ouroboros/errno.h
index 06f33bef..b9f8dbc0 100644
--- a/include/ouroboros/errno.h
+++ b/include/ouroboros/errno.h
@@ -31,7 +31,8 @@
#define EIPCP 1003 /* Failed to communicate with IPCP */
#define EIPCPSTATE 1004 /* Target in wrong state */
#define EFLOWDOWN 1005 /* Flow is down */
-#define ECRYPT 1006 /* Encryption error */
-#define ENAME 1007 /* Naming error */
+#define EFLOWPEER 1006 /* Flow is down (peer timed out) */
+#define ECRYPT 1007 /* Encryption error */
+#define ENAME 1008 /* Naming error */
#endif /* OUROBOROS_ERRNO_H */
diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h
index f6828a4d..3c0ebf90 100644
--- a/include/ouroboros/fqueue.h
+++ b/include/ouroboros/fqueue.h
@@ -33,7 +33,8 @@ enum fqtype {
FLOW_DOWN = (1 << 1),
FLOW_UP = (1 << 2),
FLOW_ALLOC = (1 << 3),
- FLOW_DEALLOC = (1 << 4)
+ FLOW_DEALLOC = (1 << 4),
+ FLOW_PEER = (1 << 5)
};
struct flow_set;
diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h
index 6391347a..b6b945d9 100644
--- a/include/ouroboros/qos.h
+++ b/include/ouroboros/qos.h
@@ -26,15 +26,18 @@
#include <stdint.h>
#include <stdbool.h>
+#define DEFAULT_PEER_TIMEOUT 120000
+
typedef struct qos_spec {
- uint32_t delay; /* In ms */
- uint64_t bandwidth; /* In bits/s */
- uint8_t availability; /* Class of 9s */
- uint32_t loss; /* Packet loss */
- uint32_t ber; /* Bit error rate, errors per billion bits */
- uint8_t in_order; /* In-order delivery, enables FRCT */
- uint32_t max_gap; /* In ms */
- uint16_t cypher_s; /* Cypher strength, 0 = no encryption */
+ uint32_t delay; /* In ms. */
+ uint64_t bandwidth; /* In bits/s. */
+ uint8_t availability; /* Class of 9s. */
+ uint32_t loss; /* Packet loss. */
+ uint32_t ber; /* Bit error rate, errors per billion bits. */
+ uint8_t in_order; /* In-order delivery, enables FRCT. */
+ uint32_t max_gap; /* In ms. */
+ uint16_t cypher_s; /* Cypher strength (bits), 0 = no encryption. */
+ uint32_t timeout; /* Peer timeout time, in ms, 0 = no timeout. */
} qosspec_t;
static const qosspec_t qos_raw = {
@@ -45,7 +48,8 @@ static const qosspec_t qos_raw = {
.ber = 1,
.in_order = 0,
.max_gap = UINT32_MAX,
- .cypher_s = 0
+ .cypher_s = 0,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_raw_no_errors = {
@@ -56,7 +60,8 @@ static const qosspec_t qos_raw_no_errors = {
.ber = 0,
.in_order = 0,
.max_gap = UINT32_MAX,
- .cypher_s = 0
+ .cypher_s = 0,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_raw_crypt = {
@@ -67,7 +72,8 @@ static const qosspec_t qos_raw_crypt = {
.ber = 0,
.in_order = 0,
.max_gap = UINT32_MAX,
- .cypher_s = 256
+ .cypher_s = 256,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_best_effort = {
@@ -78,7 +84,8 @@ static const qosspec_t qos_best_effort = {
.ber = 0,
.in_order = 1,
.max_gap = UINT32_MAX,
- .cypher_s = 0
+ .cypher_s = 0,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_best_effort_crypt = {
@@ -89,7 +96,8 @@ static const qosspec_t qos_best_effort_crypt = {
.ber = 0,
.in_order = 1,
.max_gap = UINT32_MAX,
- .cypher_s = 256
+ .cypher_s = 256,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_video = {
@@ -100,7 +108,8 @@ static const qosspec_t qos_video = {
.ber = 0,
.in_order = 1,
.max_gap = 100,
- .cypher_s = 0
+ .cypher_s = 0,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_video_crypt = {
@@ -111,7 +120,8 @@ static const qosspec_t qos_video_crypt = {
.ber = 0,
.in_order = 1,
.max_gap = 100,
- .cypher_s = 256
+ .cypher_s = 256,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_voice = {
@@ -122,7 +132,8 @@ static const qosspec_t qos_voice = {
.ber = 0,
.in_order = 1,
.max_gap = 50,
- .cypher_s = 0
+ .cypher_s = 0,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_voice_crypt = {
@@ -133,7 +144,8 @@ static const qosspec_t qos_voice_crypt = {
.ber = 0,
.in_order = 1,
.max_gap = 50,
- .cypher_s = 256
+ .cypher_s = 256,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_data = {
@@ -144,7 +156,8 @@ static const qosspec_t qos_data = {
.ber = 0,
.in_order = 1,
.max_gap = 2000,
- .cypher_s = 0
+ .cypher_s = 0,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
static const qosspec_t qos_data_crypt = {
@@ -155,7 +168,8 @@ static const qosspec_t qos_data_crypt = {
.ber = 0,
.in_order = 1,
.max_gap = 2000,
- .cypher_s = 256
+ .cypher_s = 256,
+ .timeout = DEFAULT_PEER_TIMEOUT
};
#endif /* OUROBOROS_QOS_H */
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index e22dd7bc..8b34d303 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -164,6 +164,7 @@ struct mgmt_msg {
uint32_t ber;
uint32_t max_gap;
uint32_t delay;
+ uint32_t timeout;
uint16_t cypher_s;
uint8_t in_order;
#if defined (BUILD_ETH_DIX)
@@ -492,6 +493,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, hash, ipcp_dir_hash_len());
memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen);
@@ -753,6 +755,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
qs.in_order = msg->in_order;
qs.max_gap = ntoh32(msg->max_gap);
qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
if (shim_data_reg_has(eth_data.shim_data,
buf + sizeof(*msg))) {
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index b9f97e74..5c57e6b8 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -98,7 +98,9 @@ struct mgmt_msg {
uint32_t loss;
uint32_t ber;
uint32_t max_gap;
+ uint32_t timeout;
uint16_t cypher_s;
+
} __attribute__((packed));
struct mgmt_frame {
@@ -217,6 +219,7 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(buf + len, data, dlen);
@@ -375,6 +378,7 @@ static int ipcp_udp_mgmt_frame(const uint8_t * buf,
qs.in_order = msg->in_order;
qs.max_gap = ntoh32(msg->max_gap);
qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid),
(uint8_t *) (msg + 1), qs,
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index dcc79031..d59b9760 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -72,14 +72,15 @@ struct fa_msg {
int8_t response;
uint16_t ece;
/* QoS parameters from spec, aligned */
- uint8_t availability;
- uint8_t in_order;
uint32_t delay;
uint64_t bandwidth;
uint32_t loss;
uint32_t ber;
uint32_t max_gap;
+ uint32_t timeout;
uint16_t cypher_s;
+ uint8_t availability;
+ uint8_t in_order;
} __attribute__((packed));
struct cmd {
@@ -569,6 +570,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qs.in_order = msg->in_order;
qs.max_gap = ntoh32(msg->max_gap);
qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
if (fd < 0)
@@ -840,6 +842,7 @@ int fa_alloc(int fd,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index f83e8e1e..a3acc78a 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -648,7 +648,7 @@ static int connect_ipcp(pid_t pid,
log_dbg("Connecting %s to %s.", component, dst);
if (ipcp_connect(pid, dst, component, qs)) {
- log_err("Could not connect IPCP.");
+ log_err("Could not connect IPCP %d to %s.", pid, dst);
return -EPERM;
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 4c21fcdf..5c57a538 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -68,7 +68,6 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
-#define FLOWTIMEO 120 /* seconds */
enum port_state {
PORT_NULL = 0,
@@ -123,7 +122,8 @@ struct flow_set_entry {
struct flow_set {
size_t idx;
- struct timespec chk; /* Last keepalive check */
+ struct timespec chk; /* Last keepalive check. */
+ uint32_t min; /* Minimum keepalive time in set. */
struct list_head flows;
pthread_rwlock_t lock;
@@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd)
struct timespec r_act;
struct flow * flow;
int flow_id;
+ uint32_t timeo;
flow = &ai.flows[fd];
@@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd)
r_act = flow->rcv_act;
flow_id = flow->flow_id;
+ timeo = flow->qs.timeout;
pthread_rwlock_unlock(&ai.lock);
- if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) {
- shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT);
- return -EFLOWDOWN;
+ if (timeo == 0)
+ return 0;
+
+ if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+ shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+ return -EFLOWPEER;
}
- if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION)
+ if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION)
flow_send_keepalive(fd);
return 0;
@@ -1140,7 +1145,7 @@ ssize_t flow_write(int fd,
return -ETIMEDOUT;
if (flow_keepalive(fd))
- return -EFLOWDOWN;
+ return -EFLOWPEER;
frcti_tick(flow->frcti);
@@ -1165,14 +1170,16 @@ ssize_t flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)
- goto enomem;
+ if (count > 0) {
+ if (frcti_snd(flow->frcti, sdb) < 0)
+ goto enomem;
- if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
- goto enomem;
+ if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
+ goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
- goto enomem;
+ if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ goto enomem;
+ }
pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
@@ -1263,7 +1270,7 @@ ssize_t flow_read(int fd,
return -ETIMEDOUT;
if (flow_keepalive(fd) < 0)
- return -EFLOWDOWN;
+ return -EFLOWPEER;
ts_add(&tictime, &tic, &tictime);
@@ -1360,6 +1367,7 @@ struct flow_set * fset_create()
goto fail_bmp_alloc;
set->chk = now;
+ set->min = UINT32_MAX;
pthread_rwlock_unlock(&ai.lock);
@@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set)
int fset_add(struct flow_set * set,
int fd)
{
+ struct flow * flow;
struct flow_set_entry * fse;
int ret;
size_t packets;
@@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set,
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return -EINVAL;
+ flow = &ai.flows[fd];
+
fse = malloc(sizeof(*fse));
if (fse == NULL)
return -ENOMEM;
- pthread_rwlock_wrlock(&ai.lock);
+ fse->fd = fd;
- if (ai.flows[fd].flow_id < 0) {
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->flow_id < 0) {
ret = -EINVAL;
goto fail;
}
@@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set,
pthread_rwlock_wrlock(&set->lock);
+ if (flow->qs.timeout != 0 && flow->qs.timeout < set->min)
+ set->min = flow->qs.timeout;
+
list_add_tail(&fse->next, &set->flows);
pthread_rwlock_unlock(&set->lock);
@@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set,
{
struct list_head * p;
struct list_head * h;
+ struct flow * flow;
+ uint32_t min;
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return;
+ flow = &ai.flows[fd];
+
+ min = UINT32_MAX;
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ if (flow->flow_id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
pthread_rwlock_wrlock(&set->lock);
@@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set,
if (e->fd == fd) {
list_del(&e->next);
free(e);
- break;
+ } else {
+ if (flow->qs.timeout != 0 && flow->qs.timeout < min)
+ min = flow->qs.timeout;
}
}
+ set->min = min;
+
pthread_rwlock_unlock(&set->lock);
pthread_rwlock_unlock(&ai.lock);
@@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set)
pthread_rwlock_wrlock(&set->lock);
- if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) {
+ if (ts_diff_ns(&now, &set->chk) < set->min >> 2) {
pthread_rwlock_unlock(&set->lock);
return;
}
diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto
index 8a355363..3ceedd87 100644
--- a/src/lib/qosspec.proto
+++ b/src/lib/qosspec.proto
@@ -23,12 +23,13 @@
syntax = "proto2";
message qosspec_msg {
- required uint32 delay = 1; /* In ms */
- required uint64 bandwidth = 2; /* In bits/s */
- required uint32 availability = 3; /* Class of 9s */
- required uint32 loss = 4; /* Packet loss */
- required uint32 ber = 5; /* Bit error rate, ppb */
- required uint32 in_order = 6; /* In-order delivery */
- required uint32 max_gap = 7; /* In ms */
- required uint32 cypher_s = 8; /* Crypto strength in bits */
+ required uint32 delay = 1; /* In ms. */
+ required uint64 bandwidth = 2; /* In bits/s. */
+ required uint32 availability = 3; /* Class of 9s. */
+ required uint32 loss = 4; /* Packet loss. */
+ required uint32 ber = 5; /* Bit error rate, ppb. */
+ required uint32 in_order = 6; /* In-order delivery. */
+ required uint32 max_gap = 7; /* In ms. */
+ required uint32 cypher_s = 8; /* Crypto strength in bits. */
+ required uint32 timeout = 9; /* Timeout in ms. */
};
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 8179d2b3..48e95121 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs)
msg.in_order = spec.in_order;
msg.max_gap = spec.max_gap;
msg.cypher_s = spec.cypher_s;
+ msg.timeout = spec.timeout;
return msg;
}
@@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg)
spec.in_order = msg->in_order;
spec.max_gap = msg->max_gap;
spec.cypher_s = msg->cypher_s;
+ spec.timeout = msg->timeout;
return spec;
}