diff options
-rw-r--r-- | src/lib/dev.c | 91 |
1 files changed, 67 insertions, 24 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 1478d0bb..ab869509 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -140,10 +140,10 @@ struct { struct port * ports; struct list_head flow_list; - pthread_t mon; - pthread_t tx; + pthread_t rx; size_t n_frcti; + fset_t * frct_set; pthread_rwlock_t lock; } ai; @@ -252,7 +252,7 @@ static int proc_announce(char * prog) #include "crypt.c" #include "frct.c" -void * frct_tx(void * o) +void * flow_tx(void * o) { struct timespec tic = {0, TICTIME}; @@ -327,28 +327,52 @@ static void _flow_keepalive(struct flow * flow) } } -void * monitor(void * o) +static void handle_keepalives(void) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_rdlock(&ai.lock); + + list_for_each_safe(p, h, &ai.flow_list) { + struct flow * flow; + flow = list_entry(p, struct flow, next); + _flow_keepalive(flow); + } + + pthread_rwlock_unlock(&ai.lock); +} + +static void __cleanup_fqueue_destroy(void * fq) +{ + fqueue_destroy((fqueue_t *) fq); +} + +void * flow_rx(void * o) { struct timespec tic = {0, TICTIME}; + int ret; + struct fqueue * fq; (void) o; - while (true) { - struct list_head * p; - struct list_head * h; + fq = fqueue_create(); - pthread_rwlock_rdlock(&ai.lock); + pthread_cleanup_push(__cleanup_fqueue_destroy, fq); - list_for_each_safe(p, h, &ai.flow_list) { - struct flow * flow = list_entry(p, struct flow, next); - _flow_keepalive(flow); + /* fevent will filter all FRCT packets for us */ + while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) { + if (ret == -ETIMEDOUT) { + handle_keepalives(); + continue; } - pthread_rwlock_unlock(&ai.lock); - - nanosleep(&tic, NULL); + while (fqueue_next(fq) >= 0) + ; /* no need to act */ } + pthread_cleanup_pop(true); + return (void *) 0; } @@ -371,6 +395,8 @@ static void flow_fini(int fd) pthread_join(ai.tx, NULL); } + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + frcti_destroy(ai.flows[fd].frcti); } @@ -462,9 +488,12 @@ static int flow_init(int flow_id, if (flow->frcti == NULL) goto fail_frcti; + if (shm_flow_set_add(ai.fqset, 0, flow_id)) + goto fail_flow_set_add; + ++ai.n_frcti; if (ai.n_frcti == 1 && - pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0) + pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0) goto fail_tx_thread; } @@ -479,6 +508,8 @@ static int flow_init(int flow_id, return fd; fail_tx_thread: + shm_flow_set_del(ai.fqset, 0, flow_id); + fail_flow_set_add: frcti_destroy(flow->frcti); fail_frcti: crypt_fini(flow->ctx); @@ -582,6 +613,10 @@ static void init(int argc, if (ai.fqset == NULL) goto fail_fqset; + ai.frct_set = fset_create(); + if (ai.frct_set == NULL || ai.frct_set->idx != 0) + goto fail_frct_set; + if (timerwheel_init() < 0) goto fail_timerwheel; @@ -593,7 +628,7 @@ static void init(int argc, goto fail_rib_init; } #endif - if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0) + if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0) goto fail_monitor; return; @@ -605,6 +640,8 @@ static void init(int argc, #endif timerwheel_fini(); fail_timerwheel: + fset_destroy(ai.frct_set); + fail_frct_set: shm_flow_set_close(ai.fqset); fail_fqset: pthread_rwlock_destroy(&ai.lock); @@ -640,8 +677,10 @@ static void fini(void) if (ai.fds == NULL) return; - pthread_cancel(ai.mon); - pthread_join(ai.mon, NULL); + pthread_cancel(ai.rx); + pthread_join(ai.rx, NULL); + + fset_destroy(ai.frct_set); pthread_rwlock_wrlock(&ai.lock); @@ -1510,6 +1549,9 @@ int fset_add(struct flow_set * set, goto fail; } + if (flow->frcti != NULL) + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); if (ret < 0) goto fail; @@ -1541,6 +1583,9 @@ void fset_del(struct flow_set * set, if (flow->flow_id >= 0) shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); + if (flow->frcti != NULL) + shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id); + pthread_rwlock_unlock(&ai.lock); } @@ -1628,12 +1673,10 @@ int fqueue_next(struct fqueue * fq) if (fq->fqsize == 0 || fq->next == fq->fqsize) return -EPERM; - pthread_rwlock_rdlock(&ai.lock); - - if (fq->next != 0 && fqueue_filter(fq) == 0) { - pthread_rwlock_unlock(&ai.lock); + if (fq->next != 0 && fqueue_filter(fq) == 0) return -EPERM; - } + + pthread_rwlock_rdlock(&ai.lock); fd = ai.ports[fq->fqueue[fq->next]].fd; @@ -1687,7 +1730,7 @@ ssize_t fevent(struct flow_set * set, ret = fqueue_filter(fq); } - assert(ret); + assert(ret != 0); return 1; } |