summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/cdap.c142
1 files changed, 80 insertions, 62 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index f0db2419..8ebfbec1 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -55,6 +55,10 @@ struct cdap {
flow_set_t * set;
fqueue_t * fq;
+ bool proc;
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+
size_t n_flows;
struct list_head flows;
pthread_rwlock_t flows_lock;
@@ -279,6 +283,17 @@ static void cdap_rcvd_destroy(struct cdap * instance)
pthread_mutex_unlock(&instance->rcvd_lock);
}
+static void set_proc(struct cdap * instance,
+ bool status)
+{
+ pthread_mutex_lock(&instance->mtx);
+
+ instance->proc = status;
+ pthread_cond_signal(&instance->cond);
+
+ pthread_mutex_unlock(&instance->mtx);
+}
+
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
@@ -290,7 +305,9 @@ static void * sdu_reader(void * o)
buffer_t data;
while (flow_event_wait(instance->set, instance->fq, NULL)) {
- int fd = fqueue_next(instance->fq);
+ int fd;
+ set_proc(instance, true);
+ fd = fqueue_next(instance->fq);
len = flow_read(fd, buf, BUF_SIZE);
if (len < 0)
continue;
@@ -374,6 +391,7 @@ static void * sdu_reader(void * o)
}
cdap__free_unpacked(msg, NULL);
+ set_proc(instance, false);
}
return (void *) 0;
@@ -385,87 +403,77 @@ struct cdap * cdap_create()
instance = malloc(sizeof(*instance));
if (instance == NULL)
- return NULL;
+ goto fail_malloc;
- if (pthread_rwlock_init(&instance->flows_lock, NULL)) {
- free(instance);
- return NULL;
- }
+ if (pthread_rwlock_init(&instance->flows_lock, NULL))
+ goto fail_flows_lock;
- if (pthread_mutex_init(&instance->ids_lock, NULL)) {
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- return NULL;
- }
+ if (pthread_mutex_init(&instance->ids_lock, NULL))
+ goto fail_ids_lock;
- if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
- pthread_mutex_destroy(&instance->ids_lock);
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- return NULL;
- }
+ if (pthread_mutex_init(&instance->rcvd_lock, NULL))
+ goto fail_rcvd_lock;
- if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
- pthread_mutex_destroy(&instance->rcvd_lock);
- pthread_mutex_destroy(&instance->ids_lock);
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- return NULL;
- }
+ if (pthread_rwlock_init(&instance->sent_lock, NULL))
+ goto fail_sent_lock;
- if (pthread_cond_init(&instance->rcvd_cond, NULL)) {
- pthread_rwlock_destroy(&instance->sent_lock);
- pthread_mutex_destroy(&instance->rcvd_lock);
- pthread_mutex_destroy(&instance->ids_lock);
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- return NULL;
- }
+ if (pthread_cond_init(&instance->rcvd_cond, NULL))
+ goto fail_rcvd_cond;
+
+ if (pthread_mutex_init(&instance->mtx, NULL))
+ goto fail_mtx;
+
+ if (pthread_cond_init(&instance->cond, NULL))
+ goto fail_cond;
instance->ids = bmp_create(IDS_SIZE, 0);
- if (instance->ids == NULL) {
- pthread_cond_destroy(&instance->rcvd_cond);
- pthread_rwlock_destroy(&instance->sent_lock);
- pthread_mutex_destroy(&instance->rcvd_lock);
- pthread_mutex_destroy(&instance->ids_lock);
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- return NULL;
- }
+ if (instance->ids == NULL)
+ goto fail_bmp_create;
instance->set = flow_set_create();
- if (instance->set == NULL) {
- bmp_destroy(instance->ids);
- pthread_cond_destroy(&instance->rcvd_cond);
- pthread_rwlock_destroy(&instance->sent_lock);
- pthread_mutex_destroy(&instance->rcvd_lock);
- pthread_mutex_destroy(&instance->ids_lock);
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- return NULL;
- }
+ if (instance->set == NULL)
+ goto fail_set_create;
instance->fq = fqueue_create();
- if (instance->fq == NULL) {
- flow_set_destroy(instance->set);
- bmp_destroy(instance->ids);
- pthread_cond_destroy(&instance->rcvd_cond);
- pthread_rwlock_destroy(&instance->sent_lock);
- pthread_mutex_destroy(&instance->rcvd_lock);
- pthread_mutex_destroy(&instance->ids_lock);
- pthread_rwlock_destroy(&instance->flows_lock);
- free(instance);
- }
+ if (instance->fq == NULL)
+ goto fail_fqueue_create;
instance->n_flows = 0;
+ instance->proc = false;
list_head_init(&instance->flows);
list_head_init(&instance->sent);
list_head_init(&instance->rcvd);
- pthread_create(&instance->reader, NULL, sdu_reader, instance);
+ if (pthread_create(&instance->reader, NULL, sdu_reader, instance))
+ goto fail_pthread_create;
return instance;
+
+ fail_pthread_create:
+ fqueue_destroy(instance->fq);
+ fail_fqueue_create:
+ flow_set_destroy(instance->set);
+ fail_set_create:
+ bmp_destroy(instance->ids);
+ fail_bmp_create:
+ pthread_cond_destroy(&instance->cond);
+ fail_cond:
+ pthread_mutex_destroy(&instance->mtx);
+ fail_mtx:
+ pthread_cond_destroy(&instance->rcvd_cond);
+ fail_rcvd_cond:
+ pthread_rwlock_destroy(&instance->sent_lock);
+ fail_sent_lock:
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ fail_rcvd_lock:
+ pthread_mutex_destroy(&instance->ids_lock);
+ fail_ids_lock:
+ pthread_rwlock_destroy(&instance->flows_lock);
+ fail_flows_lock:
+ free(instance);
+ fail_malloc:
+ return NULL;
}
int cdap_destroy(struct cdap * instance)
@@ -483,6 +491,9 @@ int cdap_destroy(struct cdap * instance)
flow_set_destroy(instance->set);
+ pthread_cond_destroy(&instance->cond);
+ pthread_mutex_destroy(&instance->mtx);
+
pthread_rwlock_wrlock(&instance->flows_lock);
list_for_each_safe(p,h, &instance->flows) {
@@ -557,8 +568,15 @@ int cdap_del_flow(struct cdap * instance,
pthread_rwlock_wrlock(&instance->flows_lock);
+ pthread_mutex_lock(&instance->mtx);
+
+ while (instance->proc)
+ pthread_cond_wait(&instance->cond, &instance->mtx);
+
flow_set_del(instance->set, fd);
+ pthread_mutex_unlock(&instance->mtx);
+
list_for_each_safe(p, h, &instance->flows) {
struct fd_el * e = list_entry(p, struct fd_el, next);
if (e->fd == fd) {