diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/cdap.c | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 0d1568b2..524983c2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -55,6 +55,7 @@ struct fd_el { struct cdap { flow_set_t * set; + fqueue_t * fq; size_t n_flows; struct list_head flows; @@ -263,14 +264,9 @@ static void * sdu_reader(void * o) uint8_t buf[BUF_SIZE]; ssize_t len; buffer_t data; - fqueue_t * fq; - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - while (flow_event_wait(instance->set, fq, NULL)) { - int fd = fqueue_next(fq); + while (flow_event_wait(instance->set, instance->fq, NULL)) { + int fd = fqueue_next(instance->fq); len = flow_read(fd, buf, BUF_SIZE); if (len < 0) continue; @@ -321,8 +317,10 @@ static void * sdu_reader(void * o) pthread_mutex_unlock(&instance->rcvd_lock); } else { req = cdap_sent_get_by_key(instance, msg->invoke_id); - if (req == NULL) + if (req == NULL) { + cdap__free_unpacked(msg, NULL); continue; + } if (msg->has_value) { data.len = msg->value.len; @@ -339,6 +337,9 @@ static void * sdu_reader(void * o) cdap_req_respond(req, msg->result, data); } + + cdap__free_unpacked(msg, NULL); + } return (void *) 0; } @@ -409,6 +410,18 @@ struct cdap * cdap_create() return NULL; } + instance->fq = fqueue_create(); + if (instance->fq == NULL) { + flow_set_destroy(instance->set); + bmp_destroy(instance->ids); + pthread_cond_destroy(&instance->rcvd_cond); + pthread_rwlock_destroy(&instance->sent_lock); + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); + pthread_rwlock_destroy(&instance->flows_lock); + free(instance); + } + instance->n_flows = 0; list_head_init(&instance->flows); @@ -431,6 +444,8 @@ int cdap_destroy(struct cdap * instance) pthread_cancel(instance->reader); pthread_join(instance->reader, NULL); + fqueue_destroy(instance->fq); + flow_set_destroy(instance->set); pthread_rwlock_wrlock(&instance->flows_lock); |