From 8bd27921e6cd46cdcc191c9d98a7f93bb7fe5360 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 5 Oct 2018 10:24:02 +0200 Subject: ipcpd: Shorten packet_scheduler to psched This shortens packet_scheduler to psched, which results in more readable code. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/normal/CMakeLists.txt | 2 +- src/ipcpd/normal/dt.c | 16 +-- src/ipcpd/normal/fa.c | 16 +-- src/ipcpd/normal/packet_sched.c | 241 ---------------------------------------- src/ipcpd/normal/packet_sched.h | 43 ------- src/ipcpd/normal/psched.c | 241 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/psched.h | 43 +++++++ 7 files changed, 301 insertions(+), 301 deletions(-) delete mode 100644 src/ipcpd/normal/packet_sched.c delete mode 100644 src/ipcpd/normal/packet_sched.h create mode 100644 src/ipcpd/normal/psched.c create mode 100644 src/ipcpd/normal/psched.h (limited to 'src/ipcpd') diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 0cb7b770..d1585395 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -42,7 +42,7 @@ set(SOURCE_FILES main.c pff.c routing.c - packet_sched.c + psched.c # Add policies last pol/alternate_pff.c pol/flat.c diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 08c937e7..dc7343f1 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -46,7 +46,7 @@ #include "dt.h" #include "pff.h" #include "routing.h" -#include "packet_sched.h" +#include "psched.h" #include "comp.h" #include "fa.h" @@ -154,7 +154,7 @@ static void dt_pci_shrink(struct shm_du_buff * sdb) } struct { - struct packet_sched * packet_sched; + struct psched * psched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -421,14 +421,14 @@ static void handle_event(void * self, #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, c->conn_info.addr); #endif - packet_sched_add(dt.packet_sched, c->flow_info.fd); + psched_add(dt.psched, c->flow_info.fd); log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, INVALID_ADDR); #endif - packet_sched_del(dt.packet_sched, c->flow_info.fd); + psched_del(dt.psched, c->flow_info.fd); log_dbg("Removed fd %d from " "packet scheduler.", c->flow_info.fd); break; @@ -763,15 +763,15 @@ void dt_fini(void) int dt_start(void) { - dt.packet_sched = packet_sched_create(packet_handler); - if (dt.packet_sched == NULL) { + dt.psched = psched_create(packet_handler); + if (dt.psched == NULL) { log_err("Failed to create N-1 packet scheduler."); return -1; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - packet_sched_destroy(dt.packet_sched); + psched_destroy(dt.psched); return -1; } @@ -782,7 +782,7 @@ void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); - packet_sched_destroy(dt.packet_sched); + psched_destroy(dt.psched); } int dt_reg_comp(void * comp, diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index d67ba61e..027223b7 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -39,7 +39,7 @@ #include "dir.h" #include "fa.h" -#include "packet_sched.h" +#include "psched.h" #include "ipcp.h" #include "dt.h" @@ -74,7 +74,7 @@ struct { uint64_t r_addr[PROG_MAX_FLOWS]; int fd; - struct packet_sched * packet_sched; + struct psched * psched; } fa; static void packet_handler(int fd, @@ -192,7 +192,7 @@ static void fa_post_packet(void * comp, if (msg->response < 0) destroy_conn(ntoh32(msg->r_eid)); else - packet_sched_add(fa.packet_sched, ntoh32(msg->r_eid)); + psched_add(fa.psched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -227,8 +227,8 @@ void fa_fini(void) int fa_start(void) { - fa.packet_sched = packet_sched_create(packet_handler); - if (fa.packet_sched == NULL) { + fa.psched = psched_create(packet_handler); + if (fa.psched == NULL) { log_err("Failed to create packet scheduler."); return -1; } @@ -238,7 +238,7 @@ int fa_start(void) void fa_stop(void) { - packet_sched_destroy(fa.packet_sched); + psched_destroy(fa.psched); } int fa_alloc(int fd, @@ -335,7 +335,7 @@ int fa_alloc_resp(int fd, destroy_conn(fd); ipcp_sdb_release(sdb); } else { - packet_sched_add(fa.packet_sched, fd); + psched_add(fa.psched, fd); } ipcp_flow_get_qoscube(fd, &qc); @@ -360,7 +360,7 @@ int fa_dealloc(int fd) pthread_rwlock_wrlock(&fa.flows_lock); - packet_sched_del(fa.packet_sched, fd); + psched_del(fa.psched, fd); destroy_conn(fd); diff --git a/src/ipcpd/normal/packet_sched.c b/src/ipcpd/normal/packet_sched.c deleted file mode 100644 index fc01fb32..00000000 --- a/src/ipcpd/normal/packet_sched.c +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Packet scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#include -#include - -#include "ipcp.h" -#include "packet_sched.h" -#include "connmgr.h" - -#include -#include -#include -#include -#include - -static int qos_prio [] = { - QOS_PRIO_RAW, - QOS_PRIO_BE, - QOS_PRIO_VIDEO, - QOS_PRIO_VOICE, - QOS_PRIO_DATA -}; - -struct packet_sched { - fset_t * set[QOS_CUBE_MAX]; - next_packet_fn_t callback; - pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; -}; - -struct sched_info { - struct packet_sched * sch; - qoscube_t qc; -}; - -static void cleanup_reader(void * o) -{ - fqueue_destroy((fqueue_t *) o); -} - -static void * packet_reader(void * o) -{ - struct packet_sched * sched; - struct shm_du_buff * sdb; - int fd; - fqueue_t * fq; - qoscube_t qc; - - sched = ((struct sched_info *) o)->sch; - qc = ((struct sched_info *) o)->qc; - - ipcp_lock_to_core(); - - free(o); - - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - pthread_cleanup_push(cleanup_reader, fq); - - while (true) { - int ret = fevent(sched->set[qc], fq, NULL); - if (ret < 0) - continue; - - while ((fd = fqueue_next(fq)) >= 0) { - switch (fqueue_type(fq)) { - case FLOW_DEALLOC: - notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); - break; - case FLOW_DOWN: - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - break; - case FLOW_UP: - notifier_event(NOTIFY_DT_FLOW_UP, &fd); - break; - case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) - continue; - - sched->callback(fd, qc, sdb); - break; - default: - break; - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -struct packet_sched * packet_sched_create(next_packet_fn_t callback) -{ - struct packet_sched * packet_sched; - struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; - int i; - int j; - - assert(callback); - - packet_sched = malloc(sizeof(*packet_sched)); - if (packet_sched == NULL) - goto fail_malloc; - - packet_sched->callback = callback; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - packet_sched->set[i] = fset_create(); - if (packet_sched->set[i] == NULL) { - for (j = 0; j < i; ++j) - fset_destroy(packet_sched->set[j]); - goto fail_flow_set; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - infos[i] = malloc(sizeof(*infos[i])); - if (infos[i] == NULL) { - for (j = 0; j < i; ++j) - free(infos[j]); - goto fail_infos; - } - infos[i]->sch = packet_sched; - infos[i]->qc = i % QOS_CUBE_MAX; - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - if (pthread_create(&packet_sched->readers[i], NULL, - packet_reader, infos[i])) { - for (j = 0; j < i; ++j) - pthread_cancel(packet_sched->readers[j]); - for (j = 0; j < i; ++j) - pthread_join(packet_sched->readers[j], NULL); - for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - free(infos[i]); - goto fail_infos; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - struct sched_param par; - int pol = SCHED_RR; - int min; - int max; - - min = sched_get_priority_min(pol); - max = sched_get_priority_max(pol); - - min = (max - min) / 2; - - par.sched_priority = min + - (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); - - if (pthread_setschedparam(packet_sched->readers[i], pol, &par)) - goto fail_sched; - } - - return packet_sched; - - fail_sched: - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_cancel(packet_sched->readers[j]); - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_join(packet_sched->readers[j], NULL); - fail_infos: - for (j = 0; j < QOS_CUBE_MAX; ++j) - fset_destroy(packet_sched->set[j]); - fail_flow_set: - free(packet_sched); - fail_malloc: - return NULL; -} - -void packet_sched_destroy(struct packet_sched * packet_sched) -{ - int i; - - assert(packet_sched); - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - pthread_cancel(packet_sched->readers[i]); - pthread_join(packet_sched->readers[i], NULL); - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fset_destroy(packet_sched->set[i]); - - free(packet_sched); -} - -void packet_sched_add(struct packet_sched * packet_sched, - int fd) -{ - qoscube_t qc; - - assert(packet_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_add(packet_sched->set[qc], fd); -} - -void packet_sched_del(struct packet_sched * packet_sched, - int fd) -{ - qoscube_t qc; - - assert(packet_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_del(packet_sched->set[qc], fd); -} diff --git a/src/ipcpd/normal/packet_sched.h b/src/ipcpd/normal/packet_sched.h deleted file mode 100644 index 13ff400d..00000000 --- a/src/ipcpd/normal/packet_sched.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * Packet scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H -#define OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H - -#include -#include - -typedef void (* next_packet_fn_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); - -struct packet_sched * packet_sched_create(next_packet_fn_t callback); - -void packet_sched_destroy(struct packet_sched * packet_sched); - -void packet_sched_add(struct packet_sched * packet_sched, - int fd); - -void packet_sched_del(struct packet_sched * packet_sched, - int fd); - -#endif /* OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H */ diff --git a/src/ipcpd/normal/psched.c b/src/ipcpd/normal/psched.c new file mode 100644 index 00000000..27e5f1de --- /dev/null +++ b/src/ipcpd/normal/psched.c @@ -0,0 +1,241 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include +#include + +#include "ipcp.h" +#include "psched.h" +#include "connmgr.h" + +#include +#include +#include +#include +#include + +static int qos_prio [] = { + QOS_PRIO_RAW, + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, + QOS_PRIO_DATA +}; + +struct psched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct psched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct psched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct psched * psched_create(next_packet_fn_t callback) +{ + struct psched * psched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + psched = malloc(sizeof(*psched)); + if (psched == NULL) + goto fail_malloc; + + psched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + psched->set[i] = fset_create(); + if (psched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(psched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = psched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&psched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(psched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(psched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(psched->readers[i], pol, &par)) + goto fail_sched; + } + + return psched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(psched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(psched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(psched->set[j]); + fail_flow_set: + free(psched); + fail_malloc: + return NULL; +} + +void psched_destroy(struct psched * psched) +{ + int i; + + assert(psched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(psched->readers[i]); + pthread_join(psched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(psched->set[i]); + + free(psched); +} + +void psched_add(struct psched * psched, + int fd) +{ + qoscube_t qc; + + assert(psched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(psched->set[qc], fd); +} + +void psched_del(struct psched * psched, + int fd) +{ + qoscube_t qc; + + assert(psched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(psched->set[qc], fd); +} diff --git a/src/ipcpd/normal/psched.h b/src/ipcpd/normal/psched.h new file mode 100644 index 00000000..137c8fd1 --- /dev/null +++ b/src/ipcpd/normal/psched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_PSCHED_H +#define OUROBOROS_IPCPD_NORMAL_PSCHED_H + +#include +#include + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct psched * psched_create(next_packet_fn_t callback); + +void psched_destroy(struct psched * psched); + +void psched_add(struct psched * psched, + int fd); + +void psched_del(struct psched * psched, + int fd); + +#endif /* OUROBOROS_IPCPD_NORMAL_PSCHED_H */ -- cgit v1.2.3