summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/psched.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2018-10-05 10:24:02 +0200
committerDimitri Staessens <dimitri.staessens@ugent.be>2018-10-05 10:36:05 +0200
commit8bd27921e6cd46cdcc191c9d98a7f93bb7fe5360 (patch)
tree6c047b44608d31f54728f0e203ba2820c97f50b9 /src/ipcpd/normal/psched.c
parent5d11a6ad590133c92925c6162eb47b4401f16bef (diff)
downloadouroboros-8bd27921e6cd46cdcc191c9d98a7f93bb7fe5360.tar.gz
ouroboros-8bd27921e6cd46cdcc191c9d98a7f93bb7fe5360.zip
ipcpd: Shorten packet_scheduler to psched
This shortens packet_scheduler to psched, which results in more readable code. Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be> Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Diffstat (limited to 'src/ipcpd/normal/psched.c')
-rw-r--r--src/ipcpd/normal/psched.c241
1 files changed, 241 insertions, 0 deletions
diff --git a/src/ipcpd/normal/psched.c b/src/ipcpd/normal/psched.c
new file mode 100644
index 00000000..27e5f1de
--- /dev/null
+++ b/src/ipcpd/normal/psched.c
@@ -0,0 +1,241 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2018
+ *
+ * Packet scheduler component
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/notifier.h>
+
+#include "ipcp.h"
+#include "psched.h"
+#include "connmgr.h"
+
+#include <assert.h>
+#include <sched.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+static int qos_prio [] = {
+ QOS_PRIO_RAW,
+ QOS_PRIO_BE,
+ QOS_PRIO_VIDEO,
+ QOS_PRIO_VOICE,
+ QOS_PRIO_DATA
+};
+
+struct psched {
+ fset_t * set[QOS_CUBE_MAX];
+ next_packet_fn_t callback;
+ pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
+};
+
+struct sched_info {
+ struct psched * sch;
+ qoscube_t qc;
+};
+
+static void cleanup_reader(void * o)
+{
+ fqueue_destroy((fqueue_t *) o);
+}
+
+static void * packet_reader(void * o)
+{
+ struct psched * sched;
+ struct shm_du_buff * sdb;
+ int fd;
+ fqueue_t * fq;
+ qoscube_t qc;
+
+ sched = ((struct sched_info *) o)->sch;
+ qc = ((struct sched_info *) o)->qc;
+
+ ipcp_lock_to_core();
+
+ free(o);
+
+ fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) -1;
+
+ pthread_cleanup_push(cleanup_reader, fq);
+
+ while (true) {
+ int ret = fevent(sched->set[qc], fq, NULL);
+ if (ret < 0)
+ continue;
+
+ while ((fd = fqueue_next(fq)) >= 0) {
+ switch (fqueue_type(fq)) {
+ case FLOW_DEALLOC:
+ notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd);
+ break;
+ case FLOW_DOWN:
+ notifier_event(NOTIFY_DT_FLOW_DOWN, &fd);
+ break;
+ case FLOW_UP:
+ notifier_event(NOTIFY_DT_FLOW_UP, &fd);
+ break;
+ case FLOW_PKT:
+ if (ipcp_flow_read(fd, &sdb))
+ continue;
+
+ sched->callback(fd, qc, sdb);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ pthread_cleanup_pop(true);
+
+ return (void *) 0;
+}
+
+struct psched * psched_create(next_packet_fn_t callback)
+{
+ struct psched * psched;
+ struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
+ int i;
+ int j;
+
+ assert(callback);
+
+ psched = malloc(sizeof(*psched));
+ if (psched == NULL)
+ goto fail_malloc;
+
+ psched->callback = callback;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ psched->set[i] = fset_create();
+ if (psched->set[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ fset_destroy(psched->set[j]);
+ goto fail_flow_set;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ infos[i] = malloc(sizeof(*infos[i]));
+ if (infos[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ free(infos[j]);
+ goto fail_infos;
+ }
+ infos[i]->sch = psched;
+ infos[i]->qc = i % QOS_CUBE_MAX;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ if (pthread_create(&psched->readers[i], NULL,
+ packet_reader, infos[i])) {
+ for (j = 0; j < i; ++j)
+ pthread_cancel(psched->readers[j]);
+ for (j = 0; j < i; ++j)
+ pthread_join(psched->readers[j], NULL);
+ for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j)
+ free(infos[i]);
+ goto fail_infos;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ struct sched_param par;
+ int pol = SCHED_RR;
+ int min;
+ int max;
+
+ min = sched_get_priority_min(pol);
+ max = sched_get_priority_max(pol);
+
+ min = (max - min) / 2;
+
+ par.sched_priority = min +
+ (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99);
+
+ if (pthread_setschedparam(psched->readers[i], pol, &par))
+ goto fail_sched;
+ }
+
+ return psched;
+
+ fail_sched:
+ for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j)
+ pthread_cancel(psched->readers[j]);
+ for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j)
+ pthread_join(psched->readers[j], NULL);
+ fail_infos:
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ fset_destroy(psched->set[j]);
+ fail_flow_set:
+ free(psched);
+ fail_malloc:
+ return NULL;
+}
+
+void psched_destroy(struct psched * psched)
+{
+ int i;
+
+ assert(psched);
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ pthread_cancel(psched->readers[i]);
+ pthread_join(psched->readers[i], NULL);
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ fset_destroy(psched->set[i]);
+
+ free(psched);
+}
+
+void psched_add(struct psched * psched,
+ int fd)
+{
+ qoscube_t qc;
+
+ assert(psched);
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ fset_add(psched->set[qc], fd);
+}
+
+void psched_del(struct psched * psched,
+ int fd)
+{
+ qoscube_t qc;
+
+ assert(psched);
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ fset_del(psched->set[qc], fd);
+}