summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-03-26 19:02:17 +0100
committerSander Vrijders <sander@ouroboros.rocks>2022-03-30 15:05:05 +0200
commit643c285c20abab5dadaa5c1929d978b725911b5d (patch)
tree7472dcac30c7707b9fb118db447b0f2c604cac30 /src/lib/dev.c
parent8a066315ef2d6baeeece08b94f86ad54e1ec6ee2 (diff)
downloadouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.tar.gz
ouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.zip
lib: Move timerwheel processing to its own thread
This is the first step moving away from scheduling the FRCT and flow monitoring functions as part of the IPC calls (flow_read / flow_write / fevent) and towards the more scalable (and far less complicated) implementation to take care of these functions in separate threads. If a process creates the first flow that requires FRCT, it will spin up a thread to process events on the timerwheel (retransmissions and delayed ACKs). This single thread lives until the last flow with FRCT is deallocated. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c44
1 files changed, 34 insertions, 10 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index a0f7398b..ac885711 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -147,6 +147,9 @@ struct {
struct flow * flows;
struct port * ports;
+ pthread_t tx;
+ size_t n_frcti;
+
pthread_rwlock_t lock;
} ai;
@@ -262,17 +265,39 @@ static void flow_clear(int fd)
#include "crypt.c"
#include "frct.c"
+void * frct_tx(void * o)
+{
+ struct timespec tic = {0, TICTIME};
+
+ (void) o;
+
+ while (true) {
+ timerwheel_move();
+
+ nanosleep(&tic, NULL);
+ }
+
+ return (void *) 0;
+}
+
static void flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ if (ai.flows[fd].frcti != NULL) {
+ ai.n_frcti--;
+ if (ai.n_frcti == 0) {
+ pthread_cancel(ai.tx);
+ pthread_join(ai.tx, NULL);
+ }
+ frcti_destroy(ai.flows[fd].frcti);
+ }
+
if (ai.flows[fd].flow_id != -1) {
port_destroy(&ai.ports[ai.flows[fd].flow_id]);
bmp_release(ai.fds, fd);
}
- if (ai.flows[fd].frcti != NULL)
- frcti_destroy(ai.flows[fd].frcti);
if (ai.flows[fd].rx_rb != NULL) {
shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
@@ -354,6 +379,11 @@ static int flow_init(int flow_id,
flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl);
if (flow->frcti == NULL)
goto fail_frcti;
+
+ ++ai.n_frcti;
+ if (ai.n_frcti == 1 &&
+ pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0)
+ goto fail_tx_thread;
}
ai.ports[flow_id].fd = fd;
@@ -364,6 +394,8 @@ static int flow_init(int flow_id,
return fd;
+ fail_tx_thread:
+ frcti_destroy(flow->frcti);
fail_frcti:
crypt_fini(flow->ctx);
fail_ctx:
@@ -1182,8 +1214,6 @@ ssize_t flow_write(int fd,
if (flow_keepalive(fd))
return -EFLOWPEER;
- frcti_tick(flow->frcti);
-
ts_add(&tictime, &tic, &tictime);
}
idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
@@ -1300,8 +1330,6 @@ ssize_t flow_read(int fd,
idx = flow_rx_sdb(flow, &sdb, block, &tictime);
if (idx < 0) {
- frcti_tick(flow->frcti);
-
if (idx != -ETIMEDOUT)
return idx;
@@ -1326,8 +1354,6 @@ ssize_t flow_read(int fd,
sdb = shm_rdrbuff_get(ai.rdrb, idx);
- frcti_tick(flow->frcti);
-
pthread_rwlock_unlock(&ai.lock);
packet = shm_du_buff_head(sdb);
@@ -1910,8 +1936,6 @@ int ipcp_flow_read(int fd,
frcti_rcv(flow->frcti, *sdb);
}
- frcti_tick(flow->frcti);
-
pthread_rwlock_unlock(&ai.lock);
return 0;