From f5d642a06f9c1a58197313b32f6b213a152e446f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 25 Feb 2022 17:34:29 +0100 Subject: lib: Make flow liveness timeout configurable The qosspec_t now has a timeout value that sets the timeout value of the flow. Flows with a peer that has timed out will now return -EFLOWPEER on flow_read() or flow_write(). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/eth/eth.c | 3 +++ src/ipcpd/udp/main.c | 4 +++ src/ipcpd/unicast/fa.c | 7 ++++-- src/irmd/main.c | 2 +- src/lib/dev.c | 66 +++++++++++++++++++++++++++++++++++--------------- src/lib/qosspec.proto | 17 +++++++------ src/lib/sockets.c | 2 ++ 7 files changed, 70 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index e22dd7bc..8b34d303 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -164,6 +164,7 @@ struct mgmt_msg { uint32_t ber; uint32_t max_gap; uint32_t delay; + uint32_t timeout; uint16_t cypher_s; uint8_t in_order; #if defined (BUILD_ETH_DIX) @@ -492,6 +493,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, hash, ipcp_dir_hash_len()); memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen); @@ -753,6 +755,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, qs.in_order = msg->in_order; qs.max_gap = ntoh32(msg->max_gap); qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index b9f97e74..5c57e6b8 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -98,7 +98,9 @@ struct mgmt_msg { uint32_t loss; uint32_t ber; uint32_t max_gap; + uint32_t timeout; uint16_t cypher_s; + } __attribute__((packed)); struct mgmt_frame { @@ -217,6 +219,7 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(buf + len, data, dlen); @@ -375,6 +378,7 @@ static int ipcp_udp_mgmt_frame(const uint8_t * buf, qs.in_order = msg->in_order; qs.max_gap = ntoh32(msg->max_gap); qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid), (uint8_t *) (msg + 1), qs, diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index dcc79031..d59b9760 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -72,14 +72,15 @@ struct fa_msg { int8_t response; uint16_t ece; /* QoS parameters from spec, aligned */ - uint8_t availability; - uint8_t in_order; uint32_t delay; uint64_t bandwidth; uint32_t loss; uint32_t ber; uint32_t max_gap; + uint32_t timeout; uint16_t cypher_s; + uint8_t availability; + uint8_t in_order; } __attribute__((packed)); struct cmd { @@ -569,6 +570,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qs.in_order = msg->in_order; qs.max_gap = ntoh32(msg->max_gap); qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen); if (fd < 0) @@ -840,6 +842,7 @@ int fa_alloc(int fd, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(shm_du_buff_head(sdb) + len, data, dlen); diff --git a/src/irmd/main.c b/src/irmd/main.c index f83e8e1e..a3acc78a 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -648,7 +648,7 @@ static int connect_ipcp(pid_t pid, log_dbg("Connecting %s to %s.", component, dst); if (ipcp_connect(pid, dst, component, qs)) { - log_err("Could not connect IPCP."); + log_err("Could not connect IPCP %d to %s.", pid, dst); return -EPERM; } diff --git a/src/lib/dev.c b/src/lib/dev.c index 4c21fcdf..5c57a538 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -68,7 +68,6 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 -#define FLOWTIMEO 120 /* seconds */ enum port_state { PORT_NULL = 0, @@ -123,7 +122,8 @@ struct flow_set_entry { struct flow_set { size_t idx; - struct timespec chk; /* Last keepalive check */ + struct timespec chk; /* Last keepalive check. */ + uint32_t min; /* Minimum keepalive time in set. */ struct list_head flows; pthread_rwlock_t lock; @@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd) struct timespec r_act; struct flow * flow; int flow_id; + uint32_t timeo; flow = &ai.flows[fd]; @@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd) r_act = flow->rcv_act; flow_id = flow->flow_id; + timeo = flow->qs.timeout; pthread_rwlock_unlock(&ai.lock); - if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) { - shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT); - return -EFLOWDOWN; + if (timeo == 0) + return 0; + + if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { + shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + return -EFLOWPEER; } - if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION) + if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION) flow_send_keepalive(fd); return 0; @@ -1140,7 +1145,7 @@ ssize_t flow_write(int fd, return -ETIMEDOUT; if (flow_keepalive(fd)) - return -EFLOWDOWN; + return -EFLOWPEER; frcti_tick(flow->frcti); @@ -1165,14 +1170,16 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (count != 0 && frcti_snd(flow->frcti, sdb) < 0) - goto enomem; + if (count > 0) { + if (frcti_snd(flow->frcti, sdb) < 0) + goto enomem; - if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) - goto enomem; + if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) + goto enomem; - if (flow->qs.ber == 0 && add_crc(sdb) != 0) - goto enomem; + if (flow->qs.ber == 0 && add_crc(sdb) != 0) + goto enomem; + } pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); @@ -1263,7 +1270,7 @@ ssize_t flow_read(int fd, return -ETIMEDOUT; if (flow_keepalive(fd) < 0) - return -EFLOWDOWN; + return -EFLOWPEER; ts_add(&tictime, &tic, &tictime); @@ -1360,6 +1367,7 @@ struct flow_set * fset_create() goto fail_bmp_alloc; set->chk = now; + set->min = UINT32_MAX; pthread_rwlock_unlock(&ai.lock); @@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set) int fset_add(struct flow_set * set, int fd) { + struct flow * flow; struct flow_set_entry * fse; int ret; size_t packets; @@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set, if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return -EINVAL; + flow = &ai.flows[fd]; + fse = malloc(sizeof(*fse)); if (fse == NULL) return -ENOMEM; - pthread_rwlock_wrlock(&ai.lock); + fse->fd = fd; - if (ai.flows[fd].flow_id < 0) { + pthread_rwlock_rdlock(&ai.lock); + + if (flow->flow_id < 0) { ret = -EINVAL; goto fail; } @@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&set->lock); + if (flow->qs.timeout != 0 && flow->qs.timeout < set->min) + set->min = flow->qs.timeout; + list_add_tail(&fse->next, &set->flows); pthread_rwlock_unlock(&set->lock); @@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set, { struct list_head * p; struct list_head * h; + struct flow * flow; + uint32_t min; if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return; + flow = &ai.flows[fd]; + + min = UINT32_MAX; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id); + if (flow->flow_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); pthread_rwlock_wrlock(&set->lock); @@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set, if (e->fd == fd) { list_del(&e->next); free(e); - break; + } else { + if (flow->qs.timeout != 0 && flow->qs.timeout < min) + min = flow->qs.timeout; } } + set->min = min; + pthread_rwlock_unlock(&set->lock); pthread_rwlock_unlock(&ai.lock); @@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set) pthread_rwlock_wrlock(&set->lock); - if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) { + if (ts_diff_ns(&now, &set->chk) < set->min >> 2) { pthread_rwlock_unlock(&set->lock); return; } diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto index 8a355363..3ceedd87 100644 --- a/src/lib/qosspec.proto +++ b/src/lib/qosspec.proto @@ -23,12 +23,13 @@ syntax = "proto2"; message qosspec_msg { - required uint32 delay = 1; /* In ms */ - required uint64 bandwidth = 2; /* In bits/s */ - required uint32 availability = 3; /* Class of 9s */ - required uint32 loss = 4; /* Packet loss */ - required uint32 ber = 5; /* Bit error rate, ppb */ - required uint32 in_order = 6; /* In-order delivery */ - required uint32 max_gap = 7; /* In ms */ - required uint32 cypher_s = 8; /* Crypto strength in bits */ + required uint32 delay = 1; /* In ms. */ + required uint64 bandwidth = 2; /* In bits/s. */ + required uint32 availability = 3; /* Class of 9s. */ + required uint32 loss = 4; /* Packet loss. */ + required uint32 ber = 5; /* Bit error rate, ppb. */ + required uint32 in_order = 6; /* In-order delivery. */ + required uint32 max_gap = 7; /* In ms. */ + required uint32 cypher_s = 8; /* Crypto strength in bits. */ + required uint32 timeout = 9; /* Timeout in ms. */ }; diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 8179d2b3..48e95121 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs) msg.in_order = spec.in_order; msg.max_gap = spec.max_gap; msg.cypher_s = spec.cypher_s; + msg.timeout = spec.timeout; return msg; } @@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg) spec.in_order = msg->in_order; spec.max_gap = msg->max_gap; spec.cypher_s = msg->cypher_s; + spec.timeout = msg->timeout; return spec; } -- cgit v1.2.3