summaryrefslogtreecommitdiff
path: root/src/lib/shm_rdrbuff.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_rdrbuff.c')
-rw-r--r--src/lib/shm_rdrbuff.c266
1 files changed, 110 insertions, 156 deletions
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 12e29bef..7ad1bd2e 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2018
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Random Deletion Ring Buffer for Data Units
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -25,53 +25,37 @@
#include "config.h"
#include <ouroboros/errno.h>
+#include <ouroboros/pthread.h>
#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_du_buff.h>
-#include <ouroboros/time_utils.h>
-#include <pthread.h>
-#include <sys/mman.h>
+#include <assert.h>
#include <fcntl.h>
-#include <unistd.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <stdio.h>
-#include <signal.h>
+#include <unistd.h>
+#include <sys/mman.h>
#include <sys/stat.h>
-#include <stdbool.h>
-#include <assert.h>
#define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE)
#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t) \
+ sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
-
-#ifndef SHM_RDRB_MULTI_BLOCK
-#define WAIT_BLOCKS 1
-#else
-#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4)
-#if WAIT_BLOCKS == 0
-#undef WAIT_BLOCKS
-#define WAIT_BLOCKS 1
-#endif
-#endif
+#define DU_BUFF_OVERHEAD (DU_BUFF_HEADSPACE + DU_BUFF_TAILSPACE)
#define get_head_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \
- * SHM_RDRB_BLOCK_SIZE)))
+ idx_to_du_buff_ptr(rdrb, *rdrb->head)
#define get_tail_ptr(rdrb) \
- ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \
- * SHM_RDRB_BLOCK_SIZE)))
+ idx_to_du_buff_ptr(rdrb, *rdrb->tail)
#define idx_to_du_buff_ptr(rdrb, idx) \
((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE))
-#define block_ptr_to_idx(rdrb, sdb) \
- (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE)
-
#define shm_rdrb_used(rdrb) \
- ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \
+ (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1) \
& ((SHM_BUFFER_SIZE) - 1))
#define shm_rdrb_free(rdrb, i) \
@@ -80,11 +64,6 @@
#define shm_rdrb_empty(rdrb) \
(*rdrb->tail == *rdrb->head)
-enum shm_du_buff_flags {
- SDB_VALID = 0,
- SDB_NULL
-};
-
struct shm_du_buff {
size_t size;
#ifdef SHM_RDRB_MULTI_BLOCK
@@ -92,7 +71,7 @@ struct shm_du_buff {
#endif
size_t du_head;
size_t du_tail;
- size_t flags;
+ size_t refs;
size_t idx;
};
@@ -101,8 +80,7 @@ struct shm_rdrbuff {
size_t * head; /* start of ringbuffer head */
size_t * tail; /* start of ringbuffer tail */
pthread_mutex_t * lock; /* lock all free space in shm */
- pthread_cond_t * full; /* flag when full */
- pthread_cond_t * healthy; /* flag when SDU is read */
+ pthread_cond_t * healthy; /* flag when packet is read */
pid_t * pid; /* pid of the irmd owner */
};
@@ -111,16 +89,25 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
#ifdef SHM_RDRB_MULTI_BLOCK
struct shm_du_buff * sdb;
while (!shm_rdrb_empty(rdrb) &&
- (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL)
+ (sdb = get_tail_ptr(rdrb))->refs == 0)
*rdrb->tail = (*rdrb->tail + sdb->blocks)
& ((SHM_BUFFER_SIZE) - 1);
#else
- while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL)
+ while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0)
*rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
#endif
pthread_cond_broadcast(rdrb->healthy);
}
+#ifdef HAVE_ROBUST_MUTEX
+static void sanitize(struct shm_rdrbuff * rdrb)
+{
+ --get_head_ptr(rdrb)->refs;
+ garbage_collect(rdrb);
+ pthread_mutex_consistent(rdrb->lock);
+}
+#endif
+
static char * rdrb_filename(void)
{
char * str;
@@ -148,8 +135,10 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb)
assert(rdrb);
- if (getpid() != *rdrb->pid && kill(*rdrb->pid, 0) == 0)
+ if (getpid() != *rdrb->pid && kill(*rdrb->pid, 0) == 0) {
+ free(rdrb);
return;
+ }
shm_rdrbuff_close(rdrb);
@@ -182,7 +171,7 @@ static struct shm_rdrbuff * rdrb_create(int flags)
if (fd == -1)
goto fail_open;
- if ((flags & O_CREAT) && ftruncate(fd, SHM_FILE_SIZE - 1) < 0)
+ if ((flags & O_CREAT) && ftruncate(fd, SHM_FILE_SIZE) < 0)
goto fail_truncate;
shm_base = mmap(NULL, SHM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0);
@@ -195,8 +184,7 @@ static struct shm_rdrbuff * rdrb_create(int flags)
rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE);
rdrb->tail = rdrb->head + 1;
rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1);
- rdrb->full = (pthread_cond_t *) (rdrb->lock + 1);
- rdrb->healthy = rdrb->full + 1;
+ rdrb->healthy = (pthread_cond_t *) (rdrb->lock + 1);
rdrb->pid = (pid_t *) (rdrb->healthy + 1);
free(shm_rdrb_fn);
@@ -215,7 +203,7 @@ static struct shm_rdrbuff * rdrb_create(int flags)
return NULL;
}
-struct shm_rdrbuff * shm_rdrbuff_create()
+struct shm_rdrbuff * shm_rdrbuff_create(void)
{
struct shm_rdrbuff * rdrb;
mode_t mask;
@@ -248,9 +236,6 @@ struct shm_rdrbuff * shm_rdrbuff_create()
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_cond_init(rdrb->full, &cattr))
- goto fail_full;
-
if (pthread_cond_init(rdrb->healthy, &cattr))
goto fail_healthy;
@@ -265,8 +250,6 @@ struct shm_rdrbuff * shm_rdrbuff_create()
return rdrb;
fail_healthy:
- pthread_cond_destroy(rdrb->full);
- fail_full:
pthread_condattr_destroy(&cattr);
fail_cattr:
pthread_mutex_destroy(rdrb->lock);
@@ -275,59 +258,14 @@ struct shm_rdrbuff * shm_rdrbuff_create()
fail_mattr:
shm_rdrbuff_destroy(rdrb);
fail_rdrb:
- return NULL;
+ return NULL;
}
-struct shm_rdrbuff * shm_rdrbuff_open()
+struct shm_rdrbuff * shm_rdrbuff_open(void)
{
return rdrb_create(O_RDWR);
}
-int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb,
- struct timespec * timeo)
-{
- struct timespec abstime;
-
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeo, &abstime);
- }
-
-#ifndef HAVE_ROBUST_MUTEX
- pthread_mutex_lock(rdrb->lock);
-#else
- if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
-#endif
-
- while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) {
-#ifndef HAVE_ROBUST_MUTEX
- if (pthread_cond_timedwait(rdrb->full,
- rdrb->lock,
- &abstime) == ETIMEDOUT) {
- pthread_mutex_unlock(rdrb->lock);
- return -ETIMEDOUT;
- }
-#else
- int ret = pthread_cond_timedwait(rdrb->full,
- rdrb->lock,
- &abstime);
- if (ret == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
- if (ret == ETIMEDOUT) {
- pthread_mutex_unlock(rdrb->lock);
- return -ETIMEDOUT;
- }
-#endif
- }
-
- garbage_collect(rdrb);
-
- pthread_mutex_unlock(rdrb->lock);
-
- return 0;
-}
-
void shm_rdrbuff_purge(void)
{
char * shm_rdrb_fn;
@@ -340,14 +278,13 @@ void shm_rdrbuff_purge(void)
free(shm_rdrb_fn);
}
-ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
- size_t headspace,
- size_t tailspace,
- const uint8_t * data,
- size_t len)
+ssize_t shm_rdrbuff_alloc(struct shm_rdrbuff * rdrb,
+ size_t len,
+ uint8_t ** ptr,
+ struct shm_du_buff ** psdb)
{
struct shm_du_buff * sdb;
- size_t size = headspace + len + tailspace;
+ size_t size = DU_BUFF_OVERHEAD + len;
#ifdef SHM_RDRB_MULTI_BLOCK
size_t blocks = 0;
size_t padblocks = 0;
@@ -355,23 +292,24 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
ssize_t sz = size + sizeof(*sdb);
assert(rdrb);
+ assert(psdb);
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
+#else
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
#endif
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
#ifdef SHM_RDRB_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_RDRB_BLOCK_SIZE;
- ++blocks;
- }
-
if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
@@ -379,7 +317,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#else
if (!shm_rdrb_free(rdrb, 1)) {
#endif
- pthread_cond_broadcast(rdrb->full);
pthread_mutex_unlock(rdrb->lock);
return -EAGAIN;
}
@@ -389,7 +326,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
sdb = get_head_ptr(rdrb);
sdb->size = 0;
sdb->blocks = padblocks;
- sdb->flags = SDB_NULL;
+ sdb->refs = 0;
sdb->du_head = 0;
sdb->du_tail = 0;
sdb->idx = *rdrb->head;
@@ -398,7 +335,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
}
#endif
sdb = get_head_ptr(rdrb);
- sdb->flags = SDB_VALID;
+ sdb->refs = 1;
sdb->idx = *rdrb->head;
#ifdef SHM_RDRB_MULTI_BLOCK
sdb->blocks = blocks;
@@ -410,66 +347,64 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
pthread_mutex_unlock(rdrb->lock);
sdb->size = size;
- sdb->du_head = headspace;
+ sdb->du_head = DU_BUFF_HEADSPACE;
sdb->du_tail = sdb->du_head + len;
- if (data != NULL)
- memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len);
+ *psdb = sdb;
+ if (ptr != NULL)
+ *ptr = (uint8_t *) (sdb + 1) + sdb->du_head;
return sdb->idx;
}
-ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
- size_t headspace,
- size_t tailspace,
- const uint8_t * data,
+ssize_t shm_rdrbuff_alloc_b(struct shm_rdrbuff * rdrb,
size_t len,
+ uint8_t ** ptr,
+ struct shm_du_buff ** psdb,
const struct timespec * abstime)
{
struct shm_du_buff * sdb;
- size_t size = headspace + len + tailspace;
+ size_t size = DU_BUFF_OVERHEAD + len;
#ifdef SHM_RDRB_MULTI_BLOCK
- size_t blocks = 0;
+ size_t blocks = 0;
size_t padblocks = 0;
#endif
- ssize_t sz = size + sizeof(*sdb);
- int ret = 0;
+ ssize_t sz = size + sizeof(*sdb);
+ int ret = 0;
assert(rdrb);
+ assert(psdb);
#ifndef SHM_RDRB_MULTI_BLOCK
if (sz > SHM_RDRB_BLOCK_SIZE)
return -EMSGSIZE;
+#else
+ while (sz > 0) {
+ sz -= SHM_RDRB_BLOCK_SIZE;
+ ++blocks;
+ }
#endif
#ifndef HAVE_ROBUST_MUTEX
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
- pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
- (void *) rdrb->lock);
+ pthread_cleanup_push(__cleanup_mutex_unlock, rdrb->lock);
#ifdef SHM_RDRB_MULTI_BLOCK
- while (sz > 0) {
- sz -= SHM_RDRB_BLOCK_SIZE;
- ++blocks;
- }
-
if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
- while (!shm_rdrb_free(rdrb, (blocks + padblocks)) && ret != ETIMEDOUT) {
+ while (!shm_rdrb_free(rdrb, blocks + padblocks) && ret != ETIMEDOUT) {
#else
while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) {
#endif
- pthread_cond_broadcast(rdrb->full);
- if (abstime != NULL)
- ret = pthread_cond_timedwait(rdrb->healthy,
- rdrb->lock,
- abstime);
- else
- ret = pthread_cond_wait(rdrb->healthy, rdrb->lock);
+ ret = __timedwait(rdrb->healthy, rdrb->lock, abstime);
+#ifdef SHM_RDRB_MULTI_BLOCK
+ if (blocks + *rdrb->head > (SHM_BUFFER_SIZE))
+ padblocks = (SHM_BUFFER_SIZE) - *rdrb->head;
+#endif
}
if (ret != ETIMEDOUT) {
@@ -478,7 +413,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
sdb = get_head_ptr(rdrb);
sdb->size = 0;
sdb->blocks = padblocks;
- sdb->flags = SDB_NULL;
+ sdb->refs = 0;
sdb->du_head = 0;
sdb->du_tail = 0;
sdb->idx = *rdrb->head;
@@ -487,7 +422,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
}
#endif
sdb = get_head_ptr(rdrb);
- sdb->flags = SDB_VALID;
+ sdb->refs = 1;
sdb->idx = *rdrb->head;
#ifdef SHM_RDRB_MULTI_BLOCK
sdb->blocks = blocks;
@@ -504,11 +439,12 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
return -ETIMEDOUT;
sdb->size = size;
- sdb->du_head = headspace;
+ sdb->du_head = DU_BUFF_HEADSPACE;
sdb->du_tail = sdb->du_head + len;
- if (data != NULL)
- memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len);
+ *psdb = sdb;
+ if (ptr != NULL)
+ *ptr = (uint8_t *) (sdb + 1) + sdb->du_head;
return sdb->idx;
}
@@ -541,6 +477,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb,
int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
size_t idx)
{
+ struct shm_du_buff * sdb;
+
assert(rdrb);
assert(idx < (SHM_BUFFER_SIZE));
@@ -548,22 +486,18 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
pthread_mutex_lock(rdrb->lock);
#else
if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD)
- pthread_mutex_consistent(rdrb->lock);
+ sanitize(rdrb);
#endif
- if (shm_rdrb_empty(rdrb)) {
- pthread_mutex_unlock(rdrb->lock);
- return -1;
- }
+ /* assert(!shm_rdrb_empty(rdrb)); */
- idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL;
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
- if (idx != *rdrb->tail) {
- pthread_mutex_unlock(rdrb->lock);
- return 0;
+ if (sdb->refs == 1) { /* only stack needs it, can be removed */
+ sdb->refs = 0;
+ if (idx == *rdrb->tail)
+ garbage_collect(rdrb);
}
- garbage_collect(rdrb);
-
pthread_mutex_unlock(rdrb->lock);
return 0;
@@ -590,6 +524,13 @@ uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb)
return (uint8_t *) (sdb + 1) + sdb->du_tail;
}
+size_t shm_du_buff_len(struct shm_du_buff * sdb)
+{
+ assert(sdb);
+
+ return sdb->du_tail - sdb->du_head;
+}
+
uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
size_t size)
{
@@ -636,7 +577,7 @@ uint8_t * shm_du_buff_head_release(struct shm_du_buff * sdb,
}
uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
assert(sdb);
assert(!(size > sdb->du_tail - sdb->du_head));
@@ -654,3 +595,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb,
sdb->du_tail = sdb->du_head + len;
}
+
+int shm_du_buff_wait_ack(struct shm_du_buff * sdb)
+{
+ __sync_add_and_fetch(&sdb->refs, 1);
+
+ return 0;
+}
+
+int shm_du_buff_ack(struct shm_du_buff * sdb)
+{
+ __sync_sub_and_fetch(&sdb->refs, 1);
+ return 0;
+}