diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/lib/config.h.in | 1 | ||||
| -rw-r--r-- | src/lib/dev.c | 15 | ||||
| -rw-r--r-- | src/lib/shm_rbuff.c | 10 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 62 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 62 | 
6 files changed, 137 insertions, 15 deletions
| diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index d5e80fc9..1a3c6ba3 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -139,6 +139,8 @@ mark_as_advanced(LIBRT_LIBRARIES LIBPTHREAD_LIBRARIES  set(SHM_BUFFER_SIZE 4096 CACHE STRING      "Number of blocks in packet buffer, must be a power of 2") +set(SHM_RBUFF_SIZE 1024 CACHE STRING +    "Number of blocks in rbuff buffer, must be a power of 2")  set(SYS_MAX_FLOWS 10240 CACHE STRING    "Maximum number of total flows for this system")  set(PROG_MAX_FLOWS 4096 CACHE STRING diff --git a/src/lib/config.h.in b/src/lib/config.h.in index cc0845d3..3e5a7b1e 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -36,6 +36,7 @@  #define SHM_RDRB_NAME       "@SHM_RDRB_NAME@"  #define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@  #define SHM_BUFFER_SIZE     @SHM_BUFFER_SIZE@ +#define SHM_RBUFF_SIZE      @SHM_RBUFF_SIZE@  #if defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__))  /* Avoid a bug in robust mutex implementation of glibc 2.25 */ diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d5676af..ee7839c8 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -933,7 +933,11 @@ ssize_t flow_write(int          fd,          pthread_rwlock_rdlock(&ai.lock); -        ret = shm_rbuff_write(flow->tx_rb, idx); +        if (flags & FLOWFWNOBLOCK) +                ret = shm_rbuff_write(flow->tx_rb, idx); +        else +                ret = shm_rbuff_write_b(flow->tx_rb, idx, abstime); +          if (ret < 0)                  shm_rdrbuff_remove(ai.rdrb, idx);          else @@ -1444,9 +1448,11 @@ int ipcp_flow_write(int                  fd,                  return -ENOMEM;          } -        ret = shm_rbuff_write(flow->tx_rb, idx); +        ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);          if (ret == 0)                  shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); +        else +                shm_rdrbuff_remove(ai.rdrb, idx);          pthread_rwlock_unlock(&ai.lock); @@ -1544,10 +1550,11 @@ int local_flow_write(int    fd,                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC;          } - -        ret = shm_rbuff_write(flow->tx_rb, idx); +        ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);          if (ret == 0)                  shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); +        else +                shm_rdrbuff_remove(ai.rdrb, idx);          pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index f7f383fc..ebae9702 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -45,14 +45,14 @@  #define FN_MAX_CHARS 255 -#define SHM_RB_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t)           \ +#define SHM_RB_FILE_SIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t)            \                            + 3 * sizeof(size_t)                          \                            + sizeof(pthread_mutex_t)                     \                            + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb) ((*rb->head + (SHM_BUFFER_SIZE) - *rb->tail)   \ -                            & ((SHM_BUFFER_SIZE) - 1)) -#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < (SHM_BUFFER_SIZE)) +#define shm_rbuff_used(rb) ((*rb->head + (SHM_RBUFF_SIZE) - *rb->tail)   \ +                            & ((SHM_RBUFF_SIZE) - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < (SHM_RBUFF_SIZE))  #define shm_rbuff_empty(rb) (*rb->head == *rb->tail)  #define head_el_ptr(rb) (rb->shm_base + *rb->head)  #define tail_el_ptr(rb) (rb->shm_base + *rb->tail) @@ -109,7 +109,7 @@ struct shm_rbuff * rbuff_create(pid_t pid,          close(fd);          rb->shm_base = shm_base; -        rb->head     = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE)); +        rb->head     = (size_t *) (rb->shm_base + (SHM_RBUFF_SIZE));          rb->tail     = rb->head + 1;          rb->acl      = rb->tail + 1;          rb->lock     = (pthread_mutex_t *) (rb->acl + 1); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 6146f178..263f4909 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -65,7 +65,7 @@ int shm_rbuff_write(struct shm_rbuff * rb,          do {                  ohead = nhead; -                nhead = (ohead + 1) & ((SHM_BUFFER_SIZE) - 1); +                nhead = (ohead + 1) & ((SHM_RBUFF_SIZE) - 1);                  nhead = __sync_val_compare_and_swap(rb->head, ohead, nhead);          } while (nhead != ohead); @@ -75,6 +75,63 @@ int shm_rbuff_write(struct shm_rbuff * rb,          return 0;  } +/* FIXME: this is a copy of the pthr implementation */ +int shm_rbuff_write_b(struct shm_rbuff *      rb, +                      size_t                  idx, +                      const struct timespec * abstime) +{ +        int ret = 0; + +        assert(rb); +        assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) +                pthread_mutex_consistent(rb->lock); +#endif + +        if (*rb->acl != ACL_RDWR) { +                if (*rb->acl & ACL_FLOWDOWN) +                        ret = -EFLOWDOWN; +                else if (*rb->acl & ACL_RDONLY) +                        ret = -ENOTALLOC; +                goto err; +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { +                if (abstime != NULL) +                        ret = -pthread_cond_timedwait(rb->add, +                                                      rb->lock, +                                                      abstime); +                else +                        ret = -pthread_cond_wait(rb->add, rb->lock); +#ifdef HAVE_ROBUST_MUTEX +                if (ret == -EOWNERDEAD) +                        pthread_mutex_consistent(rb->lock); +#endif +        } + +        if (shm_rbuff_empty(rb)) +                pthread_cond_broadcast(rb->add); + +        if (ret != -ETIMEDOUT) { +                *head_el_ptr(rb) = (ssize_t) idx; +                *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) -1); +        } + +        pthread_cleanup_pop(true); + +        return ret; + err: +        pthread_mutex_unlock(rb->lock); +        return ret; +} +  ssize_t shm_rbuff_read(struct shm_rbuff * rb)  {          size_t otail; @@ -90,7 +147,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          do {                  otail = ntail; -                ntail = (otail + 1) & ((SHM_BUFFER_SIZE) - 1); +                ntail = (otail + 1) & ((SHM_RBUFF_SIZE) - 1);                  ntail = __sync_val_compare_and_swap(rb->tail, otail, ntail);          } while (ntail != otail); @@ -136,7 +193,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          if (idx != -ETIMEDOUT) {                  /* do a nonblocking read */                  idx = shm_rbuff_read(rb); -                  assert(idx >= 0);          } diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index b4134bf6..5a58605b 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -72,7 +72,7 @@ int shm_rbuff_write(struct shm_rbuff * rb,                  pthread_cond_broadcast(rb->add);          *head_el_ptr(rb) = (ssize_t) idx; -        *rb->head = (*rb->head + 1) & ((SHM_BUFFER_SIZE) -1); +        *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1);          pthread_mutex_unlock(rb->lock); @@ -82,6 +82,62 @@ int shm_rbuff_write(struct shm_rbuff * rb,          return ret;  } +int shm_rbuff_write_b(struct shm_rbuff *      rb, +                      size_t                  idx, +                      const struct timespec * abstime) +{ +        int ret = 0; + +        assert(rb); +        assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) +                pthread_mutex_consistent(rb->lock); +#endif + +        if (*rb->acl != ACL_RDWR) { +                if (*rb->acl & ACL_FLOWDOWN) +                        ret = -EFLOWDOWN; +                else if (*rb->acl & ACL_RDONLY) +                        ret = -ENOTALLOC; +                goto err; +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { +                if (abstime != NULL) +                        ret = -pthread_cond_timedwait(rb->add, +                                                      rb->lock, +                                                      abstime); +                else +                        ret = -pthread_cond_wait(rb->add, rb->lock); +#ifdef HAVE_ROBUST_MUTEX +                if (ret == -EOWNERDEAD) +                        pthread_mutex_consistent(rb->lock); +#endif +        } + +        if (shm_rbuff_empty(rb)) +                pthread_cond_broadcast(rb->add); + +        if (ret != -ETIMEDOUT) { +                *head_el_ptr(rb) = (ssize_t) idx; +                *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); +        } + +        pthread_cleanup_pop(true); + +        return ret; + err: +        pthread_mutex_unlock(rb->lock); +        return ret; +} +  ssize_t shm_rbuff_read(struct shm_rbuff * rb)  {          ssize_t ret = 0; @@ -102,7 +158,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          }          ret = *tail_el_ptr(rb); -        *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); +        *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1);          pthread_cond_broadcast(rb->del);          pthread_mutex_unlock(rb->lock); @@ -147,7 +203,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          if (idx != -ETIMEDOUT) {                  idx = *tail_el_ptr(rb); -                *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); +                *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1);                  pthread_cond_broadcast(rb->del);          } | 
