summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-06-21 11:50:19 +0200
committerSander Vrijders <sander.vrijders@ugent.be>2017-06-21 12:21:54 +0200
commit806629e64e8231d0c57a80d3b6584094cd6c89bd (patch)
treee0bbe49e68ba86be325ee23c5879a7611df87c9f /src/lib/dev.c
parent22020246ac2b6f03f42dffb48ced19e43b3e9b77 (diff)
downloadouroboros-806629e64e8231d0c57a80d3b6584094cd6c89bd.tar.gz
ouroboros-806629e64e8231d0c57a80d3b6584094cd6c89bd.zip
lib, ipcpd, irmd: Add full-fledged QoS
This adds more Quality of Service support to Ouroboros. One part is the network specific characteristics such as bandwidth, delay, ... The other part is end-to-end QoS like reliability, window based flow control, ...
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c116
1 files changed, 84 insertions, 32 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 3cdcc4d3..e4fd32cc 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -34,11 +34,14 @@
#include <ouroboros/shm_rbuff.h>
#include <ouroboros/utils.h>
#include <ouroboros/fqueue.h>
+#include <ouroboros/qoscube.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
+#define BUF_SIZE 1500
+
struct flow_set {
size_t idx;
};
@@ -72,6 +75,7 @@ struct flow {
int port_id;
int oflags;
qoscube_t cube;
+ qosspec_t spec;
pid_t api;
@@ -174,24 +178,6 @@ static enum port_state port_wait_assign(int port_id)
return state;
}
-/* FIXME: translate real spec to cube */
-static qoscube_t spec_to_cube(qosspec_t * qs)
-{
- if (qs == NULL)
- return QOS_CUBE_BE;
-
- return qs->cube;
-}
-
-/* FIXME: fill real spec */
-static void fill_qosspec(qosspec_t * qs,
- qoscube_t cube)
-{
- assert(qs);
-
- qs->cube = cube;
-}
-
static int api_announce(char * ap_name)
{
irm_msg_t msg = IRM_MSG__INIT;
@@ -382,9 +368,13 @@ void ouroboros_fini()
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int fd = -1;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int fd = -1;
+ frct_enroll_msg_t * frct_enroll;
+ qosspec_t spec;
+ uint8_t data[BUF_SIZE];
+ ssize_t n;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
@@ -412,7 +402,8 @@ int flow_accept(qosspec_t * qs,
return res;
}
- if (!recv_msg->has_api || !recv_msg->has_port_id) {
+ if (!recv_msg->has_api || !recv_msg->has_port_id ||
+ !recv_msg->has_qoscube) {
irm_msg__free_unpacked(recv_msg, NULL);
return -EIRMD;
}
@@ -459,8 +450,7 @@ int flow_accept(qosspec_t * qs,
assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT);
- if (qs != NULL)
- fill_qosspec(qs, ai.flows[fd].cube);
+ spec = qos_cube_to_spec(recv_msg->qoscube);
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
@@ -469,6 +459,33 @@ int flow_accept(qosspec_t * qs,
irm_msg__free_unpacked(recv_msg, NULL);
+ n = flow_read(fd, data, BUF_SIZE);
+ if (n < 0) {
+ flow_dealloc(fd);
+ return n;
+ }
+
+ frct_enroll = frct_enroll_msg__unpack(NULL, n, data);
+ if (frct_enroll == NULL) {
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ spec.resource_control = frct_enroll->resource_control;
+ spec.reliable = frct_enroll->reliable;
+ spec.error_check = frct_enroll->error_check;
+ spec.ordered = frct_enroll->ordered;
+ spec.partial = frct_enroll->partial;
+
+ frct_enroll_msg__free_unpacked(frct_enroll, NULL);
+
+ pthread_rwlock_wrlock(&ai.flows_lock);
+ ai.flows[fd].spec = spec;
+ pthread_rwlock_unlock(&ai.flows_lock);
+
+ if (qs != NULL)
+ *qs = spec;
+
return fd;
}
@@ -476,18 +493,33 @@ int flow_alloc(const char * dst_name,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
-
- int fd;
+ irm_msg_t msg = IRM_MSG__INIT;
+ frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ qoscube_t qc = QOS_CUBE_BE;
+ int fd;
+ ssize_t len;
+ uint8_t * data;
+ int ret;
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = (char *) dst_name;
msg.has_api = true;
msg.has_qoscube = true;
- msg.qoscube = spec_to_cube(qs);
msg.api = ai.api;
+ if (qs != NULL) {
+ frct_enroll.resource_control = qs->resource_control;
+ frct_enroll.reliable = qs->reliable;
+ frct_enroll.error_check = qs->error_check;
+ frct_enroll.ordered = qs->ordered;
+ frct_enroll.partial = qs->partial;
+
+ qc = qos_spec_to_cube(*qs);
+ }
+
+ msg.qoscube = qc;
+
if (timeo != NULL) {
msg.has_timeo_sec = true;
msg.has_timeo_nsec = true;
@@ -560,9 +592,29 @@ int flow_alloc(const char * dst_name,
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
+ irm_msg__free_unpacked(recv_msg, NULL);
+
pthread_rwlock_unlock(&ai.flows_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
+ len = frct_enroll_msg__get_packed_size(&frct_enroll);
+ if (len < 0) {
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ data = malloc(len);
+ if (data == NULL) {
+ flow_dealloc(fd);
+ return -ENOMEM;
+ }
+
+ frct_enroll_msg__pack(&frct_enroll, data);
+
+ ret = flow_write(fd, data, len);
+ if (ret < 0) {
+ flow_dealloc(fd);
+ return ret;
+ }
return fd;
}
@@ -582,7 +634,7 @@ int flow_dealloc(int fd)
pthread_rwlock_rdlock(&ai.flows_lock);
- assert (!(ai.flows[fd].port_id < 0));
+ assert(!(ai.flows[fd].port_id < 0));
msg.port_id = ai.flows[fd].port_id;
@@ -722,7 +774,7 @@ int flow_get_qosspec(int fd,
return -ENOTALLOC;
}
- fill_qosspec(qs, ai.flows[fd].cube);
+ *qs = ai.flows[fd].spec;
pthread_rwlock_unlock(&ai.flows_lock);