diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-08-29 19:49:39 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-08-29 20:32:54 +0200 |
commit | 2cc89f6da424ab503af563e0cc92dda43b8f8432 (patch) | |
tree | 303d3d61717d4d3018b8025a9825ff799da01c08 /src | |
parent | caeefb4d96331d24b38e845c99d0517913a71671 (diff) | |
download | ouroboros-2cc89f6da424ab503af563e0cc92dda43b8f8432.tar.gz ouroboros-2cc89f6da424ab503af563e0cc92dda43b8f8432.zip |
lib: Refactor shm_du_map to shm_rdrbuff
The shm_du_map is renamed to shm_rdrbuff to reflect the Random
Deletion Ringbuffer used in the implementation. The close_on_exit call
is removed and SDUs are cleaned up by the application in the ap_fini()
call. This required a non-blocking peek() operation in the shm_ap_rbuff.
Some initial implementation for future support of qos cubes has been
added to the shm_rdrbuff.
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/ipcp-data.c | 1 | ||||
-rw-r--r-- | src/ipcpd/local/main.c | 19 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 20 | ||||
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 63 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 29 | ||||
-rw-r--r-- | src/irmd/main.c | 48 | ||||
-rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/lib/dev.c | 34 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 28 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 767 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 804 |
11 files changed, 950 insertions, 865 deletions
diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 593baeba..c4838d3a 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -22,7 +22,6 @@ */ #include <ouroboros/config.h> -#include <ouroboros/shm_du_map.h> #include <ouroboros/list.h> #define OUROBOROS_PREFIX "ipcp-utils" diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 547e7e28..4fa7e33f 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -24,7 +24,7 @@ #include "ipcp.h" #include "flow.h" #include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> @@ -67,7 +67,7 @@ struct ipcp * _ipcp; /* the shim needs access to these internals */ struct shim_ap_data { pid_t api; - struct shm_du_map * dum; + struct shm_rdrbuff * rdrb; struct bmp * fds; struct shm_ap_rbuff * rb; @@ -98,8 +98,8 @@ static int shim_ap_init() return -1; } - _ap_instance->dum = shm_du_map_open(); - if (_ap_instance->dum == NULL) { + _ap_instance->rdrb = shm_rdrbuff_open(); + if (_ap_instance->rdrb == NULL) { bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -107,7 +107,7 @@ static int shim_ap_init() _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { - shm_du_map_close(_ap_instance->dum); + shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -139,8 +139,13 @@ void shim_ap_fini() if (_ap_instance->fds != NULL) bmp_destroy(_ap_instance->fds); - if (_ap_instance->dum != NULL) - shm_du_map_close_on_exit(_ap_instance->dum); + + /* remove all remaining sdus */ + while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) + shm_rdrbuff_remove(_ap_instance->rdrb, i); + + if (_ap_instance->rdrb != NULL) + shm_rdrbuff_close(_ap_instance->rdrb); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 335330ae..cf4ae3f1 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -24,7 +24,7 @@ #include <ouroboros/config.h> #include <ouroboros/logs.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp.h> @@ -55,7 +55,7 @@ struct normal_ipcp_data { /* Keep ipcp_data first for polymorphism. */ struct ipcp_data ipcp_data; - struct shm_du_map * dum; + struct shm_rdrbuff * rdrb; struct shm_ap_rbuff * rb; pthread_t mainloop; @@ -206,15 +206,15 @@ struct normal_ipcp_data * normal_ipcp_data_create() return NULL; } - normal_data->dum = shm_du_map_open(); - if (normal_data->dum == NULL) { + normal_data->rdrb = shm_rdrbuff_open(); + if (normal_data->rdrb == NULL) { free(normal_data); return NULL; } normal_data->rb = shm_ap_rbuff_open(getpid()); if (normal_data->rb == NULL) { - shm_du_map_close(normal_data->dum); + shm_rdrbuff_close(normal_data->rdrb); free(normal_data); return NULL; } @@ -225,6 +225,8 @@ struct normal_ipcp_data * normal_ipcp_data_create() void normal_ipcp_data_destroy() { + int idx = 0; + if (_ipcp == NULL) return; @@ -233,8 +235,12 @@ void normal_ipcp_data_destroy() if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) LOG_WARN("Cleaning up while not in shutdown."); - if (normal_data(_ipcp)->dum != NULL) - shm_du_map_close_on_exit(normal_data(_ipcp)->dum); + /* remove all remaining sdus */ + while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0) + shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx); + + if (normal_data(_ipcp)->rdrb != NULL) + shm_rdrbuff_close(normal_data(_ipcp)->rdrb); if (normal_data(_ipcp)->rb != NULL) shm_ap_rbuff_close(normal_data(_ipcp)->rb); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 608b0029..d1100001 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -27,7 +27,7 @@ #include "ipcp.h" #include "flow.h" #include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> @@ -122,7 +122,7 @@ struct eth_llc_ipcp_data { struct bmp * indices; struct bmp * saps; - struct shm_du_map * dum; + struct shm_rdrbuff * rdrb; struct shm_ap_rbuff * rb; uint8_t * rx_ring; @@ -155,15 +155,15 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create() return NULL; } - eth_llc_data->dum = shm_du_map_open(); - if (eth_llc_data->dum == NULL) { + eth_llc_data->rdrb = shm_rdrbuff_open(); + if (eth_llc_data->rdrb == NULL) { free(eth_llc_data); return NULL; } eth_llc_data->rb = shm_ap_rbuff_create(); if (eth_llc_data->rb == NULL) { - shm_du_map_close(eth_llc_data->dum); + shm_rdrbuff_close(eth_llc_data->rdrb); free(eth_llc_data); return NULL; } @@ -171,7 +171,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create() eth_llc_data->indices = bmp_create(AP_MAX_FLOWS, 0); if (eth_llc_data->indices == NULL) { shm_ap_rbuff_destroy(eth_llc_data->rb); - shm_du_map_close(eth_llc_data->dum); + shm_rdrbuff_close(eth_llc_data->rdrb); free(eth_llc_data); return NULL; } @@ -180,7 +180,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create() if (eth_llc_data->indices == NULL) { bmp_destroy(eth_llc_data->indices); shm_ap_rbuff_destroy(eth_llc_data->rb); - shm_du_map_close(eth_llc_data->dum); + shm_rdrbuff_close(eth_llc_data->rdrb); free(eth_llc_data); return NULL; } @@ -202,8 +202,12 @@ void eth_llc_ipcp_data_destroy() if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) LOG_WARN("Cleaning up while not in shutdown."); - if (shim_data(_ipcp)->dum != NULL) - shm_du_map_close_on_exit(shim_data(_ipcp)->dum); + /* remove all remaining sdus */ + while ((i = shm_ap_rbuff_peek_idx(shim_data(_ipcp)->rb)) >= 0) + shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, i); + + if (shim_data(_ipcp)->rdrb != NULL) + shm_rdrbuff_close(shim_data(_ipcp)->rdrb); if (shim_data(_ipcp)->rb != NULL) shm_ap_rbuff_destroy(shim_data(_ipcp)->rb); if (shim_data(_ipcp)->indices != NULL) @@ -332,7 +336,7 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE], #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) header = (void *) shim_data(_ipcp)->tx_ring + - (shim_data(_ipcp)->tx_offset * SHM_DU_BUFF_BLOCK_SIZE); + (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE); while (header->tp_status != TP_STATUS_AVAILABLE) { pfd.fd = fd; @@ -345,7 +349,7 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE], } header = (void *) shim_data(_ipcp)->tx_ring + - (shim_data(_ipcp)->tx_offset * SHM_DU_BUFF_BLOCK_SIZE); + (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE); } frame = (void *) header + TPACKET_HDRLEN - sizeof(struct sockaddr_ll); @@ -671,7 +675,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) while (true) { #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) header = (void *) shim_data(_ipcp)->rx_ring + - (offset * SHM_DU_BUFF_BLOCK_SIZE); + (offset * SHM_RDRB_BLOCK_SIZE); while (!(header->tp_status & TP_STATUS_USER)) { pfd.fd = shim_data(_ipcp)->s_fd; pfd.revents = 0; @@ -683,7 +687,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) } header = (void *) shim_data(_ipcp)->rx_ring + - (offset * SHM_DU_BUFF_BLOCK_SIZE); + (offset * SHM_RDRB_BLOCK_SIZE); } buf = (void * ) header + header->tp_mac; @@ -740,7 +744,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o) } while ((index = - shm_du_map_write(shim_data(_ipcp)->dum, + shm_rdrbuff_write(shim_data(_ipcp)->rdrb, ipcp_flow(i)->api, 0, 0, @@ -782,8 +786,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o) pthread_rwlock_rdlock(&_ipcp->state_lock); - len = shm_du_map_read((uint8_t **) &buf, - shim_data(_ipcp)->dum, + len = shm_rdrbuff_read((uint8_t **) &buf, + shim_data(_ipcp)->rdrb, e->index); if (len <= 0) { free(e); @@ -808,8 +812,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o) pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - if (shim_data(_ipcp)->dum != NULL) - shm_du_map_remove(shim_data(_ipcp)->dum, e->index); + if (shim_data(_ipcp)->rdrb != NULL) + shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, e->index); pthread_rwlock_unlock(&_ipcp->state_lock); @@ -849,7 +853,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) { int fd = -1; struct ifreq ifr; - int index; + int idx; #ifdef __FreeBSD__ struct ifaddrs * ifaddr; struct ifaddrs * ifa; @@ -892,7 +896,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) return -1; } - for (ifa = ifaddr, index = 0; ifa != NULL; ifa = ifa->ifa_next, ++index) { + for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) { if (strcmp(ifa->ifa_name, conf->if_name)) continue; LOG_DBGF("Interface %s found.", conf->if_name); @@ -916,8 +920,8 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) return -1; } - index = if_nametoindex(conf->if_name); - if (index == 0) { + idx = if_nametoindex(conf->if_name); + if (idx == 0) { LOG_ERR("Failed to retrieve interface index."); return -1; } @@ -927,7 +931,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) memset(&(device), 0, sizeof(device)); #ifdef __FreeBSD__ - device.sdl_index = index; + device.sdl_index = idx; device.sdl_family = AF_LINK; memcpy(LLADDR(&device), ifr.ifr_addr.sa_data, @@ -937,7 +941,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) LOG_MISSING; fd = socket(AF_LINK, SOCK_RAW, 0); #else - device.sll_ifindex = index; + device.sll_ifindex = idx; device.sll_family = AF_PACKET; memcpy(device.sll_addr, ifr.ifr_hwaddr.sa_data, @@ -953,14 +957,14 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) } #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_DU_BUFF_BLOCK_SIZE) { + if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_RDRB_BLOCK_SIZE) { LOG_ERR("Max SDU size is bigger than DU map block size."); close(fd); return -1; } - req.tp_block_size = SHM_DU_BUFF_BLOCK_SIZE; - req.tp_frame_size = SHM_DU_BUFF_BLOCK_SIZE; + req.tp_block_size = SHM_RDRB_BLOCK_SIZE; + req.tp_frame_size = SHM_RDRB_BLOCK_SIZE; req.tp_block_nr = SHM_BUFFER_SIZE; req.tp_frame_nr = SHM_BUFFER_SIZE; @@ -987,7 +991,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) shim_data(_ipcp)->rx_ring = mmap(NULL, - 2 * SHM_DU_BUFF_BLOCK_SIZE + 2 * SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); @@ -997,10 +1001,9 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) return -1; } shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring - + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BUFFER_SIZE); + + (SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE); #endif - pthread_rwlock_wrlock(&_ipcp->state_lock); if (ipcp_get_state(_ipcp) != IPCP_INIT) { diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 3f9b20f1..451a2a4c 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,7 +24,7 @@ #include "ipcp.h" #include "flow.h" #include "shim_udp_config.h" -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> @@ -87,7 +87,7 @@ struct ipcp * _ipcp; /* the shim needs access to these internals */ struct shim_ap_data { pid_t api; - struct shm_du_map * dum; + struct shm_rdrbuff * rdrb; struct bmp * fds; struct shm_ap_rbuff * rb; @@ -121,8 +121,8 @@ static int shim_ap_init() return -1; } - _ap_instance->dum = shm_du_map_open(); - if (_ap_instance->dum == NULL) { + _ap_instance->rdrb = shm_rdrbuff_open(); + if (_ap_instance->rdrb == NULL) { bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -130,7 +130,7 @@ static int shim_ap_init() _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { - shm_du_map_close(_ap_instance->dum); + shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -163,8 +163,13 @@ void shim_ap_fini() if (_ap_instance->fds != NULL) bmp_destroy(_ap_instance->fds); - if (_ap_instance->dum != NULL) - shm_du_map_close_on_exit(_ap_instance->dum); + + /* remove all remaining sdus */ + while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) + shm_rdrbuff_remove(_ap_instance->rdrb, i); + + if (_ap_instance->rdrb != NULL) + shm_rdrbuff_close(_ap_instance->rdrb); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); @@ -202,7 +207,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) pthread_rwlock_rdlock(&_ipcp->state_lock); pthread_rwlock_rdlock(&_ap_instance->flows_lock); - index = shm_du_map_write_b(_ap_instance->dum, + index = shm_rdrbuff_write_b(_ap_instance->rdrb, _ap_instance->flows[fd].api, 0, 0, @@ -745,8 +750,8 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_rdlock(&_ipcp->state_lock); - len = shm_du_map_read((uint8_t **) &buf, - _ap_instance->dum, + len = shm_rdrbuff_read((uint8_t **) &buf, + _ap_instance->rdrb, e->index); if (len <= 0) { pthread_rwlock_unlock(&_ipcp->state_lock); @@ -771,8 +776,8 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_rdlock(&_ipcp->state_lock); - if (_ap_instance->dum != NULL) - shm_du_map_remove(_ap_instance->dum, e->index); + if (_ap_instance->rdrb != NULL) + shm_rdrbuff_remove(_ap_instance->rdrb, e->index); pthread_rwlock_unlock(&_ipcp->state_lock); diff --git a/src/irmd/main.c b/src/irmd/main.c index cd939360..29f6d9d0 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,7 +34,7 @@ #include <ouroboros/irm_config.h> #include <ouroboros/lockfile.h> #include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/bitmap.h> #include <ouroboros/flow.h> #include <ouroboros/qos.h> @@ -75,31 +75,31 @@ enum irm_state { }; struct irm { - struct list_head registry; + struct list_head registry; - struct list_head ipcps; + struct list_head ipcps; - struct list_head api_table; - struct list_head apn_table; - struct list_head spawned_apis; - pthread_rwlock_t reg_lock; + struct list_head api_table; + struct list_head apn_table; + struct list_head spawned_apis; + pthread_rwlock_t reg_lock; /* keep track of all flows in this processing system */ - struct bmp * port_ids; + struct bmp * port_ids; /* maps port_ids to api pair */ - struct list_head irm_flows; - pthread_rwlock_t flows_lock; + struct list_head irm_flows; + pthread_rwlock_t flows_lock; - struct lockfile * lf; - struct shm_du_map * dum; - pthread_t * threadpool; - int sockfd; + struct lockfile * lf; + struct shm_rdrbuff * rdrb; + pthread_t * threadpool; + int sockfd; - enum irm_state state; - pthread_rwlock_t state_lock; + enum irm_state state; + pthread_rwlock_t state_lock; - pthread_t irm_sanitize; - pthread_t shm_sanitize; + pthread_t irm_sanitize; + pthread_t shm_sanitize; } * irmd = NULL; static struct irm_flow * get_irm_flow(int port_id) @@ -1604,8 +1604,8 @@ static void irm_destroy() pthread_rwlock_unlock(&irmd->flows_lock); - if (irmd->dum != NULL) - shm_du_map_destroy(irmd->dum); + if (irmd->rdrb != NULL) + shm_rdrbuff_destroy(irmd->rdrb); if (irmd->lf != NULL) lockfile_destroy(irmd->lf); @@ -2072,7 +2072,8 @@ static int irm_create() if (kill(lockfile_owner(irmd->lf), 0) < 0) { LOG_INFO("IRMd didn't properly shut down last time."); - shm_du_map_destroy(shm_du_map_open()); + /* FIXME: do this for each QOS_CUBE in the system */ + shm_rdrbuff_destroy(shm_rdrbuff_open(QOS_CUBE_BE)); LOG_INFO("Stale resources cleaned"); lockfile_destroy(irmd->lf); irmd->lf = lockfile_create(); @@ -2090,7 +2091,8 @@ static int irm_create() return -1; } - if ((irmd->dum = shm_du_map_create()) == NULL) { + /* FIXME: create an rdrb for each QOS_CUBE in the system */ + if ((irmd->rdrb = shm_rdrbuff_create(QOS_CUBE_BE)) == NULL) { irm_destroy(); return -1; } @@ -2201,7 +2203,7 @@ int main(int argc, char ** argv) pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, - shm_du_map_sanitize, irmd->dum); + shm_rdrbuff_sanitize, irmd->rdrb); /* wait for (all of them) to return */ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5e16c7e2..8c058dd8 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -33,7 +33,7 @@ set(SOURCE_FILES logs.c nsm.c shm_ap_rbuff.c - shm_du_map.c + shm_rdrbuff.c sockets.c time_utils.c utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 3a5fc8e0..17c473ed 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,7 +25,7 @@ #include <ouroboros/dev.h> #include <ouroboros/sockets.h> #include <ouroboros/bitmap.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/utils.h> @@ -45,7 +45,7 @@ struct flow { struct ap_data { char * ap_name; pid_t api; - struct shm_du_map * dum; + struct shm_rdrbuff * rdrb; struct bmp * fds; struct shm_ap_rbuff * rb; pthread_rwlock_t data_lock; @@ -105,8 +105,8 @@ int ap_init(char * ap_name) return -ENOMEM; } - _ap_instance->dum = shm_du_map_open(); - if (_ap_instance->dum == NULL) { + _ap_instance->rdrb = shm_rdrbuff_open(); + if (_ap_instance->rdrb == NULL) { bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -114,7 +114,7 @@ int ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { - shm_du_map_close(_ap_instance->dum); + shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -146,12 +146,16 @@ void ap_fini(void) pthread_rwlock_wrlock(&_ap_instance->data_lock); + /* remove all remaining sdus */ + while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) + shm_rdrbuff_remove(_ap_instance->rdrb, i); + if (_ap_instance->fds != NULL) bmp_destroy(_ap_instance->fds); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); - if (_ap_instance->dum != NULL) - shm_du_map_close_on_exit(_ap_instance->dum); + if (_ap_instance->rdrb != NULL) + shm_rdrbuff_close(_ap_instance->rdrb); pthread_rwlock_rdlock(&_ap_instance->flows_lock); @@ -515,7 +519,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_du_map_write(_ap_instance->dum, + idx = shm_rdrbuff_write(_ap_instance->rdrb, _ap_instance->flows[fd].api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, @@ -531,18 +535,18 @@ ssize_t flow_write(int fd, void * buf, size_t count) e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_du_map_remove(_ap_instance->dum, idx); + shm_rdrbuff_remove(_ap_instance->rdrb, idx); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; } } else { /* blocking */ - struct shm_du_map * dum = _ap_instance->dum; - pid_t api = _ap_instance->flows[fd].api; + struct shm_rdrbuff * rdrb = _ap_instance->rdrb; + pid_t api = _ap_instance->flows[fd].api; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); - idx = shm_du_map_write_b(dum, + idx = shm_rdrbuff_write_b(rdrb, api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, @@ -567,7 +571,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) int flow_select(const struct timespec * timeout) { - int port_id = shm_ap_rbuff_peek(_ap_instance->rb, timeout); + int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout); if (port_id < 0) return port_id; return port_id_to_fd(port_id); @@ -612,7 +616,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) return -EAGAIN; } - n = shm_du_map_read(&sdu, _ap_instance->dum, idx); + n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx); if (n < 0) { pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; @@ -620,7 +624,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) memcpy(buf, sdu, MIN(n, count)); - shm_du_map_remove(_ap_instance->dum, idx); + shm_rdrbuff_remove(_ap_instance->rdrb, idx); pthread_rwlock_unlock(&_ap_instance->data_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 4ca29636..f21b1e86 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -285,8 +285,32 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) return 0; } -int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, - const struct timespec * timeout) +int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) +{ + int ret = 0; + + if (rb == NULL) + return -EINVAL; + + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } + + if (shm_rbuff_empty(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + ret = (rb->shm_base + *rb->ptr_tail)->index; + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + const struct timespec * timeout) { struct timespec abstime; int ret = 0; diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c deleted file mode 100644 index 9ca282b9..00000000 --- a/src/lib/shm_du_map.c +++ /dev/null @@ -1,767 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Shared memory map for data units - * - * Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * Sander Vrijders <sander.vrijders@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/time_utils.h> - -#include <pthread.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <unistd.h> -#include <stdlib.h> -#include <string.h> -#include <signal.h> -#include <sys/stat.h> - -#define OUROBOROS_PREFIX "shm_du_map" - -#include <ouroboros/logs.h> - -#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ - + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ - + sizeof(pid_t)) - -#define get_head_ptr(dum) \ -((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_head * \ - SHM_DU_BUFF_BLOCK_SIZE))) - -#define get_tail_ptr(dum) \ -((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \ - SHM_DU_BUFF_BLOCK_SIZE))) - -#define idx_to_du_buff_ptr(dum, idx) \ - ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) - -#define block_ptr_to_idx(dum, sdb) \ - (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) - -#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\ - & (SHM_BUFFER_SIZE - 1)) -#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE) - -#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) - -struct shm_du_buff { - size_t size; -#ifdef SHM_DU_MAP_MULTI_BLOCK - size_t blocks; -#endif - size_t du_head; - size_t du_tail; - pid_t dst_api; -}; - -struct shm_du_map { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - size_t * choked; /* stale sdu detection */ - pthread_cond_t * healthy; /* du map is healthy */ - pthread_cond_t * full; /* run sanitizer when buffer full */ - pid_t * api; /* api of the irmd owner */ - int fd; -}; - -static void garbage_collect(struct shm_du_map * dum) -{ -#ifdef SHM_DU_MAP_MULTI_BLOCK - struct shm_du_buff * sdb; - while (!shm_map_empty(dum) && (sdb = get_tail_ptr(dum))->dst_api == -1) - *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) - & (SHM_BUFFER_SIZE - 1); -#else - while (!shm_map_empty(dum) && get_tail_ptr(dum)->dst_api == -1) - *dum->ptr_tail = - (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); - -#endif -} - -static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit) -{ - size_t idx = *dum->ptr_tail; - struct shm_du_buff * buf; - - while (idx != *dum->ptr_head) { - buf = idx_to_du_buff_ptr(dum, idx); - if (buf->dst_api == api) - buf->dst_api = -1; -#ifdef SHM_DU_MAP_MULTI_BLOCK - idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); -#else - idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); -#endif - } - - garbage_collect(dum); - - if (!exit && kill(api, 0) == 0) { - struct shm_ap_rbuff * rb; - rb = shm_ap_rbuff_open(api); - if (rb != NULL) { - shm_ap_rbuff_reset(rb); - shm_ap_rbuff_close(rb); - } - } - - *dum->choked = 0; -} - -struct shm_du_map * shm_du_map_create() -{ - struct shm_du_map * dum; - int shm_fd; - uint8_t * shm_base; - pthread_mutexattr_t mattr; - pthread_condattr_t cattr; - - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_CREAT | O_EXCL | O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBGF("Failed creating shared memory map."); - free(dum); - return NULL; - } - - if (fchmod(shm_fd, 0666)) { - LOG_DBGF("Failed to chmod shared memory map."); - free(dum); - return NULL; - } - - if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { - LOG_DBGF("Failed to extend shared memory map."); - free(dum); - return NULL; - } - - if (write(shm_fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise extension of shared memory map."); - free(dum); - return NULL; - } - - shm_base = mmap(NULL, - SHM_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBGF("Failed to map shared memory."); - if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) - LOG_DBGF("Failed to remove invalid shm."); - - free(dum); - return NULL; - } - - dum->shm_base = shm_base; - dum->ptr_head = (size_t *) - ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); - dum->ptr_tail = dum->ptr_head + 1; - dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->lock + 1); - dum->healthy = (pthread_cond_t *) (dum->choked + 1); - dum->full = dum->healthy + 1; - dum->api = (pid_t *) (dum->full + 1); - - pthread_mutexattr_init(&mattr); - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); - pthread_mutex_init(dum->lock, &mattr); - - pthread_condattr_init(&cattr); - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(dum->full, &cattr); - pthread_cond_init(dum->healthy, &cattr); - - *dum->ptr_head = 0; - *dum->ptr_tail = 0; - - *dum->choked = 0; - - *dum->api = getpid(); - - dum->fd = shm_fd; - - return dum; -} - -struct shm_du_map * shm_du_map_open() -{ - struct shm_du_map * dum; - int shm_fd; - uint8_t * shm_base; - - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); - if (shm_fd < 0) { - LOG_DBGF("Failed opening shared memory."); - free(dum); - return NULL; - } - - shm_base = mmap(NULL, - SHM_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - if (shm_base == MAP_FAILED) { - LOG_DBGF("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBGF("Failed to close invalid shm."); - if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) - LOG_DBGF("Failed to unlink invalid shm."); - free(dum); - return NULL; - } - - dum->shm_base = shm_base; - dum->ptr_head = (size_t *) - ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); - dum->ptr_tail = dum->ptr_head + 1; - dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->lock + 1); - dum->healthy = (pthread_cond_t *) (dum->choked + 1); - dum->full = dum->healthy + 1; - dum->api = (pid_t *) (dum->full + 1); - - dum->fd = shm_fd; - - return dum; -} - -void * shm_du_map_sanitize(void * o) -{ - struct shm_du_map * dum = (struct shm_du_map *) o; - struct timespec intv - = {SHM_DU_TIMEOUT_MICROS / MILLION, - (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; - - pid_t api; - - if (dum == NULL) - return (void *) -1; - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - (void *) dum->lock); - - while (true) { - int ret = 0; - struct timespec now; - struct timespec dl; - - if (pthread_cond_wait(dum->full, dum->lock) == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - *dum->choked = 1; - - garbage_collect(dum); - - if (shm_map_empty(dum)) - continue; - - api = get_tail_ptr(dum)->dst_api; - - if (kill(api, 0)) { - LOG_DBGF("Dead process %d left stale sdu.", api); - clean_sdus(dum, api, false); - continue; - } - - clock_gettime(CLOCK_REALTIME, &now); - ts_add(&now, &intv, &dl); - while (*dum->choked) { - ret = pthread_cond_timedwait(dum->healthy, - dum->lock, - &dl); - if (!ret) - continue; - - if (ret == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - if (ret == ETIMEDOUT) { - LOG_DBGF("SDU timed out (dst: %d).", api); - clean_sdus(dum, api, false); - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -void shm_du_map_close_on_exit(struct shm_du_map * dum) -{ - if (dum == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } - - clean_sdus(dum, getpid(), true); - - if (close(dum->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - - if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) - LOG_DBGF("Couldn't unmap shared memory."); - - free(dum); -} - -void shm_du_map_close(struct shm_du_map * dum) -{ - if (dum == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } - - if (close(dum->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - - if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) - LOG_DBGF("Couldn't unmap shared memory."); - - free(dum); -} - -void shm_du_map_destroy(struct shm_du_map * dum) -{ - if (dum == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } - - if (getpid() != *dum->api && kill(*dum->api, 0) == 0) { - LOG_DBGF("Only IRMd can destroy %s.", SHM_DU_MAP_FILENAME); - return; - } - - if (close(dum->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - - if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) - LOG_DBGF("Couldn't unmap shared memory."); - - if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) - LOG_DBGF("Failed to unlink shm."); - - free(dum); -} - -ssize_t shm_du_map_write(struct shm_du_map * dum, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t len) -{ - struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; -#ifdef SHM_DU_MAP_MULTI_BLOCK - long blocks = 0; - long padblocks = 0; -#endif - int sz = size + sizeof *sdb; - uint8_t * write_pos; - ssize_t idx = -1; - - if (dum == NULL || data == NULL) { - LOG_DBGF("Bogus input, bugging out."); - return -1; - } - -#ifndef SHM_DU_MAP_MULTI_BLOCK - if (sz > SHM_DU_BUFF_BLOCK_SIZE) { - LOG_DBGF("Multi-block SDU's disabled. Dropping."); - return -1; - } -#endif - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } -#ifdef SHM_DU_MAP_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } - - if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE) - padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; - - if (!shm_map_free(dum, (blocks + padblocks))) { -#else - if (!shm_map_free(dum, 1)) { -#endif - pthread_cond_signal(dum->full); - pthread_mutex_unlock(dum->lock); - return -1; - } - -#ifdef SHM_DU_MAP_MULTI_BLOCK - if (padblocks) { - sdb = get_head_ptr(dum); - sdb->size = 0; - sdb->blocks = padblocks; - sdb->dst_api = -1; - sdb->du_head = 0; - sdb->du_tail = 0; - - *dum->ptr_head = 0; - } -#endif - sdb = get_head_ptr(dum); - sdb->size = size; - sdb->dst_api = dst_api; - sdb->du_head = headspace; - sdb->du_tail = sdb->du_head + len; -#ifdef SHM_DU_MAP_MULTI_BLOCK - sdb->blocks = blocks; -#endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); - - idx = *dum->ptr_head; -#ifdef SHM_DU_MAP_MULTI_BLOCK - *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); -#else - *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); -#endif - pthread_mutex_unlock(dum->lock); - - return idx; -} - -ssize_t shm_du_map_write_b(struct shm_du_map * dum, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t len) -{ - struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; -#ifdef SHM_DU_MAP_MULTI_BLOCK - long blocks = 0; - long padblocks = 0; -#endif - int sz = size + sizeof *sdb; - uint8_t * write_pos; - ssize_t idx = -1; - - if (dum == NULL || data == NULL) { - LOG_DBGF("Bogus input, bugging out."); - return -1; - } - -#ifndef SHM_DU_MAP_MULTI_BLOCK - if (sz > SHM_DU_BUFF_BLOCK_SIZE) { - LOG_DBGF("Multi-block SDU's disabled. Dropping."); - return -1; - } -#endif - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) dum->lock); - -#ifdef SHM_DU_MAP_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } - - if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE) - padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; - - while (!shm_map_free(dum, (blocks + padblocks))) { -#else - while (!shm_map_free(dum, 1)) { -#endif - pthread_cond_signal(dum->full); - pthread_cond_wait(dum->healthy, dum->lock); - } - -#ifdef SHM_DU_MAP_MULTI_BLOCK - if (padblocks) { - sdb = get_head_ptr(dum); - sdb->size = 0; - sdb->blocks = padblocks; - sdb->dst_api = -1; - sdb->du_head = 0; - sdb->du_tail = 0; - - *dum->ptr_head = 0; - } -#endif - sdb = get_head_ptr(dum); - sdb->size = size; - sdb->dst_api = dst_api; - sdb->du_head = headspace; - sdb->du_tail = sdb->du_head + len; -#ifdef SHM_DU_MAP_MULTI_BLOCK - sdb->blocks = blocks; -#endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); - - idx = *dum->ptr_head; -#ifdef SHM_DU_MAP_MULTI_BLOCK - *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); -#else - *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); -#endif - pthread_cleanup_pop(true); - - return idx; -} - -int shm_du_map_read(uint8_t ** dst, - struct shm_du_map * dum, - ssize_t idx) -{ - size_t len = 0; - struct shm_du_buff * sdb; - - if (idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->lock); - return -1; - } - - sdb = idx_to_du_buff_ptr(dum, idx); - len = sdb->du_tail - sdb->du_head; - *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; - - pthread_mutex_unlock(dum->lock); - - return len; -} - -int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) -{ - if (idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->lock); - return -1; - } - - idx_to_du_buff_ptr(dum, idx)->dst_api = -1; - - if (idx != *dum->ptr_tail) { - pthread_mutex_unlock(dum->lock); - return 0; - } - - garbage_collect(dum); - - *dum->choked = 0; - - pthread_cond_broadcast(dum->healthy); - - pthread_mutex_unlock(dum->lock); - - return 0; -} - -uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - uint8_t * buf; - - if (dum == NULL) - return NULL; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return NULL; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if ((long) (sdb->du_head - size) < 0) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Failed to allocate PCI headspace."); - return NULL; - } - - sdb->du_head -= size; - - buf = (uint8_t *) (sdb + 1) + sdb->du_head; - - pthread_mutex_unlock(dum->lock); - - return buf; -} - -uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - uint8_t * buf; - - if (dum == NULL) - return NULL; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return NULL; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if (sdb->du_tail + size >= sdb->size) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Failed to allocate PCI tailspace."); - return NULL; - } - - buf = (uint8_t *) (sdb + 1) + sdb->du_tail; - - sdb->du_tail += size; - - pthread_mutex_unlock(dum->lock); - - return buf; -} - -int shm_du_buff_head_release(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - - if (dum == NULL) - return -1; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Tried to release beyond sdu boundary."); - return -EOVERFLOW; - } - - sdb->du_head += size; - - pthread_mutex_unlock(dum->lock); - - return 0; -} - -int shm_du_buff_tail_release(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - - if (dum == NULL) - return -1; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Tried to release beyond sdu boundary."); - return -EOVERFLOW; - } - - sdb->du_tail -= size; - - pthread_mutex_unlock(dum->lock); - - return 0; -} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c new file mode 100644 index 00000000..d42dbea7 --- /dev/null +++ b/src/lib/shm_rdrbuff.c @@ -0,0 +1,804 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Random Deletion Ring Buffer for Data Units + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <sys/stat.h> + +#define OUROBOROS_PREFIX "shm_rdrbuff" + +#include <ouroboros/logs.h> + +#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_RDRB_BLOCK_SIZE) +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + + sizeof(pid_t)) + +#define get_head_ptr(rdrb) \ + ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_head * \ + SHM_RDRB_BLOCK_SIZE))) + +#define get_tail_ptr(rdrb) \ + ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_tail * \ + SHM_RDRB_BLOCK_SIZE))) + +#define idx_to_du_buff_ptr(rdrb, idx) \ + ((struct shm_du_buff *)(rdrb->shm_base + (idx * SHM_RDRB_BLOCK_SIZE))) + +#define block_ptr_to_idx(rdrb, sdb) \ + (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE) + +#define shm_rdrb_used(rdrb) \ + ((*rdrb->ptr_head + SHM_BUFFER_SIZE - *rdrb->ptr_tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rdrb_free(rdrb, i) \ + (shm_rdrb_used(rdrb) + i < SHM_BUFFER_SIZE) + +#define shm_rdrb_empty(rdrb) \ + (*rdrb->ptr_tail == *rdrb->ptr_head) + +struct shm_du_buff { + size_t size; +#ifdef SHM_DU_MAP_MULTI_BLOCK + size_t blocks; +#endif + size_t du_head; + size_t du_tail; + pid_t dst_api; +}; + +struct shm_rdrbuff { + uint8_t * shm_base; /* start of blocks */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * lock; /* lock all free space in shm */ + size_t * choked; /* stale sdu detection */ + pthread_cond_t * healthy; /* du map is healthy */ + pthread_cond_t * full; /* run sanitizer when buffer full */ + pid_t * api; /* api of the irmd owner */ + enum qos_cube qos; /* qos id which this buffer serves */ + int fd; +}; + +static void garbage_collect(struct shm_rdrbuff * rdrb) +{ +#ifdef SHM_RDRBUFF_MULTI_BLOCK + struct shm_du_buff * sdb; + while (!shm_rdrb_empty(rdrb) && + (sdb = get_tail_ptr(rdrb))->dst_api == -1) + *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks) + & (SHM_BUFFER_SIZE - 1); +#else + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1) + *rdrb->ptr_tail = + (*rdrb->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); + +#endif +} + +static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api) +{ + size_t idx = *rdrb->ptr_tail; + struct shm_du_buff * buf; + + while (idx != *rdrb->ptr_head) { + buf = idx_to_du_buff_ptr(rdrb, idx); + if (buf->dst_api == api) + buf->dst_api = -1; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); +#else + idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); +#endif + } + + garbage_collect(rdrb); + + *rdrb->choked = 0; +} + +static char * rdrb_filename(enum qos_cube qos) +{ + int chars = 0; + char * str; + int qm = QOS_MAX; + + do { + qm /= 10; + ++chars; + } while (qm > 0); + + str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2); + if (str == NULL) { + LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); + return NULL; + } + + sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos); + + return str; +} + +/* FIXME: create a ringbuffer for each qos cube in the system */ +struct shm_rdrbuff * shm_rdrbuff_create() +{ + struct shm_rdrbuff * rdrb; + int shm_fd; + uint8_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + enum qos_cube qos = QOS_CUBE_BE; + char * shm_rdrb_fn = rdrb_filename(qos); + if (shm_rdrb_fn == NULL) { + LOG_ERR("Could not create rdrbuff. Out of Memory"); + return NULL; + } + + rdrb = malloc(sizeof *rdrb); + if (rdrb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed creating shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + if (fchmod(shm_fd, 0666)) { + LOG_DBGF("Failed to chmod shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { + LOG_DBGF("Failed to extend shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + if (write(shm_fd, "", 1) != 1) { + LOG_DBGF("Failed to finalise extension of shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (shm_unlink(shm_rdrb_fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + rdrb->shm_base = shm_base; + rdrb->ptr_head = (size_t *) + ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->ptr_tail = rdrb->ptr_head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); + rdrb->choked = (size_t *) (rdrb->lock + 1); + rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); + rdrb->full = rdrb->healthy + 1; + rdrb->api = (pid_t *) (rdrb->full + 1); + + pthread_mutexattr_init(&mattr); + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); + pthread_mutex_init(rdrb->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); + pthread_cond_init(rdrb->full, &cattr); + pthread_cond_init(rdrb->healthy, &cattr); + + *rdrb->ptr_head = 0; + *rdrb->ptr_tail = 0; + + *rdrb->choked = 0; + + *rdrb->api = getpid(); + + rdrb->qos = qos; + rdrb->fd = shm_fd; + + free(shm_rdrb_fn); + + return rdrb; +} + +/* FIXME: open a ringbuffer for each qos cube in the system */ +struct shm_rdrbuff * shm_rdrbuff_open() +{ + struct shm_rdrbuff * rdrb; + int shm_fd; + uint8_t * shm_base; + + enum qos_cube qos = QOS_CUBE_BE; + char * shm_rdrb_fn = rdrb_filename(qos); + if (shm_rdrb_fn == NULL) { + LOG_ERR("Could not create rdrbuff. Out of Memory"); + return NULL; + } + + rdrb = malloc(sizeof *rdrb); + if (rdrb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(shm_rdrb_fn, O_RDWR, 0666); + if (shm_fd < 0) { + LOG_DBGF("Failed opening shared memory."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBG("Failed to close invalid shm."); + if (shm_unlink(shm_rdrb_fn) == -1) + LOG_DBG("Failed to unlink invalid shm."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + rdrb->shm_base = shm_base; + rdrb->ptr_head = (size_t *) + ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->ptr_tail = rdrb->ptr_head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); + rdrb->choked = (size_t *) (rdrb->lock + 1); + rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); + rdrb->full = rdrb->healthy + 1; + rdrb->api = (pid_t *) (rdrb->full + 1); + + rdrb->qos = qos; + rdrb->fd = shm_fd; + + free(shm_rdrb_fn); + + return rdrb; +} + +void * shm_rdrbuff_sanitize(void * o) +{ + struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o; + struct timespec intv + = {SHM_DU_TIMEOUT_MICROS / MILLION, + (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + + pid_t api; + + if (rdrb == NULL) + return (void *) -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + (void *) rdrb->lock); + + while (true) { + int ret = 0; + struct timespec now; + struct timespec dl; + + if (pthread_cond_wait(rdrb->full, rdrb->lock) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + *rdrb->choked = 1; + + garbage_collect(rdrb); + + if (shm_rdrb_empty(rdrb)) + continue; + + api = get_tail_ptr(rdrb)->dst_api; + + if (kill(api, 0)) { + LOG_DBGF("Dead process %d left stale sdu.", api); + clean_sdus(rdrb, api); + continue; + } + + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &dl); + while (*rdrb->choked) { + ret = pthread_cond_timedwait(rdrb->healthy, + rdrb->lock, + &dl); + if (!ret) + continue; + + if (ret == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + if (ret == ETIMEDOUT) { + LOG_DBGF("SDU timed out (dst: %d).", api); + clean_sdus(rdrb, api); + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) +{ + if (rdrb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (close(rdrb->fd) < 0) + LOG_DBGF("Couldn't close shared memory."); + + if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(rdrb); +} + +void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) +{ + char * shm_rdrb_fn; + + if (rdrb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (getpid() != *rdrb->api && kill(*rdrb->api, 0) == 0) { + LOG_DBG("Process %d tried to destroy active rdrb.", getpid()); + return; + } + + if (close(rdrb->fd) < 0) + LOG_DBG("Couldn't close shared memory."); + + if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + shm_rdrb_fn = rdrb_filename(rdrb->qos); + if (shm_rdrb_fn == NULL) { + LOG_ERR("Could not create rdrbuff. Out of Memory"); + return; + } + + if (shm_unlink(shm_rdrb_fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(rdrb); + free(shm_rdrb_fn); +} + +ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) +{ + struct shm_du_buff * sdb; + size_t size = headspace + len + tailspace; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; +#endif + int sz = size + sizeof *sdb; + uint8_t * write_pos; + ssize_t idx = -1; + + if (rdrb == NULL || data == NULL) { + LOG_DBGF("Bogus input, bugging out."); + return -1; + } + +#ifndef SHM_RDRBUFF_MULTI_BLOCK + if (sz > SHM_RDRB_BLOCK_SIZE) { + LOG_DBGF("Multi-block SDU's disabled. Dropping."); + return -1; + } +#endif + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } +#ifdef SHM_RDRBUFF_MULTI_BLOCK + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } + + if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE) + padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head; + + if (!shm_rdrb_free(rdrb, (blocks + padblocks))) { +#else + if (!shm_rdrb_free(rdrb, 1)) { +#endif + pthread_cond_signal(rdrb->full); + pthread_mutex_unlock(rdrb->lock); + return -1; + } + +#ifdef SHM_RDRBUFF_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(rdrb); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = -1; + sdb->du_head = 0; + sdb->du_tail = 0; + + *rdrb->ptr_head = 0; + } +#endif + sdb = get_head_ptr(rdrb); + sdb->size = size; + sdb->dst_api = dst_api; + sdb->du_head = headspace; + sdb->du_tail = sdb->du_head + len; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + sdb->blocks = blocks; +#endif + write_pos = ((uint8_t *) (sdb + 1)) + headspace; + + memcpy(write_pos, data, len); + + idx = *rdrb->ptr_head; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else + *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif + pthread_mutex_unlock(rdrb->lock); + + return idx; +} + +ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) +{ + struct shm_du_buff * sdb; + size_t size = headspace + len + tailspace; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; +#endif + int sz = size + sizeof *sdb; + uint8_t * write_pos; + ssize_t idx = -1; + + if (rdrb == NULL || data == NULL) { + LOG_DBGF("Bogus input, bugging out."); + return -1; + } + +#ifndef SHM_RDRBUFF_MULTI_BLOCK + if (sz > SHM_RDRB_BLOCK_SIZE) { + LOG_DBGF("Multi-block SDU's disabled. Dropping."); + return -1; + } +#endif + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rdrb->lock); + +#ifdef SHM_RDRBUFF_MULTI_BLOCK + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } + + if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE) + padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head; + + while (!shm_rdrb_free(rdrb, (blocks + padblocks))) { +#else + while (!shm_rdrb_free(rdrb, 1)) { +#endif + pthread_cond_signal(rdrb->full); + pthread_cond_wait(rdrb->healthy, rdrb->lock); + } + +#ifdef SHM_RDRBUFF_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(rdrb); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = -1; + sdb->du_head = 0; + sdb->du_tail = 0; + + *rdrb->ptr_head = 0; + } +#endif + sdb = get_head_ptr(rdrb); + sdb->size = size; + sdb->dst_api = dst_api; + sdb->du_head = headspace; + sdb->du_tail = sdb->du_head + len; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + sdb->blocks = blocks; +#endif + write_pos = ((uint8_t *) (sdb + 1)) + headspace; + + memcpy(write_pos, data, len); + + idx = *rdrb->ptr_head; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else + *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif + pthread_cleanup_pop(true); + + return idx; +} + +int shm_rdrbuff_read(uint8_t ** dst, + struct shm_rdrbuff * rdrb, + ssize_t idx) +{ + size_t len = 0; + struct shm_du_buff * sdb; + + if (idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + if (shm_rdrb_empty(rdrb)) { + pthread_mutex_unlock(rdrb->lock); + return -1; + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + len = sdb->du_tail - sdb->du_head; + *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; + + pthread_mutex_unlock(rdrb->lock); + + return len; +} + +int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) +{ + if (idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + if (shm_rdrb_empty(rdrb)) { + pthread_mutex_unlock(rdrb->lock); + return -1; + } + + idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1; + + if (idx != *rdrb->ptr_tail) { + pthread_mutex_unlock(rdrb->lock); + return 0; + } + + garbage_collect(rdrb); + + *rdrb->choked = 0; + + pthread_cond_broadcast(rdrb->healthy); + + pthread_mutex_unlock(rdrb->lock); + + return 0; +} + +uint8_t * shm_du_buff_head_alloc(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + uint8_t * buf; + + if (rdrb == NULL) + return NULL; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return NULL; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if ((long) (sdb->du_head - size) < 0) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Failed to allocate PCI headspace."); + return NULL; + } + + sdb->du_head -= size; + + buf = (uint8_t *) (sdb + 1) + sdb->du_head; + + pthread_mutex_unlock(rdrb->lock); + + return buf; +} + +uint8_t * shm_du_buff_tail_alloc(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + uint8_t * buf; + + if (rdrb == NULL) + return NULL; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return NULL; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if (sdb->du_tail + size >= sdb->size) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Failed to allocate PCI tailspace."); + return NULL; + } + + buf = (uint8_t *) (sdb + 1) + sdb->du_tail; + + sdb->du_tail += size; + + pthread_mutex_unlock(rdrb->lock); + + return buf; +} + +int shm_du_buff_head_release(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + + if (rdrb == NULL) + return -1; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if (size > sdb->du_tail - sdb->du_head) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Tried to release beyond sdu boundary."); + return -EOVERFLOW; + } + + sdb->du_head += size; + + pthread_mutex_unlock(rdrb->lock); + + return 0; +} + +int shm_du_buff_tail_release(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + + if (rdrb == NULL) + return -1; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if (size > sdb->du_tail - sdb->du_head) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Tried to release beyond sdu boundary."); + return -EOVERFLOW; + } + + sdb->du_tail -= size; + + pthread_mutex_unlock(rdrb->lock); + + return 0; +} |