diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 68 | 
1 files changed, 43 insertions, 25 deletions
| diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@  struct sdu_sched {          flow_set_t * set[QOS_CUBE_MAX]; -        fqueue_t *   fqs[QOS_CUBE_MAX];          next_sdu_t   callback; -        pthread_t    sdu_reader; +        pthread_t    sdu_readers[IPCP_SCHED_THREADS];  }; +static void cleanup_reader(void * o) +{ +        int         i; +        fqueue_t ** fqs = (fqueue_t **) o; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                fqueue_destroy(fqs[i]); +} +  static void * sdu_reader(void * o)  {          struct sdu_sched *   sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o)          int                  fd;          int                  i = 0;          int                  ret; +        fqueue_t *           fqs[QOS_CUBE_MAX];          sched = (struct sdu_sched *) o; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fqs[i] = fqueue_create(); +                if (fqs[i] == NULL) { +                        int j; +                        for (j = 0; j < i; ++j) +                                fqueue_destroy(fqs[j]); +                        return (void *) -1; +                } +        } + +        pthread_cleanup_push(cleanup_reader, fqs); +          while (true) {                  /* FIXME: replace with scheduling policy call */                  i = (i + 1) % QOS_CUBE_MAX; -                ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); +                ret = flow_event_wait(sched->set[i], fqs[i], &timeout);                  if (ret == -ETIMEDOUT)                          continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o)                          continue;                  } -                while ((fd = fqueue_next(sched->fqs[i])) >= 0) { +                while ((fd = fqueue_next(fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  log_warn("Failed to read SDU from fd %d.", fd);                                  continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o)                  }          } +        pthread_cleanup_pop(true); +          return (void *) 0;  } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)          sdu_sched = malloc(sizeof(*sdu_sched));          if (sdu_sched == NULL) -                return NULL; +                goto fail_malloc;          sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback)                  if (sdu_sched->set[i] == NULL) {                          for (j = 0; j < i; ++j)                                  flow_set_destroy(sdu_sched->set[j]); -                        goto fail_sdu_sched; +                        goto fail_flow_set;                  }          } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                sdu_sched->fqs[i] = fqueue_create(); -                if (sdu_sched->fqs[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                fqueue_destroy(sdu_sched->fqs[j]); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                if (pthread_create(&sdu_sched->sdu_readers[i], NULL, +                                   sdu_reader, sdu_sched)) { +                        int j; +                        for (j = 0; j < i; ++j) { +                                pthread_cancel(sdu_sched->sdu_readers[j]); +                                pthread_join(sdu_sched->sdu_readers[j], NULL); +                        }                          goto fail_flow_set;                  }          } -        pthread_create(&sdu_sched->sdu_reader, -                       NULL, -                       sdu_reader, -                       (void *) sdu_sched); -          return sdu_sched;   fail_flow_set: -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched:           free(sdu_sched); + fail_malloc:           return NULL;  } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)          assert(sdu_sched); -        pthread_cancel(sdu_sched->sdu_reader); - -        pthread_join(sdu_sched->sdu_reader, NULL); +        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { +                pthread_cancel(sdu_sched->sdu_readers[i]); +                pthread_join(sdu_sched->sdu_readers[i], NULL); +        } -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fqueue_destroy(sdu_sched->fqs[i]); +        for (i = 0; i < QOS_CUBE_MAX; ++i)                  flow_set_destroy(sdu_sched->set[i]); -        }          free(sdu_sched);  } | 
