diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/CMakeLists.txt | 20 | ||||
| -rw-r--r-- | src/ipcpd/config.h.in | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 143 | 
3 files changed, 106 insertions, 61 deletions
| diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 6356b1ba..a71c4e98 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -1,12 +1,28 @@  set(IPCP_ACCEPT_TIMEOUT 100 CACHE STRING    "Timeout for accept in IPCP mainloop threads (ms)") -set(IPCP_SCHED_THREADS 2 CACHE STRING -  "Number of scheduler threads in the normal IPCP") +set(IPCP_QOS_CUBE_BE_PRIO 0 CACHE STRING +  "Priority for best effort QoS cube (0-99)") +set(IPCP_QOS_CUBE_VIDEO_PRIO 90 CACHE STRING +  "Priority for video QoS cube (0-99)") +set(IPCP_QOS_CUBE_VOICE_PRIO 99 CACHE STRING +  "Priority for voice QoS cube (0-99)")  set(IPCP_MIN_THREADS 4 CACHE STRING    "Minimum number of worker threads in the IPCP")  set(IPCP_ADD_THREADS 4 CACHE STRING    "Number of extra threads to start when an IPCP faces thread starvation") +if ((IPCP_QOS_CUBE_BE_PRIO LESS 0) OR (IPCP_QOS_CUBE_BE_PRIO GREATER 99)) +  message(FATAL_ERROR "Invalid priority for best effort QoS cube") +endif () + +if ((IPCP_QOS_CUBE_VIDEO_PRIO LESS 0) OR (IPCP_QOS_CUBE_VIDEO_PRIO GREATER 99)) +  message(FATAL_ERROR "Invalid priority for video QoS cube") +endif () + +if ((IPCP_QOS_CUBE_VOICE_PRIO LESS 0) OR (IPCP_QOS_CUBE_VOICE_PRIO GREATER 99)) +  message(FATAL_ERROR "Invalid priority for voice QoS cube") +endif () +  set(IPCP_SOURCES    # Add source files here    ${CMAKE_CURRENT_SOURCE_DIR}/ipcp.c diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index 6fb409b7..04be22ba 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -36,7 +36,9 @@  #define IPCP_ADD_THREADS    @IPCP_ADD_THREADS@  /* normal IPCP */ -#define IPCP_SCHED_THREADS  @IPCP_SCHED_THREADS@ +#define QOS_PRIO_BE         @IPCP_QOS_CUBE_BE_PRIO@ +#define QOS_PRIO_VIDEO      @IPCP_QOS_CUBE_VIDEO_PRIO@ +#define QOS_PRIO_VOICE      @IPCP_QOS_CUBE_VOICE_PRIO@  #define PFT_SIZE            @PFT_SIZE@  /* shim-udp */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 7a82a874..e5f2c701 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -24,78 +24,67 @@  #include "config.h" -#define OUROBOROS_PREFIX "sdu-scheduler" - -#include <ouroboros/logs.h>  #include <ouroboros/errno.h>  #include "sdu_sched.h" -#include <stdbool.h>  #include <assert.h> +#include <sched.h> +#include <stdbool.h>  #include <stdlib.h> +#include <string.h> -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ +static int qos_prio [] = { +        QOS_PRIO_BE, +        QOS_PRIO_VIDEO, +        QOS_PRIO_VOICE +};  struct sdu_sched {          fset_t *      set[QOS_CUBE_MAX];          next_sdu_fn_t callback; -        pthread_t     sdu_readers[IPCP_SCHED_THREADS]; +        pthread_t     readers[QOS_CUBE_MAX]; +}; + +struct sched_info { +        struct sdu_sched * sch; +        qoscube_t          qc;  };  static void cleanup_reader(void * o)  { -        int         i; -        fqueue_t ** fqs = (fqueue_t **) o; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                fqueue_destroy(fqs[i]); +        fqueue_destroy((fqueue_t *) o);  }  static void * sdu_reader(void * o)  {          struct sdu_sched *   sched;          struct shm_du_buff * sdb; -        struct timespec      timeout = {0, FD_UPDATE_TIMEOUT};          int                  fd; -        int                  i = 0; -        int                  ret; -        fqueue_t *           fqs[QOS_CUBE_MAX]; +        fqueue_t *           fq; +        qoscube_t            qc; -        sched = (struct sdu_sched *) o; +        sched = ((struct sched_info *) o)->sch; +        qc    = ((struct sched_info *) o)->qc; -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fqs[i] = fqueue_create(); -                if (fqs[i] == NULL) { -                        int j; -                        for (j = 0; j < i; ++j) -                                fqueue_destroy(fqs[j]); -                        return (void *) -1; -                } -        } - -        pthread_cleanup_push(cleanup_reader, fqs); +        free(o); -        while (true) { -                /* FIXME: replace with scheduling policy call */ -                i = (i + 1) % QOS_CUBE_MAX; +        fq = fqueue_create(); +        if (fq == NULL) +                return (void *) -1; -                ret = fevent(sched->set[i], fqs[i], &timeout); -                if (ret == -ETIMEDOUT) -                        continue; +        pthread_cleanup_push(cleanup_reader, fq); -                if (ret < 0) { -                        log_warn("Event error: %d.", ret); +        while (true) { +                int ret = fevent(sched->set[qc], fq, NULL); +                if (ret < 0)                          continue; -                } -                while ((fd = fqueue_next(fqs[i])) >= 0) { -                        if (ipcp_flow_read(fd, &sdb)) { -                                log_warn("Failed to read SDU from fd %d.", fd); +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb))                                  continue; -                        } -                        sched->callback(fd, i, sdb); +                        sched->callback(fd, qc, sdb);                  }          } @@ -106,9 +95,10 @@ static void * sdu_reader(void * o)  struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback)  { -        struct sdu_sched * sdu_sched; -        int                i; -        int                j; +        struct sdu_sched *  sdu_sched; +        struct sched_info * infos[QOS_CUBE_MAX]; +        int                 i; +        int                 j;          sdu_sched = malloc(sizeof(*sdu_sched));          if (sdu_sched == NULL) @@ -127,24 +117,61 @@ struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback)                  }          } -        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { -                if (pthread_create(&sdu_sched->sdu_readers[i], NULL, -                                   sdu_reader, sdu_sched)) { -                        int j; -                        for (j = 0; j < i; ++j) { -                                pthread_cancel(sdu_sched->sdu_readers[j]); -                                pthread_join(sdu_sched->sdu_readers[j], NULL); -                        } -                        goto fail_flow_set; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                infos[i] = malloc(sizeof(*infos[i])); +                if (infos[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                free(infos[j]); +                        goto fail_infos; +                } +                infos[i]->sch = sdu_sched; +                infos[i]->qc  = i; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                if (pthread_create(&sdu_sched->readers[i], NULL, +                                   sdu_reader, infos[i])) { +                        for (j = 0; j < i; ++j) +                                pthread_cancel(sdu_sched->readers[j]); +                        for (j = 0; j < i; ++j) +                                pthread_join(sdu_sched->readers[j], NULL); +                        goto fail_pthr;                  }          } +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                struct sched_param  par; +                int                 pol = SCHED_RR; +                int                 min; +                int                 max; + +                min = sched_get_priority_min(pol); +                max = sched_get_priority_max(pol); + +                min = (max - min) / 2; + +                par.sched_priority = min + (qos_prio[i] * (max - min) / 99); +                if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) +                        goto fail_sched; +        } +          return sdu_sched; + fail_sched: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                pthread_cancel(sdu_sched->readers[j]); +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                pthread_join(sdu_sched->readers[j], NULL); + fail_pthr: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                free(infos[j]); + fail_infos: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                fset_destroy(sdu_sched->set[j]);   fail_flow_set: -         free(sdu_sched); +        free(sdu_sched);   fail_malloc: -         return NULL; +        return NULL;  }  void sdu_sched_destroy(struct sdu_sched * sdu_sched) @@ -153,9 +180,9 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched)          assert(sdu_sched); -        for (i = 0; i < IPCP_SCHED_THREADS; ++i) { -                pthread_cancel(sdu_sched->sdu_readers[i]); -                pthread_join(sdu_sched->sdu_readers[i], NULL); +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                pthread_cancel(sdu_sched->readers[i]); +                pthread_join(sdu_sched->readers[i], NULL);          }          for (i = 0; i < QOS_CUBE_MAX; ++i) | 
