From f5d642a06f9c1a58197313b32f6b213a152e446f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 25 Feb 2022 17:34:29 +0100 Subject: lib: Make flow liveness timeout configurable The qosspec_t now has a timeout value that sets the timeout value of the flow. Flows with a peer that has timed out will now return -EFLOWPEER on flow_read() or flow_write(). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- doc/man/flow_read.3 | 3 +++ include/ouroboros/errno.h | 5 ++-- include/ouroboros/fqueue.h | 3 ++- include/ouroboros/qos.h | 52 +++++++++++++++++++++++------------- src/ipcpd/eth/eth.c | 3 +++ src/ipcpd/udp/main.c | 4 +++ src/ipcpd/unicast/fa.c | 7 +++-- src/irmd/main.c | 2 +- src/lib/dev.c | 66 ++++++++++++++++++++++++++++++++-------------- src/lib/qosspec.proto | 17 ++++++------ src/lib/sockets.c | 2 ++ 11 files changed, 111 insertions(+), 53 deletions(-) diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3 index 99f96544..e41ee374 100644 --- a/doc/man/flow_read.3 +++ b/doc/man/flow_read.3 @@ -58,6 +58,9 @@ The flow was not allocated. .B -EFLOWDOWN The flow has been reported down. +.B -EFLOWPEER +The flow's peer is unresponsive (flow timed out). + .B -EMSGSIZE The buffer was too large to be written. diff --git a/include/ouroboros/errno.h b/include/ouroboros/errno.h index 06f33bef..b9f8dbc0 100644 --- a/include/ouroboros/errno.h +++ b/include/ouroboros/errno.h @@ -31,7 +31,8 @@ #define EIPCP 1003 /* Failed to communicate with IPCP */ #define EIPCPSTATE 1004 /* Target in wrong state */ #define EFLOWDOWN 1005 /* Flow is down */ -#define ECRYPT 1006 /* Encryption error */ -#define ENAME 1007 /* Naming error */ +#define EFLOWPEER 1006 /* Flow is down (peer timed out) */ +#define ECRYPT 1007 /* Encryption error */ +#define ENAME 1008 /* Naming error */ #endif /* OUROBOROS_ERRNO_H */ diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h index f6828a4d..3c0ebf90 100644 --- a/include/ouroboros/fqueue.h +++ b/include/ouroboros/fqueue.h @@ -33,7 +33,8 @@ enum fqtype { FLOW_DOWN = (1 << 1), FLOW_UP = (1 << 2), FLOW_ALLOC = (1 << 3), - FLOW_DEALLOC = (1 << 4) + FLOW_DEALLOC = (1 << 4), + FLOW_PEER = (1 << 5) }; struct flow_set; diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h index 6391347a..b6b945d9 100644 --- a/include/ouroboros/qos.h +++ b/include/ouroboros/qos.h @@ -26,15 +26,18 @@ #include #include +#define DEFAULT_PEER_TIMEOUT 120000 + typedef struct qos_spec { - uint32_t delay; /* In ms */ - uint64_t bandwidth; /* In bits/s */ - uint8_t availability; /* Class of 9s */ - uint32_t loss; /* Packet loss */ - uint32_t ber; /* Bit error rate, errors per billion bits */ - uint8_t in_order; /* In-order delivery, enables FRCT */ - uint32_t max_gap; /* In ms */ - uint16_t cypher_s; /* Cypher strength, 0 = no encryption */ + uint32_t delay; /* In ms. */ + uint64_t bandwidth; /* In bits/s. */ + uint8_t availability; /* Class of 9s. */ + uint32_t loss; /* Packet loss. */ + uint32_t ber; /* Bit error rate, errors per billion bits. */ + uint8_t in_order; /* In-order delivery, enables FRCT. */ + uint32_t max_gap; /* In ms. */ + uint16_t cypher_s; /* Cypher strength (bits), 0 = no encryption. */ + uint32_t timeout; /* Peer timeout time, in ms, 0 = no timeout. */ } qosspec_t; static const qosspec_t qos_raw = { @@ -45,7 +48,8 @@ static const qosspec_t qos_raw = { .ber = 1, .in_order = 0, .max_gap = UINT32_MAX, - .cypher_s = 0 + .cypher_s = 0, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_raw_no_errors = { @@ -56,7 +60,8 @@ static const qosspec_t qos_raw_no_errors = { .ber = 0, .in_order = 0, .max_gap = UINT32_MAX, - .cypher_s = 0 + .cypher_s = 0, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_raw_crypt = { @@ -67,7 +72,8 @@ static const qosspec_t qos_raw_crypt = { .ber = 0, .in_order = 0, .max_gap = UINT32_MAX, - .cypher_s = 256 + .cypher_s = 256, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_best_effort = { @@ -78,7 +84,8 @@ static const qosspec_t qos_best_effort = { .ber = 0, .in_order = 1, .max_gap = UINT32_MAX, - .cypher_s = 0 + .cypher_s = 0, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_best_effort_crypt = { @@ -89,7 +96,8 @@ static const qosspec_t qos_best_effort_crypt = { .ber = 0, .in_order = 1, .max_gap = UINT32_MAX, - .cypher_s = 256 + .cypher_s = 256, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_video = { @@ -100,7 +108,8 @@ static const qosspec_t qos_video = { .ber = 0, .in_order = 1, .max_gap = 100, - .cypher_s = 0 + .cypher_s = 0, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_video_crypt = { @@ -111,7 +120,8 @@ static const qosspec_t qos_video_crypt = { .ber = 0, .in_order = 1, .max_gap = 100, - .cypher_s = 256 + .cypher_s = 256, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_voice = { @@ -122,7 +132,8 @@ static const qosspec_t qos_voice = { .ber = 0, .in_order = 1, .max_gap = 50, - .cypher_s = 0 + .cypher_s = 0, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_voice_crypt = { @@ -133,7 +144,8 @@ static const qosspec_t qos_voice_crypt = { .ber = 0, .in_order = 1, .max_gap = 50, - .cypher_s = 256 + .cypher_s = 256, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_data = { @@ -144,7 +156,8 @@ static const qosspec_t qos_data = { .ber = 0, .in_order = 1, .max_gap = 2000, - .cypher_s = 0 + .cypher_s = 0, + .timeout = DEFAULT_PEER_TIMEOUT }; static const qosspec_t qos_data_crypt = { @@ -155,7 +168,8 @@ static const qosspec_t qos_data_crypt = { .ber = 0, .in_order = 1, .max_gap = 2000, - .cypher_s = 256 + .cypher_s = 256, + .timeout = DEFAULT_PEER_TIMEOUT }; #endif /* OUROBOROS_QOS_H */ diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index e22dd7bc..8b34d303 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -164,6 +164,7 @@ struct mgmt_msg { uint32_t ber; uint32_t max_gap; uint32_t delay; + uint32_t timeout; uint16_t cypher_s; uint8_t in_order; #if defined (BUILD_ETH_DIX) @@ -492,6 +493,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, hash, ipcp_dir_hash_len()); memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen); @@ -753,6 +755,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, qs.in_order = msg->in_order; qs.max_gap = ntoh32(msg->max_gap); qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index b9f97e74..5c57e6b8 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -98,7 +98,9 @@ struct mgmt_msg { uint32_t loss; uint32_t ber; uint32_t max_gap; + uint32_t timeout; uint16_t cypher_s; + } __attribute__((packed)); struct mgmt_frame { @@ -217,6 +219,7 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(buf + len, data, dlen); @@ -375,6 +378,7 @@ static int ipcp_udp_mgmt_frame(const uint8_t * buf, qs.in_order = msg->in_order; qs.max_gap = ntoh32(msg->max_gap); qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid), (uint8_t *) (msg + 1), qs, diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index dcc79031..d59b9760 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -72,14 +72,15 @@ struct fa_msg { int8_t response; uint16_t ece; /* QoS parameters from spec, aligned */ - uint8_t availability; - uint8_t in_order; uint32_t delay; uint64_t bandwidth; uint32_t loss; uint32_t ber; uint32_t max_gap; + uint32_t timeout; uint16_t cypher_s; + uint8_t availability; + uint8_t in_order; } __attribute__((packed)); struct cmd { @@ -569,6 +570,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qs.in_order = msg->in_order; qs.max_gap = ntoh32(msg->max_gap); qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen); if (fd < 0) @@ -840,6 +842,7 @@ int fa_alloc(int fd, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(shm_du_buff_head(sdb) + len, data, dlen); diff --git a/src/irmd/main.c b/src/irmd/main.c index f83e8e1e..a3acc78a 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -648,7 +648,7 @@ static int connect_ipcp(pid_t pid, log_dbg("Connecting %s to %s.", component, dst); if (ipcp_connect(pid, dst, component, qs)) { - log_err("Could not connect IPCP."); + log_err("Could not connect IPCP %d to %s.", pid, dst); return -EPERM; } diff --git a/src/lib/dev.c b/src/lib/dev.c index 4c21fcdf..5c57a538 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -68,7 +68,6 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 -#define FLOWTIMEO 120 /* seconds */ enum port_state { PORT_NULL = 0, @@ -123,7 +122,8 @@ struct flow_set_entry { struct flow_set { size_t idx; - struct timespec chk; /* Last keepalive check */ + struct timespec chk; /* Last keepalive check. */ + uint32_t min; /* Minimum keepalive time in set. */ struct list_head flows; pthread_rwlock_t lock; @@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd) struct timespec r_act; struct flow * flow; int flow_id; + uint32_t timeo; flow = &ai.flows[fd]; @@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd) r_act = flow->rcv_act; flow_id = flow->flow_id; + timeo = flow->qs.timeout; pthread_rwlock_unlock(&ai.lock); - if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) { - shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT); - return -EFLOWDOWN; + if (timeo == 0) + return 0; + + if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { + shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + return -EFLOWPEER; } - if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION) + if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION) flow_send_keepalive(fd); return 0; @@ -1140,7 +1145,7 @@ ssize_t flow_write(int fd, return -ETIMEDOUT; if (flow_keepalive(fd)) - return -EFLOWDOWN; + return -EFLOWPEER; frcti_tick(flow->frcti); @@ -1165,14 +1170,16 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (count != 0 && frcti_snd(flow->frcti, sdb) < 0) - goto enomem; + if (count > 0) { + if (frcti_snd(flow->frcti, sdb) < 0) + goto enomem; - if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) - goto enomem; + if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) + goto enomem; - if (flow->qs.ber == 0 && add_crc(sdb) != 0) - goto enomem; + if (flow->qs.ber == 0 && add_crc(sdb) != 0) + goto enomem; + } pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); @@ -1263,7 +1270,7 @@ ssize_t flow_read(int fd, return -ETIMEDOUT; if (flow_keepalive(fd) < 0) - return -EFLOWDOWN; + return -EFLOWPEER; ts_add(&tictime, &tic, &tictime); @@ -1360,6 +1367,7 @@ struct flow_set * fset_create() goto fail_bmp_alloc; set->chk = now; + set->min = UINT32_MAX; pthread_rwlock_unlock(&ai.lock); @@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { + struct flow * flow; struct flow_set_entry * fse; int ret; size_t packets; @@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set, if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return -EINVAL; + flow = &ai.flows[fd]; + fse = malloc(sizeof(*fse)); if (fse == NULL) return -ENOMEM; - pthread_rwlock_wrlock(&ai.lock); + fse->fd = fd; - if (ai.flows[fd].flow_id < 0) { + pthread_rwlock_rdlock(&ai.lock); + + if (flow->flow_id < 0) { ret = -EINVAL; goto fail; } @@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&set->lock); + if (flow->qs.timeout != 0 && flow->qs.timeout < set->min) + set->min = flow->qs.timeout; + list_add_tail(&fse->next, &set->flows); pthread_rwlock_unlock(&set->lock); @@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set, { struct list_head * p; struct list_head * h; + struct flow * flow; + uint32_t min; if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return; + flow = &ai.flows[fd]; + + min = UINT32_MAX; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id); + if (flow->flow_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); pthread_rwlock_wrlock(&set->lock); @@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set, if (e->fd == fd) { list_del(&e->next); free(e); - break; + } else { + if (flow->qs.timeout != 0 && flow->qs.timeout < min) + min = flow->qs.timeout; } } + set->min = min; + pthread_rwlock_unlock(&set->lock); pthread_rwlock_unlock(&ai.lock); @@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set) pthread_rwlock_wrlock(&set->lock); - if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) { + if (ts_diff_ns(&now, &set->chk) < set->min >> 2) { pthread_rwlock_unlock(&set->lock); return; } diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto index 8a355363..3ceedd87 100644 --- a/src/lib/qosspec.proto +++ b/src/lib/qosspec.proto @@ -23,12 +23,13 @@ syntax = "proto2"; message qosspec_msg { - required uint32 delay = 1; /* In ms */ - required uint64 bandwidth = 2; /* In bits/s */ - required uint32 availability = 3; /* Class of 9s */ - required uint32 loss = 4; /* Packet loss */ - required uint32 ber = 5; /* Bit error rate, ppb */ - required uint32 in_order = 6; /* In-order delivery */ - required uint32 max_gap = 7; /* In ms */ - required uint32 cypher_s = 8; /* Crypto strength in bits */ + required uint32 delay = 1; /* In ms. */ + required uint64 bandwidth = 2; /* In bits/s. */ + required uint32 availability = 3; /* Class of 9s. */ + required uint32 loss = 4; /* Packet loss. */ + required uint32 ber = 5; /* Bit error rate, ppb. */ + required uint32 in_order = 6; /* In-order delivery. */ + required uint32 max_gap = 7; /* In ms. */ + required uint32 cypher_s = 8; /* Crypto strength in bits. */ + required uint32 timeout = 9; /* Timeout in ms. */ }; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 8179d2b3..48e95121 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs) msg.in_order = spec.in_order; msg.max_gap = spec.max_gap; msg.cypher_s = spec.cypher_s; + msg.timeout = spec.timeout; return msg; } @@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg) spec.in_order = msg->in_order; spec.max_gap = msg->max_gap; spec.cypher_s = msg->cypher_s; + spec.timeout = msg->timeout; return spec; } -- cgit v1.2.3