From 1330cf5d2491897bbdfafe09f743599fe4ea97ea Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Mon, 28 Mar 2022 12:55:31 +0200 Subject: lib: Iterate over monitored flows Now the instance keeps all flows for an application in a linked list to easily iterate over all allocated flows, which is needed by the keepalive monitoring. This is more efficient that tracking min and max fd. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/dev.c | 98 +++++++++++++++++++++++++++++------------------------------ 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/src/lib/dev.c b/src/lib/dev.c index 135324ea..1478d0bb 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -90,6 +90,8 @@ struct port { ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti))) struct flow { + struct list_head next; + struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; @@ -136,11 +138,9 @@ struct { struct flow * flows; struct port * ports; + struct list_head flow_list; pthread_t mon; - int min_timeo; - int min_fd; - int max_fd; pthread_t tx; size_t n_frcti; @@ -267,55 +267,64 @@ void * frct_tx(void * o) return (void *) 0; } -static void flow_send_keepalive(int fd) +static void flow_send_keepalive(struct flow * flow, + struct timespec now) { - flow_write(fd, NULL, 0); + struct shm_du_buff * sdb; + ssize_t idx; + uint8_t * ptr; + + idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb); + if (idx < 0) + return; + + pthread_rwlock_wrlock(&ai.lock); + + flow->snd_act = now; + + if (shm_rbuff_write(flow->tx_rb, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + + pthread_rwlock_unlock(&ai.lock); } -static void flow_keepalive(int fd) +/* Needs rdlock on ai. */ +static void _flow_keepalive(struct flow * flow) { struct timespec now; struct timespec s_act; struct timespec r_act; - struct flow * flow; int flow_id; uint32_t timeo; - struct shm_rbuff * rb; uint32_t acl; - flow = &ai.flows[fd]; - - pthread_rwlock_rdlock(&ai.lock); - - if (flow->flow_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return; - } - s_act = flow->snd_act; r_act = flow->rcv_act; flow_id = flow->flow_id; timeo = flow->qs.timeout; - rb = flow->rx_rb; - - pthread_rwlock_unlock(&ai.lock); - - acl = shm_rbuff_get_acl(rb); + acl = shm_rbuff_get_acl(flow->rx_rb); if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) return; clock_gettime(PTHREAD_COND_CLOCK, &now); if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { - shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER); + shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); return; } - if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) - flow_send_keepalive(fd); + if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) { + pthread_rwlock_unlock(&ai.lock); + + flow_send_keepalive(flow, now); + + pthread_rwlock_rdlock(&ai.lock); + } } void * monitor(void * o) @@ -325,17 +334,17 @@ void * monitor(void * o) (void) o; while (true) { - int i; - int min; - int max; + struct list_head * p; + struct list_head * h; pthread_rwlock_rdlock(&ai.lock); - min = ai.min_fd; - max = ai.max_fd; - pthread_rwlock_unlock(&ai.lock); - for (i = min; i <= max; ++i) - flow_keepalive(i); + list_for_each_safe(p, h, &ai.flow_list) { + struct flow * flow = list_entry(p, struct flow, next); + _flow_keepalive(flow); + } + + pthread_rwlock_unlock(&ai.lock); nanosleep(&tic, NULL); } @@ -390,13 +399,9 @@ static void flow_fini(int fd) if (ai.flows[fd].ctx != NULL) crypt_fini(ai.flows[fd].ctx); - flow_clear(fd); - - while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd) - --ai.max_fd; + list_del(&ai.flows[fd].next); - while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd) - ++ai.min_fd; + flow_clear(fd); } static int flow_init(int flow_id, @@ -420,12 +425,6 @@ static int flow_init(int flow_id, goto fail_fds; } - if (fd > ai.max_fd) - ai.max_fd = fd; - - if (fd < ai.min_fd) - ai.min_fd = fd; - flow = &ai.flows[fd]; flow->rx_rb = shm_rbuff_open(getpid(), flow_id); @@ -469,6 +468,8 @@ static int flow_init(int flow_id, goto fail_tx_thread; } + list_add_tail(&flow->next, &ai.flow_list); + ai.ports[flow_id].fd = fd; port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); @@ -527,13 +528,12 @@ static void init(int argc, gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0); } #endif + list_head_init(&ai.flow_list); + ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS); if (ai.fds == NULL) goto fail_fds; - ai.min_fd = PROG_RES_FDS; - ai.max_fd = PROG_RES_FDS; - ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0); if (ai.fqueues == NULL) goto fail_fqueues; @@ -1219,7 +1219,7 @@ ssize_t flow_write(int fd, uint8_t * ptr; if (buf == NULL && count != 0) - return 0; + return -EINVAL; if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; -- cgit v1.2.3