From 5d11a6ad590133c92925c6162eb47b4401f16bef Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 5 Oct 2018 10:24:01 +0200 Subject: ipcpd, lib, irmd, tools: Change SDU to packet This will change SDU (Service Data Unit) to packet everywhere. SDU is OSI terminology, whereas packet is Ouroboros terminology. Signed-off-by: Sander Vrijders Signed-off-by: Dimitri Staessens --- src/ipcpd/eth/eth.c | 46 ++++---- src/ipcpd/local/main.c | 12 +- src/ipcpd/normal/CMakeLists.txt | 2 +- src/ipcpd/normal/dht.c | 48 ++++---- src/ipcpd/normal/dt.c | 52 ++++----- src/ipcpd/normal/dt.h | 8 +- src/ipcpd/normal/fa.c | 36 +++--- src/ipcpd/normal/packet_sched.c | 241 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/packet_sched.h | 43 +++++++ src/ipcpd/normal/sdu_sched.c | 241 ---------------------------------------- src/ipcpd/normal/sdu_sched.h | 43 ------- src/ipcpd/udp/main.c | 64 +++++------ 12 files changed, 419 insertions(+), 417 deletions(-) create mode 100644 src/ipcpd/normal/packet_sched.c create mode 100644 src/ipcpd/normal/packet_sched.h delete mode 100644 src/ipcpd/normal/sdu_sched.c delete mode 100644 src/ipcpd/normal/sdu_sched.h (limited to 'src/ipcpd') diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 6fd7b805..1bbfac5b 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -123,7 +123,7 @@ #define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE) #define MAX_EIDS (1 << (8 * DIX_EID_SIZE)) -#define ETH_MAX_SDU_SIZE (ETH_MTU - DIX_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #elif defined(BUILD_ETH_LLC) #define THIS_TYPE IPCP_ETH_LLC @@ -131,7 +131,7 @@ #define LLC_HEADER_SIZE 3 #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE) #define MAX_SAPS 64 -#define ETH_MAX_SDU_SIZE (ETH_MTU - LLC_HEADER_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE) #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif @@ -230,8 +230,8 @@ struct { fset_t * np1_flows; pthread_rwlock_t flows_lock; - pthread_t sdu_writer[IPCP_ETH_WR_THR]; - pthread_t sdu_reader[IPCP_ETH_RD_THR]; + pthread_t packet_writer[IPCP_ETH_WR_THR]; + pthread_t packet_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -383,7 +383,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, assert(frame); - if (len > (size_t) ETH_MAX_SDU_SIZE) + if (len > (size_t) ETH_MAX_PACKET_SIZE) return -1; e_frame = (struct eth_frame *) frame; @@ -808,7 +808,7 @@ static void * eth_ipcp_mgmt_handler(void * o) return (void *) 0; } -static void * eth_ipcp_sdu_reader(void * o) +static void * eth_ipcp_packet_reader(void * o) { uint8_t br_addr[MAC_SIZE]; #if defined(BUILD_ETH_DIX) @@ -992,7 +992,7 @@ static void cleanup_writer(void * o) fqueue_destroy((fqueue_t *) o); } -static void * eth_ipcp_sdu_writer(void * o) +static void * eth_ipcp_packet_writer(void * o) { int fd; struct shm_du_buff * sdb; @@ -1443,22 +1443,22 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { - if (pthread_create(ð_data.sdu_reader[idx], + if (pthread_create(ð_data.packet_reader[idx], NULL, - eth_ipcp_sdu_reader, + eth_ipcp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } } for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { - if (pthread_create(ð_data.sdu_writer[idx], + if (pthread_create(ð_data.packet_writer[idx], NULL, - eth_ipcp_sdu_writer, + eth_ipcp_packet_writer, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + goto fail_packet_writer; } } @@ -1472,16 +1472,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sdu_writer: + fail_packet_writer: while (idx > 0) { - pthread_cancel(eth_data.sdu_writer[--idx]); - pthread_join(eth_data.sdu_writer[idx], NULL); + pthread_cancel(eth_data.packet_writer[--idx]); + pthread_join(eth_data.packet_writer[idx], NULL); } idx = IPCP_ETH_RD_THR; - fail_sdu_reader: + fail_packet_reader: while (idx > 0) { - pthread_cancel(eth_data.sdu_reader[--idx]); - pthread_join(eth_data.sdu_reader[idx], NULL); + pthread_cancel(eth_data.packet_reader[--idx]); + pthread_join(eth_data.packet_reader[idx], NULL); } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); @@ -1792,18 +1792,18 @@ int main(int argc, if (ipcp_get_state() == IPCP_SHUTDOWN) { for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_cancel(eth_data.sdu_writer[i]); + pthread_cancel(eth_data.packet_writer[i]); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_cancel(eth_data.sdu_reader[i]); + pthread_cancel(eth_data.packet_reader[i]); pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif for (i = 0; i < IPCP_ETH_WR_THR; ++i) - pthread_join(eth_data.sdu_writer[i], NULL); + pthread_join(eth_data.packet_writer[i], NULL); for (i = 0; i < IPCP_ETH_RD_THR; ++i) - pthread_join(eth_data.sdu_reader[i], NULL); + pthread_join(eth_data.packet_reader[i], NULL); pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 8eae7503..ab43f1f8 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -59,7 +59,7 @@ struct { fqueue_t * fq; pthread_rwlock_t lock; - pthread_t sduloop; + pthread_t packet_loop; } local_data; static int local_data_init(void) @@ -97,7 +97,7 @@ static void local_data_fini(void){ pthread_rwlock_destroy(&local_data.lock); } -static void * ipcp_local_sdu_loop(void * o) +static void * ipcp_local_packet_loop(void * o) { (void) o; @@ -139,8 +139,8 @@ static int ipcp_local_bootstrap(const struct ipcp_config * conf) ipcp_set_state(IPCP_OPERATIONAL); - if (pthread_create(&local_data.sduloop, NULL, - ipcp_local_sdu_loop, NULL)) { + if (pthread_create(&local_data.packet_loop, NULL, + ipcp_local_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); return -1; } @@ -364,8 +364,8 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(local_data.sduloop); - pthread_join(local_data.sduloop, NULL); + pthread_cancel(local_data.packet_loop); + pthread_join(local_data.packet_loop, NULL); } local_data_fini(); diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 1cba7630..0cb7b770 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -42,7 +42,7 @@ set(SOURCE_FILES main.c pff.c routing.c - sdu_sched.c + packet_sched.c # Add policies last pol/alternate_pff.c pol/flat.c diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index a2fa4863..4064bf5c 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -62,21 +62,21 @@ typedef KadContactMsg kad_contact_msg_t; #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ -#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ -#define KAD_K 8 /* Replication factor, MDHT value. */ -#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ -#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 8 /* Response time to wait for a join. */ -#define KAD_T_RESP 5 /* Response time to wait for a response. */ -#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ -#define KAD_QUEER 15 /* Time to declare peer questionable. */ -#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ -#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ -#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ -#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ -#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ enum dht_state { DHT_INIT = 0, @@ -251,7 +251,7 @@ struct join_info { uint64_t addr; }; -struct sdu_info { +struct packet_info { struct dht * dht; struct shm_du_buff * sdb; }; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -2400,7 +2400,7 @@ uint64_t dht_query(struct dht * dht, return 0; } -static void * dht_handle_sdu(void * o) +static void * dht_handle_packet(void * o) { struct dht * dht = (struct dht *) o; @@ -2584,8 +2584,8 @@ static void * dht_handle_sdu(void * o) return (void *) 0; } -static void dht_post_sdu(void * comp, - struct shm_du_buff * sdb) +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) { struct cmd * cmd; struct dht * dht = (struct dht *) comp; @@ -2800,19 +2800,19 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ - dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); if (dht->tpm == NULL) goto fail_tpm_create; if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht->fd = dt_reg_comp(dht, &dht_post_sdu, DHT); + dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT); notifier_reg(handle_event, dht); #else (void) handle_event; - (void) dht_handle_sdu; - (void) dht_post_sdu; + (void) dht_handle_packet; + (void) dht_post_packet; #endif dht->state = DHT_INIT; diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index a350e4be..08c937e7 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -46,7 +46,7 @@ #include "dt.h" #include "pff.h" #include "routing.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "comp.h" #include "fa.h" @@ -65,7 +65,7 @@ #endif struct comp_info { - void (* post_sdu)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct shm_du_buff * sdb); void * comp; char * name; }; @@ -154,7 +154,7 @@ static void dt_pci_shrink(struct shm_du_buff * sdb) } struct { - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; @@ -421,24 +421,25 @@ static void handle_event(void * self, #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, c->conn_info.addr); #endif - sdu_sched_add(dt.sdu_sched, c->flow_info.fd); - log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); + packet_sched_add(dt.packet_sched, c->flow_info.fd); + log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: #ifdef IPCP_FLOW_STATS stat_used(c->flow_info.fd, INVALID_ADDR); #endif - sdu_sched_del(dt.sdu_sched, c->flow_info.fd); - log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); + packet_sched_del(dt.packet_sched, c->flow_info.fd); + log_dbg("Removed fd %d from " + "packet scheduler.", c->flow_info.fd); break; default: break; } } -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { struct dt_pci dt_pci; int ret; @@ -491,7 +492,7 @@ static void sdu_handler(int fd, ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", ofd); + log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); ipcp_sdb_release(sdb); @@ -560,7 +561,7 @@ static void sdu_handler(int fd, return; } - if (dt.comps[dt_pci.eid].post_sdu == NULL) { + if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); @@ -596,7 +597,8 @@ static void sdu_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif - dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb); + dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, + sdb); } } @@ -761,15 +763,15 @@ void dt_fini(void) int dt_start(void) { - dt.sdu_sched = sdu_sched_create(sdu_handler); - if (dt.sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); + dt.packet_sched = packet_sched_create(packet_handler); + if (dt.packet_sched == NULL) { + log_err("Failed to create N-1 packet scheduler."); return -1; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { log_err("Failed to create listener thread."); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); return -1; } @@ -780,7 +782,7 @@ void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); - sdu_sched_destroy(dt.sdu_sched); + packet_sched_destroy(dt.packet_sched); } int dt_reg_comp(void * comp, @@ -800,11 +802,11 @@ int dt_reg_comp(void * comp, return -EBADF; } - assert(dt.comps[res_fd].post_sdu == NULL); + assert(dt.comps[res_fd].post_packet == NULL); assert(dt.comps[res_fd].comp == NULL); assert(dt.comps[res_fd].name == NULL); - dt.comps[res_fd].post_sdu = func; + dt.comps[res_fd].post_packet = func; dt.comps[res_fd].comp = comp; dt.comps[res_fd].name = name; @@ -815,10 +817,10 @@ int dt_reg_comp(void * comp, return res_fd; } -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int np1_fd, + struct shm_du_buff * sdb) { int fd; struct dt_pci dt_pci; @@ -863,7 +865,7 @@ int dt_write_sdu(uint64_t dst_addr, #endif ret = ipcp_flow_write(fd, sdb); if (ret < 0) { - log_dbg("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); goto fail_write; diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index a17098b7..05b8220c 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -47,9 +47,9 @@ int dt_reg_comp(void * comp, void (* func)(void * comp, struct shm_du_buff * sdb), char * name); -int dt_write_sdu(uint64_t dst_addr, - qoscube_t qc, - int res_fd, - struct shm_du_buff * sdb); +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + int res_fd, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 4c82e0e0..d67ba61e 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -39,7 +39,7 @@ #include "dir.h" #include "fa.h" -#include "sdu_sched.h" +#include "packet_sched.h" #include "ipcp.h" #include "dt.h" @@ -74,19 +74,19 @@ struct { uint64_t r_addr[PROG_MAX_FLOWS]; int fd; - struct sdu_sched * sdu_sched; + struct packet_sched * packet_sched; } fa; -static void sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { pthread_rwlock_rdlock(&fa.flows_lock); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); - log_warn("Failed to forward SDU."); + log_warn("Failed to forward packet."); return; } @@ -99,7 +99,7 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } -static void fa_post_sdu(void * comp, +static void fa_post_packet(void * comp, struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; @@ -192,7 +192,7 @@ static void fa_post_sdu(void * comp, if (msg->response < 0) destroy_conn(ntoh32(msg->r_eid)); else - sdu_sched_add(fa.sdu_sched, ntoh32(msg->r_eid)); + packet_sched_add(fa.packet_sched, ntoh32(msg->r_eid)); pthread_rwlock_unlock(&fa.flows_lock); @@ -215,7 +215,7 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; - fa.fd = dt_reg_comp(&fa, &fa_post_sdu, FA); + fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; } @@ -227,9 +227,9 @@ void fa_fini(void) int fa_start(void) { - fa.sdu_sched = sdu_sched_create(sdu_handler); - if (fa.sdu_sched == NULL) { - log_err("Failed to create SDU scheduler."); + fa.packet_sched = packet_sched_create(packet_handler); + if (fa.packet_sched == NULL) { + log_err("Failed to create packet scheduler."); return -1; } @@ -238,7 +238,7 @@ int fa_start(void) void fa_stop(void) { - sdu_sched_destroy(fa.sdu_sched); + packet_sched_destroy(fa.packet_sched); } int fa_alloc(int fd, @@ -273,7 +273,7 @@ int fa_alloc(int fd, qc = qos_spec_to_cube(qs); - if (dt_write_sdu(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -335,14 +335,14 @@ int fa_alloc_resp(int fd, destroy_conn(fd); ipcp_sdb_release(sdb); } else { - sdu_sched_add(fa.sdu_sched, fd); + packet_sched_add(fa.packet_sched, fd); } ipcp_flow_get_qoscube(fd, &qc); assert(qc >= 0 && qc < QOS_CUBE_MAX); - if (dt_write_sdu(fa.r_addr[fd], qc, fa.fd, sdb)) { + if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); @@ -360,7 +360,7 @@ int fa_dealloc(int fd) pthread_rwlock_wrlock(&fa.flows_lock); - sdu_sched_del(fa.sdu_sched, fd); + packet_sched_del(fa.packet_sched, fd); destroy_conn(fd); diff --git a/src/ipcpd/normal/packet_sched.c b/src/ipcpd/normal/packet_sched.c new file mode 100644 index 00000000..fc01fb32 --- /dev/null +++ b/src/ipcpd/normal/packet_sched.c @@ -0,0 +1,241 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include +#include + +#include "ipcp.h" +#include "packet_sched.h" +#include "connmgr.h" + +#include +#include +#include +#include +#include + +static int qos_prio [] = { + QOS_PRIO_RAW, + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, + QOS_PRIO_DATA +}; + +struct packet_sched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct packet_sched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct packet_sched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (ipcp_flow_read(fd, &sdb)) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct packet_sched * packet_sched_create(next_packet_fn_t callback) +{ + struct packet_sched * packet_sched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + packet_sched = malloc(sizeof(*packet_sched)); + if (packet_sched == NULL) + goto fail_malloc; + + packet_sched->callback = callback; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + packet_sched->set[i] = fset_create(); + if (packet_sched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(packet_sched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = packet_sched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&packet_sched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(packet_sched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[i]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(packet_sched->readers[i], pol, &par)) + goto fail_sched; + } + + return packet_sched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(packet_sched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(packet_sched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(packet_sched->set[j]); + fail_flow_set: + free(packet_sched); + fail_malloc: + return NULL; +} + +void packet_sched_destroy(struct packet_sched * packet_sched) +{ + int i; + + assert(packet_sched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(packet_sched->readers[i]); + pthread_join(packet_sched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(packet_sched->set[i]); + + free(packet_sched); +} + +void packet_sched_add(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(packet_sched->set[qc], fd); +} + +void packet_sched_del(struct packet_sched * packet_sched, + int fd) +{ + qoscube_t qc; + + assert(packet_sched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(packet_sched->set[qc], fd); +} diff --git a/src/ipcpd/normal/packet_sched.h b/src/ipcpd/normal/packet_sched.h new file mode 100644 index 00000000..13ff400d --- /dev/null +++ b/src/ipcpd/normal/packet_sched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Packet scheduler component + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H + +#include +#include + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +struct packet_sched * packet_sched_create(next_packet_fn_t callback); + +void packet_sched_destroy(struct packet_sched * packet_sched); + +void packet_sched_add(struct packet_sched * packet_sched, + int fd); + +void packet_sched_del(struct packet_sched * packet_sched, + int fd); + +#endif /* OUROBOROS_IPCPD_NORMAL_PACKET_SCHED_H */ diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c deleted file mode 100644 index e6d705fb..00000000 --- a/src/ipcpd/normal/sdu_sched.c +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#include -#include - -#include "ipcp.h" -#include "sdu_sched.h" -#include "connmgr.h" - -#include -#include -#include -#include -#include - -static int qos_prio [] = { - QOS_PRIO_RAW, - QOS_PRIO_BE, - QOS_PRIO_VIDEO, - QOS_PRIO_VOICE, - QOS_PRIO_DATA -}; - -struct sdu_sched { - fset_t * set[QOS_CUBE_MAX]; - next_sdu_fn_t callback; - pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; -}; - -struct sched_info { - struct sdu_sched * sch; - qoscube_t qc; -}; - -static void cleanup_reader(void * o) -{ - fqueue_destroy((fqueue_t *) o); -} - -static void * sdu_reader(void * o) -{ - struct sdu_sched * sched; - struct shm_du_buff * sdb; - int fd; - fqueue_t * fq; - qoscube_t qc; - - sched = ((struct sched_info *) o)->sch; - qc = ((struct sched_info *) o)->qc; - - ipcp_lock_to_core(); - - free(o); - - fq = fqueue_create(); - if (fq == NULL) - return (void *) -1; - - pthread_cleanup_push(cleanup_reader, fq); - - while (true) { - int ret = fevent(sched->set[qc], fq, NULL); - if (ret < 0) - continue; - - while ((fd = fqueue_next(fq)) >= 0) { - switch (fqueue_type(fq)) { - case FLOW_DEALLOC: - notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); - break; - case FLOW_DOWN: - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - break; - case FLOW_UP: - notifier_event(NOTIFY_DT_FLOW_UP, &fd); - break; - case FLOW_PKT: - if (ipcp_flow_read(fd, &sdb)) - continue; - - sched->callback(fd, qc, sdb); - break; - default: - break; - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) -{ - struct sdu_sched * sdu_sched; - struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; - int i; - int j; - - assert(callback); - - sdu_sched = malloc(sizeof(*sdu_sched)); - if (sdu_sched == NULL) - goto fail_malloc; - - sdu_sched->callback = callback; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->set[i] = fset_create(); - if (sdu_sched->set[i] == NULL) { - for (j = 0; j < i; ++j) - fset_destroy(sdu_sched->set[j]); - goto fail_flow_set; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - infos[i] = malloc(sizeof(*infos[i])); - if (infos[i] == NULL) { - for (j = 0; j < i; ++j) - free(infos[j]); - goto fail_infos; - } - infos[i]->sch = sdu_sched; - infos[i]->qc = i % QOS_CUBE_MAX; - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - if (pthread_create(&sdu_sched->readers[i], NULL, - sdu_reader, infos[i])) { - for (j = 0; j < i; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < i; ++j) - pthread_join(sdu_sched->readers[j], NULL); - for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - free(infos[i]); - goto fail_infos; - } - } - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - struct sched_param par; - int pol = SCHED_RR; - int min; - int max; - - min = sched_get_priority_min(pol); - max = sched_get_priority_max(pol); - - min = (max - min) / 2; - - par.sched_priority = min + - (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); - - if (pthread_setschedparam(sdu_sched->readers[i], pol, &par)) - goto fail_sched; - } - - return sdu_sched; - - fail_sched: - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_cancel(sdu_sched->readers[j]); - for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) - pthread_join(sdu_sched->readers[j], NULL); - fail_infos: - for (j = 0; j < QOS_CUBE_MAX; ++j) - fset_destroy(sdu_sched->set[j]); - fail_flow_set: - free(sdu_sched); - fail_malloc: - return NULL; -} - -void sdu_sched_destroy(struct sdu_sched * sdu_sched) -{ - int i; - - assert(sdu_sched); - - for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { - pthread_cancel(sdu_sched->readers[i]); - pthread_join(sdu_sched->readers[i], NULL); - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) - fset_destroy(sdu_sched->set[i]); - - free(sdu_sched); -} - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_add(sdu_sched->set[qc], fd); -} - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd) -{ - qoscube_t qc; - - assert(sdu_sched); - - ipcp_flow_get_qoscube(fd, &qc); - fset_del(sdu_sched->set[qc], fd); -} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h deleted file mode 100644 index cdbda272..00000000 --- a/src/ipcpd/normal/sdu_sched.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2018 - * - * SDU scheduler component - * - * Dimitri Staessens - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H -#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H - -#include -#include - -typedef void (* next_sdu_fn_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); - -struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback); - -void sdu_sched_destroy(struct sdu_sched * sdu_sched); - -void sdu_sched_add(struct sdu_sched * sdu_sched, - int fd); - -void sdu_sched_del(struct sdu_sched * sdu_sched, - int fd); - -#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 96820662..a1af1e85 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -54,20 +54,20 @@ #include #include -#define FLOW_REQ 1 -#define FLOW_REPLY 2 +#define FLOW_REQ 1 +#define FLOW_REPLY 2 -#define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_SDU_SIZE 8980 -#define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ +#define THIS_TYPE IPCP_UDP +#define LISTEN_PORT htons(0x0D1F) +#define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 +#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define DNS_TTL 86400 +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS 0xFFFF +#define UDP_MAX_PORTS 0xFFFF struct mgmt_msg { uint16_t src_udp_port; @@ -106,9 +106,9 @@ struct { struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t sduloop; + pthread_t packet_loop; pthread_t handler; - pthread_t sdu_reader; + pthread_t packet_reader; bool fd_set_mod; pthread_cond_t fd_set_cond; @@ -495,13 +495,13 @@ static void * ipcp_udp_listener(void * o) return 0; } -static void * ipcp_udp_sdu_reader(void * o) +static void * ipcp_udp_packet_reader(void * o) { ssize_t n; int skfd; int fd; /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_SDU_SIZE]; + char buf[SHIM_UDP_MAX_PACKET_SIZE]; struct sockaddr_in r_saddr; struct timeval tv = {0, FD_UPDATE_TIMEOUT}; fd_set read_fds; @@ -533,7 +533,7 @@ static void * ipcp_udp_sdu_reader(void * o) n = sizeof(r_saddr); if ((n = recvfrom(skfd, &buf, - SHIM_UDP_MAX_SDU_SIZE, + SHIM_UDP_MAX_PACKET_SIZE, 0, (struct sockaddr *) &r_saddr, (unsigned *) &n)) <= 0) @@ -552,7 +552,7 @@ static void * ipcp_udp_sdu_reader(void * o) return (void *) 0; } -static void * ipcp_udp_sdu_loop(void * o) +static void * ipcp_udp_packet_loop(void * o) { int fd; struct shm_du_buff * sdb; @@ -582,7 +582,7 @@ static void * ipcp_udp_sdu_loop(void * o) if (send(fd, shm_du_buff_head(sdb), shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), 0) < 0) - log_err("Failed to send SDU."); + log_err("Failed to send PACKET."); pthread_cleanup_pop(true); } @@ -666,20 +666,20 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) goto fail_bind; } - if (pthread_create(&udp_data.sdu_reader, + if (pthread_create(&udp_data.packet_reader, NULL, - ipcp_udp_sdu_reader, + ipcp_udp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } - if (pthread_create(&udp_data.sduloop, + if (pthread_create(&udp_data.packet_loop, NULL, - ipcp_udp_sdu_loop, + ipcp_udp_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sduloop; + goto fail_packet_loop; } log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); @@ -688,10 +688,10 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sduloop: - pthread_cancel(udp_data.sdu_reader); - pthread_join(udp_data.sdu_reader, NULL); - fail_sdu_reader: + fail_packet_loop: + pthread_cancel(udp_data.packet_reader); + pthread_join(udp_data.packet_reader, NULL); + fail_packet_reader: pthread_cancel(udp_data.handler); pthread_join(udp_data.handler, NULL); fail_bind: @@ -1222,13 +1222,13 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.sduloop); + pthread_cancel(udp_data.packet_loop); pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.sdu_reader); + pthread_cancel(udp_data.packet_reader); - pthread_join(udp_data.sduloop, NULL); + pthread_join(udp_data.packet_loop, NULL); pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.sdu_reader, NULL); + pthread_join(udp_data.packet_reader, NULL); } udp_data_fini(); -- cgit v1.2.3