summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-08-29 19:49:39 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-08-29 20:32:54 +0200
commit2cc89f6da424ab503af563e0cc92dda43b8f8432 (patch)
tree303d3d61717d4d3018b8025a9825ff799da01c08 /src
parentcaeefb4d96331d24b38e845c99d0517913a71671 (diff)
downloadouroboros-2cc89f6da424ab503af563e0cc92dda43b8f8432.tar.gz
ouroboros-2cc89f6da424ab503af563e0cc92dda43b8f8432.zip
lib: Refactor shm_du_map to shm_rdrbuff
The shm_du_map is renamed to shm_rdrbuff to reflect the Random Deletion Ringbuffer used in the implementation. The close_on_exit call is removed and SDUs are cleaned up by the application in the ap_fini() call. This required a non-blocking peek() operation in the shm_ap_rbuff. Some initial implementation for future support of qos cubes has been added to the shm_rdrbuff.
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/ipcp-data.c1
-rw-r--r--src/ipcpd/local/main.c19
-rw-r--r--src/ipcpd/normal/main.c20
-rw-r--r--src/ipcpd/shim-eth-llc/main.c63
-rw-r--r--src/ipcpd/shim-udp/main.c29
-rw-r--r--src/irmd/main.c48
-rw-r--r--src/lib/CMakeLists.txt2
-rw-r--r--src/lib/dev.c34
-rw-r--r--src/lib/shm_ap_rbuff.c28
-rw-r--r--src/lib/shm_du_map.c767
-rw-r--r--src/lib/shm_rdrbuff.c804
11 files changed, 950 insertions, 865 deletions
diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c
index 593baeba..c4838d3a 100644
--- a/src/ipcpd/ipcp-data.c
+++ b/src/ipcpd/ipcp-data.c
@@ -22,7 +22,6 @@
*/
#include <ouroboros/config.h>
-#include <ouroboros/shm_du_map.h>
#include <ouroboros/list.h>
#define OUROBOROS_PREFIX "ipcp-utils"
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 547e7e28..4fa7e33f 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -24,7 +24,7 @@
#include "ipcp.h"
#include "flow.h"
#include <ouroboros/errno.h>
-#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
@@ -67,7 +67,7 @@ struct ipcp * _ipcp;
/* the shim needs access to these internals */
struct shim_ap_data {
pid_t api;
- struct shm_du_map * dum;
+ struct shm_rdrbuff * rdrb;
struct bmp * fds;
struct shm_ap_rbuff * rb;
@@ -98,8 +98,8 @@ static int shim_ap_init()
return -1;
}
- _ap_instance->dum = shm_du_map_open();
- if (_ap_instance->dum == NULL) {
+ _ap_instance->rdrb = shm_rdrbuff_open();
+ if (_ap_instance->rdrb == NULL) {
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
@@ -107,7 +107,7 @@ static int shim_ap_init()
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
- shm_du_map_close(_ap_instance->dum);
+ shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
@@ -139,8 +139,13 @@ void shim_ap_fini()
if (_ap_instance->fds != NULL)
bmp_destroy(_ap_instance->fds);
- if (_ap_instance->dum != NULL)
- shm_du_map_close_on_exit(_ap_instance->dum);
+
+ /* remove all remaining sdus */
+ while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
+ shm_rdrbuff_remove(_ap_instance->rdrb, i);
+
+ if (_ap_instance->rdrb != NULL)
+ shm_rdrbuff_close(_ap_instance->rdrb);
if (_ap_instance->rb != NULL)
shm_ap_rbuff_destroy(_ap_instance->rb);
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 335330ae..cf4ae3f1 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -24,7 +24,7 @@
#include <ouroboros/config.h>
#include <ouroboros/logs.h>
-#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp.h>
@@ -55,7 +55,7 @@ struct normal_ipcp_data {
/* Keep ipcp_data first for polymorphism. */
struct ipcp_data ipcp_data;
- struct shm_du_map * dum;
+ struct shm_rdrbuff * rdrb;
struct shm_ap_rbuff * rb;
pthread_t mainloop;
@@ -206,15 +206,15 @@ struct normal_ipcp_data * normal_ipcp_data_create()
return NULL;
}
- normal_data->dum = shm_du_map_open();
- if (normal_data->dum == NULL) {
+ normal_data->rdrb = shm_rdrbuff_open();
+ if (normal_data->rdrb == NULL) {
free(normal_data);
return NULL;
}
normal_data->rb = shm_ap_rbuff_open(getpid());
if (normal_data->rb == NULL) {
- shm_du_map_close(normal_data->dum);
+ shm_rdrbuff_close(normal_data->rdrb);
free(normal_data);
return NULL;
}
@@ -225,6 +225,8 @@ struct normal_ipcp_data * normal_ipcp_data_create()
void normal_ipcp_data_destroy()
{
+ int idx = 0;
+
if (_ipcp == NULL)
return;
@@ -233,8 +235,12 @@ void normal_ipcp_data_destroy()
if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN)
LOG_WARN("Cleaning up while not in shutdown.");
- if (normal_data(_ipcp)->dum != NULL)
- shm_du_map_close_on_exit(normal_data(_ipcp)->dum);
+ /* remove all remaining sdus */
+ while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0)
+ shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx);
+
+ if (normal_data(_ipcp)->rdrb != NULL)
+ shm_rdrbuff_close(normal_data(_ipcp)->rdrb);
if (normal_data(_ipcp)->rb != NULL)
shm_ap_rbuff_close(normal_data(_ipcp)->rb);
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 608b0029..d1100001 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -27,7 +27,7 @@
#include "ipcp.h"
#include "flow.h"
#include <ouroboros/errno.h>
-#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
@@ -122,7 +122,7 @@ struct eth_llc_ipcp_data {
struct bmp * indices;
struct bmp * saps;
- struct shm_du_map * dum;
+ struct shm_rdrbuff * rdrb;
struct shm_ap_rbuff * rb;
uint8_t * rx_ring;
@@ -155,15 +155,15 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()
return NULL;
}
- eth_llc_data->dum = shm_du_map_open();
- if (eth_llc_data->dum == NULL) {
+ eth_llc_data->rdrb = shm_rdrbuff_open();
+ if (eth_llc_data->rdrb == NULL) {
free(eth_llc_data);
return NULL;
}
eth_llc_data->rb = shm_ap_rbuff_create();
if (eth_llc_data->rb == NULL) {
- shm_du_map_close(eth_llc_data->dum);
+ shm_rdrbuff_close(eth_llc_data->rdrb);
free(eth_llc_data);
return NULL;
}
@@ -171,7 +171,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()
eth_llc_data->indices = bmp_create(AP_MAX_FLOWS, 0);
if (eth_llc_data->indices == NULL) {
shm_ap_rbuff_destroy(eth_llc_data->rb);
- shm_du_map_close(eth_llc_data->dum);
+ shm_rdrbuff_close(eth_llc_data->rdrb);
free(eth_llc_data);
return NULL;
}
@@ -180,7 +180,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()
if (eth_llc_data->indices == NULL) {
bmp_destroy(eth_llc_data->indices);
shm_ap_rbuff_destroy(eth_llc_data->rb);
- shm_du_map_close(eth_llc_data->dum);
+ shm_rdrbuff_close(eth_llc_data->rdrb);
free(eth_llc_data);
return NULL;
}
@@ -202,8 +202,12 @@ void eth_llc_ipcp_data_destroy()
if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN)
LOG_WARN("Cleaning up while not in shutdown.");
- if (shim_data(_ipcp)->dum != NULL)
- shm_du_map_close_on_exit(shim_data(_ipcp)->dum);
+ /* remove all remaining sdus */
+ while ((i = shm_ap_rbuff_peek_idx(shim_data(_ipcp)->rb)) >= 0)
+ shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, i);
+
+ if (shim_data(_ipcp)->rdrb != NULL)
+ shm_rdrbuff_close(shim_data(_ipcp)->rdrb);
if (shim_data(_ipcp)->rb != NULL)
shm_ap_rbuff_destroy(shim_data(_ipcp)->rb);
if (shim_data(_ipcp)->indices != NULL)
@@ -332,7 +336,7 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE],
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
header = (void *) shim_data(_ipcp)->tx_ring +
- (shim_data(_ipcp)->tx_offset * SHM_DU_BUFF_BLOCK_SIZE);
+ (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE);
while (header->tp_status != TP_STATUS_AVAILABLE) {
pfd.fd = fd;
@@ -345,7 +349,7 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE],
}
header = (void *) shim_data(_ipcp)->tx_ring +
- (shim_data(_ipcp)->tx_offset * SHM_DU_BUFF_BLOCK_SIZE);
+ (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE);
}
frame = (void *) header + TPACKET_HDRLEN - sizeof(struct sockaddr_ll);
@@ -671,7 +675,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
while (true) {
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
header = (void *) shim_data(_ipcp)->rx_ring +
- (offset * SHM_DU_BUFF_BLOCK_SIZE);
+ (offset * SHM_RDRB_BLOCK_SIZE);
while (!(header->tp_status & TP_STATUS_USER)) {
pfd.fd = shim_data(_ipcp)->s_fd;
pfd.revents = 0;
@@ -683,7 +687,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
}
header = (void *) shim_data(_ipcp)->rx_ring +
- (offset * SHM_DU_BUFF_BLOCK_SIZE);
+ (offset * SHM_RDRB_BLOCK_SIZE);
}
buf = (void * ) header + header->tp_mac;
@@ -740,7 +744,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
}
while ((index =
- shm_du_map_write(shim_data(_ipcp)->dum,
+ shm_rdrbuff_write(shim_data(_ipcp)->rdrb,
ipcp_flow(i)->api,
0,
0,
@@ -782,8 +786,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
pthread_rwlock_rdlock(&_ipcp->state_lock);
- len = shm_du_map_read((uint8_t **) &buf,
- shim_data(_ipcp)->dum,
+ len = shm_rdrbuff_read((uint8_t **) &buf,
+ shim_data(_ipcp)->rdrb,
e->index);
if (len <= 0) {
free(e);
@@ -808,8 +812,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- if (shim_data(_ipcp)->dum != NULL)
- shm_du_map_remove(shim_data(_ipcp)->dum, e->index);
+ if (shim_data(_ipcp)->rdrb != NULL)
+ shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, e->index);
pthread_rwlock_unlock(&_ipcp->state_lock);
@@ -849,7 +853,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
{
int fd = -1;
struct ifreq ifr;
- int index;
+ int idx;
#ifdef __FreeBSD__
struct ifaddrs * ifaddr;
struct ifaddrs * ifa;
@@ -892,7 +896,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
return -1;
}
- for (ifa = ifaddr, index = 0; ifa != NULL; ifa = ifa->ifa_next, ++index) {
+ for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) {
if (strcmp(ifa->ifa_name, conf->if_name))
continue;
LOG_DBGF("Interface %s found.", conf->if_name);
@@ -916,8 +920,8 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
return -1;
}
- index = if_nametoindex(conf->if_name);
- if (index == 0) {
+ idx = if_nametoindex(conf->if_name);
+ if (idx == 0) {
LOG_ERR("Failed to retrieve interface index.");
return -1;
}
@@ -927,7 +931,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
memset(&(device), 0, sizeof(device));
#ifdef __FreeBSD__
- device.sdl_index = index;
+ device.sdl_index = idx;
device.sdl_family = AF_LINK;
memcpy(LLADDR(&device),
ifr.ifr_addr.sa_data,
@@ -937,7 +941,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
LOG_MISSING;
fd = socket(AF_LINK, SOCK_RAW, 0);
#else
- device.sll_ifindex = index;
+ device.sll_ifindex = idx;
device.sll_family = AF_PACKET;
memcpy(device.sll_addr,
ifr.ifr_hwaddr.sa_data,
@@ -953,14 +957,14 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
}
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_DU_BUFF_BLOCK_SIZE) {
+ if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_RDRB_BLOCK_SIZE) {
LOG_ERR("Max SDU size is bigger than DU map block size.");
close(fd);
return -1;
}
- req.tp_block_size = SHM_DU_BUFF_BLOCK_SIZE;
- req.tp_frame_size = SHM_DU_BUFF_BLOCK_SIZE;
+ req.tp_block_size = SHM_RDRB_BLOCK_SIZE;
+ req.tp_frame_size = SHM_RDRB_BLOCK_SIZE;
req.tp_block_nr = SHM_BUFFER_SIZE;
req.tp_frame_nr = SHM_BUFFER_SIZE;
@@ -987,7 +991,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
shim_data(_ipcp)->rx_ring = mmap(NULL,
- 2 * SHM_DU_BUFF_BLOCK_SIZE
+ 2 * SHM_RDRB_BLOCK_SIZE
* SHM_BUFFER_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED,
fd, 0);
@@ -997,10 +1001,9 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
return -1;
}
shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring
- + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BUFFER_SIZE);
+ + (SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE);
#endif
-
pthread_rwlock_wrlock(&_ipcp->state_lock);
if (ipcp_get_state(_ipcp) != IPCP_INIT) {
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 3f9b20f1..451a2a4c 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -24,7 +24,7 @@
#include "ipcp.h"
#include "flow.h"
#include "shim_udp_config.h"
-#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
@@ -87,7 +87,7 @@ struct ipcp * _ipcp;
/* the shim needs access to these internals */
struct shim_ap_data {
pid_t api;
- struct shm_du_map * dum;
+ struct shm_rdrbuff * rdrb;
struct bmp * fds;
struct shm_ap_rbuff * rb;
@@ -121,8 +121,8 @@ static int shim_ap_init()
return -1;
}
- _ap_instance->dum = shm_du_map_open();
- if (_ap_instance->dum == NULL) {
+ _ap_instance->rdrb = shm_rdrbuff_open();
+ if (_ap_instance->rdrb == NULL) {
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
@@ -130,7 +130,7 @@ static int shim_ap_init()
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
- shm_du_map_close(_ap_instance->dum);
+ shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
@@ -163,8 +163,13 @@ void shim_ap_fini()
if (_ap_instance->fds != NULL)
bmp_destroy(_ap_instance->fds);
- if (_ap_instance->dum != NULL)
- shm_du_map_close_on_exit(_ap_instance->dum);
+
+ /* remove all remaining sdus */
+ while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
+ shm_rdrbuff_remove(_ap_instance->rdrb, i);
+
+ if (_ap_instance->rdrb != NULL)
+ shm_rdrbuff_close(_ap_instance->rdrb);
if (_ap_instance->rb != NULL)
shm_ap_rbuff_destroy(_ap_instance->rb);
@@ -202,7 +207,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
pthread_rwlock_rdlock(&_ipcp->state_lock);
pthread_rwlock_rdlock(&_ap_instance->flows_lock);
- index = shm_du_map_write_b(_ap_instance->dum,
+ index = shm_rdrbuff_write_b(_ap_instance->rdrb,
_ap_instance->flows[fd].api,
0,
0,
@@ -745,8 +750,8 @@ static void * ipcp_udp_sdu_loop(void * o)
pthread_rwlock_rdlock(&_ipcp->state_lock);
- len = shm_du_map_read((uint8_t **) &buf,
- _ap_instance->dum,
+ len = shm_rdrbuff_read((uint8_t **) &buf,
+ _ap_instance->rdrb,
e->index);
if (len <= 0) {
pthread_rwlock_unlock(&_ipcp->state_lock);
@@ -771,8 +776,8 @@ static void * ipcp_udp_sdu_loop(void * o)
pthread_rwlock_rdlock(&_ipcp->state_lock);
- if (_ap_instance->dum != NULL)
- shm_du_map_remove(_ap_instance->dum, e->index);
+ if (_ap_instance->rdrb != NULL)
+ shm_rdrbuff_remove(_ap_instance->rdrb, e->index);
pthread_rwlock_unlock(&_ipcp->state_lock);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index cd939360..29f6d9d0 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -34,7 +34,7 @@
#include <ouroboros/irm_config.h>
#include <ouroboros/lockfile.h>
#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
@@ -75,31 +75,31 @@ enum irm_state {
};
struct irm {
- struct list_head registry;
+ struct list_head registry;
- struct list_head ipcps;
+ struct list_head ipcps;
- struct list_head api_table;
- struct list_head apn_table;
- struct list_head spawned_apis;
- pthread_rwlock_t reg_lock;
+ struct list_head api_table;
+ struct list_head apn_table;
+ struct list_head spawned_apis;
+ pthread_rwlock_t reg_lock;
/* keep track of all flows in this processing system */
- struct bmp * port_ids;
+ struct bmp * port_ids;
/* maps port_ids to api pair */
- struct list_head irm_flows;
- pthread_rwlock_t flows_lock;
+ struct list_head irm_flows;
+ pthread_rwlock_t flows_lock;
- struct lockfile * lf;
- struct shm_du_map * dum;
- pthread_t * threadpool;
- int sockfd;
+ struct lockfile * lf;
+ struct shm_rdrbuff * rdrb;
+ pthread_t * threadpool;
+ int sockfd;
- enum irm_state state;
- pthread_rwlock_t state_lock;
+ enum irm_state state;
+ pthread_rwlock_t state_lock;
- pthread_t irm_sanitize;
- pthread_t shm_sanitize;
+ pthread_t irm_sanitize;
+ pthread_t shm_sanitize;
} * irmd = NULL;
static struct irm_flow * get_irm_flow(int port_id)
@@ -1604,8 +1604,8 @@ static void irm_destroy()
pthread_rwlock_unlock(&irmd->flows_lock);
- if (irmd->dum != NULL)
- shm_du_map_destroy(irmd->dum);
+ if (irmd->rdrb != NULL)
+ shm_rdrbuff_destroy(irmd->rdrb);
if (irmd->lf != NULL)
lockfile_destroy(irmd->lf);
@@ -2072,7 +2072,8 @@ static int irm_create()
if (kill(lockfile_owner(irmd->lf), 0) < 0) {
LOG_INFO("IRMd didn't properly shut down last time.");
- shm_du_map_destroy(shm_du_map_open());
+ /* FIXME: do this for each QOS_CUBE in the system */
+ shm_rdrbuff_destroy(shm_rdrbuff_open(QOS_CUBE_BE));
LOG_INFO("Stale resources cleaned");
lockfile_destroy(irmd->lf);
irmd->lf = lockfile_create();
@@ -2090,7 +2091,8 @@ static int irm_create()
return -1;
}
- if ((irmd->dum = shm_du_map_create()) == NULL) {
+ /* FIXME: create an rdrb for each QOS_CUBE in the system */
+ if ((irmd->rdrb = shm_rdrbuff_create(QOS_CUBE_BE)) == NULL) {
irm_destroy();
return -1;
}
@@ -2201,7 +2203,7 @@ int main(int argc, char ** argv)
pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd->shm_sanitize, NULL,
- shm_du_map_sanitize, irmd->dum);
+ shm_rdrbuff_sanitize, irmd->rdrb);
/* wait for (all of them) to return */
for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 5e16c7e2..8c058dd8 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -33,7 +33,7 @@ set(SOURCE_FILES
logs.c
nsm.c
shm_ap_rbuff.c
- shm_du_map.c
+ shm_rdrbuff.c
sockets.c
time_utils.c
utils.c
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 3a5fc8e0..17c473ed 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -25,7 +25,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/utils.h>
@@ -45,7 +45,7 @@ struct flow {
struct ap_data {
char * ap_name;
pid_t api;
- struct shm_du_map * dum;
+ struct shm_rdrbuff * rdrb;
struct bmp * fds;
struct shm_ap_rbuff * rb;
pthread_rwlock_t data_lock;
@@ -105,8 +105,8 @@ int ap_init(char * ap_name)
return -ENOMEM;
}
- _ap_instance->dum = shm_du_map_open();
- if (_ap_instance->dum == NULL) {
+ _ap_instance->rdrb = shm_rdrbuff_open();
+ if (_ap_instance->rdrb == NULL) {
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
@@ -114,7 +114,7 @@ int ap_init(char * ap_name)
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
- shm_du_map_close(_ap_instance->dum);
+ shm_rdrbuff_close(_ap_instance->rdrb);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
@@ -146,12 +146,16 @@ void ap_fini(void)
pthread_rwlock_wrlock(&_ap_instance->data_lock);
+ /* remove all remaining sdus */
+ while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
+ shm_rdrbuff_remove(_ap_instance->rdrb, i);
+
if (_ap_instance->fds != NULL)
bmp_destroy(_ap_instance->fds);
if (_ap_instance->rb != NULL)
shm_ap_rbuff_destroy(_ap_instance->rb);
- if (_ap_instance->dum != NULL)
- shm_du_map_close_on_exit(_ap_instance->dum);
+ if (_ap_instance->rdrb != NULL)
+ shm_rdrbuff_close(_ap_instance->rdrb);
pthread_rwlock_rdlock(&_ap_instance->flows_lock);
@@ -515,7 +519,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_du_map_write(_ap_instance->dum,
+ idx = shm_rdrbuff_write(_ap_instance->rdrb,
_ap_instance->flows[fd].api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
@@ -531,18 +535,18 @@ ssize_t flow_write(int fd, void * buf, size_t count)
e.port_id = _ap_instance->flows[fd].port_id;
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
- shm_du_map_remove(_ap_instance->dum, idx);
+ shm_rdrbuff_remove(_ap_instance->rdrb, idx);
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
}
} else { /* blocking */
- struct shm_du_map * dum = _ap_instance->dum;
- pid_t api = _ap_instance->flows[fd].api;
+ struct shm_rdrbuff * rdrb = _ap_instance->rdrb;
+ pid_t api = _ap_instance->flows[fd].api;
pthread_rwlock_unlock(&_ap_instance->flows_lock);
pthread_rwlock_unlock(&_ap_instance->data_lock);
- idx = shm_du_map_write_b(dum,
+ idx = shm_rdrbuff_write_b(rdrb,
api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
@@ -567,7 +571,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
int flow_select(const struct timespec * timeout)
{
- int port_id = shm_ap_rbuff_peek(_ap_instance->rb, timeout);
+ int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout);
if (port_id < 0)
return port_id;
return port_id_to_fd(port_id);
@@ -612,7 +616,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return -EAGAIN;
}
- n = shm_du_map_read(&sdu, _ap_instance->dum, idx);
+ n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx);
if (n < 0) {
pthread_rwlock_unlock(&_ap_instance->data_lock);
return -1;
@@ -620,7 +624,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
memcpy(buf, sdu, MIN(n, count));
- shm_du_map_remove(_ap_instance->dum, idx);
+ shm_rdrbuff_remove(_ap_instance->rdrb, idx);
pthread_rwlock_unlock(&_ap_instance->data_lock);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 4ca29636..f21b1e86 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -285,8 +285,32 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
return 0;
}
-int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
+int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)
+{
+ int ret = 0;
+
+ if (rb == NULL)
+ return -EINVAL;
+
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+
+ if (shm_rbuff_empty(rb)) {
+ pthread_mutex_unlock(rb->lock);
+ return -1;
+ }
+
+ ret = (rb->shm_base + *rb->ptr_tail)->index;
+
+ pthread_mutex_unlock(rb->lock);
+
+ return ret;
+}
+
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ const struct timespec * timeout)
{
struct timespec abstime;
int ret = 0;
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
deleted file mode 100644
index 9ca282b9..00000000
--- a/src/lib/shm_du_map.c
+++ /dev/null
@@ -1,767 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016
- *
- * Shared memory map for data units
- *
- * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <ouroboros/config.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/shm_du_map.h>
-#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/time_utils.h>
-
-#include <pthread.h>
-#include <sys/mman.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <string.h>
-#include <signal.h>
-#include <sys/stat.h>
-
-#define OUROBOROS_PREFIX "shm_du_map"
-
-#include <ouroboros/logs.h>
-
-#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE)
-#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
- + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
- + sizeof(pid_t))
-
-#define get_head_ptr(dum) \
-((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_head * \
- SHM_DU_BUFF_BLOCK_SIZE)))
-
-#define get_tail_ptr(dum) \
-((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \
- SHM_DU_BUFF_BLOCK_SIZE)))
-
-#define idx_to_du_buff_ptr(dum, idx) \
- ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE)))
-
-#define block_ptr_to_idx(dum, sdb) \
- (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE)
-
-#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\
- & (SHM_BUFFER_SIZE - 1))
-#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE)
-
-#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
-
-struct shm_du_buff {
- size_t size;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- size_t blocks;
-#endif
- size_t du_head;
- size_t du_tail;
- pid_t dst_api;
-};
-
-struct shm_du_map {
- uint8_t * shm_base; /* start of blocks */
- size_t * ptr_head; /* start of ringbuffer head */
- size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * lock; /* lock all free space in shm */
- size_t * choked; /* stale sdu detection */
- pthread_cond_t * healthy; /* du map is healthy */
- pthread_cond_t * full; /* run sanitizer when buffer full */
- pid_t * api; /* api of the irmd owner */
- int fd;
-};
-
-static void garbage_collect(struct shm_du_map * dum)
-{
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- struct shm_du_buff * sdb;
- while (!shm_map_empty(dum) && (sdb = get_tail_ptr(dum))->dst_api == -1)
- *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks)
- & (SHM_BUFFER_SIZE - 1);
-#else
- while (!shm_map_empty(dum) && get_tail_ptr(dum)->dst_api == -1)
- *dum->ptr_tail =
- (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1);
-
-#endif
-}
-
-static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit)
-{
- size_t idx = *dum->ptr_tail;
- struct shm_du_buff * buf;
-
- while (idx != *dum->ptr_head) {
- buf = idx_to_du_buff_ptr(dum, idx);
- if (buf->dst_api == api)
- buf->dst_api = -1;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1);
-#else
- idx = (idx + 1) & (SHM_BUFFER_SIZE - 1);
-#endif
- }
-
- garbage_collect(dum);
-
- if (!exit && kill(api, 0) == 0) {
- struct shm_ap_rbuff * rb;
- rb = shm_ap_rbuff_open(api);
- if (rb != NULL) {
- shm_ap_rbuff_reset(rb);
- shm_ap_rbuff_close(rb);
- }
- }
-
- *dum->choked = 0;
-}
-
-struct shm_du_map * shm_du_map_create()
-{
- struct shm_du_map * dum;
- int shm_fd;
- uint8_t * shm_base;
- pthread_mutexattr_t mattr;
- pthread_condattr_t cattr;
-
- dum = malloc(sizeof *dum);
- if (dum == NULL) {
- LOG_DBGF("Could not allocate struct.");
- return NULL;
- }
-
- shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_CREAT | O_EXCL | O_RDWR, 0666);
- if (shm_fd == -1) {
- LOG_DBGF("Failed creating shared memory map.");
- free(dum);
- return NULL;
- }
-
- if (fchmod(shm_fd, 0666)) {
- LOG_DBGF("Failed to chmod shared memory map.");
- free(dum);
- return NULL;
- }
-
- if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {
- LOG_DBGF("Failed to extend shared memory map.");
- free(dum);
- return NULL;
- }
-
- if (write(shm_fd, "", 1) != 1) {
- LOG_DBGF("Failed to finalise extension of shared memory map.");
- free(dum);
- return NULL;
- }
-
- shm_base = mmap(NULL,
- SHM_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
-
- if (shm_base == MAP_FAILED) {
- LOG_DBGF("Failed to map shared memory.");
- if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)
- LOG_DBGF("Failed to remove invalid shm.");
-
- free(dum);
- return NULL;
- }
-
- dum->shm_base = shm_base;
- dum->ptr_head = (size_t *)
- ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
- dum->ptr_tail = dum->ptr_head + 1;
- dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->choked = (size_t *) (dum->lock + 1);
- dum->healthy = (pthread_cond_t *) (dum->choked + 1);
- dum->full = dum->healthy + 1;
- dum->api = (pid_t *) (dum->full + 1);
-
- pthread_mutexattr_init(&mattr);
- pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
- pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
- pthread_mutex_init(dum->lock, &mattr);
-
- pthread_condattr_init(&cattr);
- pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
- pthread_cond_init(dum->full, &cattr);
- pthread_cond_init(dum->healthy, &cattr);
-
- *dum->ptr_head = 0;
- *dum->ptr_tail = 0;
-
- *dum->choked = 0;
-
- *dum->api = getpid();
-
- dum->fd = shm_fd;
-
- return dum;
-}
-
-struct shm_du_map * shm_du_map_open()
-{
- struct shm_du_map * dum;
- int shm_fd;
- uint8_t * shm_base;
-
- dum = malloc(sizeof *dum);
- if (dum == NULL) {
- LOG_DBGF("Could not allocate struct.");
- return NULL;
- }
-
- shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666);
- if (shm_fd < 0) {
- LOG_DBGF("Failed opening shared memory.");
- free(dum);
- return NULL;
- }
-
- shm_base = mmap(NULL,
- SHM_FILE_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_SHARED,
- shm_fd,
- 0);
- if (shm_base == MAP_FAILED) {
- LOG_DBGF("Failed to map shared memory.");
- if (close(shm_fd) == -1)
- LOG_DBGF("Failed to close invalid shm.");
- if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)
- LOG_DBGF("Failed to unlink invalid shm.");
- free(dum);
- return NULL;
- }
-
- dum->shm_base = shm_base;
- dum->ptr_head = (size_t *)
- ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
- dum->ptr_tail = dum->ptr_head + 1;
- dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->choked = (size_t *) (dum->lock + 1);
- dum->healthy = (pthread_cond_t *) (dum->choked + 1);
- dum->full = dum->healthy + 1;
- dum->api = (pid_t *) (dum->full + 1);
-
- dum->fd = shm_fd;
-
- return dum;
-}
-
-void * shm_du_map_sanitize(void * o)
-{
- struct shm_du_map * dum = (struct shm_du_map *) o;
- struct timespec intv
- = {SHM_DU_TIMEOUT_MICROS / MILLION,
- (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000};
-
- pid_t api;
-
- if (dum == NULL)
- return (void *) -1;
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
- (void *) dum->lock);
-
- while (true) {
- int ret = 0;
- struct timespec now;
- struct timespec dl;
-
- if (pthread_cond_wait(dum->full, dum->lock) == EOWNERDEAD) {
- LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- *dum->choked = 1;
-
- garbage_collect(dum);
-
- if (shm_map_empty(dum))
- continue;
-
- api = get_tail_ptr(dum)->dst_api;
-
- if (kill(api, 0)) {
- LOG_DBGF("Dead process %d left stale sdu.", api);
- clean_sdus(dum, api, false);
- continue;
- }
-
- clock_gettime(CLOCK_REALTIME, &now);
- ts_add(&now, &intv, &dl);
- while (*dum->choked) {
- ret = pthread_cond_timedwait(dum->healthy,
- dum->lock,
- &dl);
- if (!ret)
- continue;
-
- if (ret == EOWNERDEAD) {
- LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- if (ret == ETIMEDOUT) {
- LOG_DBGF("SDU timed out (dst: %d).", api);
- clean_sdus(dum, api, false);
- }
- }
- }
-
- pthread_cleanup_pop(true);
-
- return (void *) 0;
-}
-
-void shm_du_map_close_on_exit(struct shm_du_map * dum)
-{
- if (dum == NULL) {
- LOG_DBGF("Bogus input. Bugging out.");
- return;
- }
-
- clean_sdus(dum, getpid(), true);
-
- if (close(dum->fd) < 0)
- LOG_DBGF("Couldn't close shared memory.");
-
- if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
- LOG_DBGF("Couldn't unmap shared memory.");
-
- free(dum);
-}
-
-void shm_du_map_close(struct shm_du_map * dum)
-{
- if (dum == NULL) {
- LOG_DBGF("Bogus input. Bugging out.");
- return;
- }
-
- if (close(dum->fd) < 0)
- LOG_DBGF("Couldn't close shared memory.");
-
- if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
- LOG_DBGF("Couldn't unmap shared memory.");
-
- free(dum);
-}
-
-void shm_du_map_destroy(struct shm_du_map * dum)
-{
- if (dum == NULL) {
- LOG_DBGF("Bogus input. Bugging out.");
- return;
- }
-
- if (getpid() != *dum->api && kill(*dum->api, 0) == 0) {
- LOG_DBGF("Only IRMd can destroy %s.", SHM_DU_MAP_FILENAME);
- return;
- }
-
- if (close(dum->fd) < 0)
- LOG_DBGF("Couldn't close shared memory.");
-
- if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
- LOG_DBGF("Couldn't unmap shared memory.");
-
- if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)
- LOG_DBGF("Failed to unlink shm.");
-
- free(dum);
-}
-
-ssize_t shm_du_map_write(struct shm_du_map * dum,
- pid_t dst_api,
- size_t headspace,
- size_t tailspace,
- uint8_t * data,
- size_t len)
-{
- struct shm_du_buff * sdb;
- size_t size = headspace + len + tailspace;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- long blocks = 0;
- long padblocks = 0;
-#endif
- int sz = size + sizeof *sdb;
- uint8_t * write_pos;
- ssize_t idx = -1;
-
- if (dum == NULL || data == NULL) {
- LOG_DBGF("Bogus input, bugging out.");
- return -1;
- }
-
-#ifndef SHM_DU_MAP_MULTI_BLOCK
- if (sz > SHM_DU_BUFF_BLOCK_SIZE) {
- LOG_DBGF("Multi-block SDU's disabled. Dropping.");
- return -1;
- }
-#endif
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
-
- if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE)
- padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;
-
- if (!shm_map_free(dum, (blocks + padblocks))) {
-#else
- if (!shm_map_free(dum, 1)) {
-#endif
- pthread_cond_signal(dum->full);
- pthread_mutex_unlock(dum->lock);
- return -1;
- }
-
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- if (padblocks) {
- sdb = get_head_ptr(dum);
- sdb->size = 0;
- sdb->blocks = padblocks;
- sdb->dst_api = -1;
- sdb->du_head = 0;
- sdb->du_tail = 0;
-
- *dum->ptr_head = 0;
- }
-#endif
- sdb = get_head_ptr(dum);
- sdb->size = size;
- sdb->dst_api = dst_api;
- sdb->du_head = headspace;
- sdb->du_tail = sdb->du_head + len;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- sdb->blocks = blocks;
-#endif
- write_pos = ((uint8_t *) (sdb + 1)) + headspace;
-
- memcpy(write_pos, data, len);
-
- idx = *dum->ptr_head;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
-#else
- *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
-#endif
- pthread_mutex_unlock(dum->lock);
-
- return idx;
-}
-
-ssize_t shm_du_map_write_b(struct shm_du_map * dum,
- pid_t dst_api,
- size_t headspace,
- size_t tailspace,
- uint8_t * data,
- size_t len)
-{
- struct shm_du_buff * sdb;
- size_t size = headspace + len + tailspace;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- long blocks = 0;
- long padblocks = 0;
-#endif
- int sz = size + sizeof *sdb;
- uint8_t * write_pos;
- ssize_t idx = -1;
-
- if (dum == NULL || data == NULL) {
- LOG_DBGF("Bogus input, bugging out.");
- return -1;
- }
-
-#ifndef SHM_DU_MAP_MULTI_BLOCK
- if (sz > SHM_DU_BUFF_BLOCK_SIZE) {
- LOG_DBGF("Multi-block SDU's disabled. Dropping.");
- return -1;
- }
-#endif
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) dum->lock);
-
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
- }
-
- if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE)
- padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;
-
- while (!shm_map_free(dum, (blocks + padblocks))) {
-#else
- while (!shm_map_free(dum, 1)) {
-#endif
- pthread_cond_signal(dum->full);
- pthread_cond_wait(dum->healthy, dum->lock);
- }
-
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- if (padblocks) {
- sdb = get_head_ptr(dum);
- sdb->size = 0;
- sdb->blocks = padblocks;
- sdb->dst_api = -1;
- sdb->du_head = 0;
- sdb->du_tail = 0;
-
- *dum->ptr_head = 0;
- }
-#endif
- sdb = get_head_ptr(dum);
- sdb->size = size;
- sdb->dst_api = dst_api;
- sdb->du_head = headspace;
- sdb->du_tail = sdb->du_head + len;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- sdb->blocks = blocks;
-#endif
- write_pos = ((uint8_t *) (sdb + 1)) + headspace;
-
- memcpy(write_pos, data, len);
-
- idx = *dum->ptr_head;
-#ifdef SHM_DU_MAP_MULTI_BLOCK
- *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
-#else
- *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
-#endif
- pthread_cleanup_pop(true);
-
- return idx;
-}
-
-int shm_du_map_read(uint8_t ** dst,
- struct shm_du_map * dum,
- ssize_t idx)
-{
- size_t len = 0;
- struct shm_du_buff * sdb;
-
- if (idx > SHM_BUFFER_SIZE)
- return -1;
-
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- if (shm_map_empty(dum)) {
- pthread_mutex_unlock(dum->lock);
- return -1;
- }
-
- sdb = idx_to_du_buff_ptr(dum, idx);
- len = sdb->du_tail - sdb->du_head;
- *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head;
-
- pthread_mutex_unlock(dum->lock);
-
- return len;
-}
-
-int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
-{
- if (idx > SHM_BUFFER_SIZE)
- return -1;
-
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- if (shm_map_empty(dum)) {
- pthread_mutex_unlock(dum->lock);
- return -1;
- }
-
- idx_to_du_buff_ptr(dum, idx)->dst_api = -1;
-
- if (idx != *dum->ptr_tail) {
- pthread_mutex_unlock(dum->lock);
- return 0;
- }
-
- garbage_collect(dum);
-
- *dum->choked = 0;
-
- pthread_cond_broadcast(dum->healthy);
-
- pthread_mutex_unlock(dum->lock);
-
- return 0;
-}
-
-uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,
- int idx,
- ssize_t size)
-{
- struct shm_du_buff * sdb;
- uint8_t * buf;
-
- if (dum == NULL)
- return NULL;
-
- if (idx < 0 || idx > SHM_BUFFER_SIZE)
- return NULL;
-
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- sdb = idx_to_du_buff_ptr(dum, idx);
-
- if ((long) (sdb->du_head - size) < 0) {
- pthread_mutex_unlock(dum->lock);
- LOG_DBGF("Failed to allocate PCI headspace.");
- return NULL;
- }
-
- sdb->du_head -= size;
-
- buf = (uint8_t *) (sdb + 1) + sdb->du_head;
-
- pthread_mutex_unlock(dum->lock);
-
- return buf;
-}
-
-uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,
- int idx,
- ssize_t size)
-{
- struct shm_du_buff * sdb;
- uint8_t * buf;
-
- if (dum == NULL)
- return NULL;
-
- if (idx < 0 || idx > SHM_BUFFER_SIZE)
- return NULL;
-
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- sdb = idx_to_du_buff_ptr(dum, idx);
-
- if (sdb->du_tail + size >= sdb->size) {
- pthread_mutex_unlock(dum->lock);
- LOG_DBGF("Failed to allocate PCI tailspace.");
- return NULL;
- }
-
- buf = (uint8_t *) (sdb + 1) + sdb->du_tail;
-
- sdb->du_tail += size;
-
- pthread_mutex_unlock(dum->lock);
-
- return buf;
-}
-
-int shm_du_buff_head_release(struct shm_du_map * dum,
- int idx,
- ssize_t size)
-{
- struct shm_du_buff * sdb;
-
- if (dum == NULL)
- return -1;
-
- if (idx < 0 || idx > SHM_BUFFER_SIZE)
- return -1;
-
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- sdb = idx_to_du_buff_ptr(dum, idx);
-
- if (size > sdb->du_tail - sdb->du_head) {
- pthread_mutex_unlock(dum->lock);
- LOG_DBGF("Tried to release beyond sdu boundary.");
- return -EOVERFLOW;
- }
-
- sdb->du_head += size;
-
- pthread_mutex_unlock(dum->lock);
-
- return 0;
-}
-
-int shm_du_buff_tail_release(struct shm_du_map * dum,
- int idx,
- ssize_t size)
-{
- struct shm_du_buff * sdb;
-
- if (dum == NULL)
- return -1;
-
- if (idx < 0 || idx > SHM_BUFFER_SIZE)
- return -1;
-
- if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
- LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->lock);
- }
-
- sdb = idx_to_du_buff_ptr(dum, idx);
-
- if (size > sdb->du_tail - sdb->du_head) {
- pthread_mutex_unlock(dum->lock);
- LOG_DBGF("Tried to release beyond sdu boundary.");
- return -EOVERFLOW;
- }
-
- sdb->du_tail -= size;
-
- pthread_mutex_unlock(dum->lock);
-
- return 0;
-}
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
new file mode 100644
index 00000000..d42dbea7
--- /dev/null
+++ b/src/lib/shm_rdrbuff.c
@@ -0,0 +1,804 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Random Deletion Ring Buffer for Data Units
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/shm_rdrbuff.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/time_utils.h>
+
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+#define OUROBOROS_PREFIX "shm_rdrbuff"
+
+#include <ouroboros/logs.h>
+
+#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_RDRB_BLOCK_SIZE)
+#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
+ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ + sizeof(pid_t))
+
+#define get_head_ptr(rdrb) \
+ ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_head * \
+ SHM_RDRB_BLOCK_SIZE)))
+
+#define get_tail_ptr(rdrb) \
+ ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_tail * \
+ SHM_RDRB_BLOCK_SIZE)))
+
+#define idx_to_du_buff_ptr(rdrb, idx) \
+ ((struct shm_du_buff *)(rdrb->shm_base + (idx * SHM_RDRB_BLOCK_SIZE)))
+
+#define block_ptr_to_idx(rdrb, sdb) \
+ (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE)
+
+#define shm_rdrb_used(rdrb) \
+ ((*rdrb->ptr_head + SHM_BUFFER_SIZE - *rdrb->ptr_tail) \
+ & (SHM_BUFFER_SIZE - 1))
+#define shm_rdrb_free(rdrb, i) \
+ (shm_rdrb_used(rdrb) + i < SHM_BUFFER_SIZE)
+
+#define shm_rdrb_empty(rdrb) \
+ (*rdrb->ptr_tail == *rdrb->ptr_head)
+
+struct shm_du_buff {
+ size_t size;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ size_t blocks;
+#endif
+ size_t du_head;
+ size_t du_tail;
+ pid_t dst_api;
+};
+
+struct shm_rdrbuff {
+ uint8_t * shm_base; /* start of blocks */
+ size_t * ptr_head; /* start of ringbuffer head */
+ size_t * ptr_tail; /* start of ringbuffer tail */
+ pthread_mutex_t * lock; /* lock all free space in shm */
+ size_t * choked; /* stale sdu detection */
+ pthread_cond_t * healthy; /* du map is healthy */
+ pthread_cond_t * full; /* run sanitizer when buffer full */
+ pid_t * api; /* api of the irmd owner */
+ enum qos_cube qos; /* qos id which this buffer serves */
+ int fd;
+};
+
+static void garbage_collect(struct shm_rdrbuff * rdrb)
+{
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ struct shm_du_buff * sdb;
+ while (!shm_rdrb_empty(rdrb) &&
+ (sdb = get_tail_ptr(rdrb))->dst_api == -1)
+ *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks)
+ & (SHM_BUFFER_SIZE - 1);
+#else
+ while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1)
+ *rdrb->ptr_tail =
+ (*rdrb->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1);
+
+#endif
+}
+
+static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api)
+{
+ size_t idx = *rdrb->ptr_tail;
+ struct shm_du_buff * buf;
+
+ while (idx != *rdrb->ptr_head) {
+ buf = idx_to_du_buff_ptr(rdrb, idx);
+ if (buf->dst_api == api)
+ buf->dst_api = -1;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1);
+#else
+ idx = (idx + 1) & (SHM_BUFFER_SIZE - 1);
+#endif
+ }
+
+ garbage_collect(rdrb);
+
+ *rdrb->choked = 0;
+}
+
+static char * rdrb_filename(enum qos_cube qos)
+{
+ int chars = 0;
+ char * str;
+ int qm = QOS_MAX;
+
+ do {
+ qm /= 10;
+ ++chars;
+ } while (qm > 0);
+
+ str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2);
+ if (str == NULL) {
+ LOG_ERR("Failed to create shm_rdrbuff: Out of Memory.");
+ return NULL;
+ }
+
+ sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos);
+
+ return str;
+}
+
+/* FIXME: create a ringbuffer for each qos cube in the system */
+struct shm_rdrbuff * shm_rdrbuff_create()
+{
+ struct shm_rdrbuff * rdrb;
+ int shm_fd;
+ uint8_t * shm_base;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ enum qos_cube qos = QOS_CUBE_BE;
+ char * shm_rdrb_fn = rdrb_filename(qos);
+ if (shm_rdrb_fn == NULL) {
+ LOG_ERR("Could not create rdrbuff. Out of Memory");
+ return NULL;
+ }
+
+ rdrb = malloc(sizeof *rdrb);
+ if (rdrb == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBGF("Failed creating shared memory map.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ if (fchmod(shm_fd, 0666)) {
+ LOG_DBGF("Failed to chmod shared memory map.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) {
+ LOG_DBGF("Failed to extend shared memory map.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ if (write(shm_fd, "", 1) != 1) {
+ LOG_DBGF("Failed to finalise extension of shared memory map.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBGF("Failed to map shared memory.");
+ if (shm_unlink(shm_rdrb_fn) == -1)
+ LOG_DBGF("Failed to remove invalid shm.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ rdrb->shm_base = shm_base;
+ rdrb->ptr_head = (size_t *)
+ ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
+ rdrb->ptr_tail = rdrb->ptr_head + 1;
+ rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1);
+ rdrb->choked = (size_t *) (rdrb->lock + 1);
+ rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1);
+ rdrb->full = rdrb->healthy + 1;
+ rdrb->api = (pid_t *) (rdrb->full + 1);
+
+ pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+ pthread_mutex_init(rdrb->lock, &mattr);
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+ pthread_cond_init(rdrb->full, &cattr);
+ pthread_cond_init(rdrb->healthy, &cattr);
+
+ *rdrb->ptr_head = 0;
+ *rdrb->ptr_tail = 0;
+
+ *rdrb->choked = 0;
+
+ *rdrb->api = getpid();
+
+ rdrb->qos = qos;
+ rdrb->fd = shm_fd;
+
+ free(shm_rdrb_fn);
+
+ return rdrb;
+}
+
+/* FIXME: open a ringbuffer for each qos cube in the system */
+struct shm_rdrbuff * shm_rdrbuff_open()
+{
+ struct shm_rdrbuff * rdrb;
+ int shm_fd;
+ uint8_t * shm_base;
+
+ enum qos_cube qos = QOS_CUBE_BE;
+ char * shm_rdrb_fn = rdrb_filename(qos);
+ if (shm_rdrb_fn == NULL) {
+ LOG_ERR("Could not create rdrbuff. Out of Memory");
+ return NULL;
+ }
+
+ rdrb = malloc(sizeof *rdrb);
+ if (rdrb == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(shm_rdrb_fn, O_RDWR, 0666);
+ if (shm_fd < 0) {
+ LOG_DBGF("Failed opening shared memory.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+ if (shm_base == MAP_FAILED) {
+ LOG_DBGF("Failed to map shared memory.");
+ if (close(shm_fd) == -1)
+ LOG_DBG("Failed to close invalid shm.");
+ if (shm_unlink(shm_rdrb_fn) == -1)
+ LOG_DBG("Failed to unlink invalid shm.");
+ free(shm_rdrb_fn);
+ free(rdrb);
+ return NULL;
+ }
+
+ rdrb->shm_base = shm_base;
+ rdrb->ptr_head = (size_t *)
+ ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
+ rdrb->ptr_tail = rdrb->ptr_head + 1;
+ rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1);
+ rdrb->choked = (size_t *) (rdrb->lock + 1);
+ rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1);
+ rdrb->full = rdrb->healthy + 1;
+ rdrb->api = (pid_t *) (rdrb->full + 1);
+
+ rdrb->qos = qos;
+ rdrb->fd = shm_fd;
+
+ free(shm_rdrb_fn);
+
+ return rdrb;
+}
+
+void * shm_rdrbuff_sanitize(void * o)
+{
+ struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o;
+ struct timespec intv
+ = {SHM_DU_TIMEOUT_MICROS / MILLION,
+ (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000};
+
+ pid_t api;
+
+ if (rdrb == NULL)
+ return (void *) -1;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ (void *) rdrb->lock);
+
+ while (true) {
+ int ret = 0;
+ struct timespec now;
+ struct timespec dl;
+
+ if (pthread_cond_wait(rdrb->full, rdrb->lock) == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ *rdrb->choked = 1;
+
+ garbage_collect(rdrb);
+
+ if (shm_rdrb_empty(rdrb))
+ continue;
+
+ api = get_tail_ptr(rdrb)->dst_api;
+
+ if (kill(api, 0)) {
+ LOG_DBGF("Dead process %d left stale sdu.", api);
+ clean_sdus(rdrb, api);
+ continue;
+ }
+
+ clock_gettime(CLOCK_REALTIME, &now);
+ ts_add(&now, &intv, &dl);
+ while (*rdrb->choked) {
+ ret = pthread_cond_timedwait(rdrb->healthy,
+ rdrb->lock,
+ &dl);
+ if (!ret)
+ continue;
+
+ if (ret == EOWNERDEAD) {
+ LOG_WARN("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ if (ret == ETIMEDOUT) {
+ LOG_DBGF("SDU timed out (dst: %d).", api);
+ clean_sdus(rdrb, api);
+ }
+ }
+ }
+
+ pthread_cleanup_pop(true);
+
+ return (void *) 0;
+}
+
+void shm_rdrbuff_close(struct shm_rdrbuff * rdrb)
+{
+ if (rdrb == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ if (close(rdrb->fd) < 0)
+ LOG_DBGF("Couldn't close shared memory.");
+
+ if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
+ free(rdrb);
+}
+
+void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)
+{
+ char * shm_rdrb_fn;
+
+ if (rdrb == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ if (getpid() != *rdrb->api && kill(*rdrb->api, 0) == 0) {
+ LOG_DBG("Process %d tried to destroy active rdrb.", getpid());
+ return;
+ }
+
+ if (close(rdrb->fd) < 0)
+ LOG_DBG("Couldn't close shared memory.");
+
+ if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)
+ LOG_DBG("Couldn't unmap shared memory.");
+
+ shm_rdrb_fn = rdrb_filename(rdrb->qos);
+ if (shm_rdrb_fn == NULL) {
+ LOG_ERR("Could not create rdrbuff. Out of Memory");
+ return;
+ }
+
+ if (shm_unlink(shm_rdrb_fn) == -1)
+ LOG_DBG("Failed to unlink shm.");
+
+ free(rdrb);
+ free(shm_rdrb_fn);
+}
+
+ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
+{
+ struct shm_du_buff * sdb;
+ size_t size = headspace + len + tailspace;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ long blocks = 0;
+ long padblocks = 0;
+#endif
+ int sz = size + sizeof *sdb;
+ uint8_t * write_pos;
+ ssize_t idx = -1;
+
+ if (rdrb == NULL || data == NULL) {
+ LOG_DBGF("Bogus input, bugging out.");
+ return -1;
+ }
+
+#ifndef SHM_RDRBUFF_MULTI_BLOCK
+ if (sz > SHM_RDRB_BLOCK_SIZE) {
+ LOG_DBGF("Multi-block SDU's disabled. Dropping.");
+ return -1;
+ }
+#endif
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE)
+ padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head;
+
+ if (!shm_rdrb_free(rdrb, (blocks + padblocks))) {
+#else
+ if (!shm_rdrb_free(rdrb, 1)) {
+#endif
+ pthread_cond_signal(rdrb->full);
+ pthread_mutex_unlock(rdrb->lock);
+ return -1;
+ }
+
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ if (padblocks) {
+ sdb = get_head_ptr(rdrb);
+ sdb->size = 0;
+ sdb->blocks = padblocks;
+ sdb->dst_api = -1;
+ sdb->du_head = 0;
+ sdb->du_tail = 0;
+
+ *rdrb->ptr_head = 0;
+ }
+#endif
+ sdb = get_head_ptr(rdrb);
+ sdb->size = size;
+ sdb->dst_api = dst_api;
+ sdb->du_head = headspace;
+ sdb->du_tail = sdb->du_head + len;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ sdb->blocks = blocks;
+#endif
+ write_pos = ((uint8_t *) (sdb + 1)) + headspace;
+
+ memcpy(write_pos, data, len);
+
+ idx = *rdrb->ptr_head;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
+#else
+ *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
+#endif
+ pthread_mutex_unlock(rdrb->lock);
+
+ return idx;
+}
+
+ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
+{
+ struct shm_du_buff * sdb;
+ size_t size = headspace + len + tailspace;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ long blocks = 0;
+ long padblocks = 0;
+#endif
+ int sz = size + sizeof *sdb;
+ uint8_t * write_pos;
+ ssize_t idx = -1;
+
+ if (rdrb == NULL || data == NULL) {
+ LOG_DBGF("Bogus input, bugging out.");
+ return -1;
+ }
+
+#ifndef SHM_RDRBUFF_MULTI_BLOCK
+ if (sz > SHM_RDRB_BLOCK_SIZE) {
+ LOG_DBGF("Multi-block SDU's disabled. Dropping.");
+ return -1;
+ }
+#endif
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rdrb->lock);
+
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE)
+ padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head;
+
+ while (!shm_rdrb_free(rdrb, (blocks + padblocks))) {
+#else
+ while (!shm_rdrb_free(rdrb, 1)) {
+#endif
+ pthread_cond_signal(rdrb->full);
+ pthread_cond_wait(rdrb->healthy, rdrb->lock);
+ }
+
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ if (padblocks) {
+ sdb = get_head_ptr(rdrb);
+ sdb->size = 0;
+ sdb->blocks = padblocks;
+ sdb->dst_api = -1;
+ sdb->du_head = 0;
+ sdb->du_tail = 0;
+
+ *rdrb->ptr_head = 0;
+ }
+#endif
+ sdb = get_head_ptr(rdrb);
+ sdb->size = size;
+ sdb->dst_api = dst_api;
+ sdb->du_head = headspace;
+ sdb->du_tail = sdb->du_head + len;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ sdb->blocks = blocks;
+#endif
+ write_pos = ((uint8_t *) (sdb + 1)) + headspace;
+
+ memcpy(write_pos, data, len);
+
+ idx = *rdrb->ptr_head;
+#ifdef SHM_RDRBUFF_MULTI_BLOCK
+ *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
+#else
+ *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
+#endif
+ pthread_cleanup_pop(true);
+
+ return idx;
+}
+
+int shm_rdrbuff_read(uint8_t ** dst,
+ struct shm_rdrbuff * rdrb,
+ ssize_t idx)
+{
+ size_t len = 0;
+ struct shm_du_buff * sdb;
+
+ if (idx > SHM_BUFFER_SIZE)
+ return -1;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ if (shm_rdrb_empty(rdrb)) {
+ pthread_mutex_unlock(rdrb->lock);
+ return -1;
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+ len = sdb->du_tail - sdb->du_head;
+ *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head;
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return len;
+}
+
+int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)
+{
+ if (idx > SHM_BUFFER_SIZE)
+ return -1;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ if (shm_rdrb_empty(rdrb)) {
+ pthread_mutex_unlock(rdrb->lock);
+ return -1;
+ }
+
+ idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1;
+
+ if (idx != *rdrb->ptr_tail) {
+ pthread_mutex_unlock(rdrb->lock);
+ return 0;
+ }
+
+ garbage_collect(rdrb);
+
+ *rdrb->choked = 0;
+
+ pthread_cond_broadcast(rdrb->healthy);
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return 0;
+}
+
+uint8_t * shm_du_buff_head_alloc(struct shm_rdrbuff * rdrb,
+ int idx,
+ ssize_t size)
+{
+ struct shm_du_buff * sdb;
+ uint8_t * buf;
+
+ if (rdrb == NULL)
+ return NULL;
+
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
+ return NULL;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+
+ if ((long) (sdb->du_head - size) < 0) {
+ pthread_mutex_unlock(rdrb->lock);
+ LOG_DBGF("Failed to allocate PCI headspace.");
+ return NULL;
+ }
+
+ sdb->du_head -= size;
+
+ buf = (uint8_t *) (sdb + 1) + sdb->du_head;
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return buf;
+}
+
+uint8_t * shm_du_buff_tail_alloc(struct shm_rdrbuff * rdrb,
+ int idx,
+ ssize_t size)
+{
+ struct shm_du_buff * sdb;
+ uint8_t * buf;
+
+ if (rdrb == NULL)
+ return NULL;
+
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
+ return NULL;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+
+ if (sdb->du_tail + size >= sdb->size) {
+ pthread_mutex_unlock(rdrb->lock);
+ LOG_DBGF("Failed to allocate PCI tailspace.");
+ return NULL;
+ }
+
+ buf = (uint8_t *) (sdb + 1) + sdb->du_tail;
+
+ sdb->du_tail += size;
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return buf;
+}
+
+int shm_du_buff_head_release(struct shm_rdrbuff * rdrb,
+ int idx,
+ ssize_t size)
+{
+ struct shm_du_buff * sdb;
+
+ if (rdrb == NULL)
+ return -1;
+
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
+ return -1;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+
+ if (size > sdb->du_tail - sdb->du_head) {
+ pthread_mutex_unlock(rdrb->lock);
+ LOG_DBGF("Tried to release beyond sdu boundary.");
+ return -EOVERFLOW;
+ }
+
+ sdb->du_head += size;
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return 0;
+}
+
+int shm_du_buff_tail_release(struct shm_rdrbuff * rdrb,
+ int idx,
+ ssize_t size)
+{
+ struct shm_du_buff * sdb;
+
+ if (rdrb == NULL)
+ return -1;
+
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
+ return -1;
+
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+
+ if (size > sdb->du_tail - sdb->du_head) {
+ pthread_mutex_unlock(rdrb->lock);
+ LOG_DBGF("Tried to release beyond sdu boundary.");
+ return -EOVERFLOW;
+ }
+
+ sdb->du_tail -= size;
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return 0;
+}