diff options
| -rw-r--r-- | include/ouroboros/config.h.in | 6 | ||||
| -rw-r--r-- | include/ouroboros/shared.h | 3 | ||||
| -rw-r--r-- | include/ouroboros/shm_ap_rbuff.h | 5 | ||||
| -rw-r--r-- | include/ouroboros/shm_du_map.h | 72 | ||||
| -rw-r--r-- | include/ouroboros/shm_rdrbuff.h | 84 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-data.c | 1 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 19 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 20 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 63 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 29 | ||||
| -rw-r--r-- | src/irmd/main.c | 48 | ||||
| -rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/lib/dev.c | 34 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 28 | ||||
| -rw-r--r-- | src/lib/shm_du_map.c | 767 | ||||
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 804 | 
16 files changed, 1042 insertions, 943 deletions
| diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 3a246235..143ae7c8 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -36,9 +36,9 @@  #define IPCP_NORMAL_EXEC       "@IPCP_NORMAL_TARGET@"  #define IPCP_LOCAL_EXEC        "@IPCP_LOCAL_TARGET@"  #define AP_MAX_FLOWS           256 -#define SHM_DU_BUFF_BLOCK_SIZE sysconf(_SC_PAGESIZE) -#define SHM_DU_MAP_MULTI_BLOCK -#define SHM_DU_MAP_FILENAME    "/ouroboros.shm" +#define SHM_RDRB_BLOCK_SIZE    sysconf(_SC_PAGESIZE) +#define SHM_RDRB_MULTI_BLOCK +#define SHM_RDRB_PREFIX        "/ouroboros.rdrb."  #define LOCKFILE_NAME          "/ouroboros.lockfile"  #define SHM_BUFFER_SIZE        (1 << 14)  #define DU_BUFF_HEADSPACE      128 diff --git a/include/ouroboros/shared.h b/include/ouroboros/shared.h index 9ee9df21..bfd99eb0 100644 --- a/include/ouroboros/shared.h +++ b/include/ouroboros/shared.h @@ -26,7 +26,8 @@  /* FIXME: To be decided which QoS cubes we support */  enum qos_cube {          QOS_CUBE_BE = 0, -        QOS_CUBE_VIDEO +        QOS_CUBE_VIDEO, +        QOS_MAX  };  enum flow_state { diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index a1949122..9dad0863 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -42,8 +42,9 @@ void                  shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);  int                   shm_ap_rbuff_write(struct shm_ap_rbuff * rb,                                           struct rb_entry * e);  struct rb_entry *     shm_ap_rbuff_read(struct shm_ap_rbuff * rb); -int                   shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, -                                        const struct timespec * timeout); +int                   shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb); +int                   shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, +                                          const struct timespec * timeout);  ssize_t               shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb,                                               int port_id);  ssize_t               shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h deleted file mode 100644 index 98013fc9..00000000 --- a/include/ouroboros/shm_du_map.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Shared memory map for data units - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_SHM_DU_MAP_H -#define OUROBOROS_SHM_DU_MAP_H - -#include <stdint.h> -#include <pthread.h> -#include <sys/types.h> - -struct shm_du_buff; -struct shm_du_map; - -struct shm_du_map * shm_du_map_create(); -struct shm_du_map * shm_du_map_open(); -void                shm_du_map_close(struct shm_du_map * dum); -void                shm_du_map_close_on_exit(struct shm_du_map * dum); -void                shm_du_map_destroy(struct shm_du_map * dum); -void *              shm_du_map_sanitize(void * o); - -/* returns the index of the buffer in the DU map */ -ssize_t   shm_du_map_write(struct shm_du_map * dum, -                           pid_t               dst_api, -                           size_t              headspace, -                           size_t              tailspace, -                           uint8_t *           data, -                           size_t              data_len); -ssize_t   shm_du_map_write_b(struct shm_du_map * dum, -                             pid_t               dst_api, -                             size_t              headspace, -                             size_t              tailspace, -                             uint8_t *           data, -                             size_t              data_len); -int       shm_du_map_read(uint8_t **          dst, -                          struct shm_du_map * dum, -                          ssize_t             idx); -int       shm_du_map_remove(struct shm_du_map  * dum, -                            ssize_t              idx); - -uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, -                                 int                 idx, -                                 ssize_t             size); -uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, -                                 int                 idx, -                                 ssize_t             size); -int       shm_du_buff_head_release(struct shm_du_map * dum, -                                   int                 idx, -                                   ssize_t             size); -int       shm_du_buff_tail_release(struct shm_du_map * dum, -                                   int                 idx, -                                   ssize_t             size); -#endif /* OUROBOROS_SHM_DU_MAP_H */ diff --git a/include/ouroboros/shm_rdrbuff.h b/include/ouroboros/shm_rdrbuff.h new file mode 100644 index 00000000..09256c56 --- /dev/null +++ b/include/ouroboros/shm_rdrbuff.h @@ -0,0 +1,84 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Random Deletion Ring Buffer for Data Units + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_RDRBUFF_H +#define OUROBOROS_SHM_RDRBUFF_H + +#include <ouroboros/shared.h> + +#include <stdint.h> +#include <pthread.h> +#include <sys/types.h> + +struct shm_du_buff; +struct shm_rdrbuff; + +struct shm_rdrbuff * shm_rdrbuff_create(); + +struct shm_rdrbuff * shm_rdrbuff_open(); + +void                 shm_rdrbuff_close(struct shm_rdrbuff * rdrb); + +void                 shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb); + +void *               shm_rdrbuff_sanitize(void * o); + +/* returns the index of the buffer in the DU map */ +ssize_t   shm_rdrbuff_write(struct shm_rdrbuff * rdrb, +                            pid_t                dst_api, +                            size_t               headspace, +                            size_t               tailspace, +                            uint8_t *            data, +                            size_t               data_len); + +ssize_t   shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, +                              pid_t                dst_api, +                              size_t               headspace, +                              size_t               tailspace, +                              uint8_t *            data, +                              size_t               data_len); + +int       shm_rdrbuff_read(uint8_t **           dst, +                           struct shm_rdrbuff * rdrb, +                           ssize_t              idx); + +int       shm_rdrbuff_remove(struct shm_rdrbuff  * rdrb, +                             ssize_t               idx); + + +uint8_t * shm_du_buff_head_alloc(struct shm_rdrbuff * rdrb, +                                 int                  idx, +                                 ssize_t              size); + +uint8_t * shm_du_buff_tail_alloc(struct shm_rdrbuff * rdrb, +                                 int                  idx, +                                 ssize_t              size); + +int       shm_du_buff_head_release(struct shm_rdrbuff * rdrb, +                                   int                  idx, +                                   ssize_t              size); + +int       shm_du_buff_tail_release(struct shm_rdrbuff * rdrb, +                                   int                  idx, +                                   ssize_t              size); +#endif /* OUROBOROS_SHM_RDRBUFF_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 593baeba..c4838d3a 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -22,7 +22,6 @@   */  #include <ouroboros/config.h> -#include <ouroboros/shm_du_map.h>  #include <ouroboros/list.h>  #define OUROBOROS_PREFIX "ipcp-utils" diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 547e7e28..4fa7e33f 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -24,7 +24,7 @@  #include "ipcp.h"  #include "flow.h"  #include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h> @@ -67,7 +67,7 @@ struct ipcp * _ipcp;  /* the shim needs access to these internals */  struct shim_ap_data {          pid_t                 api; -        struct shm_du_map *   dum; +        struct shm_rdrbuff *  rdrb;          struct bmp *          fds;          struct shm_ap_rbuff * rb; @@ -98,8 +98,8 @@ static int shim_ap_init()                  return -1;          } -        _ap_instance->dum = shm_du_map_open(); -        if (_ap_instance->dum == NULL) { +        _ap_instance->rdrb = shm_rdrbuff_open(); +        if (_ap_instance->rdrb == NULL) {                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1; @@ -107,7 +107,7 @@ static int shim_ap_init()          _ap_instance->rb = shm_ap_rbuff_create();          if (_ap_instance->rb == NULL) { -                shm_du_map_close(_ap_instance->dum); +                shm_rdrbuff_close(_ap_instance->rdrb);                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1; @@ -139,8 +139,13 @@ void shim_ap_fini()          if (_ap_instance->fds != NULL)                  bmp_destroy(_ap_instance->fds); -        if (_ap_instance->dum != NULL) -                shm_du_map_close_on_exit(_ap_instance->dum); + +        /* remove all remaining sdus */ +        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) +                shm_rdrbuff_remove(_ap_instance->rdrb, i); + +        if (_ap_instance->rdrb != NULL) +                shm_rdrbuff_close(_ap_instance->rdrb);          if (_ap_instance->rb != NULL)                  shm_ap_rbuff_destroy(_ap_instance->rb); diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 335330ae..cf4ae3f1 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -24,7 +24,7 @@  #include <ouroboros/config.h>  #include <ouroboros/logs.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/dev.h>  #include <ouroboros/ipcp.h> @@ -55,7 +55,7 @@ struct normal_ipcp_data {          /* Keep ipcp_data first for polymorphism. */          struct ipcp_data      ipcp_data; -        struct shm_du_map *   dum; +        struct shm_rdrbuff *  rdrb;          struct shm_ap_rbuff * rb;          pthread_t             mainloop; @@ -206,15 +206,15 @@ struct normal_ipcp_data * normal_ipcp_data_create()                  return NULL;          } -        normal_data->dum = shm_du_map_open(); -        if (normal_data->dum == NULL) { +        normal_data->rdrb = shm_rdrbuff_open(); +        if (normal_data->rdrb == NULL) {                  free(normal_data);                  return NULL;          }          normal_data->rb = shm_ap_rbuff_open(getpid());          if (normal_data->rb == NULL) { -                shm_du_map_close(normal_data->dum); +                shm_rdrbuff_close(normal_data->rdrb);                  free(normal_data);                  return NULL;          } @@ -225,6 +225,8 @@ struct normal_ipcp_data * normal_ipcp_data_create()  void normal_ipcp_data_destroy()  { +        int idx = 0; +          if (_ipcp == NULL)                  return; @@ -233,8 +235,12 @@ void normal_ipcp_data_destroy()          if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN)                  LOG_WARN("Cleaning up while not in shutdown."); -        if (normal_data(_ipcp)->dum != NULL) -                shm_du_map_close_on_exit(normal_data(_ipcp)->dum); +        /* remove all remaining sdus */ +        while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0) +                shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx); + +        if (normal_data(_ipcp)->rdrb != NULL) +                shm_rdrbuff_close(normal_data(_ipcp)->rdrb);          if (normal_data(_ipcp)->rb != NULL)                  shm_ap_rbuff_close(normal_data(_ipcp)->rb); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 608b0029..d1100001 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -27,7 +27,7 @@  #include "ipcp.h"  #include "flow.h"  #include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h> @@ -122,7 +122,7 @@ struct eth_llc_ipcp_data {          struct bmp *          indices;          struct bmp *          saps; -        struct shm_du_map *   dum; +        struct shm_rdrbuff *  rdrb;          struct shm_ap_rbuff * rb;          uint8_t *             rx_ring; @@ -155,15 +155,15 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()                  return NULL;          } -        eth_llc_data->dum = shm_du_map_open(); -        if (eth_llc_data->dum == NULL) { +        eth_llc_data->rdrb = shm_rdrbuff_open(); +        if (eth_llc_data->rdrb == NULL) {                  free(eth_llc_data);                  return NULL;          }          eth_llc_data->rb = shm_ap_rbuff_create();          if (eth_llc_data->rb == NULL) { -                shm_du_map_close(eth_llc_data->dum); +                shm_rdrbuff_close(eth_llc_data->rdrb);                  free(eth_llc_data);                  return NULL;          } @@ -171,7 +171,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()          eth_llc_data->indices = bmp_create(AP_MAX_FLOWS, 0);          if (eth_llc_data->indices == NULL) {                  shm_ap_rbuff_destroy(eth_llc_data->rb); -                shm_du_map_close(eth_llc_data->dum); +                shm_rdrbuff_close(eth_llc_data->rdrb);                  free(eth_llc_data);                  return NULL;          } @@ -180,7 +180,7 @@ struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()          if (eth_llc_data->indices == NULL) {                  bmp_destroy(eth_llc_data->indices);                  shm_ap_rbuff_destroy(eth_llc_data->rb); -                shm_du_map_close(eth_llc_data->dum); +                shm_rdrbuff_close(eth_llc_data->rdrb);                  free(eth_llc_data);                  return NULL;          } @@ -202,8 +202,12 @@ void eth_llc_ipcp_data_destroy()          if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN)                  LOG_WARN("Cleaning up while not in shutdown."); -        if (shim_data(_ipcp)->dum != NULL) -                shm_du_map_close_on_exit(shim_data(_ipcp)->dum); +        /* remove all remaining sdus */ +        while ((i = shm_ap_rbuff_peek_idx(shim_data(_ipcp)->rb)) >= 0) +                shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, i); + +        if (shim_data(_ipcp)->rdrb != NULL) +                shm_rdrbuff_close(shim_data(_ipcp)->rdrb);          if (shim_data(_ipcp)->rb != NULL)                  shm_ap_rbuff_destroy(shim_data(_ipcp)->rb);          if (shim_data(_ipcp)->indices != NULL) @@ -332,7 +336,7 @@ static int eth_llc_ipcp_send_frame(uint8_t   dst_addr[MAC_SIZE],  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          header = (void *) shim_data(_ipcp)->tx_ring + -                (shim_data(_ipcp)->tx_offset * SHM_DU_BUFF_BLOCK_SIZE); +                (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE);          while (header->tp_status != TP_STATUS_AVAILABLE) {                  pfd.fd = fd; @@ -345,7 +349,7 @@ static int eth_llc_ipcp_send_frame(uint8_t   dst_addr[MAC_SIZE],                  }                  header = (void *) shim_data(_ipcp)->tx_ring + -                        (shim_data(_ipcp)->tx_offset * SHM_DU_BUFF_BLOCK_SIZE); +                        (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE);          }          frame = (void *) header + TPACKET_HDRLEN - sizeof(struct sockaddr_ll); @@ -671,7 +675,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)          while (true) {  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)                  header = (void *) shim_data(_ipcp)->rx_ring + -                        (offset * SHM_DU_BUFF_BLOCK_SIZE); +                        (offset * SHM_RDRB_BLOCK_SIZE);                  while (!(header->tp_status & TP_STATUS_USER)) {                          pfd.fd = shim_data(_ipcp)->s_fd;                          pfd.revents = 0; @@ -683,7 +687,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                          }                          header = (void *) shim_data(_ipcp)->rx_ring + -                                (offset * SHM_DU_BUFF_BLOCK_SIZE); +                                (offset * SHM_RDRB_BLOCK_SIZE);                  }                  buf = (void * ) header + header->tp_mac; @@ -740,7 +744,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                          }                          while ((index = -                                shm_du_map_write(shim_data(_ipcp)->dum, +                                shm_rdrbuff_write(shim_data(_ipcp)->rdrb,                                                   ipcp_flow(i)->api,                                                   0,                                                   0, @@ -782,8 +786,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o)                  pthread_rwlock_rdlock(&_ipcp->state_lock); -                len = shm_du_map_read((uint8_t **) &buf, -                                      shim_data(_ipcp)->dum, +                len = shm_rdrbuff_read((uint8_t **) &buf, +                                      shim_data(_ipcp)->rdrb,                                        e->index);                  if (len <= 0) {                          free(e); @@ -808,8 +812,8 @@ static void * eth_llc_ipcp_sdu_writer(void * o)                  pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); -                if (shim_data(_ipcp)->dum != NULL) -                        shm_du_map_remove(shim_data(_ipcp)->dum, e->index); +                if (shim_data(_ipcp)->rdrb != NULL) +                        shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, e->index);                  pthread_rwlock_unlock(&_ipcp->state_lock); @@ -849,7 +853,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)  {          int fd = -1;          struct ifreq ifr; -        int index; +        int idx;  #ifdef __FreeBSD__          struct ifaddrs * ifaddr;          struct ifaddrs * ifa; @@ -892,7 +896,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)                  return -1;          } -        for (ifa = ifaddr, index = 0; ifa != NULL; ifa = ifa->ifa_next, ++index) { +        for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) {                  if (strcmp(ifa->ifa_name, conf->if_name))                          continue;                  LOG_DBGF("Interface %s found.", conf->if_name); @@ -916,8 +920,8 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)                  return -1;          } -        index = if_nametoindex(conf->if_name); -        if (index == 0) { +        idx = if_nametoindex(conf->if_name); +        if (idx == 0) {                  LOG_ERR("Failed to retrieve interface index.");                  return -1;          } @@ -927,7 +931,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          memset(&(device), 0, sizeof(device));  #ifdef __FreeBSD__ -        device.sdl_index = index; +        device.sdl_index = idx;          device.sdl_family = AF_LINK;          memcpy(LLADDR(&device),                 ifr.ifr_addr.sa_data, @@ -937,7 +941,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          LOG_MISSING;          fd = socket(AF_LINK, SOCK_RAW, 0);  #else -        device.sll_ifindex = index; +        device.sll_ifindex = idx;          device.sll_family = AF_PACKET;          memcpy(device.sll_addr,                 ifr.ifr_hwaddr.sa_data, @@ -953,14 +957,14 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          }  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -        if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_DU_BUFF_BLOCK_SIZE) { +        if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_RDRB_BLOCK_SIZE) {                  LOG_ERR("Max SDU size is bigger than DU map block size.");                  close(fd);                  return -1;          } -        req.tp_block_size = SHM_DU_BUFF_BLOCK_SIZE; -        req.tp_frame_size = SHM_DU_BUFF_BLOCK_SIZE; +        req.tp_block_size = SHM_RDRB_BLOCK_SIZE; +        req.tp_frame_size = SHM_RDRB_BLOCK_SIZE;          req.tp_block_nr = SHM_BUFFER_SIZE;          req.tp_frame_nr = SHM_BUFFER_SIZE; @@ -987,7 +991,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          shim_data(_ipcp)->rx_ring = mmap(NULL, -                                         2 * SHM_DU_BUFF_BLOCK_SIZE +                                         2 * SHM_RDRB_BLOCK_SIZE                                           * SHM_BUFFER_SIZE,                                           PROT_READ | PROT_WRITE, MAP_SHARED,                                           fd, 0); @@ -997,10 +1001,9 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)                  return -1;          }          shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring -                + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BUFFER_SIZE); +                + (SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE);  #endif -          pthread_rwlock_wrlock(&_ipcp->state_lock);          if (ipcp_get_state(_ipcp) != IPCP_INIT) { diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 3f9b20f1..451a2a4c 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,7 +24,7 @@  #include "ipcp.h"  #include "flow.h"  #include "shim_udp_config.h" -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/list.h>  #include <ouroboros/utils.h> @@ -87,7 +87,7 @@ struct ipcp * _ipcp;  /* the shim needs access to these internals */  struct shim_ap_data {          pid_t                 api; -        struct shm_du_map *   dum; +        struct shm_rdrbuff *  rdrb;          struct bmp *          fds;          struct shm_ap_rbuff * rb; @@ -121,8 +121,8 @@ static int shim_ap_init()                  return -1;          } -        _ap_instance->dum = shm_du_map_open(); -        if (_ap_instance->dum == NULL) { +        _ap_instance->rdrb = shm_rdrbuff_open(); +        if (_ap_instance->rdrb == NULL) {                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1; @@ -130,7 +130,7 @@ static int shim_ap_init()          _ap_instance->rb = shm_ap_rbuff_create();          if (_ap_instance->rb == NULL) { -                shm_du_map_close(_ap_instance->dum); +                shm_rdrbuff_close(_ap_instance->rdrb);                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1; @@ -163,8 +163,13 @@ void shim_ap_fini()          if (_ap_instance->fds != NULL)                  bmp_destroy(_ap_instance->fds); -        if (_ap_instance->dum != NULL) -                shm_du_map_close_on_exit(_ap_instance->dum); + +        /* remove all remaining sdus */ +        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) +                shm_rdrbuff_remove(_ap_instance->rdrb, i); + +        if (_ap_instance->rdrb != NULL) +                shm_rdrbuff_close(_ap_instance->rdrb);          if (_ap_instance->rb != NULL)                  shm_ap_rbuff_destroy(_ap_instance->rb); @@ -202,7 +207,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)          pthread_rwlock_rdlock(&_ipcp->state_lock);          pthread_rwlock_rdlock(&_ap_instance->flows_lock); -        index = shm_du_map_write_b(_ap_instance->dum, +        index = shm_rdrbuff_write_b(_ap_instance->rdrb,                                     _ap_instance->flows[fd].api,                                     0,                                     0, @@ -745,8 +750,8 @@ static void * ipcp_udp_sdu_loop(void * o)                  pthread_rwlock_rdlock(&_ipcp->state_lock); -                len = shm_du_map_read((uint8_t **) &buf, -                                      _ap_instance->dum, +                len = shm_rdrbuff_read((uint8_t **) &buf, +                                      _ap_instance->rdrb,                                        e->index);                  if (len <= 0) {                          pthread_rwlock_unlock(&_ipcp->state_lock); @@ -771,8 +776,8 @@ static void * ipcp_udp_sdu_loop(void * o)                  pthread_rwlock_rdlock(&_ipcp->state_lock); -                if (_ap_instance->dum != NULL) -                        shm_du_map_remove(_ap_instance->dum, e->index); +                if (_ap_instance->rdrb != NULL) +                        shm_rdrbuff_remove(_ap_instance->rdrb, e->index);                  pthread_rwlock_unlock(&_ipcp->state_lock); diff --git a/src/irmd/main.c b/src/irmd/main.c index cd939360..29f6d9d0 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,7 +34,7 @@  #include <ouroboros/irm_config.h>  #include <ouroboros/lockfile.h>  #include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/flow.h>  #include <ouroboros/qos.h> @@ -75,31 +75,31 @@ enum irm_state {  };  struct irm { -        struct list_head    registry; +        struct list_head     registry; -        struct list_head    ipcps; +        struct list_head     ipcps; -        struct list_head    api_table; -        struct list_head    apn_table; -        struct list_head    spawned_apis; -        pthread_rwlock_t    reg_lock; +        struct list_head     api_table; +        struct list_head     apn_table; +        struct list_head     spawned_apis; +        pthread_rwlock_t     reg_lock;          /* keep track of all flows in this processing system */ -        struct bmp *        port_ids; +        struct bmp *         port_ids;          /* maps port_ids to api pair */ -        struct list_head    irm_flows; -        pthread_rwlock_t    flows_lock; +        struct list_head     irm_flows; +        pthread_rwlock_t     flows_lock; -        struct lockfile *   lf; -        struct shm_du_map * dum; -        pthread_t *         threadpool; -        int                 sockfd; +        struct lockfile *    lf; +        struct shm_rdrbuff * rdrb; +        pthread_t *          threadpool; +        int                  sockfd; -        enum irm_state      state; -        pthread_rwlock_t    state_lock; +        enum irm_state       state; +        pthread_rwlock_t     state_lock; -        pthread_t           irm_sanitize; -        pthread_t           shm_sanitize; +        pthread_t            irm_sanitize; +        pthread_t            shm_sanitize;  } * irmd = NULL;  static struct irm_flow * get_irm_flow(int port_id) @@ -1604,8 +1604,8 @@ static void irm_destroy()          pthread_rwlock_unlock(&irmd->flows_lock); -        if (irmd->dum != NULL) -                shm_du_map_destroy(irmd->dum); +        if (irmd->rdrb != NULL) +                shm_rdrbuff_destroy(irmd->rdrb);          if (irmd->lf != NULL)                  lockfile_destroy(irmd->lf); @@ -2072,7 +2072,8 @@ static int irm_create()                  if (kill(lockfile_owner(irmd->lf), 0) < 0) {                          LOG_INFO("IRMd didn't properly shut down last time."); -                        shm_du_map_destroy(shm_du_map_open()); +                        /* FIXME: do this for each QOS_CUBE in the system */ +                        shm_rdrbuff_destroy(shm_rdrbuff_open(QOS_CUBE_BE));                          LOG_INFO("Stale resources cleaned");                          lockfile_destroy(irmd->lf);                          irmd->lf = lockfile_create(); @@ -2090,7 +2091,8 @@ static int irm_create()                  return -1;          } -        if ((irmd->dum = shm_du_map_create()) == NULL) { +        /* FIXME: create an rdrb for each QOS_CUBE in the system */ +        if ((irmd->rdrb = shm_rdrbuff_create(QOS_CUBE_BE)) == NULL) {                  irm_destroy();                  return -1;          } @@ -2201,7 +2203,7 @@ int main(int argc, char ** argv)          pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);          pthread_create(&irmd->shm_sanitize, NULL, -                       shm_du_map_sanitize, irmd->dum); +                       shm_rdrbuff_sanitize, irmd->rdrb);          /* wait for (all of them) to return */          for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5e16c7e2..8c058dd8 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -33,7 +33,7 @@ set(SOURCE_FILES    logs.c    nsm.c    shm_ap_rbuff.c -  shm_du_map.c +  shm_rdrbuff.c    sockets.c    time_utils.c    utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 3a5fc8e0..17c473ed 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,7 +25,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/sockets.h>  #include <ouroboros/bitmap.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/utils.h> @@ -45,7 +45,7 @@ struct flow {  struct ap_data {          char *                ap_name;          pid_t                 api; -        struct shm_du_map *   dum; +        struct shm_rdrbuff *  rdrb;          struct bmp *          fds;          struct shm_ap_rbuff * rb;          pthread_rwlock_t      data_lock; @@ -105,8 +105,8 @@ int ap_init(char * ap_name)                  return -ENOMEM;          } -        _ap_instance->dum = shm_du_map_open(); -        if (_ap_instance->dum == NULL) { +        _ap_instance->rdrb = shm_rdrbuff_open(); +        if (_ap_instance->rdrb == NULL) {                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1; @@ -114,7 +114,7 @@ int ap_init(char * ap_name)          _ap_instance->rb = shm_ap_rbuff_create();          if (_ap_instance->rb == NULL) { -                shm_du_map_close(_ap_instance->dum); +                shm_rdrbuff_close(_ap_instance->rdrb);                  bmp_destroy(_ap_instance->fds);                  free(_ap_instance);                  return -1; @@ -146,12 +146,16 @@ void ap_fini(void)          pthread_rwlock_wrlock(&_ap_instance->data_lock); +        /* remove all remaining sdus */ +        while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) +                shm_rdrbuff_remove(_ap_instance->rdrb, i); +          if (_ap_instance->fds != NULL)                  bmp_destroy(_ap_instance->fds);          if (_ap_instance->rb != NULL)                  shm_ap_rbuff_destroy(_ap_instance->rb); -        if (_ap_instance->dum != NULL) -                shm_du_map_close_on_exit(_ap_instance->dum); +        if (_ap_instance->rdrb != NULL) +                shm_rdrbuff_close(_ap_instance->rdrb);          pthread_rwlock_rdlock(&_ap_instance->flows_lock); @@ -515,7 +519,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)          }          if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                idx = shm_du_map_write(_ap_instance->dum, +                idx = shm_rdrbuff_write(_ap_instance->rdrb,                                         _ap_instance->flows[fd].api,                                         DU_BUFF_HEADSPACE,                                         DU_BUFF_TAILSPACE, @@ -531,18 +535,18 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  e.port_id = _ap_instance->flows[fd].port_id;                  if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { -                        shm_du_map_remove(_ap_instance->dum, idx); +                        shm_rdrbuff_remove(_ap_instance->rdrb, idx);                          pthread_rwlock_unlock(&_ap_instance->flows_lock);                          pthread_rwlock_unlock(&_ap_instance->data_lock);                          return -1;                  }          } else { /* blocking */ -                struct shm_du_map * dum = _ap_instance->dum; -                pid_t               api = _ap_instance->flows[fd].api; +                struct shm_rdrbuff * rdrb = _ap_instance->rdrb; +                pid_t                api = _ap_instance->flows[fd].api;                  pthread_rwlock_unlock(&_ap_instance->flows_lock);                  pthread_rwlock_unlock(&_ap_instance->data_lock); -                idx = shm_du_map_write_b(dum, +                idx = shm_rdrbuff_write_b(rdrb,                                           api,                                           DU_BUFF_HEADSPACE,                                           DU_BUFF_TAILSPACE, @@ -567,7 +571,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)  int flow_select(const struct timespec * timeout)  { -        int port_id = shm_ap_rbuff_peek(_ap_instance->rb, timeout); +        int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout);          if (port_id < 0)                  return port_id;          return port_id_to_fd(port_id); @@ -612,7 +616,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)                  return -EAGAIN;          } -        n = shm_du_map_read(&sdu, _ap_instance->dum, idx); +        n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx);          if (n < 0) {                  pthread_rwlock_unlock(&_ap_instance->data_lock);                  return -1; @@ -620,7 +624,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)          memcpy(buf, sdu, MIN(n, count)); -        shm_du_map_remove(_ap_instance->dum, idx); +        shm_rdrbuff_remove(_ap_instance->rdrb, idx);          pthread_rwlock_unlock(&_ap_instance->data_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 4ca29636..f21b1e86 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -285,8 +285,32 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)          return 0;  } -int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, -                      const struct timespec * timeout) +int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) +{ +        int ret = 0; + +        if (rb == NULL) +                return -EINVAL; + +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } + +        if (shm_rbuff_empty(rb)) { +                pthread_mutex_unlock(rb->lock); +                return -1; +        } + +        ret = (rb->shm_base + *rb->ptr_tail)->index; + +        pthread_mutex_unlock(rb->lock); + +        return ret; +} + +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, +                        const struct timespec * timeout)  {          struct timespec abstime;          int ret = 0; diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c deleted file mode 100644 index 9ca282b9..00000000 --- a/src/lib/shm_du_map.c +++ /dev/null @@ -1,767 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Shared memory map for data units - * - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/time_utils.h> - -#include <pthread.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <unistd.h> -#include <stdlib.h> -#include <string.h> -#include <signal.h> -#include <sys/stat.h> - -#define OUROBOROS_PREFIX "shm_du_map" - -#include <ouroboros/logs.h> - -#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t)                   \ -                       + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t)  \ -                       + sizeof(pid_t)) - -#define get_head_ptr(dum)                                                      \ -((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_head *                      \ -                                         SHM_DU_BUFF_BLOCK_SIZE))) - -#define get_tail_ptr(dum)                                                      \ -((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail *                      \ -                                         SHM_DU_BUFF_BLOCK_SIZE))) - -#define idx_to_du_buff_ptr(dum, idx)                                           \ -        ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) - -#define block_ptr_to_idx(dum, sdb)                                             \ -        (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) - -#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\ -                          & (SHM_BUFFER_SIZE - 1)) -#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE) - -#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) - -struct shm_du_buff { -        size_t size; -#ifdef SHM_DU_MAP_MULTI_BLOCK -        size_t blocks; -#endif -        size_t du_head; -        size_t du_tail; -        pid_t  dst_api; -}; - -struct shm_du_map { -        uint8_t *         shm_base;    /* start of blocks */ -        size_t *          ptr_head;    /* start of ringbuffer head */ -        size_t *          ptr_tail;    /* start of ringbuffer tail */ -        pthread_mutex_t * lock;   /* lock all free space in shm */ -        size_t *          choked;      /* stale sdu detection */ -        pthread_cond_t *  healthy;     /* du map is healthy */ -        pthread_cond_t *  full;        /* run sanitizer when buffer full */ -        pid_t *           api;         /* api of the irmd owner */ -        int               fd; -}; - -static void garbage_collect(struct shm_du_map * dum) -{ -#ifdef SHM_DU_MAP_MULTI_BLOCK -        struct shm_du_buff * sdb; -        while (!shm_map_empty(dum) && (sdb = get_tail_ptr(dum))->dst_api == -1) -                *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) -                        & (SHM_BUFFER_SIZE - 1); -#else -        while (!shm_map_empty(dum) && get_tail_ptr(dum)->dst_api == -1) -                *dum->ptr_tail = -                        (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); - -#endif -} - -static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit) -{ -        size_t idx = *dum->ptr_tail; -        struct shm_du_buff * buf; - -        while (idx != *dum->ptr_head) { -                buf = idx_to_du_buff_ptr(dum, idx); -                if (buf->dst_api == api) -                        buf->dst_api = -1; -#ifdef SHM_DU_MAP_MULTI_BLOCK -                idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); -#else -                idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); -#endif -        } - -        garbage_collect(dum); - -        if (!exit && kill(api, 0) == 0) { -                struct shm_ap_rbuff * rb; -                rb = shm_ap_rbuff_open(api); -                if (rb != NULL) { -                        shm_ap_rbuff_reset(rb); -                        shm_ap_rbuff_close(rb); -                } -        } - -        *dum->choked = 0; -} - -struct shm_du_map * shm_du_map_create() -{ -        struct shm_du_map * dum; -        int                 shm_fd; -        uint8_t *           shm_base; -        pthread_mutexattr_t mattr; -        pthread_condattr_t  cattr; - -        dum = malloc(sizeof *dum); -        if (dum == NULL) { -                LOG_DBGF("Could not allocate struct."); -                return NULL; -        } - -        shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_CREAT | O_EXCL | O_RDWR, 0666); -        if (shm_fd == -1) { -                LOG_DBGF("Failed creating shared memory map."); -                free(dum); -                return NULL; -        } - -        if (fchmod(shm_fd, 0666)) { -                LOG_DBGF("Failed to chmod shared memory map."); -                free(dum); -                return NULL; -        } - -        if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { -                LOG_DBGF("Failed to extend shared memory map."); -                free(dum); -                return NULL; -        } - -        if (write(shm_fd, "", 1) != 1) { -                LOG_DBGF("Failed to finalise extension of shared memory map."); -                free(dum); -                return NULL; -        } - -        shm_base = mmap(NULL, -                        SHM_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); - -        if (shm_base == MAP_FAILED) { -                LOG_DBGF("Failed to map shared memory."); -                if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) -                        LOG_DBGF("Failed to remove invalid shm."); - -                free(dum); -                return NULL; -        } - -        dum->shm_base = shm_base; -        dum->ptr_head = (size_t *) -                ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); -        dum->ptr_tail = dum->ptr_head + 1; -        dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); -        dum->choked = (size_t *) (dum->lock + 1); -        dum->healthy = (pthread_cond_t *) (dum->choked + 1); -        dum->full = dum->healthy + 1; -        dum->api = (pid_t *) (dum->full + 1); - -        pthread_mutexattr_init(&mattr); -        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -        pthread_mutex_init(dum->lock, &mattr); - -        pthread_condattr_init(&cattr); -        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -        pthread_cond_init(dum->full, &cattr); -        pthread_cond_init(dum->healthy, &cattr); - -        *dum->ptr_head = 0; -        *dum->ptr_tail = 0; - -        *dum->choked = 0; - -        *dum->api = getpid(); - -        dum->fd = shm_fd; - -        return dum; -} - -struct shm_du_map * shm_du_map_open() -{ -        struct shm_du_map * dum; -        int                 shm_fd; -        uint8_t *           shm_base; - -        dum = malloc(sizeof *dum); -        if (dum == NULL) { -                LOG_DBGF("Could not allocate struct."); -                return NULL; -        } - -        shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); -        if (shm_fd < 0) { -                LOG_DBGF("Failed opening shared memory."); -                free(dum); -                return NULL; -        } - -        shm_base = mmap(NULL, -                        SHM_FILE_SIZE, -                        PROT_READ | PROT_WRITE, -                        MAP_SHARED, -                        shm_fd, -                        0); -        if (shm_base == MAP_FAILED) { -                LOG_DBGF("Failed to map shared memory."); -                if (close(shm_fd) == -1) -                        LOG_DBGF("Failed to close invalid shm."); -                if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) -                        LOG_DBGF("Failed to unlink invalid shm."); -                free(dum); -                return NULL; -        } - -        dum->shm_base = shm_base; -        dum->ptr_head = (size_t *) -                ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); -        dum->ptr_tail = dum->ptr_head + 1; -        dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); -        dum->choked = (size_t *) (dum->lock + 1); -        dum->healthy = (pthread_cond_t *) (dum->choked + 1); -        dum->full = dum->healthy + 1; -        dum->api = (pid_t *) (dum->full + 1); - -        dum->fd = shm_fd; - -        return dum; -} - -void * shm_du_map_sanitize(void * o) -{ -        struct shm_du_map * dum = (struct shm_du_map *) o; -        struct timespec intv -                = {SHM_DU_TIMEOUT_MICROS / MILLION, -                   (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; - -        pid_t   api; - -        if (dum == NULL) -                return (void *) -1; -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_WARN("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, -                             (void *) dum->lock); - -        while (true) { -                int ret = 0; -                struct timespec now; -                struct timespec dl; - -                if (pthread_cond_wait(dum->full, dum->lock) == EOWNERDEAD) { -                        LOG_WARN("Recovering dead mutex."); -                        pthread_mutex_consistent(dum->lock); -                } - -                *dum->choked = 1; - -                garbage_collect(dum); - -                if (shm_map_empty(dum)) -                        continue; - -                api = get_tail_ptr(dum)->dst_api; - -                if (kill(api, 0)) { -                        LOG_DBGF("Dead process %d left stale sdu.", api); -                        clean_sdus(dum, api, false); -                        continue; -                } - -                clock_gettime(CLOCK_REALTIME, &now); -                ts_add(&now, &intv, &dl); -                while (*dum->choked) { -                        ret = pthread_cond_timedwait(dum->healthy, -                                                     dum->lock, -                                                     &dl); -                        if (!ret) -                                continue; - -                        if (ret == EOWNERDEAD) { -                                LOG_WARN("Recovering dead mutex."); -                                pthread_mutex_consistent(dum->lock); -                        } - -                        if (ret == ETIMEDOUT) { -                                LOG_DBGF("SDU timed out (dst: %d).", api); -                                clean_sdus(dum, api, false); -                        } -                } -        } - -        pthread_cleanup_pop(true); - -        return (void *) 0; -} - -void shm_du_map_close_on_exit(struct shm_du_map * dum) -{ -        if (dum == NULL) { -                LOG_DBGF("Bogus input. Bugging out."); -                return; -        } - -        clean_sdus(dum, getpid(), true); - -        if (close(dum->fd) < 0) -                LOG_DBGF("Couldn't close shared memory."); - -        if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) -                LOG_DBGF("Couldn't unmap shared memory."); - -        free(dum); -} - -void shm_du_map_close(struct shm_du_map * dum) -{ -        if (dum == NULL) { -                LOG_DBGF("Bogus input. Bugging out."); -                return; -        } - -        if (close(dum->fd) < 0) -                LOG_DBGF("Couldn't close shared memory."); - -        if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) -                LOG_DBGF("Couldn't unmap shared memory."); - -        free(dum); -} - -void shm_du_map_destroy(struct shm_du_map * dum) -{ -        if (dum == NULL) { -                LOG_DBGF("Bogus input. Bugging out."); -                return; -        } - -        if (getpid() != *dum->api && kill(*dum->api, 0) == 0) { -                LOG_DBGF("Only IRMd can destroy %s.", SHM_DU_MAP_FILENAME); -                return; -        } - -        if (close(dum->fd) < 0) -                LOG_DBGF("Couldn't close shared memory."); - -        if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) -                LOG_DBGF("Couldn't unmap shared memory."); - -        if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) -                LOG_DBGF("Failed to unlink shm."); - -        free(dum); -} - -ssize_t shm_du_map_write(struct shm_du_map * dum, -                         pid_t               dst_api, -                         size_t              headspace, -                         size_t              tailspace, -                         uint8_t *           data, -                         size_t              len) -{ -        struct shm_du_buff * sdb; -        size_t               size = headspace + len + tailspace; -#ifdef SHM_DU_MAP_MULTI_BLOCK -        long                 blocks = 0; -        long                 padblocks = 0; -#endif -        int                  sz = size + sizeof *sdb; -        uint8_t *            write_pos; -        ssize_t              idx = -1; - -        if (dum == NULL || data == NULL) { -                LOG_DBGF("Bogus input, bugging out."); -                return -1; -        } - -#ifndef SHM_DU_MAP_MULTI_BLOCK -        if (sz > SHM_DU_BUFF_BLOCK_SIZE) { -                LOG_DBGF("Multi-block SDU's disabled. Dropping."); -                return -1; -        } -#endif -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } -#ifdef SHM_DU_MAP_MULTI_BLOCK -        while (sz > 0) { -                sz -= SHM_DU_BUFF_BLOCK_SIZE; -                ++blocks; -        } - -        if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE) -                padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; - -        if (!shm_map_free(dum, (blocks + padblocks))) { -#else -        if (!shm_map_free(dum, 1)) { -#endif -                pthread_cond_signal(dum->full); -                pthread_mutex_unlock(dum->lock); -                return -1; -        } - -#ifdef SHM_DU_MAP_MULTI_BLOCK -        if (padblocks) { -                sdb = get_head_ptr(dum); -                sdb->size    = 0; -                sdb->blocks  = padblocks; -                sdb->dst_api = -1; -                sdb->du_head = 0; -                sdb->du_tail = 0; - -                *dum->ptr_head = 0; -        } -#endif -        sdb          = get_head_ptr(dum); -        sdb->size    = size; -        sdb->dst_api = dst_api; -        sdb->du_head = headspace; -        sdb->du_tail = sdb->du_head + len; -#ifdef  SHM_DU_MAP_MULTI_BLOCK -        sdb->blocks  = blocks; -#endif -        write_pos = ((uint8_t *) (sdb + 1)) + headspace; - -        memcpy(write_pos, data, len); - -        idx = *dum->ptr_head; -#ifdef SHM_DU_MAP_MULTI_BLOCK -        *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); -#else -        *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); -#endif -        pthread_mutex_unlock(dum->lock); - -        return idx; -} - -ssize_t shm_du_map_write_b(struct shm_du_map * dum, -                           pid_t               dst_api, -                           size_t              headspace, -                           size_t              tailspace, -                           uint8_t *           data, -                           size_t              len) -{ -        struct shm_du_buff * sdb; -        size_t               size = headspace + len + tailspace; -#ifdef SHM_DU_MAP_MULTI_BLOCK -        long                 blocks = 0; -        long                 padblocks = 0; -#endif -        int                  sz = size + sizeof *sdb; -        uint8_t *            write_pos; -        ssize_t              idx = -1; - -        if (dum == NULL || data == NULL) { -                LOG_DBGF("Bogus input, bugging out."); -                return -1; -        } - -#ifndef SHM_DU_MAP_MULTI_BLOCK -        if (sz > SHM_DU_BUFF_BLOCK_SIZE) { -                LOG_DBGF("Multi-block SDU's disabled. Dropping."); -                return -1; -        } -#endif -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, -                             (void *) dum->lock); - -#ifdef SHM_DU_MAP_MULTI_BLOCK -        while (sz > 0) { -                sz -= SHM_DU_BUFF_BLOCK_SIZE; -                ++blocks; -        } - -        if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE) -                padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; - -        while (!shm_map_free(dum, (blocks + padblocks))) { -#else -        while (!shm_map_free(dum, 1)) { -#endif -                pthread_cond_signal(dum->full); -                pthread_cond_wait(dum->healthy, dum->lock); -        } - -#ifdef SHM_DU_MAP_MULTI_BLOCK -        if (padblocks) { -                sdb = get_head_ptr(dum); -                sdb->size    = 0; -                sdb->blocks  = padblocks; -                sdb->dst_api = -1; -                sdb->du_head = 0; -                sdb->du_tail = 0; - -                *dum->ptr_head = 0; -        } -#endif -        sdb          = get_head_ptr(dum); -        sdb->size    = size; -        sdb->dst_api = dst_api; -        sdb->du_head = headspace; -        sdb->du_tail = sdb->du_head + len; -#ifdef  SHM_DU_MAP_MULTI_BLOCK -        sdb->blocks  = blocks; -#endif -        write_pos = ((uint8_t *) (sdb + 1)) + headspace; - -        memcpy(write_pos, data, len); - -        idx = *dum->ptr_head; -#ifdef SHM_DU_MAP_MULTI_BLOCK -        *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); -#else -        *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); -#endif -        pthread_cleanup_pop(true); - -        return idx; -} - -int shm_du_map_read(uint8_t **          dst, -                    struct shm_du_map * dum, -                    ssize_t             idx) -{ -        size_t len = 0; -        struct shm_du_buff * sdb; - -        if (idx > SHM_BUFFER_SIZE) -                return -1; - -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        if (shm_map_empty(dum)) { -                pthread_mutex_unlock(dum->lock); -                return -1; -        } - -        sdb = idx_to_du_buff_ptr(dum, idx); -        len = sdb->du_tail - sdb->du_head; -        *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; - -        pthread_mutex_unlock(dum->lock); - -        return len; -} - -int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) -{ -        if (idx > SHM_BUFFER_SIZE) -                return -1; - -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        if (shm_map_empty(dum)) { -                pthread_mutex_unlock(dum->lock); -                return -1; -        } - -        idx_to_du_buff_ptr(dum, idx)->dst_api = -1; - -        if (idx != *dum->ptr_tail) { -                pthread_mutex_unlock(dum->lock); -                return 0; -        } - -        garbage_collect(dum); - -        *dum->choked = 0; - -        pthread_cond_broadcast(dum->healthy); - -        pthread_mutex_unlock(dum->lock); - -        return 0; -} - -uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, -                                 int                 idx, -                                 ssize_t             size) -{ -        struct shm_du_buff * sdb; -        uint8_t * buf; - -        if (dum  == NULL) -                return NULL; - -        if (idx < 0 || idx > SHM_BUFFER_SIZE) -                return NULL; - -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        sdb = idx_to_du_buff_ptr(dum, idx); - -        if ((long) (sdb->du_head - size) < 0) { -                pthread_mutex_unlock(dum->lock); -                LOG_DBGF("Failed to allocate PCI headspace."); -                return NULL; -        } - -        sdb->du_head -= size; - -        buf = (uint8_t *) (sdb + 1) + sdb->du_head; - -        pthread_mutex_unlock(dum->lock); - -        return buf; -} - -uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, -                                 int                 idx, -                                 ssize_t             size) -{ -        struct shm_du_buff * sdb; -        uint8_t * buf; - -        if (dum  == NULL) -                return NULL; - -        if (idx < 0 || idx > SHM_BUFFER_SIZE) -                return NULL; - -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        sdb = idx_to_du_buff_ptr(dum, idx); - -        if (sdb->du_tail + size >= sdb->size) { -                pthread_mutex_unlock(dum->lock); -                LOG_DBGF("Failed to allocate PCI tailspace."); -                return NULL; -        } - -        buf = (uint8_t *) (sdb + 1) + sdb->du_tail; - -        sdb->du_tail += size; - -        pthread_mutex_unlock(dum->lock); - -        return buf; -} - -int shm_du_buff_head_release(struct shm_du_map * dum, -                             int                 idx, -                             ssize_t             size) -{ -        struct shm_du_buff * sdb; - -        if (dum  == NULL) -                return -1; - -        if (idx < 0 || idx > SHM_BUFFER_SIZE) -                return -1; - -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        sdb = idx_to_du_buff_ptr(dum, idx); - -        if (size > sdb->du_tail - sdb->du_head) { -                pthread_mutex_unlock(dum->lock); -                LOG_DBGF("Tried to release beyond sdu boundary."); -                return -EOVERFLOW; -        } - -        sdb->du_head += size; - -        pthread_mutex_unlock(dum->lock); - -        return 0; -} - -int shm_du_buff_tail_release(struct shm_du_map * dum, -                             int                 idx, -                             ssize_t             size) -{ -        struct shm_du_buff * sdb; - -        if (dum  == NULL) -                return -1; - -        if (idx < 0 || idx > SHM_BUFFER_SIZE) -                return -1; - -        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { -                LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->lock); -        } - -        sdb = idx_to_du_buff_ptr(dum, idx); - -        if (size > sdb->du_tail - sdb->du_head) { -                pthread_mutex_unlock(dum->lock); -                LOG_DBGF("Tried to release beyond sdu boundary."); -                return -EOVERFLOW; -        } - -        sdb->du_tail -= size; - -        pthread_mutex_unlock(dum->lock); - -        return 0; -} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c new file mode 100644 index 00000000..d42dbea7 --- /dev/null +++ b/src/lib/shm_rdrbuff.c @@ -0,0 +1,804 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Random Deletion Ring Buffer for Data Units + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <sys/stat.h> + +#define OUROBOROS_PREFIX "shm_rdrbuff" + +#include <ouroboros/logs.h> + +#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_RDRB_BLOCK_SIZE) +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t)                   \ +                       + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t)  \ +                       + sizeof(pid_t)) + +#define get_head_ptr(rdrb)                                                     \ +        ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_head *            \ +                                                  SHM_RDRB_BLOCK_SIZE))) + +#define get_tail_ptr(rdrb)                                                     \ +        ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_tail *            \ +                                                  SHM_RDRB_BLOCK_SIZE))) + +#define idx_to_du_buff_ptr(rdrb, idx)                                          \ +        ((struct shm_du_buff *)(rdrb->shm_base + (idx * SHM_RDRB_BLOCK_SIZE))) + +#define block_ptr_to_idx(rdrb, sdb)                                            \ +        (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE) + +#define shm_rdrb_used(rdrb)                                                    \ +        ((*rdrb->ptr_head + SHM_BUFFER_SIZE - *rdrb->ptr_tail)                 \ +         & (SHM_BUFFER_SIZE - 1)) +#define shm_rdrb_free(rdrb, i)                                                 \ +        (shm_rdrb_used(rdrb) + i < SHM_BUFFER_SIZE) + +#define shm_rdrb_empty(rdrb)                                                   \ +        (*rdrb->ptr_tail == *rdrb->ptr_head) + +struct shm_du_buff { +        size_t size; +#ifdef SHM_DU_MAP_MULTI_BLOCK +        size_t blocks; +#endif +        size_t du_head; +        size_t du_tail; +        pid_t  dst_api; +}; + +struct shm_rdrbuff { +        uint8_t *         shm_base;    /* start of blocks */ +        size_t *          ptr_head;    /* start of ringbuffer head */ +        size_t *          ptr_tail;    /* start of ringbuffer tail */ +        pthread_mutex_t * lock;        /* lock all free space in shm */ +        size_t *          choked;      /* stale sdu detection */ +        pthread_cond_t *  healthy;     /* du map is healthy */ +        pthread_cond_t *  full;        /* run sanitizer when buffer full */ +        pid_t *           api;         /* api of the irmd owner */ +        enum qos_cube     qos;         /* qos id which this buffer serves */ +        int               fd; +}; + +static void garbage_collect(struct shm_rdrbuff * rdrb) +{ +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        struct shm_du_buff * sdb; +        while (!shm_rdrb_empty(rdrb) && +               (sdb = get_tail_ptr(rdrb))->dst_api == -1) +                *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks) +                        & (SHM_BUFFER_SIZE - 1); +#else +        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1) +                *rdrb->ptr_tail = +                        (*rdrb->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); + +#endif +} + +static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api) +{ +        size_t idx = *rdrb->ptr_tail; +        struct shm_du_buff * buf; + +        while (idx != *rdrb->ptr_head) { +                buf = idx_to_du_buff_ptr(rdrb, idx); +                if (buf->dst_api == api) +                        buf->dst_api = -1; +#ifdef SHM_RDRBUFF_MULTI_BLOCK +                idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); +#else +                idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); +#endif +        } + +        garbage_collect(rdrb); + +        *rdrb->choked = 0; +} + +static char * rdrb_filename(enum qos_cube qos) +{ +        int chars = 0; +        char * str; +        int qm = QOS_MAX; + +        do { +                qm /= 10; +                ++chars; +        } while (qm > 0); + +        str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2); +        if (str == NULL) { +                LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); +                return NULL; +        } + +        sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos); + +        return str; +} + +/* FIXME: create a ringbuffer for each qos cube in the system */ +struct shm_rdrbuff * shm_rdrbuff_create() +{ +        struct shm_rdrbuff * rdrb; +        int                  shm_fd; +        uint8_t *            shm_base; +        pthread_mutexattr_t  mattr; +        pthread_condattr_t   cattr; +        enum qos_cube        qos = QOS_CUBE_BE; +        char *               shm_rdrb_fn = rdrb_filename(qos); +        if (shm_rdrb_fn == NULL) { +                LOG_ERR("Could not create rdrbuff. Out of Memory"); +                return NULL; +        } + +        rdrb = malloc(sizeof *rdrb); +        if (rdrb == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666); +        if (shm_fd == -1) { +                LOG_DBGF("Failed creating shared memory map."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        if (fchmod(shm_fd, 0666)) { +                LOG_DBGF("Failed to chmod shared memory map."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { +                LOG_DBGF("Failed to extend shared memory map."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        if (write(shm_fd, "", 1) != 1) { +                LOG_DBGF("Failed to finalise extension of shared memory map."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); + +        if (shm_base == MAP_FAILED) { +                LOG_DBGF("Failed to map shared memory."); +                if (shm_unlink(shm_rdrb_fn) == -1) +                        LOG_DBGF("Failed to remove invalid shm."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        rdrb->shm_base = shm_base; +        rdrb->ptr_head = (size_t *) +                ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); +        rdrb->ptr_tail = rdrb->ptr_head + 1; +        rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); +        rdrb->choked = (size_t *) (rdrb->lock + 1); +        rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); +        rdrb->full = rdrb->healthy + 1; +        rdrb->api = (pid_t *) (rdrb->full + 1); + +        pthread_mutexattr_init(&mattr); +        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +        pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +        pthread_mutex_init(rdrb->lock, &mattr); + +        pthread_condattr_init(&cattr); +        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +        pthread_cond_init(rdrb->full, &cattr); +        pthread_cond_init(rdrb->healthy, &cattr); + +        *rdrb->ptr_head = 0; +        *rdrb->ptr_tail = 0; + +        *rdrb->choked = 0; + +        *rdrb->api = getpid(); + +        rdrb->qos = qos; +        rdrb->fd  = shm_fd; + +        free(shm_rdrb_fn); + +        return rdrb; +} + +/* FIXME: open a ringbuffer for each qos cube in the system */ +struct shm_rdrbuff * shm_rdrbuff_open() +{ +        struct shm_rdrbuff * rdrb; +        int                  shm_fd; +        uint8_t *            shm_base; + +        enum qos_cube        qos = QOS_CUBE_BE; +        char *               shm_rdrb_fn = rdrb_filename(qos); +        if (shm_rdrb_fn == NULL) { +                LOG_ERR("Could not create rdrbuff. Out of Memory"); +                return NULL; +        } + +        rdrb = malloc(sizeof *rdrb); +        if (rdrb == NULL) { +                LOG_DBGF("Could not allocate struct."); +                return NULL; +        } + +        shm_fd = shm_open(shm_rdrb_fn, O_RDWR, 0666); +        if (shm_fd < 0) { +                LOG_DBGF("Failed opening shared memory."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        shm_base = mmap(NULL, +                        SHM_FILE_SIZE, +                        PROT_READ | PROT_WRITE, +                        MAP_SHARED, +                        shm_fd, +                        0); +        if (shm_base == MAP_FAILED) { +                LOG_DBGF("Failed to map shared memory."); +                if (close(shm_fd) == -1) +                        LOG_DBG("Failed to close invalid shm."); +                if (shm_unlink(shm_rdrb_fn) == -1) +                        LOG_DBG("Failed to unlink invalid shm."); +                free(shm_rdrb_fn); +                free(rdrb); +                return NULL; +        } + +        rdrb->shm_base = shm_base; +        rdrb->ptr_head = (size_t *) +                ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); +        rdrb->ptr_tail = rdrb->ptr_head + 1; +        rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); +        rdrb->choked = (size_t *) (rdrb->lock + 1); +        rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); +        rdrb->full = rdrb->healthy + 1; +        rdrb->api = (pid_t *) (rdrb->full + 1); + +        rdrb->qos = qos; +        rdrb->fd = shm_fd; + +        free(shm_rdrb_fn); + +        return rdrb; +} + +void * shm_rdrbuff_sanitize(void * o) +{ +        struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o; +        struct timespec intv +                = {SHM_DU_TIMEOUT_MICROS / MILLION, +                   (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + +        pid_t   api; + +        if (rdrb == NULL) +                return (void *) -1; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_WARN("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                             (void *) rdrb->lock); + +        while (true) { +                int ret = 0; +                struct timespec now; +                struct timespec dl; + +                if (pthread_cond_wait(rdrb->full, rdrb->lock) == EOWNERDEAD) { +                        LOG_WARN("Recovering dead mutex."); +                        pthread_mutex_consistent(rdrb->lock); +                } + +                *rdrb->choked = 1; + +                garbage_collect(rdrb); + +                if (shm_rdrb_empty(rdrb)) +                        continue; + +                api = get_tail_ptr(rdrb)->dst_api; + +                if (kill(api, 0)) { +                        LOG_DBGF("Dead process %d left stale sdu.", api); +                        clean_sdus(rdrb, api); +                        continue; +                } + +                clock_gettime(CLOCK_REALTIME, &now); +                ts_add(&now, &intv, &dl); +                while (*rdrb->choked) { +                        ret = pthread_cond_timedwait(rdrb->healthy, +                                                     rdrb->lock, +                                                     &dl); +                        if (!ret) +                                continue; + +                        if (ret == EOWNERDEAD) { +                                LOG_WARN("Recovering dead mutex."); +                                pthread_mutex_consistent(rdrb->lock); +                        } + +                        if (ret == ETIMEDOUT) { +                                LOG_DBGF("SDU timed out (dst: %d).", api); +                                clean_sdus(rdrb, api); +                        } +                } +        } + +        pthread_cleanup_pop(true); + +        return (void *) 0; +} + +void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) +{ +        if (rdrb == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        if (close(rdrb->fd) < 0) +                LOG_DBGF("Couldn't close shared memory."); + +        if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) +                LOG_DBGF("Couldn't unmap shared memory."); + +        free(rdrb); +} + +void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) +{ +        char * shm_rdrb_fn; + +        if (rdrb == NULL) { +                LOG_DBGF("Bogus input. Bugging out."); +                return; +        } + +        if (getpid() != *rdrb->api && kill(*rdrb->api, 0) == 0) { +                LOG_DBG("Process %d tried to destroy active rdrb.", getpid()); +                return; +        } + +        if (close(rdrb->fd) < 0) +                LOG_DBG("Couldn't close shared memory."); + +        if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); + +        shm_rdrb_fn = rdrb_filename(rdrb->qos); +        if (shm_rdrb_fn == NULL) { +                LOG_ERR("Could not create rdrbuff. Out of Memory"); +                return; +        } + +        if (shm_unlink(shm_rdrb_fn) == -1) +                LOG_DBG("Failed to unlink shm."); + +        free(rdrb); +        free(shm_rdrb_fn); +} + +ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, +                          pid_t                dst_api, +                          size_t               headspace, +                          size_t               tailspace, +                          uint8_t *            data, +                          size_t               len) +{ +        struct shm_du_buff * sdb; +        size_t               size = headspace + len + tailspace; +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        long                 blocks = 0; +        long                 padblocks = 0; +#endif +        int                  sz = size + sizeof *sdb; +        uint8_t *            write_pos; +        ssize_t              idx = -1; + +        if (rdrb == NULL || data == NULL) { +                LOG_DBGF("Bogus input, bugging out."); +                return -1; +        } + +#ifndef SHM_RDRBUFF_MULTI_BLOCK +        if (sz > SHM_RDRB_BLOCK_SIZE) { +                LOG_DBGF("Multi-block SDU's disabled. Dropping."); +                return -1; +        } +#endif +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        while (sz > 0) { +                sz -= SHM_RDRB_BLOCK_SIZE; +                ++blocks; +        } + +        if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE) +                padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head; + +        if (!shm_rdrb_free(rdrb, (blocks + padblocks))) { +#else +        if (!shm_rdrb_free(rdrb, 1)) { +#endif +                pthread_cond_signal(rdrb->full); +                pthread_mutex_unlock(rdrb->lock); +                return -1; +        } + +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        if (padblocks) { +                sdb = get_head_ptr(rdrb); +                sdb->size    = 0; +                sdb->blocks  = padblocks; +                sdb->dst_api = -1; +                sdb->du_head = 0; +                sdb->du_tail = 0; + +                *rdrb->ptr_head = 0; +        } +#endif +        sdb          = get_head_ptr(rdrb); +        sdb->size    = size; +        sdb->dst_api = dst_api; +        sdb->du_head = headspace; +        sdb->du_tail = sdb->du_head + len; +#ifdef  SHM_RDRBUFF_MULTI_BLOCK +        sdb->blocks  = blocks; +#endif +        write_pos = ((uint8_t *) (sdb + 1)) + headspace; + +        memcpy(write_pos, data, len); + +        idx = *rdrb->ptr_head; +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else +        *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif +        pthread_mutex_unlock(rdrb->lock); + +        return idx; +} + +ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, +                           pid_t               dst_api, +                           size_t              headspace, +                           size_t              tailspace, +                           uint8_t *           data, +                           size_t              len) +{ +        struct shm_du_buff * sdb; +        size_t               size = headspace + len + tailspace; +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        long                 blocks = 0; +        long                 padblocks = 0; +#endif +        int                  sz = size + sizeof *sdb; +        uint8_t *            write_pos; +        ssize_t              idx = -1; + +        if (rdrb == NULL || data == NULL) { +                LOG_DBGF("Bogus input, bugging out."); +                return -1; +        } + +#ifndef SHM_RDRBUFF_MULTI_BLOCK +        if (sz > SHM_RDRB_BLOCK_SIZE) { +                LOG_DBGF("Multi-block SDU's disabled. Dropping."); +                return -1; +        } +#endif +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rdrb->lock); + +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        while (sz > 0) { +                sz -= SHM_RDRB_BLOCK_SIZE; +                ++blocks; +        } + +        if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE) +                padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head; + +        while (!shm_rdrb_free(rdrb, (blocks + padblocks))) { +#else +        while (!shm_rdrb_free(rdrb, 1)) { +#endif +                pthread_cond_signal(rdrb->full); +                pthread_cond_wait(rdrb->healthy, rdrb->lock); +        } + +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        if (padblocks) { +                sdb = get_head_ptr(rdrb); +                sdb->size    = 0; +                sdb->blocks  = padblocks; +                sdb->dst_api = -1; +                sdb->du_head = 0; +                sdb->du_tail = 0; + +                *rdrb->ptr_head = 0; +        } +#endif +        sdb          = get_head_ptr(rdrb); +        sdb->size    = size; +        sdb->dst_api = dst_api; +        sdb->du_head = headspace; +        sdb->du_tail = sdb->du_head + len; +#ifdef  SHM_RDRBUFF_MULTI_BLOCK +        sdb->blocks  = blocks; +#endif +        write_pos = ((uint8_t *) (sdb + 1)) + headspace; + +        memcpy(write_pos, data, len); + +        idx = *rdrb->ptr_head; +#ifdef SHM_RDRBUFF_MULTI_BLOCK +        *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else +        *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif +        pthread_cleanup_pop(true); + +        return idx; +} + +int shm_rdrbuff_read(uint8_t **           dst, +                     struct shm_rdrbuff * rdrb, +                     ssize_t              idx) +{ +        size_t len = 0; +        struct shm_du_buff * sdb; + +        if (idx > SHM_BUFFER_SIZE) +                return -1; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        if (shm_rdrb_empty(rdrb)) { +                pthread_mutex_unlock(rdrb->lock); +                return -1; +        } + +        sdb = idx_to_du_buff_ptr(rdrb, idx); +        len = sdb->du_tail - sdb->du_head; +        *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; + +        pthread_mutex_unlock(rdrb->lock); + +        return len; +} + +int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) +{ +        if (idx > SHM_BUFFER_SIZE) +                return -1; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        if (shm_rdrb_empty(rdrb)) { +                pthread_mutex_unlock(rdrb->lock); +                return -1; +        } + +        idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1; + +        if (idx != *rdrb->ptr_tail) { +                pthread_mutex_unlock(rdrb->lock); +                return 0; +        } + +        garbage_collect(rdrb); + +        *rdrb->choked = 0; + +        pthread_cond_broadcast(rdrb->healthy); + +        pthread_mutex_unlock(rdrb->lock); + +        return 0; +} + +uint8_t * shm_du_buff_head_alloc(struct shm_rdrbuff * rdrb, +                                 int                  idx, +                                 ssize_t              size) +{ +        struct shm_du_buff * sdb; +        uint8_t * buf; + +        if (rdrb  == NULL) +                return NULL; + +        if (idx < 0 || idx > SHM_BUFFER_SIZE) +                return NULL; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        sdb = idx_to_du_buff_ptr(rdrb, idx); + +        if ((long) (sdb->du_head - size) < 0) { +                pthread_mutex_unlock(rdrb->lock); +                LOG_DBGF("Failed to allocate PCI headspace."); +                return NULL; +        } + +        sdb->du_head -= size; + +        buf = (uint8_t *) (sdb + 1) + sdb->du_head; + +        pthread_mutex_unlock(rdrb->lock); + +        return buf; +} + +uint8_t * shm_du_buff_tail_alloc(struct shm_rdrbuff * rdrb, +                                 int                  idx, +                                 ssize_t              size) +{ +        struct shm_du_buff * sdb; +        uint8_t * buf; + +        if (rdrb  == NULL) +                return NULL; + +        if (idx < 0 || idx > SHM_BUFFER_SIZE) +                return NULL; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        sdb = idx_to_du_buff_ptr(rdrb, idx); + +        if (sdb->du_tail + size >= sdb->size) { +                pthread_mutex_unlock(rdrb->lock); +                LOG_DBGF("Failed to allocate PCI tailspace."); +                return NULL; +        } + +        buf = (uint8_t *) (sdb + 1) + sdb->du_tail; + +        sdb->du_tail += size; + +        pthread_mutex_unlock(rdrb->lock); + +        return buf; +} + +int shm_du_buff_head_release(struct shm_rdrbuff * rdrb, +                             int                  idx, +                             ssize_t              size) +{ +        struct shm_du_buff * sdb; + +        if (rdrb  == NULL) +                return -1; + +        if (idx < 0 || idx > SHM_BUFFER_SIZE) +                return -1; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        sdb = idx_to_du_buff_ptr(rdrb, idx); + +        if (size > sdb->du_tail - sdb->du_head) { +                pthread_mutex_unlock(rdrb->lock); +                LOG_DBGF("Tried to release beyond sdu boundary."); +                return -EOVERFLOW; +        } + +        sdb->du_head += size; + +        pthread_mutex_unlock(rdrb->lock); + +        return 0; +} + +int shm_du_buff_tail_release(struct shm_rdrbuff * rdrb, +                             int                  idx, +                             ssize_t              size) +{ +        struct shm_du_buff * sdb; + +        if (rdrb  == NULL) +                return -1; + +        if (idx < 0 || idx > SHM_BUFFER_SIZE) +                return -1; + +        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(rdrb->lock); +        } + +        sdb = idx_to_du_buff_ptr(rdrb, idx); + +        if (size > sdb->du_tail - sdb->du_head) { +                pthread_mutex_unlock(rdrb->lock); +                LOG_DBGF("Tried to release beyond sdu boundary."); +                return -EOVERFLOW; +        } + +        sdb->du_tail -= size; + +        pthread_mutex_unlock(rdrb->lock); + +        return 0; +} | 
