diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 5 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 22 | ||||
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 256 | 
3 files changed, 95 insertions, 188 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index fc8739a2..1c0d73a1 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -536,7 +536,7 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)                  return -1;          } -        ai.flows[fd].rx_rb   = shm_rbuff_open(ai.api, recv_msg->port_id); +        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id);          if (ai.flows[fd].rx_rb == NULL) {                  reset_flow(fd);                  bmp_release(ai.fds, fd); @@ -746,7 +746,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)          if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {                  idx = shm_rdrbuff_write(ai.rdrb, -                                       ai.flows[fd].api,                                         DU_BUFF_HEADSPACE,                                         DU_BUFF_TAILSPACE,                                         buf, @@ -766,7 +765,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)          } else { /* blocking */                  struct shm_rdrbuff * rdrb = ai.rdrb;                  struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; -                pid_t api  = ai.flows[fd].api;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -774,7 +772,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  assert(tx_rb);                  idx = shm_rdrbuff_write_b(rdrb, -                                          api,                                            DU_BUFF_HEADSPACE,                                            DU_BUFF_TAILSPACE,                                            buf, diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 301669e7..c0901ab1 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -43,12 +43,12 @@  #include <stdbool.h>  #define FN_MAX_CHARS 255 -#define RB_CLOSED -1  #define RB_OPEN 0 +#define RB_CLOSED 1  #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t)          \ -                             + 2 * sizeof(size_t) + sizeof(int8_t)      \ -                             + sizeof(pthread_mutex_t)                  \ +                             + 3 * sizeof(size_t)                         \ +                             + sizeof(pthread_mutex_t)                    \                               + 2 * sizeof (pthread_cond_t))  #define shm_rbuff_used(rb) ((*rb->head + (SHM_BUFFER_SIZE) - *rb->tail)   \ @@ -62,7 +62,7 @@ struct shm_rbuff {          ssize_t *         shm_base; /* start of entry                */          size_t *          head;     /* start of ringbuffer head      */          size_t *          tail;     /* start of ringbuffer tail      */ -        int8_t *          acl;      /* access control                */ +        size_t *          acl;      /* access control                */          pthread_mutex_t * lock;     /* lock all free space in shm    */          pthread_cond_t *  add;      /* SDU arrived                   */          pthread_cond_t *  del;      /* SDU removed                   */ @@ -126,7 +126,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          rb->shm_base = shm_base;          rb->head     = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE));          rb->tail     = rb->head + 1; -        rb->acl      = (int8_t *) (rb->tail + 1); +        rb->acl      = rb->tail + 1;          rb->lock     = (pthread_mutex_t *) (rb->acl + 1);          rb->add      = (pthread_cond_t *) (rb->lock + 1);          rb->del      = rb->add + 1; @@ -153,9 +153,6 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)          rb->api = api;          rb->port_id = port_id; -        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) -                LOG_DBG("Couldn't unmap shared memory."); -          return rb;  } @@ -202,7 +199,7 @@ struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id)          rb->shm_base = shm_base;          rb->head     = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE));          rb->tail     = rb->head + 1; -        rb->acl      = (int8_t *) (rb->tail + 1); +        rb->acl      = rb->tail + 1;          rb->lock     = (pthread_mutex_t *) (rb->acl + 1);          rb->add      = (pthread_cond_t *) (rb->lock + 1);          rb->del      = rb->add + 1; @@ -225,13 +222,16 @@ void shm_rbuff_close(struct shm_rbuff * rb)  void shm_rbuff_destroy(struct shm_rbuff * rb)  { -        char fn[25]; +        char fn[FN_MAX_CHARS];          if (rb == NULL)                  return;          sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); +        if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) +                LOG_DBG("Couldn't unmap shared memory."); +          if (shm_unlink(fn) == -1)                  LOG_DBG("Failed to unlink shm %s.", fn); @@ -251,7 +251,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)                  pthread_mutex_consistent(rb->lock);          }  #endif -        if (*rb->acl) { +        if (*rb->acl == RB_CLOSED) {                  pthread_mutex_unlock(rb->lock);                  return -ENOTALLOC;          } diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index dc1feb10..a8245447 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -42,16 +42,26 @@  #include <ouroboros/logs.h>  #define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof(size_t)                    \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t)                    \                         + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t)  \                         + sizeof(pid_t)) +#ifndef SHM_RDRB_MULTI_BLOCK +#define WAIT_BLOCKS 1 +#else +#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4) +#if WAIT_BLOCKS == 0 +#undef WAIT_BLOCKS +#define WAIT_BLOCKS 1 +#endif +#endif +  #define get_head_ptr(rdrb)                                                     \ -        ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_head             \ +        ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head                 \                                                     * SHM_RDRB_BLOCK_SIZE)))  #define get_tail_ptr(rdrb)                                                     \ -        ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->ptr_tail             \ +        ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail                 \                                                     * SHM_RDRB_BLOCK_SIZE)))  #define idx_to_du_buff_ptr(rdrb, idx)                                          \ @@ -61,13 +71,19 @@          (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE)  #define shm_rdrb_used(rdrb)                                                    \ -        ((*rdrb->ptr_head + (SHM_BUFFER_SIZE) - *rdrb->ptr_tail)               \ +        ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail)                       \           & ((SHM_BUFFER_SIZE) - 1)) +  #define shm_rdrb_free(rdrb, i)                                                 \          (shm_rdrb_used(rdrb) + i < (SHM_BUFFER_SIZE))  #define shm_rdrb_empty(rdrb)                                                   \ -        (*rdrb->ptr_tail == *rdrb->ptr_head) +        (*rdrb->tail == *rdrb->head) + +enum shm_du_buff_flags { +        SDB_VALID = 0, +        SDB_NULL +};  struct shm_du_buff {          size_t size; @@ -76,20 +92,18 @@ struct shm_du_buff {  #endif          size_t du_head;          size_t du_tail; -        pid_t  dst_api; +        size_t flags;          size_t idx;  };  struct shm_rdrbuff { -        uint8_t *         shm_base;    /* start of blocks */ -        size_t *          ptr_head;    /* start of ringbuffer head */ -        size_t *          ptr_tail;    /* start of ringbuffer tail */ -        pthread_mutex_t * lock;        /* lock all free space in shm */ -        size_t *          choked;      /* stale sdu detection */ -        pthread_cond_t *  healthy;     /* du map is healthy */ -        pthread_cond_t *  full;        /* run sanitizer when buffer full */ -        pid_t *           api;         /* api of the irmd owner */ -        enum qos_cube     qos;         /* qos id which this buffer serves */ +        uint8_t *         shm_base; /* start of blocks */ +        size_t *          head;     /* start of ringbuffer head */ +        size_t *          tail;     /* start of ringbuffer tail */ +        pthread_mutex_t * lock;     /* lock all free space in shm */ +        pthread_cond_t *  full;     /* flag when full */ +        pthread_cond_t *  healthy;  /* flag when SDU is read */ +        pid_t *           api;      /* api of the irmd owner */  };  static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -97,61 +111,31 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)  #ifdef SHM_RDRB_MULTI_BLOCK          struct shm_du_buff * sdb;          while (!shm_rdrb_empty(rdrb) && -               (sdb = get_tail_ptr(rdrb))->dst_api == -1) -                *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks) +               (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) +                *rdrb->tail = (*rdrb->tail + sdb->blocks)                          & ((SHM_BUFFER_SIZE) - 1);  #else -        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1) -                *rdrb->ptr_tail = -                        (*rdrb->ptr_tail + 1) & ((SHM_BUFFER_SIZE) - 1); - +        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) +                *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);  #endif +        pthread_cond_broadcast(rdrb->healthy);  } -static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api) -{ -        size_t idx = *rdrb->ptr_tail; -        struct shm_du_buff * buf; - -        while (idx != *rdrb->ptr_head) { -                buf = idx_to_du_buff_ptr(rdrb, idx); -                if (buf->dst_api == api) -                        buf->dst_api = -1; -#ifdef SHM_RDRB_MULTI_BLOCK -                idx = (idx + buf->blocks) & ((SHM_BUFFER_SIZE) - 1); -#else -                idx = (idx + 1) & ((SHM_BUFFER_SIZE) - 1); -#endif -        } - -        garbage_collect(rdrb); - -        *rdrb->choked = 0; -} - -static char * rdrb_filename(enum qos_cube qos) +static char * rdrb_filename(void)  { -        size_t chars = 0;          char * str; -        int qm = QOS_MAX; -        do { -                qm /= 10; -                ++chars; -        } while (qm > 0); - -        str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 1); +        str = malloc(strlen(SHM_RDRB_PREFIX) + 1);          if (str == NULL) {                  LOG_ERR("Failed to create shm_rdrbuff: Out of Memory.");                  return NULL;          } -        sprintf(str, "%s%d", SHM_RDRB_PREFIX, (int) qos); +        sprintf(str, "%s", SHM_RDRB_PREFIX);          return str;  } -/* FIXME: create a ringbuffer for each qos cube in the system */  struct shm_rdrbuff * shm_rdrbuff_create()  {          struct shm_rdrbuff * rdrb; @@ -160,8 +144,7 @@ struct shm_rdrbuff * shm_rdrbuff_create()          uint8_t *            shm_base;          pthread_mutexattr_t  mattr;          pthread_condattr_t   cattr; -        enum qos_cube        qos = QOS_CUBE_BE; -        char *               shm_rdrb_fn = rdrb_filename(qos); +        char *               shm_rdrb_fn = rdrb_filename();          if (shm_rdrb_fn == NULL) {                  LOG_ERR("Could not create rdrbuff. Out of Memory");                  return NULL; @@ -212,14 +195,12 @@ struct shm_rdrbuff * shm_rdrbuff_create()          }          rdrb->shm_base = shm_base; -        rdrb->ptr_head = (size_t *) -                ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); -        rdrb->ptr_tail = rdrb->ptr_head + 1; -        rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); -        rdrb->choked = (size_t *) (rdrb->lock + 1); -        rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); -        rdrb->full = rdrb->healthy + 1; -        rdrb->api = (pid_t *) (rdrb->full + 1); +        rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); +        rdrb->tail = rdrb->head + 1; +        rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); +        rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); +        rdrb->healthy = rdrb->full + 1; +        rdrb->api = (pid_t *) (rdrb->healthy + 1);          pthread_mutexattr_init(&mattr);          pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -236,29 +217,22 @@ struct shm_rdrbuff * shm_rdrbuff_create()          pthread_cond_init(rdrb->full, &cattr);          pthread_cond_init(rdrb->healthy, &cattr); -        *rdrb->ptr_head = 0; -        *rdrb->ptr_tail = 0; - -        *rdrb->choked = 0; +        *rdrb->head = 0; +        *rdrb->tail = 0;          *rdrb->api = getpid(); -        rdrb->qos = qos; -          free(shm_rdrb_fn);          return rdrb;  } -/* FIXME: open a ringbuffer for each qos cube in the system */  struct shm_rdrbuff * shm_rdrbuff_open()  {          struct shm_rdrbuff * rdrb;          int                  shm_fd;          uint8_t *            shm_base; - -        enum qos_cube        qos = QOS_CUBE_BE; -        char *               shm_rdrb_fn = rdrb_filename(qos); +        char *               shm_rdrb_fn = rdrb_filename();          if (shm_rdrb_fn == NULL) {                  LOG_ERR("Could not create rdrbuff. Out of Memory");                  return NULL; @@ -297,32 +271,20 @@ struct shm_rdrbuff * shm_rdrbuff_open()          }          rdrb->shm_base = shm_base; -        rdrb->ptr_head = (size_t *) -                ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); -        rdrb->ptr_tail = rdrb->ptr_head + 1; -        rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); -        rdrb->choked = (size_t *) (rdrb->lock + 1); -        rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); -        rdrb->full = rdrb->healthy + 1; -        rdrb->api = (pid_t *) (rdrb->full + 1); - -        rdrb->qos = qos; +        rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); +        rdrb->tail = rdrb->head + 1; +        rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); +        rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); +        rdrb->healthy = rdrb->full + 1; +        rdrb->api = (pid_t *) (rdrb->healthy + 1);          free(shm_rdrb_fn);          return rdrb;  } -void * shm_rdrbuff_sanitize(void * o) +void shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb)  { -        struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o; -        struct timespec intv -                = {SHM_DU_TIMEOUT_MICROS / MILLION, -                   (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; - -        pid_t   api; - -        assert(o);  #ifdef __APPLE__          pthread_mutex_lock(rdrb->lock); @@ -332,14 +294,10 @@ void * shm_rdrbuff_sanitize(void * o)                  pthread_mutex_consistent(rdrb->lock);          }  #endif -          pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,                               (void *) rdrb->lock); -        while (true) { -                int ret = 0; -                struct timespec now; -                struct timespec dl; +        while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) {  #ifdef __APPLE__                  pthread_cond_wait(rdrb->full, rdrb->lock);  #else @@ -348,49 +306,11 @@ void * shm_rdrbuff_sanitize(void * o)                          pthread_mutex_consistent(rdrb->lock);                  }  #endif -                *rdrb->choked = 1; - -                garbage_collect(rdrb); - -                if (shm_rdrb_empty(rdrb)) { -                        pthread_cond_broadcast(rdrb->healthy); -                        continue; -                } - -                api = get_tail_ptr(rdrb)->dst_api; - -                if (kill(api, 0)) { -                        LOG_DBGF("Dead process %d left stale sdu.", api); -                        clean_sdus(rdrb, api); -                        pthread_cond_broadcast(rdrb->healthy); -                        continue; -                } - -                clock_gettime(CLOCK_REALTIME, &now); -                ts_add(&now, &intv, &dl); -                while (*rdrb->choked) { -                        ret = pthread_cond_timedwait(rdrb->healthy, -                                                     rdrb->lock, -                                                     &dl); -                        if (!ret) -                                continue; -#ifndef __APPLE__ -                        if (ret == EOWNERDEAD) { -                                LOG_WARN("Recovering dead mutex."); -                                pthread_mutex_consistent(rdrb->lock); -                        } -#endif -                        if (ret == ETIMEDOUT) { -                                LOG_DBGF("SDU timed out (dst: %d).", api); -                                clean_sdus(rdrb, api); -                        } -                } -                pthread_cond_broadcast(rdrb->healthy);          } -        pthread_cleanup_pop(true); +        garbage_collect(rdrb); -        return (void *) 0; +        pthread_cleanup_pop(true);  }  void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) @@ -417,7 +337,7 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)          if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1)                  LOG_DBG("Couldn't unmap shared memory."); -        shm_rdrb_fn = rdrb_filename(rdrb->qos); +        shm_rdrb_fn = rdrb_filename();          if (shm_rdrb_fn == NULL) {                  LOG_ERR("Could not create rdrbuff. Out of Memory");                  return; @@ -431,7 +351,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)  }  ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, -                          pid_t                dst_api,                            size_t               headspace,                            size_t               tailspace,                            uint8_t *            data, @@ -444,7 +363,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,          size_t               padblocks = 0;  #endif          ssize_t              sz = size + sizeof(*sdb); -        uint8_t *            write_pos;          assert(rdrb);          assert(data); @@ -469,14 +387,15 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,                  ++blocks;          } -        if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE)) -                padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head; +        if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) +                padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;          if (!shm_rdrb_free(rdrb, blocks + padblocks)) {  #else          if (!shm_rdrb_free(rdrb, 1)) {  #endif -                pthread_cond_signal(rdrb->full); +                LOG_DBG("buffer full, idx = %ld.", *rdrb->tail); +                pthread_cond_broadcast(rdrb->full);                  pthread_mutex_unlock(rdrb->lock);                  return -1;          } @@ -486,31 +405,29 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,                  sdb = get_head_ptr(rdrb);                  sdb->size    = 0;                  sdb->blocks  = padblocks; -                sdb->dst_api = -1; +                sdb->flags   = SDB_NULL;                  sdb->du_head = 0;                  sdb->du_tail = 0; -                sdb->idx     = *rdrb->ptr_head; +                sdb->idx     = *rdrb->head; -                *rdrb->ptr_head = 0; +                *rdrb->head = 0;          }  #endif          sdb          = get_head_ptr(rdrb);          sdb->size    = size; -        sdb->dst_api = dst_api; +        sdb->flags   = SDB_VALID;          sdb->du_head = headspace;          sdb->du_tail = sdb->du_head + len;  #ifdef  SHM_RDRB_MULTI_BLOCK          sdb->blocks  = blocks;  #endif -        write_pos = ((uint8_t *) (sdb + 1)) + headspace; - -        memcpy(write_pos, data, len); +        memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); -        sdb->idx = *rdrb->ptr_head; +        sdb->idx = *rdrb->head;  #ifdef SHM_RDRB_MULTI_BLOCK -        *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1); +        *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1);  #else -        *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1); +        *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1);  #endif          pthread_mutex_unlock(rdrb->lock); @@ -518,7 +435,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,  }  ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, -                           pid_t                 dst_api,                             size_t                headspace,                             size_t                tailspace,                             uint8_t *             data, @@ -531,7 +447,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,          size_t               padblocks = 0;  #endif          ssize_t              sz = size + sizeof(*sdb); -        uint8_t *            write_pos;          assert(rdrb);          assert(data); @@ -559,14 +474,14 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,                  ++blocks;          } -        if (blocks + *rdrb->ptr_head > (SHM_BUFFER_SIZE)) -                padblocks = (SHM_BUFFER_SIZE) - *rdrb->ptr_head; +        if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) +                padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;          while (!shm_rdrb_free(rdrb, (blocks + padblocks))) {  #else          while (!shm_rdrb_free(rdrb, 1)) {  #endif -                pthread_cond_signal(rdrb->full); +                pthread_cond_broadcast(rdrb->full);                  pthread_cond_wait(rdrb->healthy, rdrb->lock);          } @@ -575,31 +490,29 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,                  sdb = get_head_ptr(rdrb);                  sdb->size    = 0;                  sdb->blocks  = padblocks; -                sdb->dst_api = -1; +                sdb->flags   = SDB_NULL;                  sdb->du_head = 0;                  sdb->du_tail = 0; -                sdb->idx     = *rdrb->ptr_head; +                sdb->idx     = *rdrb->head; -                *rdrb->ptr_head = 0; +                *rdrb->head = 0;          }  #endif          sdb          = get_head_ptr(rdrb);          sdb->size    = size; -        sdb->dst_api = dst_api; +        sdb->flags   = SDB_VALID;          sdb->du_head = headspace;          sdb->du_tail = sdb->du_head + len;  #ifdef  SHM_RDRB_MULTI_BLOCK          sdb->blocks  = blocks;  #endif -        write_pos = ((uint8_t *) (sdb + 1)) + headspace; - -        memcpy(write_pos, data, len); +        memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); -        sdb->idx = *rdrb->ptr_head; +        sdb->idx = *rdrb->head;  #ifdef SHM_RDRB_MULTI_BLOCK -        *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & ((SHM_BUFFER_SIZE) - 1); +        *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1);  #else -        *rdrb->ptr_head = (*rdrb->ptr_head + 1) & ((SHM_BUFFER_SIZE) - 1); +        *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1);  #endif          pthread_cleanup_pop(true); @@ -684,18 +597,15 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx)                  return -1;          } -        idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1; +        idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; -        if (idx != *rdrb->ptr_tail) { +        if (idx != *rdrb->tail) {                  pthread_mutex_unlock(rdrb->lock);                  return 0;          }          garbage_collect(rdrb); -        *rdrb->choked = 0; - -        pthread_cond_broadcast(rdrb->healthy);          pthread_mutex_unlock(rdrb->lock);          return 0; | 
