diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-08-04 17:07:45 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-08-04 17:07:45 +0200 | 
| commit | 51bb7c6f315dba4044eb2ece5c1312362674d7fb (patch) | |
| tree | fff3eeadb6eb04edee21340ecdcdfc13da3115b4 /src | |
| parent | 44b55f0b03ffc6aff4f1c290b5687d5ac95ddbf9 (diff) | |
| parent | 4931526cf9b5e40294e043deab856f25bf56c7cf (diff) | |
| download | ouroboros-51bb7c6f315dba4044eb2ece5c1312362674d7fb.tar.gz ouroboros-51bb7c6f315dba4044eb2ece5c1312362674d7fb.zip | |
Merged in dstaesse/ouroboros/be-blocking (pull request #185)
lib: Revise blocking I/O
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 18 | ||||
| -rw-r--r-- | src/lib/dev.c | 56 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 126 | ||||
| -rw-r--r-- | src/lib/shm_du_map.c | 210 | 
4 files changed, 281 insertions, 129 deletions
| diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 9e315335..f98799a5 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -386,7 +386,7 @@ static int eth_llc_ipcp_send_frame(uint8_t   dst_addr[MAC_SIZE],          shim_data(_ipcp)->tx_offset =                  (shim_data(_ipcp)->tx_offset + 1) -                & (SHM_BLOCKS_IN_MAP -1); +                & (SHM_BUFFER_SIZE -1);  #else          device = (shim_data(_ipcp))->device; @@ -719,7 +719,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                             MAC_SIZE) &&                      memcmp(br_addr, &llc_frame->dst_hwaddr, MAC_SIZE)) {  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -                        offset = (offset + 1) & (SHM_BLOCKS_IN_MAP - 1); +                        offset = (offset + 1) & (SHM_BUFFER_SIZE - 1);                          header->tp_status = TP_STATUS_KERNEL;  #endif                          continue; @@ -730,7 +730,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                  if (ntohs(length) > SHIM_ETH_LLC_MAX_SDU_SIZE) {                          /* Not an LLC packet. */  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -                        offset = (offset + 1) & (SHM_BLOCKS_IN_MAP - 1); +                        offset = (offset + 1) & (SHM_BUFFER_SIZE - 1);                          header->tp_status = TP_STATUS_KERNEL;  #endif                          continue; @@ -758,7 +758,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                                  pthread_rwlock_unlock(&_ipcp->state_lock);  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)                                  offset = (offset + 1) -                                        & (SHM_BLOCKS_IN_MAP - 1); +                                        & (SHM_BUFFER_SIZE - 1);                                  header->tp_status = TP_STATUS_KERNEL;  #endif                                  continue; @@ -784,7 +784,7 @@ static void * eth_llc_ipcp_sdu_reader(void * o)                          pthread_rwlock_unlock(&_ipcp->state_lock);                  }  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) -                offset = (offset + 1) & (SHM_BLOCKS_IN_MAP -1); +                offset = (offset + 1) & (SHM_BUFFER_SIZE -1);                  header->tp_status = TP_STATUS_KERNEL;  #endif          } @@ -1009,8 +1009,8 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          req.tp_block_size = SHM_DU_BUFF_BLOCK_SIZE;          req.tp_frame_size = SHM_DU_BUFF_BLOCK_SIZE; -        req.tp_block_nr = SHM_BLOCKS_IN_MAP; -        req.tp_frame_nr = SHM_BLOCKS_IN_MAP; +        req.tp_block_nr = SHM_BUFFER_SIZE; +        req.tp_frame_nr = SHM_BUFFER_SIZE;          if (setsockopt(fd, SOL_PACKET, PACKET_RX_RING,                         (void *) &req, sizeof(req))) { @@ -1036,7 +1036,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)  #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)          shim_data(_ipcp)->rx_ring = mmap(NULL,                                           2 * SHM_DU_BUFF_BLOCK_SIZE -                                         * SHM_BLOCKS_IN_MAP, +                                         * SHM_BUFFER_SIZE,                                           PROT_READ | PROT_WRITE, MAP_SHARED,                                           fd, 0);          if (shim_data(_ipcp)->rx_ring == NULL) { @@ -1045,7 +1045,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)                  return -1;          }          shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring -                + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BLOCKS_IN_MAP); +                + (SHM_DU_BUFF_BLOCK_SIZE * SHM_BUFFER_SIZE);  #endif diff --git a/src/lib/dev.c b/src/lib/dev.c index 22e77169..ce919263 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -41,6 +41,8 @@ struct flow {          int                   oflags;          pid_t                 api; + +        struct timespec *     timeout;  };  struct ap_data { @@ -93,7 +95,9 @@ int ap_init(char * ap_name)          for (i = 0; i < AP_MAX_FLOWS; ++i) {                  _ap_instance->flows[i].rb = NULL;                  _ap_instance->flows[i].port_id = -1; +                _ap_instance->flows[i].oflags = 0;                  _ap_instance->flows[i].api = -1; +                _ap_instance->flows[i].timeout = NULL;          }          pthread_rwlock_init(&_ap_instance->flows_lock, NULL); @@ -127,6 +131,9 @@ void ap_fini(void)          pthread_rwlock_unlock(&_ap_instance->flows_lock);          pthread_rwlock_unlock(&_ap_instance->data_lock); +        pthread_rwlock_destroy(&_ap_instance->flows_lock); +        pthread_rwlock_destroy(&_ap_instance->data_lock); +          free(_ap_instance);  } @@ -458,7 +465,7 @@ int flow_cntl(int fd, int cmd, int oflags)  ssize_t flow_write(int fd, void * buf, size_t count)  { -        ssize_t index; +        ssize_t idx;          struct rb_entry e;          if (buf == NULL) @@ -477,37 +484,35 @@ ssize_t flow_write(int fd, void * buf, size_t count)          }          if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                index = shm_du_map_write(_ap_instance->dum, -                                         _ap_instance->flows[fd].api, -                                         DU_BUFF_HEADSPACE, -                                         DU_BUFF_TAILSPACE, -                                         (uint8_t *) buf, -                                         count); -                if (index == -1) { +                idx = shm_du_map_write(_ap_instance->dum, +                                       _ap_instance->flows[fd].api, +                                       DU_BUFF_HEADSPACE, +                                       DU_BUFF_TAILSPACE, +                                       (uint8_t *) buf, +                                       count); +                if (idx == -1) {                          pthread_rwlock_unlock(&_ap_instance->flows_lock);                          pthread_rwlock_unlock(&_ap_instance->data_lock);                          return -EAGAIN;                  } -                e.index   = index; +                e.index   = idx;                  e.port_id = _ap_instance->flows[fd].port_id;                  if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { -                        shm_du_map_remove(_ap_instance->dum, index); +                        shm_du_map_remove(_ap_instance->dum, idx);                          pthread_rwlock_unlock(&_ap_instance->flows_lock);                          pthread_rwlock_unlock(&_ap_instance->data_lock);                          return -1;                  }          } else { /* blocking */ -                while ((index = shm_du_map_write(_ap_instance->dum, -                                                 _ap_instance->flows[fd].api, -                                                 DU_BUFF_HEADSPACE, -                                                 DU_BUFF_TAILSPACE, -                                                 (uint8_t *) buf, -                                                 count)) < 0) -                        ; - -                e.index   = index; +                idx = shm_du_map_write_b(_ap_instance->dum, +                                         _ap_instance->flows[fd].api, +                                         DU_BUFF_HEADSPACE, +                                         DU_BUFF_TAILSPACE, +                                         (uint8_t *) buf, +                                         count); +                e.index   = idx;                  e.port_id = _ap_instance->flows[fd].port_id;                  while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) @@ -546,16 +551,13 @@ ssize_t flow_read(int fd, void * buf, size_t count)                  return -ENOTALLOC;          } -        if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { +        if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK)                  idx = shm_ap_rbuff_read_port(_ap_instance->rb,                                               _ap_instance->flows[fd].port_id); -        } else { /* block */ -                while ((idx = -                        shm_ap_rbuff_read_port(_ap_instance->rb, -                                               _ap_instance-> -                                               flows[fd].port_id)) < 0) -                        ; -        } +        else +                idx = shm_ap_rbuff_read_port_b(_ap_instance->rb, +                                               _ap_instance->flows[fd].port_id, +                                               _ap_instance->flows[fd].timeout);          pthread_rwlock_unlock(&_ap_instance->flows_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index be4cd0c2..56555533 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -42,23 +42,27 @@  #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC -#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry)          \ +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry)          \                               + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \ -                             + sizeof (pthread_cond_t)) +                             + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)    \ -                          & (SHM_RBUFF_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail)    \ +                          & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE)  #define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail) -#define head_el_ptr (rb->shm_base + *rb->ptr_head) -#define tail_el_ptr (rb->shm_base + *rb->ptr_tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail) +#define clean_sdus(rb)                                                         \ +        while (!shm_rbuff_empty(rb) && tail_el_ptr(rb)->port_id < 0)           \ +                *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);     \  struct shm_ap_rbuff {          struct rb_entry * shm_base;    /* start of entry */          size_t *          ptr_head;    /* start of ringbuffer head */          size_t *          ptr_tail;    /* start of ringbuffer tail */          pthread_mutex_t * lock;        /* lock all free space in shm */ -        pthread_cond_t *  work;        /* threads will wait for a signal */ +        pthread_cond_t *  add;         /* SDU arrived */ +        pthread_cond_t *  del;         /* SDU removed */          pid_t             api;         /* api to which this rb belongs */          int               fd;  }; @@ -125,10 +129,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()          }          rb->shm_base = shm_base; -        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); +        rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);          rb->ptr_tail = rb->ptr_head + 1;          rb->lock     = (pthread_mutex_t *) (rb->ptr_tail + 1); -        rb->work     = (pthread_cond_t *) (rb->lock + 1); +        rb->add      = (pthread_cond_t *) (rb->lock + 1); +        rb->del      = rb->add + 1;          pthread_mutexattr_init(&mattr);          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); @@ -138,7 +143,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()          pthread_condattr_init(&cattr);          pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);          pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -        pthread_cond_init(rb->work, &cattr); +        pthread_cond_init(rb->add, &cattr); +        pthread_cond_init(rb->del, &cattr);          *rb->ptr_head = 0;          *rb->ptr_tail = 0; @@ -190,10 +196,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)          }          rb->shm_base = shm_base; -        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); +        rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);          rb->ptr_tail = rb->ptr_head + 1;          rb->lock     = (pthread_mutex_t *) (rb->ptr_tail + 1); -        rb->work     = (pthread_cond_t *) (rb->lock + 1); +        rb->add      = (pthread_cond_t *) (rb->lock + 1); +        rb->del      = rb->add + 1;          rb->fd = shm_fd;          rb->api = api; @@ -243,7 +250,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)          }          pthread_mutex_destroy(rb->lock); -        pthread_cond_destroy(rb->work); +        pthread_cond_destroy(rb->add); +        pthread_cond_destroy(rb->del);          if (close(rb->fd) < 0)                  LOG_DBG("Couldn't close shared memory."); @@ -275,10 +283,10 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)          }          if (shm_rbuff_empty(rb)) -                pthread_cond_broadcast(rb->work); +                pthread_cond_broadcast(rb->add); -        *head_el_ptr = *e; -        *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); +        *head_el_ptr(rb) = *e; +        *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1);          pthread_mutex_unlock(rb->lock); @@ -307,13 +315,17 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,                  pthread_mutex_consistent(rb->lock);          } +        clean_sdus(rb); +          while (shm_rbuff_empty(rb)) {                  if (timeout != NULL) -                        ret = pthread_cond_timedwait(rb->work, +                        ret = pthread_cond_timedwait(rb->add,                                                       rb->lock,                                                       &abstime);                  else -                        ret = pthread_cond_wait(rb->work, rb->lock); +                        ret = pthread_cond_wait(rb->add, rb->lock); + +                clean_sdus(rb);                  if (ret == EOWNERDEAD) {                          LOG_DBG("Recovering dead mutex."); @@ -348,11 +360,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)                  pthread_mutex_consistent(rb->lock);          } -        while (!shm_rbuff_empty(rb) && tail_el_ptr->port_id < 0) -                *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); +        clean_sdus(rb);          while (shm_rbuff_empty(rb)) -                if (pthread_cond_wait(rb->work, rb->lock) == EOWNERDEAD) { +                if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) {                  LOG_DBG("Recovering dead mutex.");                  pthread_mutex_consistent(rb->lock);          } @@ -365,7 +376,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)          *e = *(rb->shm_base + *rb->ptr_tail); -        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); +        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);          pthread_cleanup_pop(true); @@ -381,24 +392,75 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)                  pthread_mutex_consistent(rb->lock);          } -        if (shm_rbuff_empty(rb)) { +        clean_sdus(rb); + +        if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {                  pthread_mutex_unlock(rb->lock);                  return -1;          } -        while (tail_el_ptr->port_id < 0) -                *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); +        idx = tail_el_ptr(rb)->index; -        if (tail_el_ptr->port_id != port_id) { -                pthread_mutex_unlock(rb->lock); -                return -1; +        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + +        pthread_cond_broadcast(rb->del); + +        pthread_mutex_unlock(rb->lock); + +        return idx; +} + +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, +                                 int port_id, +                                 const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret = 0; +        ssize_t idx = -1; + +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock);          } -        idx = tail_el_ptr->index; +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } -        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); +        clean_sdus(rb); -        pthread_mutex_unlock(rb->lock); +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (tail_el_ptr(rb)->port_id != port_id) { +                if (timeout != NULL) +                        ret = pthread_cond_timedwait(rb->del, +                                                     rb->lock, +                                                     &abstime); +                else +                        ret = pthread_cond_wait(rb->del, rb->lock); + +                clean_sdus(rb); + +                if (ret == EOWNERDEAD) { +                        LOG_DBG("Recovering dead mutex."); +                        pthread_mutex_consistent(rb->lock); +                } + +                if (ret == ETIMEDOUT) { +                        pthread_mutex_unlock(rb->lock); +                        return -ret; +                } +        } + +        idx = tail_el_ptr(rb)->index; + +        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + +        pthread_cond_broadcast(rb->del); + +        pthread_cleanup_pop(true);          return idx;  } diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index b090bb74..a12ef223 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -40,7 +40,7 @@  #include <ouroboros/logs.h> -#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) +#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE)  #define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t)                   \                         + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t)  \                         + sizeof(pid_t)) @@ -59,9 +59,9 @@  #define block_ptr_to_idx(dum, sdb)                                             \          (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) -#define shm_map_used(dum)((*dum->ptr_head + SHM_BLOCKS_IN_MAP - *dum->ptr_tail)\ -                          & (SHM_BLOCKS_IN_MAP - 1)) -#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\ +                          & (SHM_BUFFER_SIZE - 1)) +#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE)  #define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) @@ -79,7 +79,7 @@ struct shm_du_map {          uint8_t *         shm_base;    /* start of blocks */          size_t *          ptr_head;    /* start of ringbuffer head */          size_t *          ptr_tail;    /* start of ringbuffer tail */ -        pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ +        pthread_mutex_t * lock;   /* lock all free space in shm */          size_t *          choked;      /* stale sdu detection */          pthread_cond_t *  healthy;     /* du map is healthy */          pthread_cond_t *  full;        /* run sanitizer when buffer full */ @@ -94,12 +94,12 @@ static void garbage_collect(struct shm_du_map * dum)          while ((sdb = get_tail_ptr(dum))->dst_api == -1 &&                 !shm_map_empty(dum))                  *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) -                        & (SHM_BLOCKS_IN_MAP - 1); +                        & (SHM_BUFFER_SIZE - 1);  #else          while (get_tail_ptr(dum)->dst_api == -1 &&                 !shm_map_empty(dum))                  *dum->ptr_tail = -                        (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); +                        (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1);  #endif  } @@ -114,9 +114,9 @@ static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit)                  if (buf->dst_api == api)                          buf->dst_api = -1;  #ifdef SHM_DU_MAP_MULTI_BLOCK -                idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1); +                idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1);  #else -                idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); +                idx = (idx + 1) & (SHM_BUFFER_SIZE - 1);  #endif          } @@ -194,8 +194,8 @@ struct shm_du_map * shm_du_map_create()          dum->ptr_head = (size_t *)                  ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);          dum->ptr_tail = dum->ptr_head + 1; -        dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); -        dum->choked = (size_t *) (dum->shm_mutex + 1); +        dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); +        dum->choked = (size_t *) (dum->lock + 1);          dum->healthy = (pthread_cond_t *) (dum->choked + 1);          dum->full = dum->healthy + 1;          dum->api = (pid_t *) (dum->full + 1); @@ -203,7 +203,7 @@ struct shm_du_map * shm_du_map_create()          pthread_mutexattr_init(&mattr);          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);          pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -        pthread_mutex_init(dum->shm_mutex, &mattr); +        pthread_mutex_init(dum->lock, &mattr);          pthread_condattr_init(&cattr);          pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); @@ -261,8 +261,8 @@ struct shm_du_map * shm_du_map_open()          dum->ptr_head = (size_t *)                  ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);          dum->ptr_tail = dum->ptr_head + 1; -        dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); -        dum->choked = (size_t *) (dum->shm_mutex + 1); +        dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); +        dum->choked = (size_t *) (dum->lock + 1);          dum->healthy = (pthread_cond_t *) (dum->choked + 1);          dum->full = dum->healthy + 1;          dum->api = (pid_t *) (dum->full + 1); @@ -283,23 +283,23 @@ void * shm_du_map_sanitize(void * o)          if (dum == NULL)                  return (void *) -1; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_WARN("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, -                             (void *) dum->shm_mutex); +                             (void *) dum->lock);          while (true) {                  int ret = 0;                  struct timespec now;                  struct timespec dl; -                if (pthread_cond_wait(dum->full, dum->shm_mutex) +                if (pthread_cond_wait(dum->full, dum->lock)                          == EOWNERDEAD) {                          LOG_WARN("Recovering dead mutex."); -                        pthread_mutex_consistent(dum->shm_mutex); +                        pthread_mutex_consistent(dum->lock);                  }                  *dum->choked = 1; @@ -321,14 +321,14 @@ void * shm_du_map_sanitize(void * o)                  ts_add(&now, &intv, &dl);                  while (*dum->choked) {                          ret = pthread_cond_timedwait(dum->healthy, -                                                     dum->shm_mutex, +                                                     dum->lock,                                                       &dl);                          if (!ret)                                  continue;                          if (ret == EOWNERDEAD) {                                  LOG_WARN("Recovering dead mutex."); -                                pthread_mutex_consistent(dum->shm_mutex); +                                pthread_mutex_consistent(dum->lock);                          }                          if (ret == ETIMEDOUT) { @@ -429,9 +429,9 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,                  return -1;          }  #endif -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }  #ifdef SHM_DU_MAP_MULTI_BLOCK          while (sz > 0) { @@ -439,15 +439,15 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,                  ++blocks;          } -        if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1) -                padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head; +        if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1) +                padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;          if (!shm_map_free(dum, (blocks + padblocks))) {  #else          if (!shm_map_free(dum, 1)) {  #endif                  pthread_cond_signal(dum->full); -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  return -1;          } @@ -477,11 +477,99 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,          idx = *dum->ptr_head;  #ifdef SHM_DU_MAP_MULTI_BLOCK -        *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1); +        *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);  #else -        *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); +        *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);  #endif -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock); + +        return idx; +} + +ssize_t shm_du_map_write_b(struct shm_du_map * dum, +                           pid_t               dst_api, +                           size_t              headspace, +                           size_t              tailspace, +                           uint8_t *           data, +                           size_t              len) +{ +        struct shm_du_buff * sdb; +        size_t               size = headspace + len + tailspace; +#ifdef SHM_DU_MAP_MULTI_BLOCK +        long                 blocks = 0; +        long                 padblocks = 0; +        int                  sz = size + sizeof *sdb; +#endif +        uint8_t *            write_pos; +        ssize_t              idx = -1; + +        if (dum == NULL || data == NULL) { +                LOG_DBGF("Bogus input, bugging out."); +                return -1; +        } + +#ifndef SHM_DU_MAP_MULTI_BLOCK +        if (sz > SHM_DU_BUFF_BLOCK_SIZE) { +                LOG_DBGF("Multi-block SDU's disabled. Dropping."); +                return -1; +        } +#endif +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { +                LOG_DBGF("Recovering dead mutex."); +                pthread_mutex_consistent(dum->lock); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) dum->lock); + +#ifdef SHM_DU_MAP_MULTI_BLOCK +        while (sz > 0) { +                sz -= SHM_DU_BUFF_BLOCK_SIZE; +                ++blocks; +        } + +        if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1) +                padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; + +        while (!shm_map_free(dum, (blocks + padblocks))) { +#else +        while (!shm_map_free(dum, 1)) { +#endif +                pthread_cond_signal(dum->full); +                pthread_cond_wait(dum->healthy, dum->lock); +        } + +#ifdef SHM_DU_MAP_MULTI_BLOCK +        if (padblocks) { +                sdb = get_head_ptr(dum); +                sdb->size    = 0; +                sdb->blocks  = padblocks; +                sdb->dst_api = -1; +                sdb->du_head = 0; +                sdb->du_tail = 0; + +                *dum->ptr_head = 0; +        } +#endif +        sdb          = get_head_ptr(dum); +        sdb->size    = size; +        sdb->dst_api = dst_api; +        sdb->du_head = headspace; +        sdb->du_tail = sdb->du_head + len; +#ifdef  SHM_DU_MAP_MULTI_BLOCK +        sdb->blocks  = blocks; +#endif +        write_pos = ((uint8_t *) (sdb + 1)) + headspace; + +        memcpy(write_pos, data, len); + +        idx = *dum->ptr_head; +#ifdef SHM_DU_MAP_MULTI_BLOCK +        *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else +        *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif +        pthread_cleanup_pop(true);          return idx;  } @@ -493,16 +581,16 @@ int shm_du_map_read(uint8_t **          dst,          size_t len = 0;          struct shm_du_buff * sdb; -        if (idx > SHM_BLOCKS_IN_MAP) +        if (idx > SHM_BUFFER_SIZE)                  return -1; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          if (shm_map_empty(dum)) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  return -1;          } @@ -510,30 +598,30 @@ int shm_du_map_read(uint8_t **          dst,          len = sdb->du_tail - sdb->du_head;          *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock);          return len;  }  int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)  { -        if (idx > SHM_BLOCKS_IN_MAP) +        if (idx > SHM_BUFFER_SIZE)                  return -1; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          if (shm_map_empty(dum)) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  return -1;          }          idx_to_du_buff_ptr(dum, idx)->dst_api = -1;          if (idx != *dum->ptr_tail) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  return 0;          } @@ -541,9 +629,9 @@ int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)          *dum->choked = 0; -        pthread_cond_signal(dum->healthy); +        pthread_cond_broadcast(dum->healthy); -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock);          return 0;  } @@ -558,18 +646,18 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,          if (dum  == NULL)                  return NULL; -        if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) +        if (idx < 0 || idx > SHM_BUFFER_SIZE)                  return NULL; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          sdb = idx_to_du_buff_ptr(dum, idx);          if ((long) (sdb->du_head - size) < 0) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  LOG_DBGF("Failed to allocate PCI headspace.");                  return NULL;          } @@ -578,7 +666,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,          buf = (uint8_t *) (sdb + 1) + sdb->du_head; -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock);          return buf;  } @@ -593,18 +681,18 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,          if (dum  == NULL)                  return NULL; -        if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) +        if (idx < 0 || idx > SHM_BUFFER_SIZE)                  return NULL; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          sdb = idx_to_du_buff_ptr(dum, idx);          if (sdb->du_tail + size >= sdb->size) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  LOG_DBGF("Failed to allocate PCI tailspace.");                  return NULL;          } @@ -613,7 +701,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,          sdb->du_tail += size; -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock);          return buf;  } @@ -627,25 +715,25 @@ int shm_du_buff_head_release(struct shm_du_map * dum,          if (dum  == NULL)                  return -1; -        if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) +        if (idx < 0 || idx > SHM_BUFFER_SIZE)                  return -1; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          sdb = idx_to_du_buff_ptr(dum, idx);          if (size > sdb->du_tail - sdb->du_head) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  LOG_DBGF("Tried to release beyond sdu boundary.");                  return -EOVERFLOW;          }          sdb->du_head += size; -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock);          return 0;  } @@ -659,25 +747,25 @@ int shm_du_buff_tail_release(struct shm_du_map * dum,          if (dum  == NULL)                  return -1; -        if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) +        if (idx < 0 || idx > SHM_BUFFER_SIZE)                  return -1; -        if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { +        if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {                  LOG_DBGF("Recovering dead mutex."); -                pthread_mutex_consistent(dum->shm_mutex); +                pthread_mutex_consistent(dum->lock);          }          sdb = idx_to_du_buff_ptr(dum, idx);          if (size > sdb->du_tail - sdb->du_head) { -                pthread_mutex_unlock(dum->shm_mutex); +                pthread_mutex_unlock(dum->lock);                  LOG_DBGF("Tried to release beyond sdu boundary.");                  return -EOVERFLOW;          }          sdb->du_tail -= size; -        pthread_mutex_unlock(dum->shm_mutex); +        pthread_mutex_unlock(dum->lock);          return 0;  } | 
