From 806629e64e8231d0c57a80d3b6584094cd6c89bd Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 21 Jun 2017 11:50:19 +0200 Subject: lib, ipcpd, irmd: Add full-fledged QoS This adds more Quality of Service support to Ouroboros. One part is the network specific characteristics such as bandwidth, delay, ... The other part is end-to-end QoS like reliability, window based flow control, ... --- src/ipcpd/normal/dt_pci.h | 2 +- src/ipcpd/normal/fa.h | 2 +- src/ipcpd/normal/neighbors.c | 2 +- src/ipcpd/normal/pol/complete.c | 8 +-- src/ipcpd/shim-data.h | 2 +- src/ipcpd/shim-eth-llc/main.c | 2 +- src/ipcpd/shim-udp/main.c | 14 ++--- src/irmd/ipcp.h | 2 +- src/irmd/irm_flow.c | 2 + src/irmd/irm_flow.h | 2 +- src/irmd/registry.h | 2 +- src/lib/CMakeLists.txt | 5 +- src/lib/dev.c | 116 +++++++++++++++++++++++++++++----------- src/lib/frct_enroll.proto | 32 +++++++++++ src/lib/qos.c | 57 ++++++++++++++++++++ src/lib/qoscube.c | 76 ++++++++++++++++++++++++++ 16 files changed, 270 insertions(+), 56 deletions(-) create mode 100644 src/lib/frct_enroll.proto create mode 100644 src/lib/qos.c create mode 100644 src/lib/qoscube.c (limited to 'src') diff --git a/src/ipcpd/normal/dt_pci.h b/src/ipcpd/normal/dt_pci.h index ddcf9066..2291329a 100644 --- a/src/ipcpd/normal/dt_pci.h +++ b/src/ipcpd/normal/dt_pci.h @@ -25,7 +25,7 @@ #include #include -#include +#include #include #include diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index a77dc723..929c0623 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -23,7 +23,7 @@ #ifndef OUROBOROS_IPCPD_NORMAL_FA_H #define OUROBOROS_IPCPD_NORMAL_FA_H -#include +#include #include int fa_init(void); diff --git a/src/ipcpd/normal/neighbors.c b/src/ipcpd/normal/neighbors.c index 0dbc22f2..0fb721c2 100644 --- a/src/ipcpd/normal/neighbors.c +++ b/src/ipcpd/normal/neighbors.c @@ -23,7 +23,7 @@ #define OUROBOROS_PREFIX "neighbors" #include -#include +#include #include #include #include diff --git a/src/ipcpd/normal/pol/complete.c b/src/ipcpd/normal/pol/complete.c index 732556c6..74848a1f 100644 --- a/src/ipcpd/normal/pol/complete.c +++ b/src/ipcpd/normal/pol/complete.c @@ -23,7 +23,7 @@ #define OUROBOROS_PREFIX "complete" #include -#include +#include #include #include #include @@ -92,11 +92,7 @@ static void * allocator(void * o) complete = (struct complete *) o; - qs.delay = 0; - qs.jitter = 0; - - /* FIXME: implement QoS specs */ - qs.cube = QOS_CUBE_BE; + qosspec_init(&qs); while (true) { len = rib_children(MEMBERS_PATH, &children); diff --git a/src/ipcpd/shim-data.h b/src/ipcpd/shim-data.h index d53373df..1951fa9d 100644 --- a/src/ipcpd/shim-data.h +++ b/src/ipcpd/shim-data.h @@ -23,7 +23,7 @@ #ifndef IPCPD_IPCP_DATA_H #define IPCPD_IPCP_DATA_H -#include +#include #include #include diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 971eeb49..99d46a1d 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -979,7 +979,7 @@ static int eth_llc_ipcp_flow_alloc(int fd, assert(hash); - if (cube != QOS_CUBE_BE && cube != QOS_CUBE_FRC) { + if (cube != QOS_CUBE_BE) { log_dbg("Unsupported QoS requested."); return -1; } diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 892334b2..b793c414 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -196,10 +196,10 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg, return 0; } -static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, - uint16_t src_udp_port, +static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, + uint16_t src_udp_port, const uint8_t * dst, - qoscube_t cube) + qoscube_t cube) { shim_udp_msg_t msg = SHIM_UDP_MSG__INIT; @@ -899,7 +899,7 @@ static int ipcp_udp_flow_alloc(int fd, assert(dst); - if (cube != QOS_CUBE_BE && cube != QOS_CUBE_FRC) { + if (cube != QOS_CUBE_BE) { log_dbg("Unsupported QoS requested."); return -1; } @@ -923,7 +923,6 @@ static int ipcp_udp_flow_alloc(int fd, return -1; } - if (!shim_data_dir_has(ipcpi.shim_data, dst)) { log_dbg("Could not resolve destination."); close(skfd); @@ -952,10 +951,7 @@ static int ipcp_udp_flow_alloc(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, - f_saddr.sin_port, - dst, - cube) < 0) { + if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) { pthread_rwlock_wrlock(&udp_data.flows_lock); udp_data.fd_to_uf[fd].udp = -1; diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 74175f97..fde0428c 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 8b85f36f..766c45ca 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -61,6 +61,8 @@ struct irm_flow * irm_flow_create(pid_t n_api, f->n_rb = shm_rbuff_create(n_api, port_id); if (f->n_rb == NULL) { log_err("Could not create ringbuffer for AP-I %d.", n_api); + pthread_mutex_destroy(&f->state_lock); + pthread_cond_destroy(&f->state_cond); free(f); return NULL; } diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 8902a6ab..223740b7 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -25,7 +25,7 @@ #include #include -#include +#include #include #include diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 29cefc02..2b887162 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include "api_table.h" #include "apn_table.h" diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 263065dd..99dd96fc 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -11,6 +11,7 @@ protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto) protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto) protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) +protobuf_generate_c(FRCT_ENROLL_SRCS FRCT_ENROLL_HDRS frct_enroll.proto) if (NOT APPLE) find_library(LIBRT_LIBRARIES rt) @@ -43,6 +44,8 @@ set(SOURCE_FILES logs.c md5.c nsm.c + qos.c + qoscube.c rib.c sha3.c shm_flow_set.c @@ -55,7 +58,7 @@ set(SOURCE_FILES add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS} - ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS}) + ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS} ${FRCT_ENROLL_SRCS}) target_link_libraries(ouroboros ${LIBRT_LIBRARIES} ${LIBPTHREAD_LIBRARIES} ${PROTOBUF_C_LIBRARY}) diff --git a/src/lib/dev.c b/src/lib/dev.c index 3cdcc4d3..e4fd32cc 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,11 +34,14 @@ #include #include #include +#include #include #include #include +#define BUF_SIZE 1500 + struct flow_set { size_t idx; }; @@ -72,6 +75,7 @@ struct flow { int port_id; int oflags; qoscube_t cube; + qosspec_t spec; pid_t api; @@ -174,24 +178,6 @@ static enum port_state port_wait_assign(int port_id) return state; } -/* FIXME: translate real spec to cube */ -static qoscube_t spec_to_cube(qosspec_t * qs) -{ - if (qs == NULL) - return QOS_CUBE_BE; - - return qs->cube; -} - -/* FIXME: fill real spec */ -static void fill_qosspec(qosspec_t * qs, - qoscube_t cube) -{ - assert(qs); - - qs->cube = cube; -} - static int api_announce(char * ap_name) { irm_msg_t msg = IRM_MSG__INIT; @@ -382,9 +368,13 @@ void ouroboros_fini() int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int fd = -1; + frct_enroll_msg_t * frct_enroll; + qosspec_t spec; + uint8_t data[BUF_SIZE]; + ssize_t n; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; @@ -412,7 +402,8 @@ int flow_accept(qosspec_t * qs, return res; } - if (!recv_msg->has_api || !recv_msg->has_port_id) { + if (!recv_msg->has_api || !recv_msg->has_port_id || + !recv_msg->has_qoscube) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } @@ -459,8 +450,7 @@ int flow_accept(qosspec_t * qs, assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); - if (qs != NULL) - fill_qosspec(qs, ai.flows[fd].cube); + spec = qos_cube_to_spec(recv_msg->qoscube); ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -469,6 +459,33 @@ int flow_accept(qosspec_t * qs, irm_msg__free_unpacked(recv_msg, NULL); + n = flow_read(fd, data, BUF_SIZE); + if (n < 0) { + flow_dealloc(fd); + return n; + } + + frct_enroll = frct_enroll_msg__unpack(NULL, n, data); + if (frct_enroll == NULL) { + flow_dealloc(fd); + return -1; + } + + spec.resource_control = frct_enroll->resource_control; + spec.reliable = frct_enroll->reliable; + spec.error_check = frct_enroll->error_check; + spec.ordered = frct_enroll->ordered; + spec.partial = frct_enroll->partial; + + frct_enroll_msg__free_unpacked(frct_enroll, NULL); + + pthread_rwlock_wrlock(&ai.flows_lock); + ai.flows[fd].spec = spec; + pthread_rwlock_unlock(&ai.flows_lock); + + if (qs != NULL) + *qs = spec; + return fd; } @@ -476,18 +493,33 @@ int flow_alloc(const char * dst_name, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - - int fd; + irm_msg_t msg = IRM_MSG__INIT; + frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT; + irm_msg_t * recv_msg = NULL; + qoscube_t qc = QOS_CUBE_BE; + int fd; + ssize_t len; + uint8_t * data; + int ret; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = (char *) dst_name; msg.has_api = true; msg.has_qoscube = true; - msg.qoscube = spec_to_cube(qs); msg.api = ai.api; + if (qs != NULL) { + frct_enroll.resource_control = qs->resource_control; + frct_enroll.reliable = qs->reliable; + frct_enroll.error_check = qs->error_check; + frct_enroll.ordered = qs->ordered; + frct_enroll.partial = qs->partial; + + qc = qos_spec_to_cube(*qs); + } + + msg.qoscube = qc; + if (timeo != NULL) { msg.has_timeo_sec = true; msg.has_timeo_nsec = true; @@ -560,9 +592,29 @@ int flow_alloc(const char * dst_name, ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; + irm_msg__free_unpacked(recv_msg, NULL); + pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); + len = frct_enroll_msg__get_packed_size(&frct_enroll); + if (len < 0) { + flow_dealloc(fd); + return -1; + } + + data = malloc(len); + if (data == NULL) { + flow_dealloc(fd); + return -ENOMEM; + } + + frct_enroll_msg__pack(&frct_enroll, data); + + ret = flow_write(fd, data, len); + if (ret < 0) { + flow_dealloc(fd); + return ret; + } return fd; } @@ -582,7 +634,7 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&ai.flows_lock); - assert (!(ai.flows[fd].port_id < 0)); + assert(!(ai.flows[fd].port_id < 0)); msg.port_id = ai.flows[fd].port_id; @@ -722,7 +774,7 @@ int flow_get_qosspec(int fd, return -ENOTALLOC; } - fill_qosspec(qs, ai.flows[fd].cube); + *qs = ai.flows[fd].spec; pthread_rwlock_unlock(&ai.flows_lock); diff --git a/src/lib/frct_enroll.proto b/src/lib/frct_enroll.proto new file mode 100644 index 00000000..497d6acc --- /dev/null +++ b/src/lib/frct_enroll.proto @@ -0,0 +1,32 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * QoS messages + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +syntax = "proto2"; + +message frct_enroll_msg { + required bool resource_control = 1; + required bool reliable = 2; + required bool error_check = 3; + required bool ordered = 4; + required bool partial = 5; +}; diff --git a/src/lib/qos.c b/src/lib/qos.c new file mode 100644 index 00000000..035b9049 --- /dev/null +++ b/src/lib/qos.c @@ -0,0 +1,57 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Quality of Service cube specifications + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#include +#include + +#include +#include + +int qosspec_init(qosspec_t * qs) +{ + if (qs == NULL) + return -EINVAL; + + qs->delay = UINT32_MAX; + qs->bandwidth = UINT64_MAX; + qs->availability = 0; + qs->maximum_interruption = UINT32_MAX; + + qs->resource_control = true; + qs->reliable = false; + qs->error_check = true; + qs->ordered = true; + qs->partial = false; + + return 0; +} + +int qosspec_fini(qosspec_t * qs) +{ + if (qs == NULL) + return -EINVAL; + + qs = NULL; + + return 0; +} diff --git a/src/lib/qoscube.c b/src/lib/qoscube.c new file mode 100644 index 00000000..4ab827a6 --- /dev/null +++ b/src/lib/qoscube.c @@ -0,0 +1,76 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Quality of Service cube + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#include + +#include + +static struct qos_spec qos_best_effort = { + .delay = UINT32_MAX, + .bandwidth = UINT64_MAX, + .availability = 0, + .maximum_interruption = UINT32_MAX +}; + +static struct qos_spec qos_video = { + .delay = 100, + .bandwidth = UINT64_MAX, + .availability = 3, + .maximum_interruption = 100 +}; + +static struct qos_spec qos_voice = { + .delay = 10, + .bandwidth = 100000, + .availability = 5, + .maximum_interruption = 50 +}; + +qoscube_t qos_spec_to_cube(qosspec_t qs) +{ + if (qs.delay <= qos_voice.delay && + qs.bandwidth <= qos_voice.bandwidth && + qs.availability >= qos_voice.availability && + qs.maximum_interruption <= qos_voice.maximum_interruption) + return QOS_CUBE_VOICE; + else if (qs.delay <= qos_video.delay && + qs.bandwidth <= qos_video.bandwidth && + qs.availability >= qos_video.availability && + qs.maximum_interruption <= qos_video.maximum_interruption) + return QOS_CUBE_VIDEO; + else + return QOS_CUBE_BE; +} + +qosspec_t qos_cube_to_spec(qoscube_t qc) +{ + switch (qc) { + case QOS_CUBE_VOICE: + return qos_voice; + case QOS_CUBE_VIDEO: + return qos_video; + case QOS_CUBE_BE: + default: + return qos_best_effort; + } +} -- cgit v1.2.3