From 2c1d03ac383c1a4380aa540a29d95c1b788d2439 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Thu, 21 Sep 2017 14:36:12 +0200 Subject: ipcpd: Use the kernel scheduler for QoS This revises the sdu_scheduler of the normal IPCP to create a scheduler thread per QoS cube and let the kernel scheduler schedule them based on a priority. Priorities can be set at build time in a range of 0-100, which will be mapped onto a suitable priority for the kernel scheduler. The current scheduler policy is fixed to SCHED_RR since it is the most suitable scheduler defined by POSIX. --- src/ipcpd/CMakeLists.txt | 20 +++++- src/ipcpd/config.h.in | 4 +- src/ipcpd/normal/sdu_sched.c | 143 +++++++++++++++++++++++++------------------ 3 files changed, 106 insertions(+), 61 deletions(-) (limited to 'src') diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 6356b1ba..a71c4e98 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -1,12 +1,28 @@ set(IPCP_ACCEPT_TIMEOUT 100 CACHE STRING "Timeout for accept in IPCP mainloop threads (ms)") -set(IPCP_SCHED_THREADS 2 CACHE STRING - "Number of scheduler threads in the normal IPCP") +set(IPCP_QOS_CUBE_BE_PRIO 0 CACHE STRING + "Priority for best effort QoS cube (0-99)") +set(IPCP_QOS_CUBE_VIDEO_PRIO 90 CACHE STRING + "Priority for video QoS cube (0-99)") +set(IPCP_QOS_CUBE_VOICE_PRIO 99 CACHE STRING + "Priority for voice QoS cube (0-99)") set(IPCP_MIN_THREADS 4 CACHE STRING "Minimum number of worker threads in the IPCP") set(IPCP_ADD_THREADS 4 CACHE STRING "Number of extra threads to start when an IPCP faces thread starvation") +if ((IPCP_QOS_CUBE_BE_PRIO LESS 0) OR (IPCP_QOS_CUBE_BE_PRIO GREATER 99)) + message(FATAL_ERROR "Invalid priority for best effort QoS cube") +endif () + +if ((IPCP_QOS_CUBE_VIDEO_PRIO LESS 0) OR (IPCP_QOS_CUBE_VIDEO_PRIO GREATER 99)) + message(FATAL_ERROR "Invalid priority for video QoS cube") +endif () + +if ((IPCP_QOS_CUBE_VOICE_PRIO LESS 0) OR (IPCP_QOS_CUBE_VOICE_PRIO GREATER 99)) + message(FATAL_ERROR "Invalid priority for voice QoS cube") +endif () + set(IPCP_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/ipcp.c diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index 6fb409b7..04be22ba 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -36,7 +36,9 @@ #define IPCP_ADD_THREADS @IPCP_ADD_THREADS@ /* normal IPCP */ -#define IPCP_SCHED_THREADS @IPCP_SCHED_THREADS@ +#define QOS_PRIO_BE @IPCP_QOS_CUBE_BE_PRIO@ +#define QOS_PRIO_VIDEO @IPCP_QOS_CUBE_VIDEO_PRIO@ +#define QOS_PRIO_VOICE @IPCP_QOS_CUBE_VOICE_PRIO@ #define PFT_SIZE @PFT_SIZE@ /* shim-udp */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 7a82a874..e5f2c701 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -24,78 +24,67 @@ #include "config.h" -#define OUROBOROS_PREFIX "sdu-scheduler" - -#include #include #include "sdu_sched.h" -#include #include +#include +#include #include +#include -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ +static int qos_prio [] = { + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE +}; struct sdu_sched { fset_t * set[QOS_CUBE_MAX]; next_sdu_fn_t callback; - pthread_t sdu_readers[IPCP_SCHED_THREADS]; + pthread_t readers[QOS_CUBE_MAX]; +}; + +struct sched_info { + struct sdu_sched * sch; + qoscube_t qc; }; static void cleanup_reader(void * o) { - int i; - fqueue_t ** fqs = (fqueue_t **) o; - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fqueue_destroy(fqs[i]); + fqueue_destroy((fqueue_t *) o); } static void * sdu_reader(void * o) { struct sdu_sched * sched; struct shm_du_buff * sdb; - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; int fd; - int i = 0; - int ret; - fqueue_t * fqs[QOS_CUBE_MAX]; + fqueue_t * fq; + qoscube_t qc; - sched = (struct sdu_sched *) o; + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fqs[i] = fqueue_create(); - if (fqs[i] == NULL) { - int j; - for (j = 0; j < i; ++j) - fqueue_destroy(fqs[j]); - return (void *) -1; - } - } - - pthread_cleanup_push(cleanup_reader, fqs); + free(o); - while (true) { - /* FIXME: replace with scheduling policy call */ - i = (i + 1) % QOS_CUBE_MAX; + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; - ret = fevent(sched->set[i], fqs[i], &timeout); - if (ret == -ETIMEDOUT) - continue; + pthread_cleanup_push(cleanup_reader, fq); - if (ret < 0) { - log_warn("Event error: %d.", ret); + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) continue; - } - while ((fd = fqueue_next(fqs[i])) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_warn("Failed to read SDU from fd %d.", fd); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) continue; - } - sched->callback(fd, i, sdb); + sched->callback(fd, qc, sdb); } } @@ -106,9 +95,10 @@ static void * sdu_reader(void * o) struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) { - struct sdu_sched * sdu_sched; - int i; - int j; + struct sdu_sched * sdu_sched; + struct sched_info * infos[QOS_CUBE_MAX]; + int i; + int j; sdu_sched = malloc(sizeof(*sdu_sched)); if (sdu_sched == NULL) @@ -127,24 +117,61 @@ struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) } } - for (i = 0; i < IPCP_SCHED_THREADS; ++i) { - if (pthread_create(&sdu_sched->sdu_readers[i], NULL, - sdu_reader, sdu_sched)) { - int j; - for (j = 0; j < i; ++j) { - pthread_cancel(sdu_sched->sdu_readers[j]); - pthread_join(sdu_sched->sdu_readers[j], NULL); - } - goto fail_flow_set; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = sdu_sched; + infos[i]->qc = i; + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + if (pthread_create(&sdu_sched->readers[i], NULL, + sdu_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(sdu_sched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(sdu_sched->readers[j], NULL); + goto fail_pthr; } } + for (i = 0; i < QOS_CUBE_MAX; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + (qos_prio[i] * (max - min) / 99); + if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) + goto fail_sched; + } + return sdu_sched; + fail_sched: + for (j = 0; j < QOS_CUBE_MAX; ++j) + pthread_cancel(sdu_sched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX; ++j) + pthread_join(sdu_sched->readers[j], NULL); + fail_pthr: + for (j = 0; j < QOS_CUBE_MAX; ++j) + free(infos[j]); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(sdu_sched->set[j]); fail_flow_set: - free(sdu_sched); + free(sdu_sched); fail_malloc: - return NULL; + return NULL; } void sdu_sched_destroy(struct sdu_sched * sdu_sched) @@ -153,9 +180,9 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched) assert(sdu_sched); - for (i = 0; i < IPCP_SCHED_THREADS; ++i) { - pthread_cancel(sdu_sched->sdu_readers[i]); - pthread_join(sdu_sched->sdu_readers[i], NULL); + for (i = 0; i < QOS_CUBE_MAX; ++i) { + pthread_cancel(sdu_sched->readers[i]); + pthread_join(sdu_sched->readers[i], NULL); } for (i = 0; i < QOS_CUBE_MAX; ++i) -- cgit v1.2.3