summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-07 16:11:09 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-07 16:11:09 +0200
commiteb9f44379d5316e7f7e9311d7a66d2041eca743a (patch)
tree2489605a42bb2c9582c0c4e912c2de0c40512b2a /src/lib/dev.c
parentde8f2015cbd015b1cced366cb12c054be62c23b1 (diff)
downloadouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.tar.gz
ouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.zip
irmd: flow allocation and fast path
This commit has a first implementation of flow allocation (the "slow path") and read/write (the "fast path") for ouroboros. It provides basic but unstable communications over the shared memory. It required a lot of changes all over the stack, and fixes a number of previously undetected issues. This PR still need heavy revision regarding data model, locking and cleanup. lib/dev: modifications to the API. It now uses an ap_init() call to set the AP name and sets the Instance ID to the pid of the process. It also binds the AP to the shared memory and creates tables for mappings in the fast path. A call to ap_fini() releases the resources. lib/shm_ap_rbuff: added ring buffer for data exchange between processes in the fast path. It passes an index in the shm_du_map. lib/shm_du_map: rewrote API to work with calls from dev.c. Garbage collector added. Tests updated to new API. ipcpd/ipcp-data: removed everything related to flows, as these are universal for all ap's and kept in ap_data (dev.c), or similar structs for shim ipcps. shim-udp: added flow allocator and read/write functions and shm elements. irmd: revised data model and structures necessary for flow allocation. tools: echo updated to new dev.h API. messaging system was updated to comply with new flow allocation messages. All exchanges use pid and port_id to bootstrap the fast path.
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c330
1 files changed, 232 insertions, 98 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 6d8411c5..40bf2dc3 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -25,73 +25,190 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/instance_name.h>
+#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/utils.h>
#include <stdlib.h>
+#include <string.h>
-int ap_reg(char * ap_name,
- char ** difs,
- size_t difs_size)
+#define AP_MAX_FLOWS 256
+
+#ifndef DU_BUFF_HEADSPACE
+ #define DU_BUFF_HEADSPACE 128
+#endif
+
+#ifndef DU_BUFF_TAILSPACE
+ #define DU_BUFF_TAILSPACE 0
+#endif
+
+struct flow {
+ struct shm_ap_rbuff * rb;
+ uint32_t port_id;
+ uint32_t oflags;
+
+ /* don't think this needs locking */
+};
+
+struct ap_data {
+ instance_name_t * api;
+ struct shm_du_map * dum;
+ struct bmp * fds;
+
+ struct shm_ap_rbuff * rb;
+ struct flow flows[AP_MAX_FLOWS];
+} * _ap_instance;
+
+
+int ap_init(char * ap_name)
{
- irm_msg_t msg = IRM_MSG__INIT;
+ _ap_instance = malloc(sizeof(struct ap_data));
+ if (_ap_instance == NULL) {
+ return -1;
+ }
+
+ _ap_instance->api = instance_name_create();
+ if (_ap_instance->api == NULL) {
+ free(_ap_instance);
+ return -1;
+ }
+
+ if (instance_name_init_from(_ap_instance->api,
+ ap_name,
+ getpid()) == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (_ap_instance->fds == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->dum = shm_du_map_open();
+ if (_ap_instance->dum == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->rb = shm_ap_rbuff_create();
+ if (_ap_instance->rb == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ return 0;
+}
+
+void ap_fini()
+{
+ int i = 0;
+
+ if (_ap_instance == NULL)
+ return;
+ if (_ap_instance->api != NULL)
+ instance_name_destroy(_ap_instance->api);
+ if (_ap_instance->fds != NULL)
+ bmp_destroy(_ap_instance->fds);
+ if (_ap_instance->dum != NULL)
+ shm_du_map_close(_ap_instance->dum);
+ if (_ap_instance->rb != NULL)
+ shm_ap_rbuff_destroy(_ap_instance->rb);
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+
+ free(_ap_instance);
+}
+
+#if 0
+static int port_id_to_fd(uint32_t port_id)
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].port_id == port_id
+ && _ap_instance->flows[i].state != FLOW_NULL)
+ return i;
+ return -1;
+}
+#endif
+
+int ap_reg(char ** difs,
+ size_t len)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = bmp_allocate(_ap_instance->fds);
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
+ if (_ap_instance == NULL) {
+ LOG_DBG("ap_init was not called");
+ return -1;
+ }
+
msg.code = IRM_MSG_CODE__IRM_AP_REG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ if (recv_msg->result < 0)
+ fd = -1;
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
}
-int ap_unreg(char * ap_name,
- char ** difs,
- size_t difs_size)
+int ap_unreg(char ** difs,
+ size_t len)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
int ret = -1;
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
msg.code = IRM_MSG_CODE__IRM_AP_UNREG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -102,38 +219,48 @@ int ap_unreg(char * ap_name,
return ret;
}
-int flow_accept(int fd,
- char * ap_name,
- char * ae_name)
+int flow_accept(int fd,
+ char ** ap_name,
+ char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int cli_fd = 0;
-
- if (ap_name == NULL) {
- return -EINVAL;
- }
+ int cfd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- cli_fd = recv_msg->fd;
- ap_name = recv_msg->ap_name;
- ae_name = recv_msg->ae_name;
+
+ cfd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[cfd].rb == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ _ap_instance->flows[cfd].port_id = recv_msg->port_id;
+ _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
+
+ *ap_name = strdup(recv_msg->ap_name);
+ if (ae_name != NULL)
+ *ae_name = strdup(recv_msg->ae_name);
irm_msg__free_unpacked(recv_msg, NULL);
- return cli_fd;
+
+ bmp_release(_ap_instance->fds, fd);
+
+ return cfd;
}
int flow_alloc_resp(int fd,
@@ -145,9 +272,9 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
msg.has_response = true;
msg.response = response;
@@ -155,7 +282,7 @@ int flow_alloc_resp(int fd,
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -167,41 +294,49 @@ int flow_alloc_resp(int fd,
}
int flow_alloc(char * dst_name,
- char * src_ap_name,
char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+ struct qos_spec * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = -1;
- if (dst_name == NULL ||
- src_ap_name == NULL) {
+ if (dst_name == NULL)
return -EINVAL;
- }
if (src_ae_name == NULL)
src_ae_name = UNKNOWN_AE;
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = dst_name;
- msg.ap_name = src_ap_name;
+ msg.ap_name = _ap_instance->api->name;
+ msg.has_pid = true;
+ msg.pid = _ap_instance->api->id;
msg.ae_name = src_ae_name;
- msg.has_oflags = true;
- msg.oflags = oflags;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ fd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ bmp_release(_ap_instance->fds, fd);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ _ap_instance->flows[fd].port_id = recv_msg->port_id;
+ _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+
irm_msg__free_unpacked(recv_msg, NULL);
+
return fd;
}
@@ -211,17 +346,15 @@ int flow_alloc_res(int fd)
irm_msg_t * recv_msg = NULL;
int result = 0;
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -238,17 +371,15 @@ int flow_dealloc(int fd)
irm_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -259,47 +390,50 @@ int flow_dealloc(int fd)
return ret;
}
-int flow_cntl(int fd, int oflags)
+int flow_cntl(int fd, int cmd, int oflags)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
- msg.oflags = oflags;
+ return -1;
+}
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ssize_t flow_write(int fd, void * buf, size_t count)
+{
+ /* the AP chooses the amount of headspace and tailspace */
+ size_t index = shm_create_du_buff(_ap_instance->dum,
+ count + DU_BUFF_HEADSPACE +
+ DU_BUFF_TAILSPACE,
+ DU_BUFF_HEADSPACE,
+ (uint8_t *) buf,
+ count);
+ struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
+ if (index == -1)
return -1;
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ shm_release_du_buff(_ap_instance->dum, index);
+ return -EPIPE;
}
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
+ return 0;
}
-ssize_t flow_write(int fd,
- void * buf,
- size_t count)
+ssize_t flow_read(int fd, void * buf, size_t count)
{
- LOG_MISSING;
+ struct rb_entry * e = NULL;
+ int n;
+ uint8_t * sdu;
+ /* FIXME: move this to a thread */
+ while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id)
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+
+ n = shm_du_map_read_sdu(&sdu,
+ _ap_instance->dum,
+ e->index);
+ if (n < 0)
+ return -1;
- return -1;
-}
+ memcpy(buf, sdu, MIN(n, count));
-ssize_t flow_read(int fd,
- void * buf,
- size_t count)
-{
- LOG_MISSING;
+ shm_release_du_buff(_ap_instance->dum, e->index);
- return -1;
+ return n;
}