diff options
-rw-r--r-- | include/ouroboros/shm_rbuff.h | 4 | ||||
-rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/lib/config.h.in | 1 | ||||
-rw-r--r-- | src/lib/dev.c | 15 | ||||
-rw-r--r-- | src/lib/shm_rbuff.c | 10 | ||||
-rw-r--r-- | src/lib/shm_rbuff_ll.c | 62 | ||||
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 62 |
7 files changed, 141 insertions, 15 deletions
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index c47ae46e..d8c53ee0 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -54,6 +54,10 @@ void shm_rbuff_fini(struct shm_rbuff * rb); int shm_rbuff_write(struct shm_rbuff * rb, size_t idx); +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime); + ssize_t shm_rbuff_read(struct shm_rbuff * rb); ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index d5e80fc9..1a3c6ba3 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -139,6 +139,8 @@ mark_as_advanced(LIBRT_LIBRARIES LIBPTHREAD_LIBRARIES set(SHM_BUFFER_SIZE 4096 CACHE STRING "Number of blocks in packet buffer, must be a power of 2") +set(SHM_RBUFF_SIZE 1024 CACHE STRING + "Number of blocks in rbuff buffer, must be a power of 2") set(SYS_MAX_FLOWS 10240 CACHE STRING "Maximum number of total flows for this system") set(PROG_MAX_FLOWS 4096 CACHE STRING diff --git a/src/lib/config.h.in b/src/lib/config.h.in index cc0845d3..3e5a7b1e 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -36,6 +36,7 @@ #define SHM_RDRB_NAME "@SHM_RDRB_NAME@" #define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@ #define SHM_BUFFER_SIZE @SHM_BUFFER_SIZE@ +#define SHM_RBUFF_SIZE @SHM_RBUFF_SIZE@ #if defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__)) /* Avoid a bug in robust mutex implementation of glibc 2.25 */ diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d5676af..ee7839c8 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -933,7 +933,11 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - ret = shm_rbuff_write(flow->tx_rb, idx); + if (flags & FLOWFWNOBLOCK) + ret = shm_rbuff_write(flow->tx_rb, idx); + else + ret = shm_rbuff_write_b(flow->tx_rb, idx, abstime); + if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else @@ -1444,9 +1448,11 @@ int ipcp_flow_write(int fd, return -ENOMEM; } - ret = shm_rbuff_write(flow->tx_rb, idx); + ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret == 0) shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + else + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.lock); @@ -1544,10 +1550,11 @@ int local_flow_write(int fd, pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - - ret = shm_rbuff_write(flow->tx_rb, idx); + ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret == 0) shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + else + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index f7f383fc..ebae9702 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -45,14 +45,14 @@ #define FN_MAX_CHARS 255 -#define SHM_RB_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \ +#define SHM_RB_FILE_SIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t) \ + 3 * sizeof(size_t) \ + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb) ((*rb->head + (SHM_BUFFER_SIZE) - *rb->tail) \ - & ((SHM_BUFFER_SIZE) - 1)) -#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < (SHM_BUFFER_SIZE)) +#define shm_rbuff_used(rb) ((*rb->head + (SHM_RBUFF_SIZE) - *rb->tail) \ + & ((SHM_RBUFF_SIZE) - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < (SHM_RBUFF_SIZE)) #define shm_rbuff_empty(rb) (*rb->head == *rb->tail) #define head_el_ptr(rb) (rb->shm_base + *rb->head) #define tail_el_ptr(rb) (rb->shm_base + *rb->tail) @@ -109,7 +109,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, close(fd); rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + (SHM_BUFFER_SIZE)); + rb->head = (size_t *) (rb->shm_base + (SHM_RBUFF_SIZE)); rb->tail = rb->head + 1; rb->acl = rb->tail + 1; rb->lock = (pthread_mutex_t *) (rb->acl + 1); diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 6146f178..263f4909 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -65,7 +65,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, do { ohead = nhead; - nhead = (ohead + 1) & ((SHM_BUFFER_SIZE) - 1); + nhead = (ohead + 1) & ((SHM_RBUFF_SIZE) - 1); nhead = __sync_val_compare_and_swap(rb->head, ohead, nhead); } while (nhead != ohead); @@ -75,6 +75,63 @@ int shm_rbuff_write(struct shm_rbuff * rb, return 0; } +/* FIXME: this is a copy of the pthr implementation */ +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + int ret = 0; + + assert(rb); + assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { + if (abstime != NULL) + ret = -pthread_cond_timedwait(rb->add, + rb->lock, + abstime); + else + ret = -pthread_cond_wait(rb->add, rb->lock); +#ifdef HAVE_ROBUST_MUTEX + if (ret == -EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + if (ret != -ETIMEDOUT) { + *head_el_ptr(rb) = (ssize_t) idx; + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) -1); + } + + pthread_cleanup_pop(true); + + return ret; + err: + pthread_mutex_unlock(rb->lock); + return ret; +} + ssize_t shm_rbuff_read(struct shm_rbuff * rb) { size_t otail; @@ -90,7 +147,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) do { otail = ntail; - ntail = (otail + 1) & ((SHM_BUFFER_SIZE) - 1); + ntail = (otail + 1) & ((SHM_RBUFF_SIZE) - 1); ntail = __sync_val_compare_and_swap(rb->tail, otail, ntail); } while (ntail != otail); @@ -136,7 +193,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (idx != -ETIMEDOUT) { /* do a nonblocking read */ idx = shm_rbuff_read(rb); - assert(idx >= 0); } diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index b4134bf6..5a58605b 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -72,7 +72,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, pthread_cond_broadcast(rb->add); *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_BUFFER_SIZE) -1); + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_mutex_unlock(rb->lock); @@ -82,6 +82,62 @@ int shm_rbuff_write(struct shm_rbuff * rb, return ret; } +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + int ret = 0; + + assert(rb); + assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { + if (abstime != NULL) + ret = -pthread_cond_timedwait(rb->add, + rb->lock, + abstime); + else + ret = -pthread_cond_wait(rb->add, rb->lock); +#ifdef HAVE_ROBUST_MUTEX + if (ret == -EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + if (ret != -ETIMEDOUT) { + *head_el_ptr(rb) = (ssize_t) idx; + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); + } + + pthread_cleanup_pop(true); + + return ret; + err: + pthread_mutex_unlock(rb->lock); + return ret; +} + ssize_t shm_rbuff_read(struct shm_rbuff * rb) { ssize_t ret = 0; @@ -102,7 +158,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) } ret = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->lock); @@ -147,7 +203,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (idx != -ETIMEDOUT) { idx = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_cond_broadcast(rb->del); } |