summaryrefslogtreecommitdiff
path: root/src/lib/cdap.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/cdap.c')
-rw-r--r--src/lib/cdap.c31
1 files changed, 23 insertions, 8 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 0d1568b2..524983c2 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -55,6 +55,7 @@ struct fd_el {
struct cdap {
flow_set_t * set;
+ fqueue_t * fq;
size_t n_flows;
struct list_head flows;
@@ -263,14 +264,9 @@ static void * sdu_reader(void * o)
uint8_t buf[BUF_SIZE];
ssize_t len;
buffer_t data;
- fqueue_t * fq;
- fq = fqueue_create();
- if (fq == NULL)
- return (void *) -1;
-
- while (flow_event_wait(instance->set, fq, NULL)) {
- int fd = fqueue_next(fq);
+ while (flow_event_wait(instance->set, instance->fq, NULL)) {
+ int fd = fqueue_next(instance->fq);
len = flow_read(fd, buf, BUF_SIZE);
if (len < 0)
continue;
@@ -321,8 +317,10 @@ static void * sdu_reader(void * o)
pthread_mutex_unlock(&instance->rcvd_lock);
} else {
req = cdap_sent_get_by_key(instance, msg->invoke_id);
- if (req == NULL)
+ if (req == NULL) {
+ cdap__free_unpacked(msg, NULL);
continue;
+ }
if (msg->has_value) {
data.len = msg->value.len;
@@ -339,6 +337,9 @@ static void * sdu_reader(void * o)
cdap_req_respond(req, msg->result, data);
}
+
+ cdap__free_unpacked(msg, NULL);
+
}
return (void *) 0;
}
@@ -409,6 +410,18 @@ struct cdap * cdap_create()
return NULL;
}
+ instance->fq = fqueue_create();
+ if (instance->fq == NULL) {
+ flow_set_destroy(instance->set);
+ bmp_destroy(instance->ids);
+ pthread_cond_destroy(&instance->rcvd_cond);
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
+ free(instance);
+ }
+
instance->n_flows = 0;
list_head_init(&instance->flows);
@@ -431,6 +444,8 @@ int cdap_destroy(struct cdap * instance)
pthread_cancel(instance->reader);
pthread_join(instance->reader, NULL);
+ fqueue_destroy(instance->fq);
+
flow_set_destroy(instance->set);
pthread_rwlock_wrlock(&instance->flows_lock);