diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2022-03-28 20:08:12 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2022-03-30 15:12:25 +0200 | 
| commit | efe850f4f90967649cdb27cfa29ca0a17127f932 (patch) | |
| tree | 38306f31b9c0c950788ea23e0639365c10067e5f /src/lib/dev.c | |
| parent | 1330cf5d2491897bbdfafe09f743599fe4ea97ea (diff) | |
| download | ouroboros-efe850f4f90967649cdb27cfa29ca0a17127f932.tar.gz ouroboros-efe850f4f90967649cdb27cfa29ca0a17127f932.zip | |
lib: Move incoming FRCT handling to own thread
The application will now handle incoming FRCT packets even if the
application never reads data from the flow (for instance servers).  To
do this, it reserves an fset_t (id 0). When an FRCT-enabled flow is
created, it is automatically added to this fset. An rx thread will
listen for incoming events and perform necessary actions on the flow
if needed. If the FRCT flow is added to another user fset, it will be
handled by that user fset (and if the flow is removed from a user
fset, it will be re-added to the set with id 0 to be handled by the
rx_flow thread. The flow monitoring is handled by the same thread,
replacing the previous monitoring thread.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/dev.c')
| -rw-r--r-- | src/lib/dev.c | 91 | 
1 files changed, 67 insertions, 24 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 1478d0bb..ab869509 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -140,10 +140,10 @@ struct {          struct port *         ports;          struct list_head      flow_list; -        pthread_t             mon; -          pthread_t             tx; +        pthread_t             rx;          size_t                n_frcti; +        fset_t *              frct_set;          pthread_rwlock_t      lock;  } ai; @@ -252,7 +252,7 @@ static int proc_announce(char * prog)  #include "crypt.c"  #include "frct.c" -void * frct_tx(void * o) +void * flow_tx(void * o)  {          struct timespec tic = {0, TICTIME}; @@ -327,28 +327,52 @@ static void _flow_keepalive(struct flow * flow)          }  } -void * monitor(void * o) +static void handle_keepalives(void) +{ +        struct list_head * p; +        struct list_head * h; + +        pthread_rwlock_rdlock(&ai.lock); + +        list_for_each_safe(p, h, &ai.flow_list) { +                struct flow * flow; +                flow = list_entry(p, struct flow, next); +                _flow_keepalive(flow); +        } + +        pthread_rwlock_unlock(&ai.lock); +} + +static void __cleanup_fqueue_destroy(void * fq) +{ +        fqueue_destroy((fqueue_t *) fq); +} + +void * flow_rx(void * o)  {          struct timespec tic = {0, TICTIME}; +        int             ret; +        struct fqueue * fq;          (void) o; -        while (true) { -                struct list_head * p; -                struct list_head * h; +        fq = fqueue_create(); -                pthread_rwlock_rdlock(&ai.lock); +        pthread_cleanup_push(__cleanup_fqueue_destroy, fq); -                list_for_each_safe(p, h, &ai.flow_list) { -                        struct flow * flow  = list_entry(p, struct flow, next); -                        _flow_keepalive(flow); +        /* fevent will filter all FRCT packets for us */ +        while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) { +                if (ret == -ETIMEDOUT) { +                        handle_keepalives(); +                        continue;                  } -                pthread_rwlock_unlock(&ai.lock); - -                nanosleep(&tic, NULL); +                while (fqueue_next(fq) >= 0) +                        ; /* no need to act */          } +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -371,6 +395,8 @@ static void flow_fini(int fd)                          pthread_join(ai.tx, NULL);                  } +                shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); +                  frcti_destroy(ai.flows[fd].frcti);          } @@ -462,9 +488,12 @@ static int flow_init(int       flow_id,                  if (flow->frcti == NULL)                          goto fail_frcti; +                if (shm_flow_set_add(ai.fqset, 0, flow_id)) +                        goto fail_flow_set_add; +                  ++ai.n_frcti;                  if (ai.n_frcti == 1 && -                    pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0) +                    pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0)                          goto fail_tx_thread;          } @@ -479,6 +508,8 @@ static int flow_init(int       flow_id,          return fd;   fail_tx_thread: +        shm_flow_set_del(ai.fqset, 0, flow_id); + fail_flow_set_add:          frcti_destroy(flow->frcti);   fail_frcti:          crypt_fini(flow->ctx); @@ -582,6 +613,10 @@ static void init(int     argc,          if (ai.fqset == NULL)                  goto fail_fqset; +        ai.frct_set = fset_create(); +        if (ai.frct_set == NULL || ai.frct_set->idx != 0) +                goto fail_frct_set; +          if (timerwheel_init() < 0)                  goto fail_timerwheel; @@ -593,7 +628,7 @@ static void init(int     argc,                          goto fail_rib_init;          }  #endif -        if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0) +        if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0)                  goto fail_monitor;          return; @@ -605,6 +640,8 @@ static void init(int     argc,  #endif          timerwheel_fini();   fail_timerwheel: +        fset_destroy(ai.frct_set); + fail_frct_set:          shm_flow_set_close(ai.fqset);   fail_fqset:          pthread_rwlock_destroy(&ai.lock); @@ -640,8 +677,10 @@ static void fini(void)          if (ai.fds == NULL)                  return; -        pthread_cancel(ai.mon); -        pthread_join(ai.mon, NULL); +        pthread_cancel(ai.rx); +        pthread_join(ai.rx, NULL); + +        fset_destroy(ai.frct_set);          pthread_rwlock_wrlock(&ai.lock); @@ -1510,6 +1549,9 @@ int fset_add(struct flow_set * set,                  goto fail;          } +        if (flow->frcti != NULL) +                shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); +          ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);          if (ret < 0)                  goto fail; @@ -1541,6 +1583,9 @@ void fset_del(struct flow_set * set,          if (flow->flow_id >= 0)                  shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); +        if (flow->frcti != NULL) +                shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id); +          pthread_rwlock_unlock(&ai.lock);  } @@ -1628,12 +1673,10 @@ int fqueue_next(struct fqueue * fq)          if (fq->fqsize == 0 || fq->next == fq->fqsize)                  return -EPERM; -        pthread_rwlock_rdlock(&ai.lock); - -        if (fq->next != 0 && fqueue_filter(fq) == 0) { -                pthread_rwlock_unlock(&ai.lock); +        if (fq->next != 0 && fqueue_filter(fq) == 0)                  return -EPERM; -        } + +        pthread_rwlock_rdlock(&ai.lock);          fd = ai.ports[fq->fqueue[fq->next]].fd; @@ -1687,7 +1730,7 @@ ssize_t fevent(struct flow_set *       set,                  ret = fqueue_filter(fq);          } -        assert(ret); +        assert(ret != 0);          return 1;  } | 
