summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/bitmap.c14
-rw-r--r--src/lib/dev.c330
-rw-r--r--src/lib/ipcp.c65
-rw-r--r--src/lib/ipcpd_messages.proto6
-rw-r--r--src/lib/irmd_messages.proto26
-rw-r--r--src/lib/shm_ap_rbuff.c268
-rw-r--r--src/lib/shm_du_map.c143
-rw-r--r--src/lib/tests/shm_du_map_test.c53
9 files changed, 690 insertions, 216 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 4922e07c..53a7b354 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -32,6 +32,7 @@ set(SOURCE_FILES
ipcp.c
irm.c
list.c
+ shm_ap_rbuff.c
shm_du_map.c
sockets.c
utils.c
diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c
index 8aabb4f4..0e3c968f 100644
--- a/src/lib/bitmap.c
+++ b/src/lib/bitmap.c
@@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset)
return NULL;
tmp = malloc(sizeof(*tmp));
- if (!tmp)
+ if (tmp == NULL)
return NULL;
- tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap)));
- if (!tmp->bitmap)
+ tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long));
+ if (!tmp->bitmap) {
+ free(tmp);
return NULL;
+ }
tmp->size = bits;
tmp->offset = offset;
@@ -140,8 +142,6 @@ int bmp_destroy(struct bmp * b)
static ssize_t bad_id(struct bmp * b)
{
- assert(b);
-
return b->offset - 1;
}
@@ -149,8 +149,8 @@ ssize_t bmp_allocate(struct bmp * b)
{
ssize_t id;
- if (!b)
- return bad_id(b);
+ if (b == NULL)
+ return -1;
id = (ssize_t) find_next_zero_bit(b->bitmap,
b->size);
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 6d8411c5..40bf2dc3 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -25,73 +25,190 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/instance_name.h>
+#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/utils.h>
#include <stdlib.h>
+#include <string.h>
-int ap_reg(char * ap_name,
- char ** difs,
- size_t difs_size)
+#define AP_MAX_FLOWS 256
+
+#ifndef DU_BUFF_HEADSPACE
+ #define DU_BUFF_HEADSPACE 128
+#endif
+
+#ifndef DU_BUFF_TAILSPACE
+ #define DU_BUFF_TAILSPACE 0
+#endif
+
+struct flow {
+ struct shm_ap_rbuff * rb;
+ uint32_t port_id;
+ uint32_t oflags;
+
+ /* don't think this needs locking */
+};
+
+struct ap_data {
+ instance_name_t * api;
+ struct shm_du_map * dum;
+ struct bmp * fds;
+
+ struct shm_ap_rbuff * rb;
+ struct flow flows[AP_MAX_FLOWS];
+} * _ap_instance;
+
+
+int ap_init(char * ap_name)
{
- irm_msg_t msg = IRM_MSG__INIT;
+ _ap_instance = malloc(sizeof(struct ap_data));
+ if (_ap_instance == NULL) {
+ return -1;
+ }
+
+ _ap_instance->api = instance_name_create();
+ if (_ap_instance->api == NULL) {
+ free(_ap_instance);
+ return -1;
+ }
+
+ if (instance_name_init_from(_ap_instance->api,
+ ap_name,
+ getpid()) == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (_ap_instance->fds == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->dum = shm_du_map_open();
+ if (_ap_instance->dum == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->rb = shm_ap_rbuff_create();
+ if (_ap_instance->rb == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ return 0;
+}
+
+void ap_fini()
+{
+ int i = 0;
+
+ if (_ap_instance == NULL)
+ return;
+ if (_ap_instance->api != NULL)
+ instance_name_destroy(_ap_instance->api);
+ if (_ap_instance->fds != NULL)
+ bmp_destroy(_ap_instance->fds);
+ if (_ap_instance->dum != NULL)
+ shm_du_map_close(_ap_instance->dum);
+ if (_ap_instance->rb != NULL)
+ shm_ap_rbuff_destroy(_ap_instance->rb);
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+
+ free(_ap_instance);
+}
+
+#if 0
+static int port_id_to_fd(uint32_t port_id)
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].port_id == port_id
+ && _ap_instance->flows[i].state != FLOW_NULL)
+ return i;
+ return -1;
+}
+#endif
+
+int ap_reg(char ** difs,
+ size_t len)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = bmp_allocate(_ap_instance->fds);
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
+ if (_ap_instance == NULL) {
+ LOG_DBG("ap_init was not called");
+ return -1;
+ }
+
msg.code = IRM_MSG_CODE__IRM_AP_REG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ if (recv_msg->result < 0)
+ fd = -1;
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
}
-int ap_unreg(char * ap_name,
- char ** difs,
- size_t difs_size)
+int ap_unreg(char ** difs,
+ size_t len)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
int ret = -1;
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
msg.code = IRM_MSG_CODE__IRM_AP_UNREG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -102,38 +219,48 @@ int ap_unreg(char * ap_name,
return ret;
}
-int flow_accept(int fd,
- char * ap_name,
- char * ae_name)
+int flow_accept(int fd,
+ char ** ap_name,
+ char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int cli_fd = 0;
-
- if (ap_name == NULL) {
- return -EINVAL;
- }
+ int cfd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- cli_fd = recv_msg->fd;
- ap_name = recv_msg->ap_name;
- ae_name = recv_msg->ae_name;
+
+ cfd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[cfd].rb == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ _ap_instance->flows[cfd].port_id = recv_msg->port_id;
+ _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
+
+ *ap_name = strdup(recv_msg->ap_name);
+ if (ae_name != NULL)
+ *ae_name = strdup(recv_msg->ae_name);
irm_msg__free_unpacked(recv_msg, NULL);
- return cli_fd;
+
+ bmp_release(_ap_instance->fds, fd);
+
+ return cfd;
}
int flow_alloc_resp(int fd,
@@ -145,9 +272,9 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
msg.has_response = true;
msg.response = response;
@@ -155,7 +282,7 @@ int flow_alloc_resp(int fd,
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -167,41 +294,49 @@ int flow_alloc_resp(int fd,
}
int flow_alloc(char * dst_name,
- char * src_ap_name,
char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+ struct qos_spec * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = -1;
- if (dst_name == NULL ||
- src_ap_name == NULL) {
+ if (dst_name == NULL)
return -EINVAL;
- }
if (src_ae_name == NULL)
src_ae_name = UNKNOWN_AE;
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = dst_name;
- msg.ap_name = src_ap_name;
+ msg.ap_name = _ap_instance->api->name;
+ msg.has_pid = true;
+ msg.pid = _ap_instance->api->id;
msg.ae_name = src_ae_name;
- msg.has_oflags = true;
- msg.oflags = oflags;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ fd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ bmp_release(_ap_instance->fds, fd);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ _ap_instance->flows[fd].port_id = recv_msg->port_id;
+ _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+
irm_msg__free_unpacked(recv_msg, NULL);
+
return fd;
}
@@ -211,17 +346,15 @@ int flow_alloc_res(int fd)
irm_msg_t * recv_msg = NULL;
int result = 0;
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -238,17 +371,15 @@ int flow_dealloc(int fd)
irm_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -259,47 +390,50 @@ int flow_dealloc(int fd)
return ret;
}
-int flow_cntl(int fd, int oflags)
+int flow_cntl(int fd, int cmd, int oflags)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
- msg.oflags = oflags;
+ return -1;
+}
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ssize_t flow_write(int fd, void * buf, size_t count)
+{
+ /* the AP chooses the amount of headspace and tailspace */
+ size_t index = shm_create_du_buff(_ap_instance->dum,
+ count + DU_BUFF_HEADSPACE +
+ DU_BUFF_TAILSPACE,
+ DU_BUFF_HEADSPACE,
+ (uint8_t *) buf,
+ count);
+ struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
+ if (index == -1)
return -1;
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ shm_release_du_buff(_ap_instance->dum, index);
+ return -EPIPE;
}
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
+ return 0;
}
-ssize_t flow_write(int fd,
- void * buf,
- size_t count)
+ssize_t flow_read(int fd, void * buf, size_t count)
{
- LOG_MISSING;
+ struct rb_entry * e = NULL;
+ int n;
+ uint8_t * sdu;
+ /* FIXME: move this to a thread */
+ while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id)
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+
+ n = shm_du_map_read_sdu(&sdu,
+ _ap_instance->dum,
+ e->index);
+ if (n < 0)
+ return -1;
- return -1;
-}
+ memcpy(buf, sdu, MIN(n, count));
-ssize_t flow_read(int fd,
- void * buf,
- size_t count)
-{
- LOG_MISSING;
+ shm_release_du_buff(_ap_instance->dum, e->index);
- return -1;
+ return n;
}
diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c
index 387572b3..75676915 100644
--- a/src/lib/ipcp.c
+++ b/src/lib/ipcp.c
@@ -121,6 +121,8 @@ pid_t ipcp_create(char * ipcp_name,
return pid;
}
+ /* clear fd table */
+
if (ipcp_type == IPCP_NORMAL)
exec_name = IPCP_NORMAL_EXEC;
else if (ipcp_type == IPCP_SHIM_UDP)
@@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid,
return -EINVAL;
msg.code = IPCP_MSG_CODE__IPCP_ENROLL;
- msg.member_name = malloc(sizeof(*(msg.member_name)));
- if (msg.member_name == NULL) {
- LOG_ERR("Failed to malloc.");
- return -1;
- }
- msg.n_1_dif = n_1_dif;
msg.member_name = member_name;
+ msg.n_1_dif = n_1_dif;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL) {
@@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t pid,
if (name == NULL)
return -1;
- msg.code = IPCP_MSG_CODE__IPCP_NAME_REG;
- msg.name = name;
+ msg.code = IPCP_MSG_CODE__IPCP_NAME_REG;
+ msg.name = name;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
@@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t pid,
int ipcp_flow_alloc(pid_t pid,
uint32_t port_id,
+ pid_t n_pid,
char * dst_name,
char * src_ap_name,
char * src_ae_name,
@@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t pid,
return -EINVAL;
msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+ msg.has_pid = true;
+ msg.pid = n_pid;
msg.src_ap_name = src_ap_name;
msg.src_ae_name = src_ae_name;
msg.dst_name = dst_name;
- msg.port_id = port_id;
- msg.has_port_id = true;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
ipcp_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t pid,
int ipcp_flow_alloc_resp(pid_t pid,
uint32_t port_id,
- int result)
+ pid_t n_pid,
+ int response)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
- msg.has_port_id = true;
- msg.port_id = port_id;
- msg.has_result = true;
- msg.result = result;
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+ msg.has_pid = true;
+ msg.pid = n_pid;
+ msg.has_response = true;
+ msg.response = response;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
@@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t pid,
return ret;
}
-int ipcp_flow_req_arr(pid_t pid,
- char * dst_name,
- char * src_ap_name,
- char * src_ae_name)
+int ipcp_flow_req_arr(pid_t pid,
+ char * dst_name,
+ char * src_ap_name,
+ char * src_ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = -1;
+ int port_id = -1;
if (src_ap_name == NULL || src_ae_name == NULL)
return -EINVAL;
msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_pid = true;
+ msg.pid = pid;
msg.dst_name = dst_name;
msg.ap_name = src_ap_name;
msg.ae_name = src_ae_name;
- msg.pid = pid;
- msg.has_pid = true;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ port_id = recv_msg->port_id;
irm_msg__free_unpacked(recv_msg, NULL);
- return fd;
+ return port_id;
}
int ipcp_flow_alloc_reply(pid_t pid,
@@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t pid,
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
- return -1;
+ return 0;
if (recv_msg->has_result == false) {
ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ return 0;
}
ret = recv_msg->result;
@@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t pid,
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
- return -1;
+ return 0;
if (recv_msg->has_result == false) {
irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ return 0;
}
ret = recv_msg->result;
diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto
index da4bb469..daca011d 100644
--- a/src/lib/ipcpd_messages.proto
+++ b/src/lib/ipcpd_messages.proto
@@ -25,6 +25,8 @@ message ipcp_msg {
optional string src_ap_name = 9;
optional string src_ae_name = 10;
optional dif_config_msg conf = 11;
- optional int32 result = 12;
- optional int32 fd = 13;
+ optional int32 fd = 12;
+ optional int32 pid = 13;
+ optional int32 response = 14;
+ optional int32 result = 15;
};
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index 89e2c882..c336614e 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -36,13 +36,10 @@ enum irm_msg_code {
IRM_FLOW_ALLOC = 11;
IRM_FLOW_ALLOC_RES = 12;
IRM_FLOW_DEALLOC = 13;
- IRM_FLOW_CONTROL = 14;
- IRM_FLOW_WRITE = 15;
- IRM_FLOW_READ = 16;
- IPCP_FLOW_REQ_ARR = 17;
- IPCP_FLOW_ALLOC_REPLY = 18;
- IPCP_FLOW_DEALLOC = 19;
- IRM_REPLY = 20;
+ IPCP_FLOW_REQ_ARR = 14;
+ IPCP_FLOW_ALLOC_REPLY = 15;
+ IPCP_FLOW_DEALLOC = 16;
+ IRM_REPLY = 17;
};
message irm_msg {
@@ -52,12 +49,11 @@ message irm_msg {
optional uint32 api_id = 3;
optional uint32 ipcp_type = 5;
repeated string dif_name = 6;
- optional int32 fd = 7;
- optional int32 response = 8;
- optional int32 oflags = 9;
- optional string dst_name = 10;
- optional uint32 port_id = 11;
- optional int32 pid = 12;
- optional dif_config_msg conf = 13;
- optional int32 result = 14;
+ optional int32 response = 7;
+ optional string dst_name = 8;
+ optional uint32 port_id = 9;
+ optional int32 pid = 10;
+ optional dif_config_msg conf = 11;
+ optional int32 cfd = 12;
+ optional int32 result = 13;
};
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
new file mode 100644
index 00000000..4bd64775
--- /dev/null
+++ b/src/lib/shm_ap_rbuff.c
@@ -0,0 +1,268 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Ring buffer for application processes
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/shm_ap_rbuff.h>
+#define OUROBOROS_PREFIX "shm_ap_rbuff"
+
+#include <ouroboros/logs.h>
+
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <errno.h>
+
+#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \
+ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t))
+
+#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\
+ & (SHM_RBUFF_SIZE - 1))
+#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE)
+
+struct shm_ap_rbuff {
+ struct rb_entry * shm_base; /* start of entry */
+ size_t * ptr_head; /* start of ringbuffer head */
+ size_t * ptr_tail; /* start of ringbuffer tail */
+ pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ pid_t pid; /* pid to which this rb belongs */
+ int fd;
+};
+
+struct shm_ap_rbuff * shm_ap_rbuff_create()
+{
+ struct shm_ap_rbuff * rb;
+ int shm_fd;
+ struct rb_entry * shm_base;
+ pthread_mutexattr_t attr;
+ char fn[25];
+
+ sprintf(fn, SHM_AP_RBUFF "%d", getpid());
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBGF("Failed creating ring buffer.");
+ free(rb);
+ return NULL;
+ }
+
+ if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) {
+ LOG_DBGF("Failed to extend ringbuffer.");
+ free(rb);
+ return NULL;
+ }
+
+ if (write(shm_fd, "", 1) != 1) {
+ LOG_DBGF("Failed to finalise extension of ringbuffer.");
+ free(rb);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_RBUFF_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBGF("Failed to map shared memory.");
+ if (close(shm_fd) == -1)
+ LOG_DBGF("Failed to close invalid shm.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBGF("Failed to remove invalid shm.");
+
+ free(rb);
+ return NULL;
+ }
+
+ rb->shm_base = shm_base;
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_tail = (size_t *)
+ ((uint8_t *) rb->ptr_head + sizeof(size_t));
+ rb->shm_mutex = (pthread_mutex_t *)
+ ((uint8_t *) rb->ptr_tail + sizeof(size_t));
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(rb->shm_mutex, &attr);
+
+ *rb->ptr_head = 0;
+ *rb->ptr_tail = 0;
+
+ rb->fd = shm_fd;
+ rb->pid = getpid();
+
+ return rb;
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid)
+{
+ struct shm_ap_rbuff * rb;
+ int shm_fd;
+ struct rb_entry * shm_base;
+ char fn[25];
+
+ sprintf(fn, SHM_AP_RBUFF "%d", pid);
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(fn, O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBGF("Failed opening shared memory %s.", fn);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_RBUFF_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBGF("Failed to map shared memory.");
+ if (close(shm_fd) == -1)
+ LOG_DBGF("Failed to close invalid shm.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBGF("Failed to remove invalid shm.");
+
+ free(rb);
+ return NULL;
+ }
+
+ rb->shm_base = shm_base;
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_tail = (size_t *)
+ ((uint8_t *) rb->ptr_head + sizeof(size_t));
+ rb->shm_mutex = (pthread_mutex_t *)
+ ((uint8_t *) rb->ptr_tail + sizeof(size_t));
+
+ rb->fd = shm_fd;
+ rb->pid = pid;
+
+ return rb;
+}
+void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
+{
+ char fn[25];
+
+ if (rb == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ sprintf(fn, SHM_AP_RBUFF "%d", rb->pid);
+
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
+ free(rb);
+}
+
+void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
+{
+ char fn[25];
+
+
+ if (rb == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ if (rb->pid != getpid()) {
+ LOG_ERR("Tried to destroy other AP's rbuff.");
+ return;
+ }
+
+ sprintf(fn, SHM_AP_RBUFF "%d", rb->pid);
+
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBGF("Failed to unlink shm.");
+
+ free(rb);
+}
+
+int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
+{
+ struct rb_entry * pos;
+
+ if (rb == NULL || e == NULL)
+ return -1;
+
+ pthread_mutex_lock(rb->shm_mutex);
+
+ if (!shm_rbuff_free(rb)) {
+ pthread_mutex_unlock(rb->shm_mutex);
+ return -1;
+ }
+
+ pos = rb->shm_base + *rb->ptr_head;
+ *pos = *e;
+ *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
+
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return 0;
+}
+struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
+{
+ struct rb_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ if (rb == NULL)
+ return NULL;
+
+ pthread_mutex_lock(rb->shm_mutex);
+
+ if (shm_rbuff_used(rb) == 0) {
+ pthread_mutex_unlock(rb->shm_mutex);
+ return NULL;
+ }
+
+ *e = *(rb->shm_base + *rb->ptr_tail);
+
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return e;
+}
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index dfccca6a..56062c9d 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -45,6 +45,9 @@
((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \
SHM_DU_BUFF_BLOCK_SIZE)))
+#define idx_to_du_buff_ptr(dum, idx) \
+ ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE)))
+
#define block_ptr_to_idx(dum, sdb) \
(((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE)
@@ -52,27 +55,31 @@
& (SHM_BLOCKS_IN_MAP - 1))
#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \
+ idx_to_du_buff_ptr(dum, idx)->du_head)
+
#define MIN(a,b)(a < b ? a : b)
struct shm_du_buff {
- size_t size;
- size_t du_head;
- size_t du_tail;
+ size_t size;
+ size_t du_head;
+ size_t du_tail;
+ size_t garbage;
};
struct shm_du_map {
- uint8_t * shm_base; /* start of blocks */
- size_t * ptr_head; /* start of ringbuffer head */
- size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * shm_mutex; /* lock all free space in shm */
- int fd;
+ uint8_t * shm_base; /* start of blocks */
+ size_t * ptr_head; /* start of ringbuffer head */
+ size_t * ptr_tail; /* start of ringbuffer tail */
+ pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ int fd;
};
struct shm_du_map * shm_du_map_create()
{
struct shm_du_map * dum;
int shm_fd;
- uint8_t * shm_base;
+ uint8_t * shm_base;
pthread_mutexattr_t attr;
dum = malloc(sizeof *dum);
@@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open()
{
struct shm_du_map * dum;
int shm_fd;
- uint8_t * shm_base;
+ uint8_t * shm_base;
+
+ dum = malloc(sizeof *dum);
+ if (dum == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666);
if (shm_fd == -1) {
@@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open()
return NULL;
}
- dum = malloc(sizeof *dum);
- if (dum == NULL) {
- LOG_DBGF("Could not allocate struct.");
- return NULL;
- }
-
dum->shm_base = shm_base;
dum->ptr_head = (size_t *)
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
@@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum)
if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
LOG_DBGF("Couldn't unmap shared memory.");
+ free(dum);
+}
+
+void shm_du_map_destroy(struct shm_du_map * dum)
+{
+ if (dum == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)
LOG_DBGF("Failed to unlink shm.");
free(dum);
}
-struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len)
+int shm_create_du_buff(struct shm_du_map * dum,
+ size_t size,
+ size_t headspace,
+ uint8_t * data,
+ size_t len)
{
struct shm_du_buff * sdb;
long blocks = 0;
int sz = size + sizeof *sdb;
int sz2 = headspace + len + sizeof *sdb;
- uint8_t * write_pos;
+ uint8_t * write_pos;
size_t copy_len;
+ size_t index;
if (dum == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
- return NULL;
+ return -1;
}
if (headspace >= size) {
LOG_DBGF("Index out of bounds.");
- return NULL;
+ return -1;
}
if (headspace + len > size) {
LOG_DBGF("Buffer too small for data.");
- return NULL;
+ return -1;
}
pthread_mutex_lock(dum->shm_mutex);
@@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
if (sz2 < 0 && sz > 0) {
pthread_mutex_unlock(dum->shm_mutex);
LOG_DBG("Can't handle this packet now");
- return NULL;
+ return -1;
}
++blocks;
}
if (!shm_map_free(dum, blocks)) {
pthread_mutex_unlock(dum->shm_mutex);
- LOG_DBGF("Allocation failed, Out of Memory.");
- return NULL;
+ return -1;
}
sdb = get_head_ptr(dum);
sdb->size = size;
+ sdb->garbage = 0;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
@@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
--blocks;
}
+ index = *dum->ptr_head - 1;
+
pthread_mutex_unlock(dum->shm_mutex);
- return sdb;
+ return index;
}
-int shm_release_du_buff(struct shm_du_map * dum)
+/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */
+int shm_du_map_read_sdu(uint8_t ** dst,
+ struct shm_du_map * dum,
+ size_t idx)
+{
+ size_t len = 0;
+
+ if (idx > SHM_BLOCKS_IN_MAP)
+ return -1;
+
+ pthread_mutex_lock(dum->shm_mutex);
+
+ if (*dum->ptr_head == *dum->ptr_tail) {
+ pthread_mutex_unlock(dum->shm_mutex);
+ return -1;
+ }
+
+ *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) +
+ sizeof(struct shm_du_buff) +
+ idx_to_du_buff_ptr(dum, idx)->du_head;
+ len = sdu_size(dum, idx);
+
+ pthread_mutex_unlock(dum->shm_mutex);
+
+ return len;
+}
+
+int shm_release_du_buff(struct shm_du_map * dum, size_t idx)
{
long sz;
long blocks = 0;
+
+ /* FIXME: this is crap for the test */
+ if (idx > SHM_BLOCKS_IN_MAP)
+ idx = *dum->ptr_tail;
+
pthread_mutex_lock(dum->shm_mutex);
if (*dum->ptr_head == *dum->ptr_tail) {
- LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do.");
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
- sz = get_tail_ptr(dum)->size;
+ idx_to_du_buff_ptr(dum, idx)->garbage = 1;
- while (sz + (long) sizeof (struct shm_du_buff) > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
+ if (idx != *dum->ptr_tail) {
+ pthread_mutex_unlock(dum->shm_mutex);
+ return 0;
+ }
+
+ while (get_tail_ptr(dum)->garbage == 1) {
+ sz = get_tail_ptr(dum)->size;
+
+ while (sz + (long) sizeof (struct shm_du_buff) > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ *(dum->ptr_tail) =
+ (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
}
- *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
pthread_mutex_unlock(dum->shm_mutex);
return 0;
@@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
}
uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
if (sdb == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,
}
int shm_du_buff_head_release(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
if (sdb == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb,
}
int shm_du_buff_tail_release(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
if (sdb == NULL) {
LOG_DBGF("Bogus input, bugging out.");
diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c
index 85a82e4d..55938a62 100644
--- a/src/lib/tests/shm_du_map_test.c
+++ b/src/lib/tests/shm_du_map_test.c
@@ -32,7 +32,7 @@
#include <ouroboros/logs.h>
-#define SIZE_OF_DU_BUFF 24
+#define SIZE_OF_DU_BUFF 32
#define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF)
#define MAX(a,b) (a > b ? a : b)
@@ -44,7 +44,7 @@ void * produce()
{
struct shm_du_map * dum;
long test_buf_size = 0;
- uint8_t * test_values;
+ uint8_t * test_values;
int headspace;
int tailspace;
long i;
@@ -66,9 +66,8 @@ void * produce()
test_values[i] = 170;
clock_gettime(CLOCK_MONOTONIC, &starttime);
- for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) {
- struct shm_du_buff * sdb;
- size_t len;
+ for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) {
+ size_t len;
test_buf_size = TEST_BUFF_SIZE;
@@ -77,21 +76,19 @@ void * produce()
len = test_buf_size - (headspace + tailspace);
- sdb = shm_create_du_buff(dum,
- test_buf_size,
- headspace,
- test_values,
- len);
-
- if (sdb != NULL) {
- bytes_written += len;
- }
- else {
- sync = -2;
- break;
+ if (shm_create_du_buff(dum,
+ test_buf_size,
+ headspace,
+ test_values,
+ len) < 0) {
+ continue;
}
+
+ bytes_written += len;
}
+ sync = -2;
+
clock_gettime(CLOCK_MONOTONIC, &stoptime);
elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) -
(starttime.tv_sec + starttime.tv_nsec / 1000000000.0);
@@ -104,13 +101,14 @@ void * produce()
sync = -1;
+ shm_du_map_close(dum);
+
return 0;
}
void * consume()
{
struct shm_du_map * dum;
-
struct timespec ts;
ts.tv_sec = 0;
@@ -123,10 +121,15 @@ void * consume()
return (void *)-1;
}
- while (!sync) {
- while (!shm_release_du_buff(dum));
- nanosleep(&ts, NULL);
+ while (true) {
+ shm_release_du_buff(dum, 1823429173941);
+ if (sync)
+ break;
}
+ nanosleep(&ts, NULL);
+
+
+ shm_du_map_close(dum);
return 0;
}
@@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv)
return -1;
}
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
LOG_INFO("done.");
@@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv)
pthread_create(&consumer, NULL, consume, NULL);
pthread_join(consumer, NULL);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
LOG_INFO("done.");
@@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv)
LOG_INFO("starting concurrency test.");
+ sync = 0;
+
dum = shm_du_map_create();
res1 = (int) pthread_create(&producer, NULL, produce, NULL);
@@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv)
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
LOG_INFO("done.");