summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt1
-rw-r--r--src/ipcpd/normal/fmgr.c193
-rw-r--r--src/ipcpd/normal/sdu_sched.c131
-rw-r--r--src/ipcpd/normal/sdu_sched.h38
4 files changed, 236 insertions, 127 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 69615d0c..2045b8df 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -32,6 +32,7 @@ set(SOURCE_FILES
pff.c
ribmgr.c
routing.c
+ sdu_sched.c
shm_pci.c
# Add policies last
pol/complete.c
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index ba36812f..d055b311 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -42,6 +42,7 @@
#include "neighbors.h"
#include "gam.h"
#include "routing.h"
+#include "sdu_sched.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -55,18 +56,15 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
struct {
- flow_set_t * np1_set[QOS_CUBE_MAX];
- fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
-
cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
- pthread_t np1_sdu_reader;
+ flow_set_t * np1_set[QOS_CUBE_MAX];
+ struct sdu_sched * np1_sdu_sched;
flow_set_t * nm1_set[QOS_CUBE_MAX];
- fqueue_t * nm1_fqs[QOS_CUBE_MAX];
- pthread_t nm1_sdu_reader;
+ struct sdu_sched * nm1_sdu_sched;
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
@@ -102,125 +100,70 @@ static int fmgr_neighbor_event(enum nb_event event,
return 0;
}
-static void * fmgr_np1_sdu_reader(void * o)
+static int np1_sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
{
- struct shm_du_buff * sdb;
- struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- int fd;
- int i = 0;
- int ret;
-
- (void) o;
-
- while (true) {
- /* FIXME: replace with scheduling policy call */
- i = (i + 1) % QOS_CUBE_MAX;
-
- ret = flow_event_wait(fmgr.np1_set[i],
- fmgr.np1_fqs[i],
- &timeout);
- if (ret == -ETIMEDOUT)
- continue;
-
- if (ret < 0) {
- log_warn("Event error: %d.", ret);
- continue;
- }
-
- while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {
- if (ipcp_flow_read(fd, &sdb)) {
- log_warn("Failed to read SDU from fd %d.", fd);
- continue;
- }
+ (void) qc;
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
-
- if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- log_warn("Failed to hand SDU to FRCT.");
- continue;
- }
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- }
+ if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ log_warn("Failed to hand SDU to FRCT.");
+ return -1;
}
- return (void *) 0;
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
+ return 0;
}
-void * fmgr_nm1_sdu_reader(void * o)
+static int nm1_sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
{
- struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- struct shm_du_buff * sdb;
- struct pci pci;
- int fd;
- int i = 0;
- int ret;
-
- (void) o;
+ struct pci pci;
memset(&pci, 0, sizeof(pci));
- while (true) {
- /* FIXME: replace with scheduling policy call */
- i = (i + 1) % QOS_CUBE_MAX;
+ shm_pci_des(sdb, &pci);
+
+ if (pci.dst_addr != ipcpi.dt_addr) {
+ if (pci.ttl == 0) {
+ log_dbg("TTL was zero.");
+ ipcp_flow_del(sdb);
+ return 0;
+ }
+
+ pff_lock(fmgr.pff[qc]);
+
+ fd = pff_nhop(fmgr.pff[qc], pci.dst_addr);
+ if (fd < 0) {
+ pff_unlock(fmgr.pff[qc]);
+ log_err("No next hop for %" PRIu64, pci.dst_addr);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
- ret = flow_event_wait(fmgr.nm1_set[i],
- fmgr.nm1_fqs[i],
- &timeout);
- if (ret == -ETIMEDOUT)
- continue;
+ pff_unlock(fmgr.pff[qc]);
- if (ret < 0) {
- log_err("Event error: %d.", ret);
- continue;
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
+ ipcp_flow_del(sdb);
+ return -1;
}
+ } else {
+ shm_pci_shrink(sdb);
- while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {
- if (ipcp_flow_read(fd, &sdb)) {
- log_err("Failed to read SDU from fd %d.", fd);
- continue;
- }
-
- shm_pci_des(sdb, &pci);
-
- if (pci.dst_addr != ipcpi.dt_addr) {
- if (pci.ttl == 0) {
- log_dbg("TTL was zero.");
- ipcp_flow_del(sdb);
- continue;
- }
-
- pff_lock(fmgr.pff[i]);
- fd = pff_nhop(fmgr.pff[i], pci.dst_addr);
- if (fd < 0) {
- pff_unlock(fmgr.pff[i]);
- log_err("No next hop for %" PRIu64,
- pci.dst_addr);
- ipcp_flow_del(sdb);
- continue;
- }
- pff_unlock(fmgr.pff[i]);
-
- if (ipcp_flow_write(fd, sdb)) {
- log_err("Failed to write SDU to fd %d.",
- fd);
- ipcp_flow_del(sdb);
- continue;
- }
- } else {
- shm_pci_shrink(sdb);
-
- if (frct_nm1_post_sdu(&pci, sdb)) {
- log_err("Failed to hand PDU to FRCT.");
- continue;
- }
- }
+ if (frct_nm1_post_sdu(&pci, sdb)) {
+ log_err("Failed to hand PDU to FRCT.");
+ return -1;
}
}
- return (void *) 0;
+ return 0;
}
static void fmgr_destroy_flows(void)
@@ -230,8 +173,6 @@ static void fmgr_destroy_flows(void)
for (i = 0; i < QOS_CUBE_MAX; ++i) {
flow_set_destroy(fmgr.nm1_set[i]);
flow_set_destroy(fmgr.np1_set[i]);
- fqueue_destroy(fmgr.nm1_fqs[i]);
- fqueue_destroy(fmgr.np1_fqs[i]);
}
}
@@ -270,23 +211,11 @@ int fmgr_init(void)
return -1;
}
- fmgr.np1_fqs[i] = fqueue_create();
- if (fmgr.np1_fqs[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
-
fmgr.nm1_set[i] = flow_set_create();
if (fmgr.nm1_set[i] == NULL) {
fmgr_destroy_flows();
return -1;
}
-
- fmgr.nm1_fqs[i] = fqueue_create();
- if (fmgr.nm1_fqs[i] == NULL) {
- fmgr_destroy_flows();
- return -1;
- }
}
if (shm_pci_init()) {
@@ -410,19 +339,29 @@ int fmgr_start(void)
return -1;
}
- pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
- pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
+ fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler);
+ if (fmgr.nm1_sdu_sched == NULL) {
+ log_err("Failed to create N-1 SDU scheduler.");
+ gam_destroy(fmgr.gam);
+ return -1;
+ }
+
+ fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler);
+ if (fmgr.np1_sdu_sched == NULL) {
+ log_err("Failed to create N+1 SDU scheduler.");
+ sdu_sched_destroy(fmgr.nm1_sdu_sched);
+ gam_destroy(fmgr.gam);
+ return -1;
+ }
return 0;
}
void fmgr_stop(void)
{
- pthread_cancel(fmgr.np1_sdu_reader);
- pthread_cancel(fmgr.nm1_sdu_reader);
+ sdu_sched_destroy(fmgr.np1_sdu_sched);
- pthread_join(fmgr.np1_sdu_reader, NULL);
- pthread_join(fmgr.nm1_sdu_reader, NULL);
+ sdu_sched_destroy(fmgr.nm1_sdu_sched);
gam_destroy(fmgr.gam);
}
diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c
new file mode 100644
index 00000000..14fc672f
--- /dev/null
+++ b/src/ipcpd/normal/sdu_sched.c
@@ -0,0 +1,131 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * SDU scheduler component
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "sdu-scheduler"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/errno.h>
+
+#include "sdu_sched.h"
+
+#include <stdbool.h>
+#include <assert.h>
+#include <stdlib.h>
+
+#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
+
+struct sdu_sched {
+ flow_set_t * set[QOS_CUBE_MAX];
+ fqueue_t * fqs[QOS_CUBE_MAX];
+ next_sdu_t callback;
+ pthread_t sdu_reader;
+};
+
+static void * sdu_reader(void * o)
+{
+ struct sdu_sched * sched;
+ struct shm_du_buff * sdb;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ int fd;
+ int i = 0;
+ int ret;
+
+ sched = (struct sdu_sched *) o;
+
+ while (true) {
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
+
+ if (ret < 0) {
+ log_warn("Event error: %d.", ret);
+ continue;
+ }
+
+ while ((fd = fqueue_next(sched->fqs[i])) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ log_warn("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
+
+ if (sched->callback(fd, i, sdb)) {
+ log_warn("Callback reported an error.");
+ continue;
+ }
+ }
+ }
+
+ return (void *) 0;
+}
+
+struct sdu_sched * sdu_sched_create(flow_set_t * set[QOS_CUBE_MAX],
+ next_sdu_t callback)
+{
+ struct sdu_sched * sdu_sched;
+ int i;
+ int j;
+
+ sdu_sched = malloc(sizeof(*sdu_sched));
+ if (sdu_sched == NULL)
+ return NULL;
+
+ sdu_sched->callback = callback;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ sdu_sched->set[i] = set[i];
+
+ sdu_sched->fqs[i] = fqueue_create();
+ if (sdu_sched->fqs[i] == NULL) {
+ for (j = i; j >= 0; --j)
+ fqueue_destroy(sdu_sched->fqs[j]);
+ free(sdu_sched);
+ return NULL;
+ }
+ }
+
+ pthread_create(&sdu_sched->sdu_reader,
+ NULL,
+ sdu_reader,
+ (void *) sdu_sched);
+
+ return sdu_sched;
+}
+
+void sdu_sched_destroy(struct sdu_sched * sdu_sched)
+{
+ int i;
+
+ assert(sdu_sched);
+
+ pthread_cancel(sdu_sched->sdu_reader);
+
+ pthread_join(sdu_sched->sdu_reader, NULL);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ fqueue_destroy(sdu_sched->fqs[i]);
+
+ free(sdu_sched);
+}
diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h
new file mode 100644
index 00000000..1a22b041
--- /dev/null
+++ b/src/ipcpd/normal/sdu_sched.h
@@ -0,0 +1,38 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * SDU scheduler component
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H
+#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H
+
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fqueue.h>
+
+typedef int (* next_sdu_t)(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb);
+
+struct sdu_sched * sdu_sched_create(flow_set_t * set[QOS_CUBE_MAX],
+ next_sdu_t callback);
+
+void sdu_sched_destroy(struct sdu_sched * sdu_sched);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */