diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-07-16 11:13:02 +0200 | 
|---|---|---|
| committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-07-16 11:28:26 +0200 | 
| commit | d1070fcdf36b32a7cdeefc0ca802a1a8973de827 (patch) | |
| tree | 00c3d70d7a483e32a9d14e6d8f31dbb132baf229 | |
| parent | f02ce7519ed794f22f701f3c1562dfb63bd49d72 (diff) | |
| download | ouroboros-d1070fcdf36b32a7cdeefc0ca802a1a8973de827.tar.gz ouroboros-d1070fcdf36b32a7cdeefc0ca802a1a8973de827.zip | |
lib: Wait for fqueue loop at cdap_del_flow
The enrollment calls dealloc immediately after cdap_del_flow(), but
the CDAP instance may still have that fd in its fqueue loop.
cdap_del_flow will now wait for an fqueue loop to end before
returning, to make sure the flow is not needed anymore.
| -rw-r--r-- | src/lib/cdap.c | 142 | 
1 files changed, 80 insertions, 62 deletions
| diff --git a/src/lib/cdap.c b/src/lib/cdap.c index f0db2419..8ebfbec1 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -55,6 +55,10 @@ struct cdap {          flow_set_t *     set;          fqueue_t *       fq; +        bool             proc; +        pthread_mutex_t  mtx; +        pthread_cond_t   cond; +          size_t           n_flows;          struct list_head flows;          pthread_rwlock_t flows_lock; @@ -279,6 +283,17 @@ static void cdap_rcvd_destroy(struct cdap * instance)          pthread_mutex_unlock(&instance->rcvd_lock);  } +static void set_proc(struct cdap * instance, +                     bool          status) +{ +        pthread_mutex_lock(&instance->mtx); + +        instance->proc = status; +        pthread_cond_signal(&instance->cond); + +        pthread_mutex_unlock(&instance->mtx); +} +  static void * sdu_reader(void * o)  {          struct cdap * instance = (struct cdap *) o; @@ -290,7 +305,9 @@ static void * sdu_reader(void * o)          buffer_t data;          while (flow_event_wait(instance->set, instance->fq, NULL)) { -                int fd = fqueue_next(instance->fq); +                int fd; +                set_proc(instance, true); +                fd = fqueue_next(instance->fq);                  len = flow_read(fd, buf, BUF_SIZE);                  if (len < 0)                          continue; @@ -374,6 +391,7 @@ static void * sdu_reader(void * o)                  }                  cdap__free_unpacked(msg, NULL); +                set_proc(instance, false);          }          return (void *) 0; @@ -385,87 +403,77 @@ struct cdap * cdap_create()          instance = malloc(sizeof(*instance));          if (instance == NULL) -                return NULL; +                goto fail_malloc; -        if (pthread_rwlock_init(&instance->flows_lock, NULL)) { -                free(instance); -                return NULL; -        } +        if (pthread_rwlock_init(&instance->flows_lock, NULL)) +                goto fail_flows_lock; -        if (pthread_mutex_init(&instance->ids_lock, NULL)) { -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -                return NULL; -        } +        if (pthread_mutex_init(&instance->ids_lock, NULL)) +                goto fail_ids_lock; -        if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { -                pthread_mutex_destroy(&instance->ids_lock); -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -                return NULL; -        } +        if (pthread_mutex_init(&instance->rcvd_lock, NULL)) +                goto fail_rcvd_lock; -        if (pthread_rwlock_init(&instance->sent_lock, NULL)) { -                pthread_mutex_destroy(&instance->rcvd_lock); -                pthread_mutex_destroy(&instance->ids_lock); -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -                return NULL; -        } +        if (pthread_rwlock_init(&instance->sent_lock, NULL)) +                goto fail_sent_lock; -        if (pthread_cond_init(&instance->rcvd_cond, NULL)) { -                pthread_rwlock_destroy(&instance->sent_lock); -                pthread_mutex_destroy(&instance->rcvd_lock); -                pthread_mutex_destroy(&instance->ids_lock); -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -                return NULL; -        } +        if (pthread_cond_init(&instance->rcvd_cond, NULL)) +                goto fail_rcvd_cond; + +        if (pthread_mutex_init(&instance->mtx, NULL)) +                goto fail_mtx; + +        if (pthread_cond_init(&instance->cond, NULL)) +                goto fail_cond;          instance->ids = bmp_create(IDS_SIZE, 0); -        if (instance->ids == NULL) { -                pthread_cond_destroy(&instance->rcvd_cond); -                pthread_rwlock_destroy(&instance->sent_lock); -                pthread_mutex_destroy(&instance->rcvd_lock); -                pthread_mutex_destroy(&instance->ids_lock); -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -                return NULL; -        } +        if (instance->ids == NULL) +                goto fail_bmp_create;          instance->set = flow_set_create(); -        if (instance->set == NULL) { -                bmp_destroy(instance->ids); -                pthread_cond_destroy(&instance->rcvd_cond); -                pthread_rwlock_destroy(&instance->sent_lock); -                pthread_mutex_destroy(&instance->rcvd_lock); -                pthread_mutex_destroy(&instance->ids_lock); -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -                return NULL; -        } +        if (instance->set == NULL) +                goto fail_set_create;          instance->fq = fqueue_create(); -        if (instance->fq == NULL) { -                flow_set_destroy(instance->set); -                bmp_destroy(instance->ids); -                pthread_cond_destroy(&instance->rcvd_cond); -                pthread_rwlock_destroy(&instance->sent_lock); -                pthread_mutex_destroy(&instance->rcvd_lock); -                pthread_mutex_destroy(&instance->ids_lock); -                pthread_rwlock_destroy(&instance->flows_lock); -                free(instance); -        } +        if (instance->fq == NULL) +                goto fail_fqueue_create;          instance->n_flows = 0; +        instance->proc = false;          list_head_init(&instance->flows);          list_head_init(&instance->sent);          list_head_init(&instance->rcvd); -        pthread_create(&instance->reader, NULL, sdu_reader, instance); +        if (pthread_create(&instance->reader, NULL, sdu_reader, instance)) +                goto fail_pthread_create;          return instance; + + fail_pthread_create: +        fqueue_destroy(instance->fq); + fail_fqueue_create: +        flow_set_destroy(instance->set); + fail_set_create: +        bmp_destroy(instance->ids); + fail_bmp_create: +        pthread_cond_destroy(&instance->cond); + fail_cond: +        pthread_mutex_destroy(&instance->mtx); + fail_mtx: +        pthread_cond_destroy(&instance->rcvd_cond); + fail_rcvd_cond: +        pthread_rwlock_destroy(&instance->sent_lock); + fail_sent_lock: +        pthread_mutex_destroy(&instance->rcvd_lock); + fail_rcvd_lock: +        pthread_mutex_destroy(&instance->ids_lock); + fail_ids_lock: +        pthread_rwlock_destroy(&instance->flows_lock); + fail_flows_lock: +        free(instance); + fail_malloc: +        return NULL;  }  int cdap_destroy(struct cdap * instance) @@ -483,6 +491,9 @@ int cdap_destroy(struct cdap * instance)          flow_set_destroy(instance->set); +        pthread_cond_destroy(&instance->cond); +        pthread_mutex_destroy(&instance->mtx); +          pthread_rwlock_wrlock(&instance->flows_lock);          list_for_each_safe(p,h, &instance->flows) { @@ -557,8 +568,15 @@ int cdap_del_flow(struct cdap * instance,          pthread_rwlock_wrlock(&instance->flows_lock); +        pthread_mutex_lock(&instance->mtx); + +        while (instance->proc) +                pthread_cond_wait(&instance->cond, &instance->mtx); +          flow_set_del(instance->set, fd); +        pthread_mutex_unlock(&instance->mtx); +          list_for_each_safe(p, h, &instance->flows) {                  struct fd_el * e = list_entry(p, struct fd_el, next);                  if (e->fd == fd) { | 
