summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-03-28 12:55:31 +0200
committerSander Vrijders <sander@ouroboros.rocks>2022-03-30 15:12:25 +0200
commit1330cf5d2491897bbdfafe09f743599fe4ea97ea (patch)
tree54e772933e6709a773e7486b03312149bcc3678d
parentccc98e43268fbc0558290ab10c06511ddd4c0458 (diff)
downloadouroboros-1330cf5d2491897bbdfafe09f743599fe4ea97ea.tar.gz
ouroboros-1330cf5d2491897bbdfafe09f743599fe4ea97ea.zip
lib: Iterate over monitored flows
Now the instance keeps all flows for an application in a linked list to easily iterate over all allocated flows, which is needed by the keepalive monitoring. This is more efficient that tracking min and max fd. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/lib/dev.c98
1 files changed, 49 insertions, 49 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 135324ea..1478d0bb 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -90,6 +90,8 @@ struct port {
((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti)))
struct flow {
+ struct list_head next;
+
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
@@ -136,11 +138,9 @@ struct {
struct flow * flows;
struct port * ports;
+ struct list_head flow_list;
pthread_t mon;
- int min_timeo;
- int min_fd;
- int max_fd;
pthread_t tx;
size_t n_frcti;
@@ -267,55 +267,64 @@ void * frct_tx(void * o)
return (void *) 0;
}
-static void flow_send_keepalive(int fd)
+static void flow_send_keepalive(struct flow * flow,
+ struct timespec now)
{
- flow_write(fd, NULL, 0);
+ struct shm_du_buff * sdb;
+ ssize_t idx;
+ uint8_t * ptr;
+
+ idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb);
+ if (idx < 0)
+ return;
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ flow->snd_act = now;
+
+ if (shm_rbuff_write(flow->tx_rb, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+
+ pthread_rwlock_unlock(&ai.lock);
}
-static void flow_keepalive(int fd)
+/* Needs rdlock on ai. */
+static void _flow_keepalive(struct flow * flow)
{
struct timespec now;
struct timespec s_act;
struct timespec r_act;
- struct flow * flow;
int flow_id;
uint32_t timeo;
- struct shm_rbuff * rb;
uint32_t acl;
- flow = &ai.flows[fd];
-
- pthread_rwlock_rdlock(&ai.lock);
-
- if (flow->flow_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return;
- }
-
s_act = flow->snd_act;
r_act = flow->rcv_act;
flow_id = flow->flow_id;
timeo = flow->qs.timeout;
- rb = flow->rx_rb;
-
- pthread_rwlock_unlock(&ai.lock);
-
- acl = shm_rbuff_get_acl(rb);
+ acl = shm_rbuff_get_acl(flow->rx_rb);
if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
return;
clock_gettime(PTHREAD_COND_CLOCK, &now);
if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
- shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER);
+ shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER);
shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
return;
}
- if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2)
- flow_send_keepalive(fd);
+ if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) {
+ pthread_rwlock_unlock(&ai.lock);
+
+ flow_send_keepalive(flow, now);
+
+ pthread_rwlock_rdlock(&ai.lock);
+ }
}
void * monitor(void * o)
@@ -325,17 +334,17 @@ void * monitor(void * o)
(void) o;
while (true) {
- int i;
- int min;
- int max;
+ struct list_head * p;
+ struct list_head * h;
pthread_rwlock_rdlock(&ai.lock);
- min = ai.min_fd;
- max = ai.max_fd;
- pthread_rwlock_unlock(&ai.lock);
- for (i = min; i <= max; ++i)
- flow_keepalive(i);
+ list_for_each_safe(p, h, &ai.flow_list) {
+ struct flow * flow = list_entry(p, struct flow, next);
+ _flow_keepalive(flow);
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
nanosleep(&tic, NULL);
}
@@ -390,13 +399,9 @@ static void flow_fini(int fd)
if (ai.flows[fd].ctx != NULL)
crypt_fini(ai.flows[fd].ctx);
- flow_clear(fd);
-
- while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd)
- --ai.max_fd;
+ list_del(&ai.flows[fd].next);
- while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd)
- ++ai.min_fd;
+ flow_clear(fd);
}
static int flow_init(int flow_id,
@@ -420,12 +425,6 @@ static int flow_init(int flow_id,
goto fail_fds;
}
- if (fd > ai.max_fd)
- ai.max_fd = fd;
-
- if (fd < ai.min_fd)
- ai.min_fd = fd;
-
flow = &ai.flows[fd];
flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
@@ -469,6 +468,8 @@ static int flow_init(int flow_id,
goto fail_tx_thread;
}
+ list_add_tail(&flow->next, &ai.flow_list);
+
ai.ports[flow_id].fd = fd;
port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED);
@@ -527,13 +528,12 @@ static void init(int argc,
gcry_control(GCRYCTL_INITIALIZATION_FINISHED, 0);
}
#endif
+ list_head_init(&ai.flow_list);
+
ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);
if (ai.fds == NULL)
goto fail_fds;
- ai.min_fd = PROG_RES_FDS;
- ai.max_fd = PROG_RES_FDS;
-
ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
if (ai.fqueues == NULL)
goto fail_fqueues;
@@ -1219,7 +1219,7 @@ ssize_t flow_write(int fd,
uint8_t * ptr;
if (buf == NULL && count != 0)
- return 0;
+ return -EINVAL;
if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -EBADF;