diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 130 | 
1 files changed, 40 insertions, 90 deletions
| diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 1dfcb2ca..5ae2085d 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -46,29 +46,17 @@                         + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t)  \                         + sizeof(pid_t)) -#ifndef SHM_RDRB_MULTI_BLOCK -#define WAIT_BLOCKS 1 -#else -#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4) -#if WAIT_BLOCKS == 0 -#undef WAIT_BLOCKS -#define WAIT_BLOCKS 1 -#endif -#endif -  #define get_head_ptr(rdrb)                                                     \ -        ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head                 \ -                                                   * SHM_RDRB_BLOCK_SIZE))) +        idx_to_du_buff_ptr(rdrb, *rdrb->head)  #define get_tail_ptr(rdrb)                                                     \ -        ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail                 \ -                                                   * SHM_RDRB_BLOCK_SIZE))) +        idx_to_du_buff_ptr(rdrb, *rdrb->tail)  #define idx_to_du_buff_ptr(rdrb, idx)                                          \          ((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE))  #define shm_rdrb_used(rdrb)                                                    \ -        ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail)                       \ +        (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1)                 \           & ((SHM_BUFFER_SIZE) - 1))  #define shm_rdrb_free(rdrb, i)                                                 \ @@ -118,6 +106,13 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)          pthread_cond_broadcast(rdrb->healthy);  } +static void sanitize(struct shm_rdrbuff * rdrb) +{ +        get_head_ptr(rdrb)->flags = SDB_NULL; +        garbage_collect(rdrb); +        pthread_mutex_consistent(rdrb->lock); +} +  static char * rdrb_filename(void)  {          char * str; @@ -282,51 +277,6 @@ struct shm_rdrbuff * shm_rdrbuff_open()          return rdrb_create(O_RDWR);  } -int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb, -                          struct timespec *    timeo) -{ -        struct timespec abstime; - -        if (timeo != NULL) { -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ts_add(&abstime, timeo, &abstime); -        } - -#ifndef HAVE_ROBUST_MUTEX -        pthread_mutex_lock(rdrb->lock); -#else -        if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) -                pthread_mutex_consistent(rdrb->lock); -#endif - -        while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { -#ifndef HAVE_ROBUST_MUTEX -                if (pthread_cond_timedwait(rdrb->full, -                                           rdrb->lock, -                                           &abstime) == ETIMEDOUT) { -                        pthread_mutex_unlock(rdrb->lock); -                        return -ETIMEDOUT; -                } -#else -                int ret = pthread_cond_timedwait(rdrb->full, -                                                 rdrb->lock, -                                                 &abstime); -                if (ret == EOWNERDEAD) -                        pthread_mutex_consistent(rdrb->lock); -                if (ret == ETIMEDOUT) { -                        pthread_mutex_unlock(rdrb->lock); -                        return -ETIMEDOUT; -                } -#endif -        } - -        garbage_collect(rdrb); - -        pthread_mutex_unlock(rdrb->lock); - -        return 0; -} -  void shm_rdrbuff_purge(void)  {          char * shm_rdrb_fn; @@ -358,19 +308,19 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,  #ifndef SHM_RDRB_MULTI_BLOCK          if (sz > SHM_RDRB_BLOCK_SIZE)                  return -EMSGSIZE; +#else +        while (sz > 0) { +                sz -= SHM_RDRB_BLOCK_SIZE; +                ++blocks; +        }  #endif  #ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) -                pthread_mutex_consistent(rdrb->lock); +                sanitize(rdrb);  #endif  #ifdef SHM_RDRB_MULTI_BLOCK -        while (sz > 0) { -                sz -= SHM_RDRB_BLOCK_SIZE; -                ++blocks; -        } -          if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))                  padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; @@ -426,39 +376,41 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff *    rdrb,                              const struct timespec * abstime)  {          struct shm_du_buff * sdb; -        size_t               size = headspace + len + tailspace; +        size_t               size      = headspace + len + tailspace;  #ifdef SHM_RDRB_MULTI_BLOCK -        size_t               blocks = 0; +        size_t               blocks    = 0;          size_t               padblocks = 0; +        size_t               rblocks;  #endif -        ssize_t              sz = size + sizeof(*sdb); -        int                  ret = 0; +        ssize_t              sz        = size + sizeof(*sdb); +        int                  ret       = 0;          assert(rdrb);  #ifndef SHM_RDRB_MULTI_BLOCK          if (sz > SHM_RDRB_BLOCK_SIZE)                  return -EMSGSIZE; +#else +        while (sz > 0) { +                sz -= SHM_RDRB_BLOCK_SIZE; +                ++blocks; +        } +        rblocks = blocks;  #endif  #ifndef HAVE_ROBUST_MUTEX          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) -                pthread_mutex_consistent(rdrb->lock); +                sanitize(rdrb);  #endif          pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,                               (void *) rdrb->lock);  #ifdef SHM_RDRB_MULTI_BLOCK -        while (sz > 0) { -                sz -= SHM_RDRB_BLOCK_SIZE; -                ++blocks; -        } +        if (blocks > 1) +                rblocks = blocks << 1; -        if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) -                padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; - -        while (!shm_rdrb_free(rdrb, (blocks + padblocks)) && ret != ETIMEDOUT) { +        while (!shm_rdrb_free(rdrb, rblocks) && ret != ETIMEDOUT) {  #else          while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) {  #endif @@ -469,6 +421,11 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff *    rdrb,                                                       abstime);                  else                          ret = pthread_cond_wait(rdrb->healthy, rdrb->lock); + +#ifdef SHM_RDRB_MULTI_BLOCK +                if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) +                        padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; +#endif          }          if (ret != ETIMEDOUT) { @@ -547,21 +504,14 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,          pthread_mutex_lock(rdrb->lock);  #else          if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) -                pthread_mutex_consistent(rdrb->lock); +                sanitize(rdrb);  #endif -        if (shm_rdrb_empty(rdrb)) { -                pthread_mutex_unlock(rdrb->lock); -                return -1; -        } +        assert(!shm_rdrb_empty(rdrb));          idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; -        if (idx != *rdrb->tail) { -                pthread_mutex_unlock(rdrb->lock); -                return 0; -        } - -        garbage_collect(rdrb); +        if (idx == *rdrb->tail) +                garbage_collect(rdrb);          pthread_mutex_unlock(rdrb->lock); | 
