diff options
Diffstat (limited to 'src/ipcpd/unicast/psched.c')
-rw-r--r-- | src/ipcpd/unicast/psched.c | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c new file mode 100644 index 00000000..6e8c4e0e --- /dev/null +++ b/src/ipcpd/unicast/psched.c @@ -0,0 +1,239 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Packet scheduler component + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/notifier.h> + +#include "ipcp.h" +#include "psched.h" +#include "connmgr.h" + +#include <assert.h> +#include <sched.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +static int qos_prio [] = { + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, +}; + +struct psched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct psched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct psched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct psched * psched_create(next_packet_fn_t callback) +{ + struct psched * psched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + psched = malloc(sizeof(*psched)); + if (psched == NULL) + goto fail_malloc; + + psched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + psched->set[i] = fset_create(); + if (psched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(psched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = psched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&psched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(psched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(psched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(psched->readers[i], pol, &par)) + goto fail_sched; + } + + return psched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(psched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(psched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(psched->set[j]); + fail_flow_set: + free(psched); + fail_malloc: + return NULL; +} + +void psched_destroy(struct psched * psched) +{ + int i; + + assert(psched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(psched->readers[i]); + pthread_join(psched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(psched->set[i]); + + free(psched); +} + +void psched_add(struct psched * psched, + int fd) +{ + qoscube_t qc; + + assert(psched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(psched->set[qc], fd); +} + +void psched_del(struct psched * psched, + int fd) +{ + qoscube_t qc; + + assert(psched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(psched->set[qc], fd); +} |