summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/psched.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/psched.c')
-rw-r--r--src/ipcpd/unicast/psched.c239
1 files changed, 239 insertions, 0 deletions
diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c
new file mode 100644
index 00000000..6e8c4e0e
--- /dev/null
+++ b/src/ipcpd/unicast/psched.c
@@ -0,0 +1,239 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2019
+ *
+ * Packet scheduler component
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/notifier.h>
+
+#include "ipcp.h"
+#include "psched.h"
+#include "connmgr.h"
+
+#include <assert.h>
+#include <sched.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+static int qos_prio [] = {
+ QOS_PRIO_BE,
+ QOS_PRIO_VIDEO,
+ QOS_PRIO_VOICE,
+};
+
+struct psched {
+ fset_t * set[QOS_CUBE_MAX];
+ next_packet_fn_t callback;
+ pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
+};
+
+struct sched_info {
+ struct psched * sch;
+ qoscube_t qc;
+};
+
+static void cleanup_reader(void * o)
+{
+ fqueue_destroy((fqueue_t *) o);
+}
+
+static void * packet_reader(void * o)
+{
+ struct psched * sched;
+ struct shm_du_buff * sdb;
+ int fd;
+ fqueue_t * fq;
+ qoscube_t qc;
+
+ sched = ((struct sched_info *) o)->sch;
+ qc = ((struct sched_info *) o)->qc;
+
+ ipcp_lock_to_core();
+
+ free(o);
+
+ fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) -1;
+
+ pthread_cleanup_push(cleanup_reader, fq);
+
+ while (true) {
+ int ret = fevent(sched->set[qc], fq, NULL);
+ if (ret < 0)
+ continue;
+
+ while ((fd = fqueue_next(fq)) >= 0) {
+ switch (fqueue_type(fq)) {
+ case FLOW_DEALLOC:
+ notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd);
+ break;
+ case FLOW_DOWN:
+ notifier_event(NOTIFY_DT_FLOW_DOWN, &fd);
+ break;
+ case FLOW_UP:
+ notifier_event(NOTIFY_DT_FLOW_UP, &fd);
+ break;
+ case FLOW_PKT:
+ if (ipcp_flow_read(fd, &sdb))
+ continue;
+
+ sched->callback(fd, qc, sdb);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ pthread_cleanup_pop(true);
+
+ return (void *) 0;
+}
+
+struct psched * psched_create(next_packet_fn_t callback)
+{
+ struct psched * psched;
+ struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
+ int i;
+ int j;
+
+ assert(callback);
+
+ psched = malloc(sizeof(*psched));
+ if (psched == NULL)
+ goto fail_malloc;
+
+ psched->callback = callback;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ psched->set[i] = fset_create();
+ if (psched->set[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ fset_destroy(psched->set[j]);
+ goto fail_flow_set;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ infos[i] = malloc(sizeof(*infos[i]));
+ if (infos[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ free(infos[j]);
+ goto fail_infos;
+ }
+ infos[i]->sch = psched;
+ infos[i]->qc = i % QOS_CUBE_MAX;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ if (pthread_create(&psched->readers[i], NULL,
+ packet_reader, infos[i])) {
+ for (j = 0; j < i; ++j)
+ pthread_cancel(psched->readers[j]);
+ for (j = 0; j < i; ++j)
+ pthread_join(psched->readers[j], NULL);
+ for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j)
+ free(infos[i]);
+ goto fail_infos;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ struct sched_param par;
+ int pol = SCHED_RR;
+ int min;
+ int max;
+
+ min = sched_get_priority_min(pol);
+ max = sched_get_priority_max(pol);
+
+ min = (max - min) / 2;
+
+ par.sched_priority = min +
+ (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99);
+
+ if (pthread_setschedparam(psched->readers[i], pol, &par))
+ goto fail_sched;
+ }
+
+ return psched;
+
+ fail_sched:
+ for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j)
+ pthread_cancel(psched->readers[j]);
+ for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j)
+ pthread_join(psched->readers[j], NULL);
+ fail_infos:
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ fset_destroy(psched->set[j]);
+ fail_flow_set:
+ free(psched);
+ fail_malloc:
+ return NULL;
+}
+
+void psched_destroy(struct psched * psched)
+{
+ int i;
+
+ assert(psched);
+
+ for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) {
+ pthread_cancel(psched->readers[i]);
+ pthread_join(psched->readers[i], NULL);
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ fset_destroy(psched->set[i]);
+
+ free(psched);
+}
+
+void psched_add(struct psched * psched,
+ int fd)
+{
+ qoscube_t qc;
+
+ assert(psched);
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ fset_add(psched->set[qc], fd);
+}
+
+void psched_del(struct psched * psched,
+ int fd)
+{
+ qoscube_t qc;
+
+ assert(psched);
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ fset_del(psched->set[qc], fd);
+}