diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/cdap.c | 31 | 
1 files changed, 23 insertions, 8 deletions
| diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 0d1568b2..524983c2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -55,6 +55,7 @@ struct fd_el {  struct cdap {          flow_set_t *     set; +        fqueue_t *       fq;          size_t           n_flows;          struct list_head flows; @@ -263,14 +264,9 @@ static void * sdu_reader(void * o)          uint8_t buf[BUF_SIZE];          ssize_t len;          buffer_t data; -        fqueue_t * fq; -        fq = fqueue_create(); -        if (fq == NULL) -                return (void *) -1; - -        while (flow_event_wait(instance->set, fq, NULL)) { -                int fd = fqueue_next(fq); +        while (flow_event_wait(instance->set, instance->fq, NULL)) { +                int fd = fqueue_next(instance->fq);                  len = flow_read(fd, buf, BUF_SIZE);                  if (len < 0)                          continue; @@ -321,8 +317,10 @@ static void * sdu_reader(void * o)                          pthread_mutex_unlock(&instance->rcvd_lock);                  } else  {                          req = cdap_sent_get_by_key(instance, msg->invoke_id); -                        if (req == NULL) +                        if (req == NULL) { +                                cdap__free_unpacked(msg, NULL);                                  continue; +                        }                          if (msg->has_value) {                                  data.len = msg->value.len; @@ -339,6 +337,9 @@ static void * sdu_reader(void * o)                          cdap_req_respond(req, msg->result, data);                  } + +                cdap__free_unpacked(msg, NULL); +          }          return (void *) 0;  } @@ -409,6 +410,18 @@ struct cdap * cdap_create()                  return NULL;          } +        instance->fq = fqueue_create(); +        if (instance->fq == NULL) { +                flow_set_destroy(instance->set); +                bmp_destroy(instance->ids); +                pthread_cond_destroy(&instance->rcvd_cond); +                pthread_rwlock_destroy(&instance->sent_lock); +                pthread_mutex_destroy(&instance->rcvd_lock); +                pthread_mutex_destroy(&instance->ids_lock); +                pthread_rwlock_destroy(&instance->flows_lock); +                free(instance); +        } +          instance->n_flows = 0;          list_head_init(&instance->flows); @@ -431,6 +444,8 @@ int cdap_destroy(struct cdap * instance)          pthread_cancel(instance->reader);          pthread_join(instance->reader, NULL); +        fqueue_destroy(instance->fq); +          flow_set_destroy(instance->set);          pthread_rwlock_wrlock(&instance->flows_lock); | 
