diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-05-07 16:11:09 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-05-07 16:11:09 +0200 |
commit | eb9f44379d5316e7f7e9311d7a66d2041eca743a (patch) | |
tree | 2489605a42bb2c9582c0c4e912c2de0c40512b2a /src/lib | |
parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
download | ouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.tar.gz ouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.zip |
irmd: flow allocation and fast path
This commit has a first implementation of flow allocation (the "slow
path") and read/write (the "fast path") for ouroboros. It provides
basic but unstable communications over the shared memory.
It required a lot of changes all over the stack, and fixes a number of
previously undetected issues.
This PR still need heavy revision regarding data model, locking and
cleanup.
lib/dev: modifications to the API. It now uses an ap_init() call to
set the AP name and sets the Instance ID to the pid of the process. It
also binds the AP to the shared memory and creates tables for mappings
in the fast path. A call to ap_fini() releases the resources.
lib/shm_ap_rbuff: added ring buffer for data exchange between
processes in the fast path. It passes an index in the shm_du_map.
lib/shm_du_map: rewrote API to work with calls from dev.c. Garbage
collector added. Tests updated to new API.
ipcpd/ipcp-data: removed everything related to flows, as these are
universal for all ap's and kept in ap_data (dev.c), or similar structs
for shim ipcps.
shim-udp: added flow allocator and read/write functions and shm
elements.
irmd: revised data model and structures necessary for flow allocation.
tools: echo updated to new dev.h API.
messaging system was updated to comply with new flow allocation
messages. All exchanges use pid and port_id to bootstrap the fast
path.
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/bitmap.c | 14 | ||||
-rw-r--r-- | src/lib/dev.c | 330 | ||||
-rw-r--r-- | src/lib/ipcp.c | 65 | ||||
-rw-r--r-- | src/lib/ipcpd_messages.proto | 6 | ||||
-rw-r--r-- | src/lib/irmd_messages.proto | 26 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 268 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 143 | ||||
-rw-r--r-- | src/lib/tests/shm_du_map_test.c | 53 |
9 files changed, 690 insertions, 216 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4922e07c..53a7b354 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES ipcp.c irm.c list.c + shm_ap_rbuff.c shm_du_map.c sockets.c utils.c diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 8aabb4f4..0e3c968f 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset) return NULL; tmp = malloc(sizeof(*tmp)); - if (!tmp) + if (tmp == NULL) return NULL; - tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap))); - if (!tmp->bitmap) + tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long)); + if (!tmp->bitmap) { + free(tmp); return NULL; + } tmp->size = bits; tmp->offset = offset; @@ -140,8 +142,6 @@ int bmp_destroy(struct bmp * b) static ssize_t bad_id(struct bmp * b) { - assert(b); - return b->offset - 1; } @@ -149,8 +149,8 @@ ssize_t bmp_allocate(struct bmp * b) { ssize_t id; - if (!b) - return bad_id(b); + if (b == NULL) + return -1; id = (ssize_t) find_next_zero_bit(b->bitmap, b->size); diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..40bf2dc3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@ #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/sockets.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/instance_name.h> +#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/utils.h> #include <stdlib.h> +#include <string.h> -int ap_reg(char * ap_name, - char ** difs, - size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { + struct shm_ap_rbuff * rb; + uint32_t port_id; + uint32_t oflags; + + /* don't think this needs locking */ +}; + +struct ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name) { - irm_msg_t msg = IRM_MSG__INIT; + _ap_instance = malloc(sizeof(struct ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} +#endif + +int ap_reg(char ** difs, + size_t len) +{ + irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = bmp_allocate(_ap_instance->fds); - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } + if (_ap_instance == NULL) { + LOG_DBG("ap_init was not called"); + return -1; + } + msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + if (recv_msg->result < 0) + fd = -1; + irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int ap_unreg(char * ap_name, - char ** difs, - size_t difs_size) +int ap_unreg(char ** difs, + size_t len) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; int ret = -1; - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -102,38 +219,48 @@ int ap_unreg(char * ap_name, return ret; } -int flow_accept(int fd, - char * ap_name, - char * ae_name) +int flow_accept(int fd, + char ** ap_name, + char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int cli_fd = 0; - - if (ap_name == NULL) { - return -EINVAL; - } + int cfd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - cli_fd = recv_msg->fd; - ap_name = recv_msg->ap_name; - ae_name = recv_msg->ae_name; + + cfd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + _ap_instance->flows[cfd].port_id = recv_msg->port_id; + _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; + + *ap_name = strdup(recv_msg->ap_name); + if (ae_name != NULL) + *ae_name = strdup(recv_msg->ae_name); irm_msg__free_unpacked(recv_msg, NULL); - return cli_fd; + + bmp_release(_ap_instance->fds, fd); + + return cfd; } int flow_alloc_resp(int fd, @@ -145,9 +272,9 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; msg.has_response = true; msg.response = response; @@ -155,7 +282,7 @@ int flow_alloc_resp(int fd, if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -167,41 +294,49 @@ int flow_alloc_resp(int fd, } int flow_alloc(char * dst_name, - char * src_ap_name, char * src_ae_name, - struct qos_spec * qos, - int oflags) + struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = -1; - if (dst_name == NULL || - src_ap_name == NULL) { + if (dst_name == NULL) return -EINVAL; - } if (src_ae_name == NULL) src_ae_name = UNKNOWN_AE; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = src_ap_name; + msg.ap_name = _ap_instance->api->name; + msg.has_pid = true; + msg.pid = _ap_instance->api->id; msg.ae_name = src_ae_name; - msg.has_oflags = true; - msg.oflags = oflags; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + fd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[fd].rb == NULL) { + bmp_release(_ap_instance->fds, fd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + _ap_instance->flows[fd].port_id = recv_msg->port_id; + _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + irm_msg__free_unpacked(recv_msg, NULL); + return fd; } @@ -211,17 +346,15 @@ int flow_alloc_res(int fd) irm_msg_t * recv_msg = NULL; int result = 0; - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -238,17 +371,15 @@ int flow_dealloc(int fd) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -259,47 +390,50 @@ int flow_dealloc(int fd) return ret; } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; - msg.oflags = oflags; + return -1; +} - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + if (index == -1) return -1; - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; } - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; + return 0; } -ssize_t flow_write(int fd, - void * buf, - size_t count) +ssize_t flow_read(int fd, void * buf, size_t count) { - LOG_MISSING; + struct rb_entry * e = NULL; + int n; + uint8_t * sdu; + /* FIXME: move this to a thread */ + while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) + e = shm_ap_rbuff_read(_ap_instance->rb); + + n = shm_du_map_read_sdu(&sdu, + _ap_instance->dum, + e->index); + if (n < 0) + return -1; - return -1; -} + memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, - void * buf, - size_t count) -{ - LOG_MISSING; + shm_release_du_buff(_ap_instance->dum, e->index); - return -1; + return n; } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 387572b3..75676915 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -121,6 +121,8 @@ pid_t ipcp_create(char * ipcp_name, return pid; } + /* clear fd table */ + if (ipcp_type == IPCP_NORMAL) exec_name = IPCP_NORMAL_EXEC; else if (ipcp_type == IPCP_SHIM_UDP) @@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_ENROLL; - msg.member_name = malloc(sizeof(*(msg.member_name))); - if (msg.member_name == NULL) { - LOG_ERR("Failed to malloc."); - return -1; - } - msg.n_1_dif = n_1_dif; msg.member_name = member_name; + msg.n_1_dif = n_1_dif; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) { @@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t pid, if (name == NULL) return -1; - msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; - msg.name = name; + msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; + msg.name = name; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t pid, int ipcp_flow_alloc(pid_t pid, uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, @@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; msg.src_ap_name = src_ap_name; msg.src_ae_name = src_ae_name; msg.dst_name = dst_name; - msg.port_id = port_id; - msg.has_port_id = true; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { ipcp_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t pid, int ipcp_flow_alloc_resp(pid_t pid, uint32_t port_id, - int result) + pid_t n_pid, + int response) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; - msg.has_result = true; - msg.result = result; + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; + msg.has_response = true; + msg.response = response; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t pid, return ret; } -int ipcp_flow_req_arr(pid_t pid, - char * dst_name, - char * src_ap_name, - char * src_ae_name) +int ipcp_flow_req_arr(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = -1; + int port_id = -1; if (src_ap_name == NULL || src_ae_name == NULL) return -EINVAL; msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; msg.dst_name = dst_name; msg.ap_name = src_ap_name; msg.ae_name = src_ae_name; - msg.pid = pid; - msg.has_pid = true; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + port_id = recv_msg->port_id; irm_msg__free_unpacked(recv_msg, NULL); - return fd; + return port_id; } int ipcp_flow_alloc_reply(pid_t pid, @@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; @@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { irm_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index da4bb469..daca011d 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -25,6 +25,8 @@ message ipcp_msg { optional string src_ap_name = 9; optional string src_ae_name = 10; optional dif_config_msg conf = 11; - optional int32 result = 12; - optional int32 fd = 13; + optional int32 fd = 12; + optional int32 pid = 13; + optional int32 response = 14; + optional int32 result = 15; }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 89e2c882..c336614e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -36,13 +36,10 @@ enum irm_msg_code { IRM_FLOW_ALLOC = 11; IRM_FLOW_ALLOC_RES = 12; IRM_FLOW_DEALLOC = 13; - IRM_FLOW_CONTROL = 14; - IRM_FLOW_WRITE = 15; - IRM_FLOW_READ = 16; - IPCP_FLOW_REQ_ARR = 17; - IPCP_FLOW_ALLOC_REPLY = 18; - IPCP_FLOW_DEALLOC = 19; - IRM_REPLY = 20; + IPCP_FLOW_REQ_ARR = 14; + IPCP_FLOW_ALLOC_REPLY = 15; + IPCP_FLOW_DEALLOC = 16; + IRM_REPLY = 17; }; message irm_msg { @@ -52,12 +49,11 @@ message irm_msg { optional uint32 api_id = 3; optional uint32 ipcp_type = 5; repeated string dif_name = 6; - optional int32 fd = 7; - optional int32 response = 8; - optional int32 oflags = 9; - optional string dst_name = 10; - optional uint32 port_id = 11; - optional int32 pid = 12; - optional dif_config_msg conf = 13; - optional int32 result = 14; + optional int32 response = 7; + optional string dst_name = 8; + optional uint32 port_id = 9; + optional int32 pid = 10; + optional dif_config_msg conf = 11; + optional int32 cfd = 12; + optional int32 result = 13; }; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c new file mode 100644 index 00000000..4bd64775 --- /dev/null +++ b/src/lib/shm_ap_rbuff.c @@ -0,0 +1,268 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for application processes + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/shm_ap_rbuff.h> +#define OUROBOROS_PREFIX "shm_ap_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <stdbool.h> +#include <errno.h> + +#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ + + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\ + & (SHM_RBUFF_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) + +struct shm_ap_rbuff { + struct rb_entry * shm_base; /* start of entry */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pid_t pid; /* pid to which this rb belongs */ + int fd; +}; + +struct shm_ap_rbuff * shm_ap_rbuff_create() +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + pthread_mutexattr_t attr; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF "%d", getpid()); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed creating ring buffer."); + free(rb); + return NULL; + } + + if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) { + LOG_DBGF("Failed to extend ringbuffer."); + free(rb); + return NULL; + } + + if (write(shm_fd, "", 1) != 1) { + LOG_DBGF("Failed to finalise extension of ringbuffer."); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->shm_mutex, &attr); + + *rb->ptr_head = 0; + *rb->ptr_tail = 0; + + rb->fd = shm_fd; + rb->pid = getpid(); + + return rb; +} + +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF "%d", pid); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed opening shared memory %s.", fn); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + rb->fd = shm_fd; + rb->pid = pid; + + return rb; +} +void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + sprintf(fn, SHM_AP_RBUFF "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (rb->pid != getpid()) { + LOG_ERR("Tried to destroy other AP's rbuff."); + return; + } + + sprintf(fn, SHM_AP_RBUFF "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to unlink shm."); + + free(rb); +} + +int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) +{ + struct rb_entry * pos; + + if (rb == NULL || e == NULL) + return -1; + + pthread_mutex_lock(rb->shm_mutex); + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->shm_mutex); + return -1; + } + + pos = rb->shm_base + *rb->ptr_head; + *pos = *e; + *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return 0; +} +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +{ + struct rb_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + if (rb == NULL) + return NULL; + + pthread_mutex_lock(rb->shm_mutex); + + if (shm_rbuff_used(rb) == 0) { + pthread_mutex_unlock(rb->shm_mutex); + return NULL; + } + + *e = *(rb->shm_base + *rb->ptr_tail); + + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return e; +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index dfccca6a..56062c9d 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -45,6 +45,9 @@ ((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \ SHM_DU_BUFF_BLOCK_SIZE))) +#define idx_to_du_buff_ptr(dum, idx) \ + ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) + #define block_ptr_to_idx(dum, sdb) \ (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) @@ -52,27 +55,31 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ + idx_to_du_buff_ptr(dum, idx)->du_head) + #define MIN(a,b)(a < b ? a : b) struct shm_du_buff { - size_t size; - size_t du_head; - size_t du_tail; + size_t size; + size_t du_head; + size_t du_tail; + size_t garbage; }; struct shm_du_map { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - int fd; + uint8_t * shm_base; /* start of blocks */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + int fd; }; struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; pthread_mutexattr_t attr; dum = malloc(sizeof *dum); @@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; + + dum = malloc(sizeof *dum); + if (dum == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); if (shm_fd == -1) { @@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open() return NULL; } - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - dum->shm_base = shm_base; dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); @@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum) if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); + free(dum); +} + +void shm_du_map_destroy(struct shm_du_map * dum) +{ + if (dum == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) LOG_DBGF("Failed to unlink shm."); free(dum); } -struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +int shm_create_du_buff(struct shm_du_map * dum, + size_t size, + size_t headspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; long blocks = 0; int sz = size + sizeof *sdb; int sz2 = headspace + len + sizeof *sdb; - uint8_t * write_pos; + uint8_t * write_pos; size_t copy_len; + size_t index; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); - return NULL; + return -1; } if (headspace >= size) { LOG_DBGF("Index out of bounds."); - return NULL; + return -1; } if (headspace + len > size) { LOG_DBGF("Buffer too small for data."); - return NULL; + return -1; } pthread_mutex_lock(dum->shm_mutex); @@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, if (sz2 < 0 && sz > 0) { pthread_mutex_unlock(dum->shm_mutex); LOG_DBG("Can't handle this packet now"); - return NULL; + return -1; } ++blocks; } if (!shm_map_free(dum, blocks)) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBGF("Allocation failed, Out of Memory."); - return NULL; + return -1; } sdb = get_head_ptr(dum); sdb->size = size; + sdb->garbage = 0; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, --blocks; } + index = *dum->ptr_head - 1; + pthread_mutex_unlock(dum->shm_mutex); - return sdb; + return index; } -int shm_release_du_buff(struct shm_du_map * dum) +/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ +int shm_du_map_read_sdu(uint8_t ** dst, + struct shm_du_map * dum, + size_t idx) +{ + size_t len = 0; + + if (idx > SHM_BLOCKS_IN_MAP) + return -1; + + pthread_mutex_lock(dum->shm_mutex); + + if (*dum->ptr_head == *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return -1; + } + + *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + + sizeof(struct shm_du_buff) + + idx_to_du_buff_ptr(dum, idx)->du_head; + len = sdu_size(dum, idx); + + pthread_mutex_unlock(dum->shm_mutex); + + return len; +} + +int shm_release_du_buff(struct shm_du_map * dum, size_t idx) { long sz; long blocks = 0; + + /* FIXME: this is crap for the test */ + if (idx > SHM_BLOCKS_IN_MAP) + idx = *dum->ptr_tail; + pthread_mutex_lock(dum->shm_mutex); if (*dum->ptr_head == *dum->ptr_tail) { - LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do."); pthread_mutex_unlock(dum->shm_mutex); return -1; } - sz = get_tail_ptr(dum)->size; + idx_to_du_buff_ptr(dum, idx)->garbage = 1; - while (sz + (long) sizeof (struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; + if (idx != *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return 0; + } + + while (get_tail_ptr(dum)->garbage == 1) { + sz = get_tail_ptr(dum)->size; + + while (sz + (long) sizeof (struct shm_du_buff) > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *(dum->ptr_tail) = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); } - *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); pthread_mutex_unlock(dum->shm_mutex); return 0; @@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, } uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, } int shm_du_buff_head_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb, } int shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c index 85a82e4d..55938a62 100644 --- a/src/lib/tests/shm_du_map_test.c +++ b/src/lib/tests/shm_du_map_test.c @@ -32,7 +32,7 @@ #include <ouroboros/logs.h> -#define SIZE_OF_DU_BUFF 24 +#define SIZE_OF_DU_BUFF 32 #define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF) #define MAX(a,b) (a > b ? a : b) @@ -44,7 +44,7 @@ void * produce() { struct shm_du_map * dum; long test_buf_size = 0; - uint8_t * test_values; + uint8_t * test_values; int headspace; int tailspace; long i; @@ -66,9 +66,8 @@ void * produce() test_values[i] = 170; clock_gettime(CLOCK_MONOTONIC, &starttime); - for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) { - struct shm_du_buff * sdb; - size_t len; + for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) { + size_t len; test_buf_size = TEST_BUFF_SIZE; @@ -77,21 +76,19 @@ void * produce() len = test_buf_size - (headspace + tailspace); - sdb = shm_create_du_buff(dum, - test_buf_size, - headspace, - test_values, - len); - - if (sdb != NULL) { - bytes_written += len; - } - else { - sync = -2; - break; + if (shm_create_du_buff(dum, + test_buf_size, + headspace, + test_values, + len) < 0) { + continue; } + + bytes_written += len; } + sync = -2; + clock_gettime(CLOCK_MONOTONIC, &stoptime); elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) - (starttime.tv_sec + starttime.tv_nsec / 1000000000.0); @@ -104,13 +101,14 @@ void * produce() sync = -1; + shm_du_map_close(dum); + return 0; } void * consume() { struct shm_du_map * dum; - struct timespec ts; ts.tv_sec = 0; @@ -123,10 +121,15 @@ void * consume() return (void *)-1; } - while (!sync) { - while (!shm_release_du_buff(dum)); - nanosleep(&ts, NULL); + while (true) { + shm_release_du_buff(dum, 1823429173941); + if (sync) + break; } + nanosleep(&ts, NULL); + + + shm_du_map_close(dum); return 0; } @@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv) return -1; } - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_create(&consumer, NULL, consume, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv) LOG_INFO("starting concurrency test."); + sync = 0; + dum = shm_du_map_create(); res1 = (int) pthread_create(&producer, NULL, produce, NULL); @@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_join(producer, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); |