summaryrefslogtreecommitdiff
path: root/src/lib/shm_rdrbuff.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_rdrbuff.c')
-rw-r--r--src/lib/shm_rdrbuff.c130
1 files changed, 40 insertions, 90 deletions
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 1dfcb2ca..5ae2085d 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -46,29 +46,17 @@
+ sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
-#ifndef SHM_RDRB_MULTI_BLOCK
-#define WAIT_BLOCKS 1
-#else
-#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4)
-#if WAIT_BLOCKS == 0
-#undef WAIT_BLOCKS
-#define WAIT_BLOCKS 1
-#endif
-#endif
-
#define get_head_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \
- * SHM_RDRB_BLOCK_SIZE)))
+ idx_to_du_buff_ptr(rdrb, *rdrb->head)
#define get_tail_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \
- * SHM_RDRB_BLOCK_SIZE)))
+ idx_to_du_buff_ptr(rdrb, *rdrb->tail)
#define idx_to_du_buff_ptr(rdrb, idx) \
((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE))
#define shm_rdrb_used(rdrb) \
- ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \
+ (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1) \
& ((SHM_BUFFER_SIZE) - 1))
#define shm_rdrb_free(rdrb, i) \
@@ -118,6 +106,13 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
pthread_cond_broadcast(rdrb->healthy);
}
+static void sanitize(struct shm_rdrbuff * rdrb)
+{
+ get_head_ptr(rdrb)->flags = SDB_NULL;
+ garbage_collect(rdrb);
+ pthread_mutex_consistent(rdrb->lock);
+}
+
static char * rdrb_filename(void)
{
char * str;
@@ -282,51 +277,6 @@ struct shm_rdrbuff * shm_rdrbuff_open()
return rdrb_create(O_RDWR);
}
-int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,
- struct timespec * timeo)
-{
- struct timespec abstime;
-
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeo, &abstime);
- }
-
-#ifndef HAVE_ROBUST_MUTEX
- pthread_mutex_lock(rdrb->lock);
-#else
- if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
-#endif
-
- while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) {
-#ifndef HAVE_ROBUST_MUTEX
- if (pthread_cond_timedwait(rdrb->full,
- rdrb->lock,
- &abstime) == ETIMEDOUT) {
- pthread_mutex_unlock(rdrb->lock);
- return -ETIMEDOUT;
- }
-#else
- int ret = pthread_cond_timedwait(rdrb->full,
- rdrb->lock,
- &abstime);
- if (ret == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
- if (ret == ETIMEDOUT) {
- pthread_mutex_unlock(rdrb->lock);
- return -ETIMEDOUT;
- }
-#endif
- }
-
- garbage_collect(rdrb);
-
- pthread_mutex_unlock(rdrb->lock);
-
- return 0;
-}
-
void shm_rdrbuff_purge(void)
{
char * shm_rdrb_fn;
@@ -358,19 +308,19 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
+#else
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
#endif
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
#ifdef SHM_RDRB_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_RDRB_BLOCK_SIZE;
- ++blocks;
- }
-
if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
@@ -426,39 +376,41 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
const struct timespec * abstime)
{
struct shm_du_buff * sdb;
- size_t size = headspace + len + tailspace;
+ size_t size = headspace + len + tailspace;
#ifdef SHM_RDRB_MULTI_BLOCK
- size_t blocks = 0;
+ size_t blocks = 0;
size_t padblocks = 0;
+ size_t rblocks;
#endif
- ssize_t sz = size + sizeof(*sdb);
- int ret = 0;
+ ssize_t sz = size + sizeof(*sdb);
+ int ret = 0;
assert(rdrb);
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
+#else
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
+ rblocks = blocks;
#endif
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
(void *) rdrb->lock);
#ifdef SHM_RDRB_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_RDRB_BLOCK_SIZE;
- ++blocks;
- }
+ if (blocks > 1)
+ rblocks = blocks << 1;
- if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
- padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
-
- while (!shm_rdrb_free(rdrb, (blocks + padblocks)) && ret != ETIMEDOUT) {
+ while (!shm_rdrb_free(rdrb, rblocks) && ret != ETIMEDOUT) {
#else
while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) {
#endif
@@ -469,6 +421,11 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
abstime);
else
ret = pthread_cond_wait(rdrb->healthy, rdrb->lock);
+
+#ifdef SHM_RDRB_MULTI_BLOCK
+ if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
+ padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
+#endif
}
if (ret != ETIMEDOUT) {
@@ -547,21 +504,14 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
- if (shm_rdrb_empty(rdrb)) {
- pthread_mutex_unlock(rdrb->lock);
- return -1;
- }
+ assert(!shm_rdrb_empty(rdrb));
idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL;
- if (idx != *rdrb->tail) {
- pthread_mutex_unlock(rdrb->lock);
- return 0;
- }
-
- garbage_collect(rdrb);
+ if (idx == *rdrb->tail)
+ garbage_collect(rdrb);
pthread_mutex_unlock(rdrb->lock);