summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-03-28 20:08:12 +0200
committerSander Vrijders <sander@ouroboros.rocks>2022-03-30 15:12:25 +0200
commitefe850f4f90967649cdb27cfa29ca0a17127f932 (patch)
tree38306f31b9c0c950788ea23e0639365c10067e5f
parent1330cf5d2491897bbdfafe09f743599fe4ea97ea (diff)
downloadouroboros-efe850f4f90967649cdb27cfa29ca0a17127f932.tar.gz
ouroboros-efe850f4f90967649cdb27cfa29ca0a17127f932.zip
lib: Move incoming FRCT handling to own thread
The application will now handle incoming FRCT packets even if the application never reads data from the flow (for instance servers). To do this, it reserves an fset_t (id 0). When an FRCT-enabled flow is created, it is automatically added to this fset. An rx thread will listen for incoming events and perform necessary actions on the flow if needed. If the FRCT flow is added to another user fset, it will be handled by that user fset (and if the flow is removed from a user fset, it will be re-added to the set with id 0 to be handled by the rx_flow thread. The flow monitoring is handled by the same thread, replacing the previous monitoring thread. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/lib/dev.c91
1 files changed, 67 insertions, 24 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1478d0bb..ab869509 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -140,10 +140,10 @@ struct {
struct port * ports;
struct list_head flow_list;
- pthread_t mon;
-
pthread_t tx;
+ pthread_t rx;
size_t n_frcti;
+ fset_t * frct_set;
pthread_rwlock_t lock;
} ai;
@@ -252,7 +252,7 @@ static int proc_announce(char * prog)
#include "crypt.c"
#include "frct.c"
-void * frct_tx(void * o)
+void * flow_tx(void * o)
{
struct timespec tic = {0, TICTIME};
@@ -327,28 +327,52 @@ static void _flow_keepalive(struct flow * flow)
}
}
-void * monitor(void * o)
+static void handle_keepalives(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ list_for_each_safe(p, h, &ai.flow_list) {
+ struct flow * flow;
+ flow = list_entry(p, struct flow, next);
+ _flow_keepalive(flow);
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static void __cleanup_fqueue_destroy(void * fq)
+{
+ fqueue_destroy((fqueue_t *) fq);
+}
+
+void * flow_rx(void * o)
{
struct timespec tic = {0, TICTIME};
+ int ret;
+ struct fqueue * fq;
(void) o;
- while (true) {
- struct list_head * p;
- struct list_head * h;
+ fq = fqueue_create();
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_cleanup_push(__cleanup_fqueue_destroy, fq);
- list_for_each_safe(p, h, &ai.flow_list) {
- struct flow * flow = list_entry(p, struct flow, next);
- _flow_keepalive(flow);
+ /* fevent will filter all FRCT packets for us */
+ while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) {
+ if (ret == -ETIMEDOUT) {
+ handle_keepalives();
+ continue;
}
- pthread_rwlock_unlock(&ai.lock);
-
- nanosleep(&tic, NULL);
+ while (fqueue_next(fq) >= 0)
+ ; /* no need to act */
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -371,6 +395,8 @@ static void flow_fini(int fd)
pthread_join(ai.tx, NULL);
}
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+
frcti_destroy(ai.flows[fd].frcti);
}
@@ -462,9 +488,12 @@ static int flow_init(int flow_id,
if (flow->frcti == NULL)
goto fail_frcti;
+ if (shm_flow_set_add(ai.fqset, 0, flow_id))
+ goto fail_flow_set_add;
+
++ai.n_frcti;
if (ai.n_frcti == 1 &&
- pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0)
+ pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0)
goto fail_tx_thread;
}
@@ -479,6 +508,8 @@ static int flow_init(int flow_id,
return fd;
fail_tx_thread:
+ shm_flow_set_del(ai.fqset, 0, flow_id);
+ fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
crypt_fini(flow->ctx);
@@ -582,6 +613,10 @@ static void init(int argc,
if (ai.fqset == NULL)
goto fail_fqset;
+ ai.frct_set = fset_create();
+ if (ai.frct_set == NULL || ai.frct_set->idx != 0)
+ goto fail_frct_set;
+
if (timerwheel_init() < 0)
goto fail_timerwheel;
@@ -593,7 +628,7 @@ static void init(int argc,
goto fail_rib_init;
}
#endif
- if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0)
+ if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0)
goto fail_monitor;
return;
@@ -605,6 +640,8 @@ static void init(int argc,
#endif
timerwheel_fini();
fail_timerwheel:
+ fset_destroy(ai.frct_set);
+ fail_frct_set:
shm_flow_set_close(ai.fqset);
fail_fqset:
pthread_rwlock_destroy(&ai.lock);
@@ -640,8 +677,10 @@ static void fini(void)
if (ai.fds == NULL)
return;
- pthread_cancel(ai.mon);
- pthread_join(ai.mon, NULL);
+ pthread_cancel(ai.rx);
+ pthread_join(ai.rx, NULL);
+
+ fset_destroy(ai.frct_set);
pthread_rwlock_wrlock(&ai.lock);
@@ -1510,6 +1549,9 @@ int fset_add(struct flow_set * set,
goto fail;
}
+ if (flow->frcti != NULL)
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+
ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
if (ret < 0)
goto fail;
@@ -1541,6 +1583,9 @@ void fset_del(struct flow_set * set,
if (flow->flow_id >= 0)
shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
+ if (flow->frcti != NULL)
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id);
+
pthread_rwlock_unlock(&ai.lock);
}
@@ -1628,12 +1673,10 @@ int fqueue_next(struct fqueue * fq)
if (fq->fqsize == 0 || fq->next == fq->fqsize)
return -EPERM;
- pthread_rwlock_rdlock(&ai.lock);
-
- if (fq->next != 0 && fqueue_filter(fq) == 0) {
- pthread_rwlock_unlock(&ai.lock);
+ if (fq->next != 0 && fqueue_filter(fq) == 0)
return -EPERM;
- }
+
+ pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[fq->fqueue[fq->next]].fd;
@@ -1687,7 +1730,7 @@ ssize_t fevent(struct flow_set * set,
ret = fqueue_filter(fq);
}
- assert(ret);
+ assert(ret != 0);
return 1;
}