From f5d642a06f9c1a58197313b32f6b213a152e446f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 25 Feb 2022 17:34:29 +0100 Subject: lib: Make flow liveness timeout configurable The qosspec_t now has a timeout value that sets the timeout value of the flow. Flows with a peer that has timed out will now return -EFLOWPEER on flow_read() or flow_write(). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/dev.c | 66 +++++++++++++++++++++++++++++++++++---------------- src/lib/qosspec.proto | 17 ++++++------- src/lib/sockets.c | 2 ++ 3 files changed, 57 insertions(+), 28 deletions(-) (limited to 'src/lib') diff --git a/src/lib/dev.c b/src/lib/dev.c index 4c21fcdf..5c57a538 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -68,7 +68,6 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 -#define FLOWTIMEO 120 /* seconds */ enum port_state { PORT_NULL = 0, @@ -123,7 +122,8 @@ struct flow_set_entry { struct flow_set { size_t idx; - struct timespec chk; /* Last keepalive check */ + struct timespec chk; /* Last keepalive check. */ + uint32_t min; /* Minimum keepalive time in set. */ struct list_head flows; pthread_rwlock_t lock; @@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd) struct timespec r_act; struct flow * flow; int flow_id; + uint32_t timeo; flow = &ai.flows[fd]; @@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd) r_act = flow->rcv_act; flow_id = flow->flow_id; + timeo = flow->qs.timeout; pthread_rwlock_unlock(&ai.lock); - if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) { - shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT); - return -EFLOWDOWN; + if (timeo == 0) + return 0; + + if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { + shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + return -EFLOWPEER; } - if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION) + if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION) flow_send_keepalive(fd); return 0; @@ -1140,7 +1145,7 @@ ssize_t flow_write(int fd, return -ETIMEDOUT; if (flow_keepalive(fd)) - return -EFLOWDOWN; + return -EFLOWPEER; frcti_tick(flow->frcti); @@ -1165,14 +1170,16 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (count != 0 && frcti_snd(flow->frcti, sdb) < 0) - goto enomem; + if (count > 0) { + if (frcti_snd(flow->frcti, sdb) < 0) + goto enomem; - if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) - goto enomem; + if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) + goto enomem; - if (flow->qs.ber == 0 && add_crc(sdb) != 0) - goto enomem; + if (flow->qs.ber == 0 && add_crc(sdb) != 0) + goto enomem; + } pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); @@ -1263,7 +1270,7 @@ ssize_t flow_read(int fd, return -ETIMEDOUT; if (flow_keepalive(fd) < 0) - return -EFLOWDOWN; + return -EFLOWPEER; ts_add(&tictime, &tic, &tictime); @@ -1360,6 +1367,7 @@ struct flow_set * fset_create() goto fail_bmp_alloc; set->chk = now; + set->min = UINT32_MAX; pthread_rwlock_unlock(&ai.lock); @@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { + struct flow * flow; struct flow_set_entry * fse; int ret; size_t packets; @@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set, if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return -EINVAL; + flow = &ai.flows[fd]; + fse = malloc(sizeof(*fse)); if (fse == NULL) return -ENOMEM; - pthread_rwlock_wrlock(&ai.lock); + fse->fd = fd; - if (ai.flows[fd].flow_id < 0) { + pthread_rwlock_rdlock(&ai.lock); + + if (flow->flow_id < 0) { ret = -EINVAL; goto fail; } @@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&set->lock); + if (flow->qs.timeout != 0 && flow->qs.timeout < set->min) + set->min = flow->qs.timeout; + list_add_tail(&fse->next, &set->flows); pthread_rwlock_unlock(&set->lock); @@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set, { struct list_head * p; struct list_head * h; + struct flow * flow; + uint32_t min; if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return; + flow = &ai.flows[fd]; + + min = UINT32_MAX; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id); + if (flow->flow_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); pthread_rwlock_wrlock(&set->lock); @@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set, if (e->fd == fd) { list_del(&e->next); free(e); - break; + } else { + if (flow->qs.timeout != 0 && flow->qs.timeout < min) + min = flow->qs.timeout; } } + set->min = min; + pthread_rwlock_unlock(&set->lock); pthread_rwlock_unlock(&ai.lock); @@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set) pthread_rwlock_wrlock(&set->lock); - if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) { + if (ts_diff_ns(&now, &set->chk) < set->min >> 2) { pthread_rwlock_unlock(&set->lock); return; } diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto index 8a355363..3ceedd87 100644 --- a/src/lib/qosspec.proto +++ b/src/lib/qosspec.proto @@ -23,12 +23,13 @@ syntax = "proto2"; message qosspec_msg { - required uint32 delay = 1; /* In ms */ - required uint64 bandwidth = 2; /* In bits/s */ - required uint32 availability = 3; /* Class of 9s */ - required uint32 loss = 4; /* Packet loss */ - required uint32 ber = 5; /* Bit error rate, ppb */ - required uint32 in_order = 6; /* In-order delivery */ - required uint32 max_gap = 7; /* In ms */ - required uint32 cypher_s = 8; /* Crypto strength in bits */ + required uint32 delay = 1; /* In ms. */ + required uint64 bandwidth = 2; /* In bits/s. */ + required uint32 availability = 3; /* Class of 9s. */ + required uint32 loss = 4; /* Packet loss. */ + required uint32 ber = 5; /* Bit error rate, ppb. */ + required uint32 in_order = 6; /* In-order delivery. */ + required uint32 max_gap = 7; /* In ms. */ + required uint32 cypher_s = 8; /* Crypto strength in bits. */ + required uint32 timeout = 9; /* Timeout in ms. */ }; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 8179d2b3..48e95121 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs) msg.in_order = spec.in_order; msg.max_gap = spec.max_gap; msg.cypher_s = spec.cypher_s; + msg.timeout = spec.timeout; return msg; } @@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg) spec.in_order = msg->in_order; spec.max_gap = msg->max_gap; spec.cypher_s = msg->cypher_s; + spec.timeout = msg->timeout; return spec; } -- cgit v1.2.3