diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 91 | 
1 files changed, 67 insertions, 24 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 1478d0bb..ab869509 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -140,10 +140,10 @@ struct {          struct port *         ports;          struct list_head      flow_list; -        pthread_t             mon; -          pthread_t             tx; +        pthread_t             rx;          size_t                n_frcti; +        fset_t *              frct_set;          pthread_rwlock_t      lock;  } ai; @@ -252,7 +252,7 @@ static int proc_announce(char * prog)  #include "crypt.c"  #include "frct.c" -void * frct_tx(void * o) +void * flow_tx(void * o)  {          struct timespec tic = {0, TICTIME}; @@ -327,28 +327,52 @@ static void _flow_keepalive(struct flow * flow)          }  } -void * monitor(void * o) +static void handle_keepalives(void) +{ +        struct list_head * p; +        struct list_head * h; + +        pthread_rwlock_rdlock(&ai.lock); + +        list_for_each_safe(p, h, &ai.flow_list) { +                struct flow * flow; +                flow = list_entry(p, struct flow, next); +                _flow_keepalive(flow); +        } + +        pthread_rwlock_unlock(&ai.lock); +} + +static void __cleanup_fqueue_destroy(void * fq) +{ +        fqueue_destroy((fqueue_t *) fq); +} + +void * flow_rx(void * o)  {          struct timespec tic = {0, TICTIME}; +        int             ret; +        struct fqueue * fq;          (void) o; -        while (true) { -                struct list_head * p; -                struct list_head * h; +        fq = fqueue_create(); -                pthread_rwlock_rdlock(&ai.lock); +        pthread_cleanup_push(__cleanup_fqueue_destroy, fq); -                list_for_each_safe(p, h, &ai.flow_list) { -                        struct flow * flow  = list_entry(p, struct flow, next); -                        _flow_keepalive(flow); +        /* fevent will filter all FRCT packets for us */ +        while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) { +                if (ret == -ETIMEDOUT) { +                        handle_keepalives(); +                        continue;                  } -                pthread_rwlock_unlock(&ai.lock); - -                nanosleep(&tic, NULL); +                while (fqueue_next(fq) >= 0) +                        ; /* no need to act */          } +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -371,6 +395,8 @@ static void flow_fini(int fd)                          pthread_join(ai.tx, NULL);                  } +                shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); +                  frcti_destroy(ai.flows[fd].frcti);          } @@ -462,9 +488,12 @@ static int flow_init(int       flow_id,                  if (flow->frcti == NULL)                          goto fail_frcti; +                if (shm_flow_set_add(ai.fqset, 0, flow_id)) +                        goto fail_flow_set_add; +                  ++ai.n_frcti;                  if (ai.n_frcti == 1 && -                    pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0) +                    pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0)                          goto fail_tx_thread;          } @@ -479,6 +508,8 @@ static int flow_init(int       flow_id,          return fd;   fail_tx_thread: +        shm_flow_set_del(ai.fqset, 0, flow_id); + fail_flow_set_add:          frcti_destroy(flow->frcti);   fail_frcti:          crypt_fini(flow->ctx); @@ -582,6 +613,10 @@ static void init(int     argc,          if (ai.fqset == NULL)                  goto fail_fqset; +        ai.frct_set = fset_create(); +        if (ai.frct_set == NULL || ai.frct_set->idx != 0) +                goto fail_frct_set; +          if (timerwheel_init() < 0)                  goto fail_timerwheel; @@ -593,7 +628,7 @@ static void init(int     argc,                          goto fail_rib_init;          }  #endif -        if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0) +        if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0)                  goto fail_monitor;          return; @@ -605,6 +640,8 @@ static void init(int     argc,  #endif          timerwheel_fini();   fail_timerwheel: +        fset_destroy(ai.frct_set); + fail_frct_set:          shm_flow_set_close(ai.fqset);   fail_fqset:          pthread_rwlock_destroy(&ai.lock); @@ -640,8 +677,10 @@ static void fini(void)          if (ai.fds == NULL)                  return; -        pthread_cancel(ai.mon); -        pthread_join(ai.mon, NULL); +        pthread_cancel(ai.rx); +        pthread_join(ai.rx, NULL); + +        fset_destroy(ai.frct_set);          pthread_rwlock_wrlock(&ai.lock); @@ -1510,6 +1549,9 @@ int fset_add(struct flow_set * set,                  goto fail;          } +        if (flow->frcti != NULL) +                shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); +          ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);          if (ret < 0)                  goto fail; @@ -1541,6 +1583,9 @@ void fset_del(struct flow_set * set,          if (flow->flow_id >= 0)                  shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); +        if (flow->frcti != NULL) +                shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id); +          pthread_rwlock_unlock(&ai.lock);  } @@ -1628,12 +1673,10 @@ int fqueue_next(struct fqueue * fq)          if (fq->fqsize == 0 || fq->next == fq->fqsize)                  return -EPERM; -        pthread_rwlock_rdlock(&ai.lock); - -        if (fq->next != 0 && fqueue_filter(fq) == 0) { -                pthread_rwlock_unlock(&ai.lock); +        if (fq->next != 0 && fqueue_filter(fq) == 0)                  return -EPERM; -        } + +        pthread_rwlock_rdlock(&ai.lock);          fd = ai.ports[fq->fqueue[fq->next]].fd; @@ -1687,7 +1730,7 @@ ssize_t fevent(struct flow_set *       set,                  ret = fqueue_filter(fq);          } -        assert(ret); +        assert(ret != 0);          return 1;  }  | 
