From 806629e64e8231d0c57a80d3b6584094cd6c89bd Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 21 Jun 2017 11:50:19 +0200 Subject: lib, ipcpd, irmd: Add full-fledged QoS This adds more Quality of Service support to Ouroboros. One part is the network specific characteristics such as bandwidth, delay, ... The other part is end-to-end QoS like reliability, window based flow control, ... --- src/lib/dev.c | 116 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 84 insertions(+), 32 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index 3cdcc4d3..e4fd32cc 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,11 +34,14 @@ #include #include #include +#include #include #include #include +#define BUF_SIZE 1500 + struct flow_set { size_t idx; }; @@ -72,6 +75,7 @@ struct flow { int port_id; int oflags; qoscube_t cube; + qosspec_t spec; pid_t api; @@ -174,24 +178,6 @@ static enum port_state port_wait_assign(int port_id) return state; } -/* FIXME: translate real spec to cube */ -static qoscube_t spec_to_cube(qosspec_t * qs) -{ - if (qs == NULL) - return QOS_CUBE_BE; - - return qs->cube; -} - -/* FIXME: fill real spec */ -static void fill_qosspec(qosspec_t * qs, - qoscube_t cube) -{ - assert(qs); - - qs->cube = cube; -} - static int api_announce(char * ap_name) { irm_msg_t msg = IRM_MSG__INIT; @@ -382,9 +368,13 @@ void ouroboros_fini() int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int fd = -1; + frct_enroll_msg_t * frct_enroll; + qosspec_t spec; + uint8_t data[BUF_SIZE]; + ssize_t n; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; @@ -412,7 +402,8 @@ int flow_accept(qosspec_t * qs, return res; } - if (!recv_msg->has_api || !recv_msg->has_port_id) { + if (!recv_msg->has_api || !recv_msg->has_port_id || + !recv_msg->has_qoscube) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } @@ -459,8 +450,7 @@ int flow_accept(qosspec_t * qs, assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); - if (qs != NULL) - fill_qosspec(qs, ai.flows[fd].cube); + spec = qos_cube_to_spec(recv_msg->qoscube); ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -469,6 +459,33 @@ int flow_accept(qosspec_t * qs, irm_msg__free_unpacked(recv_msg, NULL); + n = flow_read(fd, data, BUF_SIZE); + if (n < 0) { + flow_dealloc(fd); + return n; + } + + frct_enroll = frct_enroll_msg__unpack(NULL, n, data); + if (frct_enroll == NULL) { + flow_dealloc(fd); + return -1; + } + + spec.resource_control = frct_enroll->resource_control; + spec.reliable = frct_enroll->reliable; + spec.error_check = frct_enroll->error_check; + spec.ordered = frct_enroll->ordered; + spec.partial = frct_enroll->partial; + + frct_enroll_msg__free_unpacked(frct_enroll, NULL); + + pthread_rwlock_wrlock(&ai.flows_lock); + ai.flows[fd].spec = spec; + pthread_rwlock_unlock(&ai.flows_lock); + + if (qs != NULL) + *qs = spec; + return fd; } @@ -476,18 +493,33 @@ int flow_alloc(const char * dst_name, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - - int fd; + irm_msg_t msg = IRM_MSG__INIT; + frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT; + irm_msg_t * recv_msg = NULL; + qoscube_t qc = QOS_CUBE_BE; + int fd; + ssize_t len; + uint8_t * data; + int ret; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = (char *) dst_name; msg.has_api = true; msg.has_qoscube = true; - msg.qoscube = spec_to_cube(qs); msg.api = ai.api; + if (qs != NULL) { + frct_enroll.resource_control = qs->resource_control; + frct_enroll.reliable = qs->reliable; + frct_enroll.error_check = qs->error_check; + frct_enroll.ordered = qs->ordered; + frct_enroll.partial = qs->partial; + + qc = qos_spec_to_cube(*qs); + } + + msg.qoscube = qc; + if (timeo != NULL) { msg.has_timeo_sec = true; msg.has_timeo_nsec = true; @@ -560,9 +592,29 @@ int flow_alloc(const char * dst_name, ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; + irm_msg__free_unpacked(recv_msg, NULL); + pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); + len = frct_enroll_msg__get_packed_size(&frct_enroll); + if (len < 0) { + flow_dealloc(fd); + return -1; + } + + data = malloc(len); + if (data == NULL) { + flow_dealloc(fd); + return -ENOMEM; + } + + frct_enroll_msg__pack(&frct_enroll, data); + + ret = flow_write(fd, data, len); + if (ret < 0) { + flow_dealloc(fd); + return ret; + } return fd; } @@ -582,7 +634,7 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&ai.flows_lock); - assert (!(ai.flows[fd].port_id < 0)); + assert(!(ai.flows[fd].port_id < 0)); msg.port_id = ai.flows[fd].port_id; @@ -722,7 +774,7 @@ int flow_get_qosspec(int fd, return -ENOTALLOC; } - fill_qosspec(qs, ai.flows[fd].cube); + *qs = ai.flows[fd].spec; pthread_rwlock_unlock(&ai.flows_lock); -- cgit v1.2.3