From 5d11a6ad590133c92925c6162eb47b4401f16bef Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 5 Oct 2018 10:24:01 +0200 Subject: ipcpd, lib, irmd, tools: Change SDU to packet This will change SDU (Service Data Unit) to packet everywhere. SDU is OSI terminology, whereas packet is Ouroboros terminology. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/normal/CMakeLists.txt | 2 +- src/ipcpd/normal/dht.c | 48 ++++---- src/ipcpd/normal/dt.c | 52 ++++----- src/ipcpd/normal/dt.h | 8 +- src/ipcpd/normal/fa.c | 36 +++--- src/ipcpd/normal/packet_sched.c | 241 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/packet_sched.h | 43 +++++++ src/ipcpd/normal/sdu_sched.c | 241 ---------------------------------------- src/ipcpd/normal/sdu_sched.h | 43 ------- 9 files changed, 358 insertions(+), 356 deletions(-) create mode 100644 src/ipcpd/normal/packet_sched.c create mode 100644 src/ipcpd/normal/packet_sched.h delete mode 100644 src/ipcpd/normal/sdu_sched.c delete mode 100644 src/ipcpd/normal/sdu_sched.h (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 1cba7630..0cb7b770 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -42,7 +42,7 @@ set(SOURCE_FILES main.c pff.c routing.c - sdu_sched.c + packet_sched.c # Add policies last pol/alternate_pff.c pol/flat.c diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index a2fa4863..4064bf5c 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -62,21 +62,21 @@ typedef KadContactMsg kad_contact_msg_t; #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ -#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ enum dht_state { DHT_INIT = 0, @@ -251,7 +251,7 @@ struct join_info { uint64_t addr; }; -struct sdu_info { +struct packet_info { struct dht * dht; struct shm_du_buff * sdb; }; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -2400,7 +2400,7 @@ uint64_t dht_query(struct dht * dht, return 0; } -static void * dht_handle_sdu(void * o) +static void * dht_handle_packet(void * o) { struct dht * dht = (struct dht *) o; @@ -2584,8 +2584,8 @@ static void * dht_handle_sdu(void * o) return (void *) 0; } -static void dht_post_sdu(void * comp, - struct shm_du_buff * sdb) +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { struct cmd * cmd; struct dht * dht = (struct dht *) comp; @@ -2800,19 +2800,19 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ - dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); if (dht->tpm == NULL) goto fail_tpm_create; if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht->fd = dt_reg_comp(dht, &dht_post_sdu, DHT); + dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT); notifier_reg(handle_event, dht); #else (void) handle_event; - (void) dht_handle_sdu; - (void) dht_post_sdu; + (void) dht_handle_packet; + (void) dht_post_packet; #endif dht->state = DHT_INIT; diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index a350e4be..08c937e7 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -46,7 +46,7 @@ #include "dt.h" #include "pff.h" #include "routing.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "comp.h" #include "fa.h" @@ -65,7 +65,7 @@ #endif struct comp_info { - void (* post_sdu)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct shm_du_buff * sdb); void * comp; char * name; }; @@ -154,7 +154,7 @@ static void dt_pci_shrink(struct shm_du_buff * sdb) } struct { - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -421,24 +421,25 @@ static void handle_event(void * self, #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, c->conn_info.addr); #endif - sdu_sched_add(dt.sdu_sched, c->flow_info.fd); - log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); + packet_sched_add(dt.packet_sched, c->flow_info.fd); + log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, INVALID_ADDR); #endif - sdu_sched_del(dt.sdu_sched, c->flow_info.fd); - log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); + packet_sched_del(dt.packet_sched, c->flow_info.fd); + log_dbg("Removed fd %d from " + "packet scheduler.", c->flow_info.fd); break; default: break; } } -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { struct dt_pci dt_pci; int ret; @@ -491,7 +492,7 @@ static void sdu_handler(int fd, ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", ofd); + log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); ipcp_sdb_release(sdb); @@ -560,7 +561,7 @@ static void sdu_handler(int fd, return; } - if (dt.comps[dt_pci.eid].post_sdu == NULL) { + if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); @@ -596,7 +597,8 @@ static void sdu_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif - dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb); + dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, + sdb); } } @@ -761,15 +763,15 @@ void dt_fini(void) int dt_start(void) { - dt.sdu_sched = sdu_sched_create(sdu_handler); - if (dt.sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); + dt.packet_sched = packet_sched_create(packet_handler); + if (dt.packet_sched == NULL) { + log_err("Failed to create N-1 packet scheduler."); return -1; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); return -1; } @@ -780,7 +782,7 @@ void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); } int dt_reg_comp(void * comp, @@ -800,11 +802,11 @@ int dt_reg_comp(void * comp, return -EBADF; } - assert(dt.comps[res_fd].post_sdu == NULL); + assert(dt.comps[res_fd].post_packet == NULL); assert(dt.comps[res_fd].comp == NULL); assert(dt.comps[res_fd].name == NULL); - dt.comps[res_fd].post_sdu = func; + dt.comps[res_fd].post_packet = func; dt.comps[res_fd].comp = comp; dt.comps[res_fd].name = name; @@ -815,10 +817,10 @@ int dt_reg_comp(void * comp, return res_fd; } -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int np1_fd, + struct shm_du_buff * sdb) { int fd; struct dt_pci dt_pci; @@ -863,7 +865,7 @@ int dt_write_sdu(uint64_t dst_addr, #endif ret = ipcp_flow_write(fd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); goto fail_write; diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index a17098b7..05b8220c 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -47,9 +47,9 @@ int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), char * name); -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int res_fd, - struct shm_du_buff * sdb); +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int res_fd, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 4c82e0e0..d67ba61e 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -39,7 +39,7 @@ #include "dir.h" #include "fa.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "ipcp.h" #include "dt.h" @@ -74,19 +74,19 @@ struct { uint64_t r_addr[PROG_MAX_FLOWS]; int fd; - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; } fa; -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { pthread_rwlock_rdlock(&fa.flows_lock); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); - log_warn("Failed to forward SDU."); + log_warn("Failed to forward packet."); return; } @@ -99,7 +99,7 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } -static void fa_post_sdu(void * comp, +static void fa_post_packet(void * comp, struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; @@ -192,7 +192,7 @@ static void fa_post_sdu(void * comp, if (msg->response < 0) destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); + packet_sched_add(fa.packet_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -215,7 +215,7 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; - fa.fd = dt_reg_comp(&fa, &fa_post_sdu, FA); + fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; } @@ -227,9 +227,9 @@ void fa_fini(void) int fa_start(void) { - fa.sdu_sched = sdu_sched_create(sdu_handler); - if (fa.sdu_sched == NULL) { - log_err("Failed to create SDU scheduler."); + fa.packet_sched = packet_sched_create(packet_handler); + if (fa.packet_sched == NULL) { + log_err("Failed to create packet scheduler."); return -1; } @@ -238,7 +238,7 @@ int fa_start(void) void fa_stop(void) { - sdu_sched_destroy(fa.sdu_sched); + packet_sched_destroy(fa.packet_sched); } int fa_alloc(int fd, @@ -273,7 +273,7 @@ int fa_alloc(int fd, qc = qos_spec_to_cube(qs); - if (dt_write_sdu(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -335,14 +335,14 @@ int fa_alloc_resp(int fd, destroy_conn(fd); ipcp_sdb_release(sdb); } else { - sdu_sched_add(fa.sdu_sched, fd); + packet_sched_add(fa.packet_sched, fd); } ipcp_flow_get_qoscube(fd, &qc); assert(qc >= 0 && qc < QOS_CUBE_MAX); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.fd, sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); @@ -360,7 +360,7 @@ int fa_dealloc(int fd) pthread_rwlock_wrlock(&fa.flows_lock); - sdu_sched_del(fa.sdu_sched, fd); + packet_sched_del(fa.packet_sched, fd); destroy_conn(fd); diff --git a/src/ipcpd/normal/packet_sched.c b/src/ipcpd/normal/packet_sched.c new file mode 100644 index 00000000..fc01fb32 --- /dev/null +++ b/src/ipcpd/normal/packet_sched.c @@ -0,0 +1,241 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include +#include + +#include "ipcp.h" +#include "packet_sched.h" +#include "connmgr.h" + +#include +#include +#include +#include +#include + +static int qos_prio [] = { + QOS_PRIO_RAW, + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, + QOS_PRIO_DATA +}; + +struct packet_sched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct packet_sched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct packet_sched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct packet_sched * packet_sched_create(next_packet_fn_t callback) +{ + struct packet_sched * packet_sched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + packet_sched = malloc(sizeof(*packet_sched)); + if (packet_sched == NULL) + goto fail_malloc; + + packet_sched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + packet_sched->set[i] = fset_create(); + if (packet_sched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(packet_sched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = packet_sched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&packet_sched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(packet_sched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(packet_sched->readers[i], pol, &par)) + goto fail_sched; + } + + return packet_sched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(packet_sched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(packet_sched->set[j]); + fail_flow_set: + free(packet_sched); + fail_malloc: + return NULL; +} + +void packet_sched_destroy(struct packet_sched * packet_sched) +{ + int i; + + assert(packet_sched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(packet_sched->readers[i]); + pthread_join(packet_sched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(packet_sched->set[i]); + + free(packet_sched); +} + +void packet_sched_add(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(packet_sched->set[qc], fd); +} + +void packet_sched_del(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(packet_sched->set[qc], fd); +} diff --git a/src/ipcpd/normal/packet_sched.h b/src/ipcpd/normal/packet_sched.h new file mode 100644 index 00000000..13ff400d --- /dev/null +++ b/src/ipcpd/normal/packet_sched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H + +#include +#include + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct packet_sched * packet_sched_create(next_packet_fn_t callback); + +void packet_sched_destroy(struct packet_sched * packet_sched); + +void packet_sched_add(struct packet_sched * packet_sched, + int fd); + +void packet_sched_del(struct packet_sched * packet_sched, + int fd); + +#endif /* OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c deleted file mode 100644 index e6d705fb..00000000 --- a/src/ipcpd/normal/sdu_sched.c +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#include -#include - -#include "ipcp.h" -#include "sdu_sched.h" -#include "connmgr.h" - -#include -#include -#include -#include -#include - -static int qos_prio [] = { - QOS_PRIO_RAW, - QOS_PRIO_BE, - QOS_PRIO_VIDEO, - QOS_PRIO_VOICE, - QOS_PRIO_DATA -}; - -struct sdu_sched { - fset_t * set[QOS_CUBE_MAX]; - next_sdu_fn_t callback; - pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; -}; - -struct sched_info { - struct sdu_sched * sch; - qoscube_t qc; -}; - -static void cleanup_reader(void * o) -{ - fqueue_destroy((fqueue_t *) o); -} - -static void * sdu_reader(void * o) -{ - struct sdu_sched * sched; - struct shm_du_buff * sdb; - int fd; - fqueue_t * fq; - qoscube_t qc; - - sched = ((struct sched_info *) o)->sch; - qc = ((struct sched_info *) o)->qc; - - ipcp_lock_to_core(); - - free(o); - - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - pthread_cleanup_push(cleanup_reader, fq); - - while (true) { - int ret = fevent(sched->set[qc], fq, NULL); - if (ret < 0) - continue; - - while ((fd = fqueue_next(fq)) >= 0) { - switch (fqueue_type(fq)) { - case FLOW_DEALLOC: - notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); - break; - case FLOW_DOWN: - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - break; - case FLOW_UP: - notifier_event(NOTIFY_DT_FLOW_UP, &fd); - break; - case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) - continue; - - sched->callback(fd, qc, sdb); - break; - default: - break; - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) -{ - struct sdu_sched * sdu_sched; - struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; - int i; - int j; - - assert(callback); - - sdu_sched = malloc(sizeof(*sdu_sched)); - if (sdu_sched == NULL) - goto fail_malloc; - - sdu_sched->callback = callback; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->set[i] = fset_create(); - if (sdu_sched->set[i] == NULL) { - for (j = 0; j < i; ++j) - fset_destroy(sdu_sched->set[j]); - goto fail_flow_set; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - infos[i] = malloc(sizeof(*infos[i])); - if (infos[i] == NULL) { - for (j = 0; j < i; ++j) - free(infos[j]); - goto fail_infos; - } - infos[i]->sch = sdu_sched; - infos[i]->qc = i % QOS_CUBE_MAX; - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - if (pthread_create(&sdu_sched->readers[i], NULL, - sdu_reader, infos[i])) { - for (j = 0; j < i; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < i; ++j) - pthread_join(sdu_sched->readers[j], NULL); - for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - free(infos[i]); - goto fail_infos; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - struct sched_param par; - int pol = SCHED_RR; - int min; - int max; - - min = sched_get_priority_min(pol); - max = sched_get_priority_max(pol); - - min = (max - min) / 2; - - par.sched_priority = min + - (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); - - if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) - goto fail_sched; - } - - return sdu_sched; - - fail_sched: - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_join(sdu_sched->readers[j], NULL); - fail_infos: - for (j = 0; j < QOS_CUBE_MAX; ++j) - fset_destroy(sdu_sched->set[j]); - fail_flow_set: - free(sdu_sched); - fail_malloc: - return NULL; -} - -void sdu_sched_destroy(struct sdu_sched * sdu_sched) -{ - int i; - - assert(sdu_sched); - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - pthread_cancel(sdu_sched->readers[i]); - pthread_join(sdu_sched->readers[i], NULL); - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fset_destroy(sdu_sched->set[i]); - - free(sdu_sched); -} - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_add(sdu_sched->set[qc], fd); -} - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_del(sdu_sched->set[qc], fd); -} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h deleted file mode 100644 index cdbda272..00000000 --- a/src/ipcpd/normal/sdu_sched.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H -#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H - -#include -#include - -typedef void (* next_sdu_fn_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback); - -void sdu_sched_destroy(struct sdu_sched * sdu_sched); - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd); - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd); - -#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ -- cgit v1.2.3