diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 143 | 
1 files changed, 85 insertions, 58 deletions
| diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 7a82a874..e5f2c701 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -24,78 +24,67 @@  #include "config.h" -#define OUROBOROS_PREFIX "sdu-scheduler" - -#include <ouroboros/logs.h>  #include <ouroboros/errno.h>  #include "sdu_sched.h" -#include <stdbool.h>  #include <assert.h> +#include <sched.h> +#include <stdbool.h>  #include <stdlib.h> +#include <string.h> -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ +static int qos_prio [] = { +        QOS_PRIO_BE, +        QOS_PRIO_VIDEO, +        QOS_PRIO_VOICE +};  struct sdu_sched {          fset_t *      set[QOS_CUBE_MAX];          next_sdu_fn_t callback; -        pthread_t     sdu_readers[IPCP_SCHED_THREADS]; +        pthread_t     readers[QOS_CUBE_MAX]; +}; + +struct sched_info { +        struct sdu_sched * sch; +        qoscube_t          qc;  };  static void cleanup_reader(void * o)  { -        int         i; -        fqueue_t ** fqs = (fqueue_t **) o; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                fqueue_destroy(fqs[i]); +        fqueue_destroy((fqueue_t *) o);  }  static void * sdu_reader(void * o)  {          struct sdu_sched *   sched;          struct shm_du_buff * sdb; -        struct timespec      timeout = {0, FD_UPDATE_TIMEOUT};          int                  fd; -        int                  i = 0; -        int                  ret; -        fqueue_t *           fqs[QOS_CUBE_MAX]; +        fqueue_t *           fq; +        qoscube_t            qc; -        sched = (struct sdu_sched *) o; +        sched = ((struct sched_info *) o)->sch; +        qc    = ((struct sched_info *) o)->qc; -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fqs[i] = fqueue_create(); -                if (fqs[i] == NULL) { -                        int j; -                        for (j = 0; j < i; ++j) -                                fqueue_destroy(fqs[j]); -                        return (void *) -1; -                } -        } - -        pthread_cleanup_push(cleanup_reader, fqs); +        free(o); -        while (true) { -                /* FIXME: replace with scheduling policy call */ -                i = (i + 1) % QOS_CUBE_MAX; +        fq = fqueue_create(); +        if (fq == NULL) +                return (void *) -1; -                ret = fevent(sched->set[i], fqs[i], &timeout); -                if (ret == -ETIMEDOUT) -                        continue; +        pthread_cleanup_push(cleanup_reader, fq); -                if (ret < 0) { -                        log_warn("Event error: %d.", ret); +        while (true) { +                int ret = fevent(sched->set[qc], fq, NULL); +                if (ret < 0)                          continue; -                } -                while ((fd = fqueue_next(fqs[i])) >= 0) { -                        if (ipcp_flow_read(fd, &sdb)) { -                                log_warn("Failed to read SDU from fd %d.", fd); +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb))                                  continue; -                        } -                        sched->callback(fd, i, sdb); +                        sched->callback(fd, qc, sdb);                  }          } @@ -106,9 +95,10 @@ static void * sdu_reader(void * o)  struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback)  { -        struct sdu_sched * sdu_sched; -        int                i; -        int                j; +        struct sdu_sched *  sdu_sched; +        struct sched_info * infos[QOS_CUBE_MAX]; +        int                 i; +        int                 j;          sdu_sched = malloc(sizeof(*sdu_sched));          if (sdu_sched == NULL) @@ -127,24 +117,61 @@ struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback)                  }          } -        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { -                if (pthread_create(&sdu_sched->sdu_readers[i], NULL, -                                   sdu_reader, sdu_sched)) { -                        int j; -                        for (j = 0; j < i; ++j) { -                                pthread_cancel(sdu_sched->sdu_readers[j]); -                                pthread_join(sdu_sched->sdu_readers[j], NULL); -                        } -                        goto fail_flow_set; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                infos[i] = malloc(sizeof(*infos[i])); +                if (infos[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                free(infos[j]); +                        goto fail_infos; +                } +                infos[i]->sch = sdu_sched; +                infos[i]->qc  = i; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                if (pthread_create(&sdu_sched->readers[i], NULL, +                                   sdu_reader, infos[i])) { +                        for (j = 0; j < i; ++j) +                                pthread_cancel(sdu_sched->readers[j]); +                        for (j = 0; j < i; ++j) +                                pthread_join(sdu_sched->readers[j], NULL); +                        goto fail_pthr;                  }          } +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                struct sched_param  par; +                int                 pol = SCHED_RR; +                int                 min; +                int                 max; + +                min = sched_get_priority_min(pol); +                max = sched_get_priority_max(pol); + +                min = (max - min) / 2; + +                par.sched_priority = min + (qos_prio[i] * (max - min) / 99); +                if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) +                        goto fail_sched; +        } +          return sdu_sched; + fail_sched: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                pthread_cancel(sdu_sched->readers[j]); +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                pthread_join(sdu_sched->readers[j], NULL); + fail_pthr: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                free(infos[j]); + fail_infos: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                fset_destroy(sdu_sched->set[j]);   fail_flow_set: -         free(sdu_sched); +        free(sdu_sched);   fail_malloc: -         return NULL; +        return NULL;  }  void sdu_sched_destroy(struct sdu_sched * sdu_sched) @@ -153,9 +180,9 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)          assert(sdu_sched); -        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { -                pthread_cancel(sdu_sched->sdu_readers[i]); -                pthread_join(sdu_sched->sdu_readers[i], NULL); +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                pthread_cancel(sdu_sched->readers[i]); +                pthread_join(sdu_sched->readers[i], NULL);          }          for (i = 0; i < QOS_CUBE_MAX; ++i) | 
