From fa70f932b4a6ad5849e1fd68906157619c09341f Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 19 Apr 2017 18:36:59 +0200 Subject: ipcpd: normal: Extract SDU scheduling component This extracts the SDU scheduling component out of the Flow Manager since the functionality was duplicated. For both the N-1 and N+1 flow sets an SDU scheduling component is now created. --- src/ipcpd/normal/CMakeLists.txt | 1 + src/ipcpd/normal/fmgr.c | 193 ++++++++++++++-------------------------- src/ipcpd/normal/sdu_sched.c | 131 +++++++++++++++++++++++++++ src/ipcpd/normal/sdu_sched.h | 38 ++++++++ 4 files changed, 236 insertions(+), 127 deletions(-) create mode 100644 src/ipcpd/normal/sdu_sched.c create mode 100644 src/ipcpd/normal/sdu_sched.h diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 69615d0c..2045b8df 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES pff.c ribmgr.c routing.c + sdu_sched.c shm_pci.c # Add policies last pol/complete.c diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index ba36812f..d055b311 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -42,6 +42,7 @@ #include "neighbors.h" #include "gam.h" #include "routing.h" +#include "sdu_sched.h" #include #include @@ -55,18 +56,15 @@ typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ struct { - flow_set_t * np1_set[QOS_CUBE_MAX]; - fqueue_t * np1_fqs[QOS_CUBE_MAX]; pthread_rwlock_t np1_flows_lock; - cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - pthread_t np1_sdu_reader; + flow_set_t * np1_set[QOS_CUBE_MAX]; + struct sdu_sched * np1_sdu_sched; flow_set_t * nm1_set[QOS_CUBE_MAX]; - fqueue_t * nm1_fqs[QOS_CUBE_MAX]; - pthread_t nm1_sdu_reader; + struct sdu_sched * nm1_sdu_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -102,125 +100,70 @@ static int fmgr_neighbor_event(enum nb_event event, return 0; } -static void * fmgr_np1_sdu_reader(void * o) +static int np1_sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { - struct shm_du_buff * sdb; - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - int fd; - int i = 0; - int ret; - - (void) o; - - while (true) { - /* FIXME: replace with scheduling policy call */ - i = (i + 1) % QOS_CUBE_MAX; - - ret = flow_event_wait(fmgr.np1_set[i], - fmgr.np1_fqs[i], - &timeout); - if (ret == -ETIMEDOUT) - continue; - - if (ret < 0) { - log_warn("Event error: %d.", ret); - continue; - } - - while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_warn("Failed to read SDU from fd %d.", fd); - continue; - } + (void) qc; - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - - if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - log_warn("Failed to hand SDU to FRCT."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - } + if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + log_warn("Failed to hand SDU to FRCT."); + return -1; } - return (void *) 0; + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + + return 0; } -void * fmgr_nm1_sdu_reader(void * o) +static int nm1_sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct shm_du_buff * sdb; - struct pci pci; - int fd; - int i = 0; - int ret; - - (void) o; + struct pci pci; memset(&pci, 0, sizeof(pci)); - while (true) { - /* FIXME: replace with scheduling policy call */ - i = (i + 1) % QOS_CUBE_MAX; + shm_pci_des(sdb, &pci); + + if (pci.dst_addr != ipcpi.dt_addr) { + if (pci.ttl == 0) { + log_dbg("TTL was zero."); + ipcp_flow_del(sdb); + return 0; + } + + pff_lock(fmgr.pff[qc]); + + fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); + if (fd < 0) { + pff_unlock(fmgr.pff[qc]); + log_err("No next hop for %" PRIu64, pci.dst_addr); + ipcp_flow_del(sdb); + return -1; + } - ret = flow_event_wait(fmgr.nm1_set[i], - fmgr.nm1_fqs[i], - &timeout); - if (ret == -ETIMEDOUT) - continue; + pff_unlock(fmgr.pff[qc]); - if (ret < 0) { - log_err("Event error: %d.", ret); - continue; + if (ipcp_flow_write(fd, sdb)) { + log_err("Failed to write SDU to fd %d.", fd); + ipcp_flow_del(sdb); + return -1; } + } else { + shm_pci_shrink(sdb); - while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_err("Failed to read SDU from fd %d.", fd); - continue; - } - - shm_pci_des(sdb, &pci); - - if (pci.dst_addr != ipcpi.dt_addr) { - if (pci.ttl == 0) { - log_dbg("TTL was zero."); - ipcp_flow_del(sdb); - continue; - } - - pff_lock(fmgr.pff[i]); - fd = pff_nhop(fmgr.pff[i], pci.dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[i]); - log_err("No next hop for %" PRIu64, - pci.dst_addr); - ipcp_flow_del(sdb); - continue; - } - pff_unlock(fmgr.pff[i]); - - if (ipcp_flow_write(fd, sdb)) { - log_err("Failed to write SDU to fd %d.", - fd); - ipcp_flow_del(sdb); - continue; - } - } else { - shm_pci_shrink(sdb); - - if (frct_nm1_post_sdu(&pci, sdb)) { - log_err("Failed to hand PDU to FRCT."); - continue; - } - } + if (frct_nm1_post_sdu(&pci, sdb)) { + log_err("Failed to hand PDU to FRCT."); + return -1; } } - return (void *) 0; + return 0; } static void fmgr_destroy_flows(void) @@ -230,8 +173,6 @@ static void fmgr_destroy_flows(void) for (i = 0; i < QOS_CUBE_MAX; ++i) { flow_set_destroy(fmgr.nm1_set[i]); flow_set_destroy(fmgr.np1_set[i]); - fqueue_destroy(fmgr.nm1_fqs[i]); - fqueue_destroy(fmgr.np1_fqs[i]); } } @@ -270,23 +211,11 @@ int fmgr_init(void) return -1; } - fmgr.np1_fqs[i] = fqueue_create(); - if (fmgr.np1_fqs[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } - fmgr.nm1_set[i] = flow_set_create(); if (fmgr.nm1_set[i] == NULL) { fmgr_destroy_flows(); return -1; } - - fmgr.nm1_fqs[i] = fqueue_create(); - if (fmgr.nm1_fqs[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } } if (shm_pci_init()) { @@ -410,19 +339,29 @@ int fmgr_start(void) return -1; } - pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); - pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); + fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); + if (fmgr.nm1_sdu_sched == NULL) { + log_err("Failed to create N-1 SDU scheduler."); + gam_destroy(fmgr.gam); + return -1; + } + + fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); + if (fmgr.np1_sdu_sched == NULL) { + log_err("Failed to create N+1 SDU scheduler."); + sdu_sched_destroy(fmgr.nm1_sdu_sched); + gam_destroy(fmgr.gam); + return -1; + } return 0; } void fmgr_stop(void) { - pthread_cancel(fmgr.np1_sdu_reader); - pthread_cancel(fmgr.nm1_sdu_reader); + sdu_sched_destroy(fmgr.np1_sdu_sched); - pthread_join(fmgr.np1_sdu_reader, NULL); - pthread_join(fmgr.nm1_sdu_reader, NULL); + sdu_sched_destroy(fmgr.nm1_sdu_sched); gam_destroy(fmgr.gam); } diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c new file mode 100644 index 00000000..14fc672f --- /dev/null +++ b/src/ipcpd/normal/sdu_sched.c @@ -0,0 +1,131 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * SDU scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "sdu-scheduler" + +#include +#include +#include + +#include "sdu_sched.h" + +#include +#include +#include + +#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ + +struct sdu_sched { + flow_set_t * set[QOS_CUBE_MAX]; + fqueue_t * fqs[QOS_CUBE_MAX]; + next_sdu_t callback; + pthread_t sdu_reader; +}; + +static void * sdu_reader(void * o) +{ + struct sdu_sched * sched; + struct shm_du_buff * sdb; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + int fd; + int i = 0; + int ret; + + sched = (struct sdu_sched *) o; + + while (true) { + /* FIXME: replace with scheduling policy call */ + i = (i + 1) % QOS_CUBE_MAX; + + ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); + if (ret == -ETIMEDOUT) + continue; + + if (ret < 0) { + log_warn("Event error: %d.", ret); + continue; + } + + while ((fd = fqueue_next(sched->fqs[i])) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + log_warn("Failed to read SDU from fd %d.", fd); + continue; + } + + if (sched->callback(fd, i, sdb)) { + log_warn("Callback reported an error."); + continue; + } + } + } + + return (void *) 0; +} + +struct sdu_sched * sdu_sched_create(flow_set_t * set[QOS_CUBE_MAX], + next_sdu_t callback) +{ + struct sdu_sched * sdu_sched; + int i; + int j; + + sdu_sched = malloc(sizeof(*sdu_sched)); + if (sdu_sched == NULL) + return NULL; + + sdu_sched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + sdu_sched->set[i] = set[i]; + + sdu_sched->fqs[i] = fqueue_create(); + if (sdu_sched->fqs[i] == NULL) { + for (j = i; j >= 0; --j) + fqueue_destroy(sdu_sched->fqs[j]); + free(sdu_sched); + return NULL; + } + } + + pthread_create(&sdu_sched->sdu_reader, + NULL, + sdu_reader, + (void *) sdu_sched); + + return sdu_sched; +} + +void sdu_sched_destroy(struct sdu_sched * sdu_sched) +{ + int i; + + assert(sdu_sched); + + pthread_cancel(sdu_sched->sdu_reader); + + pthread_join(sdu_sched->sdu_reader, NULL); + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fqueue_destroy(sdu_sched->fqs[i]); + + free(sdu_sched); +} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h new file mode 100644 index 00000000..1a22b041 --- /dev/null +++ b/src/ipcpd/normal/sdu_sched.h @@ -0,0 +1,38 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * SDU scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H + +#include +#include + +typedef int (* next_sdu_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct sdu_sched * sdu_sched_create(flow_set_t * set[QOS_CUBE_MAX], + next_sdu_t callback); + +void sdu_sched_destroy(struct sdu_sched * sdu_sched); + +#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ -- cgit v1.2.3