summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lib/dev.c91
1 files changed, 67 insertions, 24 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1478d0bb..ab869509 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -140,10 +140,10 @@ struct {
struct port * ports;
struct list_head flow_list;
- pthread_t mon;
-
pthread_t tx;
+ pthread_t rx;
size_t n_frcti;
+ fset_t * frct_set;
pthread_rwlock_t lock;
} ai;
@@ -252,7 +252,7 @@ static int proc_announce(char * prog)
#include "crypt.c"
#include "frct.c"
-void * frct_tx(void * o)
+void * flow_tx(void * o)
{
struct timespec tic = {0, TICTIME};
@@ -327,28 +327,52 @@ static void _flow_keepalive(struct flow * flow)
}
}
-void * monitor(void * o)
+static void handle_keepalives(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ list_for_each_safe(p, h, &ai.flow_list) {
+ struct flow * flow;
+ flow = list_entry(p, struct flow, next);
+ _flow_keepalive(flow);
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static void __cleanup_fqueue_destroy(void * fq)
+{
+ fqueue_destroy((fqueue_t *) fq);
+}
+
+void * flow_rx(void * o)
{
struct timespec tic = {0, TICTIME};
+ int ret;
+ struct fqueue * fq;
(void) o;
- while (true) {
- struct list_head * p;
- struct list_head * h;
+ fq = fqueue_create();
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_cleanup_push(__cleanup_fqueue_destroy, fq);
- list_for_each_safe(p, h, &ai.flow_list) {
- struct flow * flow = list_entry(p, struct flow, next);
- _flow_keepalive(flow);
+ /* fevent will filter all FRCT packets for us */
+ while ((ret = fevent(ai.frct_set, fq, &tic)) != 0) {
+ if (ret == -ETIMEDOUT) {
+ handle_keepalives();
+ continue;
}
- pthread_rwlock_unlock(&ai.lock);
-
- nanosleep(&tic, NULL);
+ while (fqueue_next(fq) >= 0)
+ ; /* no need to act */
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
@@ -371,6 +395,8 @@ static void flow_fini(int fd)
pthread_join(ai.tx, NULL);
}
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+
frcti_destroy(ai.flows[fd].frcti);
}
@@ -462,9 +488,12 @@ static int flow_init(int flow_id,
if (flow->frcti == NULL)
goto fail_frcti;
+ if (shm_flow_set_add(ai.fqset, 0, flow_id))
+ goto fail_flow_set_add;
+
++ai.n_frcti;
if (ai.n_frcti == 1 &&
- pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0)
+ pthread_create(&ai.tx, NULL, flow_tx, NULL) < 0)
goto fail_tx_thread;
}
@@ -479,6 +508,8 @@ static int flow_init(int flow_id,
return fd;
fail_tx_thread:
+ shm_flow_set_del(ai.fqset, 0, flow_id);
+ fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
crypt_fini(flow->ctx);
@@ -582,6 +613,10 @@ static void init(int argc,
if (ai.fqset == NULL)
goto fail_fqset;
+ ai.frct_set = fset_create();
+ if (ai.frct_set == NULL || ai.frct_set->idx != 0)
+ goto fail_frct_set;
+
if (timerwheel_init() < 0)
goto fail_timerwheel;
@@ -593,7 +628,7 @@ static void init(int argc,
goto fail_rib_init;
}
#endif
- if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0)
+ if (pthread_create(&ai.rx, NULL, flow_rx, NULL) < 0)
goto fail_monitor;
return;
@@ -605,6 +640,8 @@ static void init(int argc,
#endif
timerwheel_fini();
fail_timerwheel:
+ fset_destroy(ai.frct_set);
+ fail_frct_set:
shm_flow_set_close(ai.fqset);
fail_fqset:
pthread_rwlock_destroy(&ai.lock);
@@ -640,8 +677,10 @@ static void fini(void)
if (ai.fds == NULL)
return;
- pthread_cancel(ai.mon);
- pthread_join(ai.mon, NULL);
+ pthread_cancel(ai.rx);
+ pthread_join(ai.rx, NULL);
+
+ fset_destroy(ai.frct_set);
pthread_rwlock_wrlock(&ai.lock);
@@ -1510,6 +1549,9 @@ int fset_add(struct flow_set * set,
goto fail;
}
+ if (flow->frcti != NULL)
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+
ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
if (ret < 0)
goto fail;
@@ -1541,6 +1583,9 @@ void fset_del(struct flow_set * set,
if (flow->flow_id >= 0)
shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
+ if (flow->frcti != NULL)
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id);
+
pthread_rwlock_unlock(&ai.lock);
}
@@ -1628,12 +1673,10 @@ int fqueue_next(struct fqueue * fq)
if (fq->fqsize == 0 || fq->next == fq->fqsize)
return -EPERM;
- pthread_rwlock_rdlock(&ai.lock);
-
- if (fq->next != 0 && fqueue_filter(fq) == 0) {
- pthread_rwlock_unlock(&ai.lock);
+ if (fq->next != 0 && fqueue_filter(fq) == 0)
return -EPERM;
- }
+
+ pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[fq->fqueue[fq->next]].fd;
@@ -1687,7 +1730,7 @@ ssize_t fevent(struct flow_set * set,
ret = fqueue_filter(fq);
}
- assert(ret);
+ assert(ret != 0);
return 1;
}