summaryrefslogtreecommitdiff
path: root/src/lib/ssm
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/ssm')
-rw-r--r--src/lib/ssm/rbuff.c95
-rw-r--r--src/lib/ssm/tests/rbuff_test.c30
2 files changed, 70 insertions, 55 deletions
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c
index c149c306..0121af89 100644
--- a/src/lib/ssm/rbuff.c
+++ b/src/lib/ssm/rbuff.c
@@ -74,7 +74,7 @@ struct ssm_rbuff {
ssize_t * shm_base; /* start of shared memory */
size_t * head; /* start of ringbuffer */
size_t * tail;
- size_t * acl; /* access control */
+ size_t * flags; /* out-of-band flags (RB_*) */
pthread_mutex_t * mtx; /* lock for cond vars only */
pthread_cond_t * add; /* signal when new data */
pthread_cond_t * del; /* signal when data removed */
@@ -114,8 +114,8 @@ static struct ssm_rbuff * rbuff_create(pid_t pid,
rb->shm_base = shm_base;
rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE));
rb->tail = (size_t *) (rb->head + 1);
- rb->acl = (size_t *) (rb->tail + 1);
- rb->mtx = (pthread_mutex_t *) (rb->acl + 1);
+ rb->flags = (size_t *) (rb->tail + 1);
+ rb->mtx = (pthread_mutex_t *) (rb->flags + 1);
rb->add = (pthread_cond_t *) (rb->mtx + 1);
rb->del = rb->add + 1;
rb->pid = pid;
@@ -181,7 +181,7 @@ struct ssm_rbuff * ssm_rbuff_create(pid_t pid,
if (pthread_cond_init(rb->del, &cattr))
goto fail_del;
- *rb->acl = ACL_RDWR;
+ *rb->flags = RB_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -231,7 +231,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
assert(rb);
/*
- * Caller must set ACL_FLOWDOWN first; if a user becomes
+ * Caller must set RB_FLOWDOWN first; if a user becomes
* cancellable, push a cleanup that decrements n_users.
*/
while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) {
@@ -245,7 +245,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
int ssm_rbuff_write(struct ssm_rbuff * rb,
size_t off)
{
- size_t acl;
+ size_t flags;
bool was_empty;
int ret = 0;
@@ -253,15 +253,15 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl != ACL_RDWR) {
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags != RB_RDWR) {
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
- goto fail_acl;
+ goto fail_flags;
}
- if (acl & ACL_RDONLY) {
+ if (!(flags & RB_WR)) {
ret = -ENOTALLOC;
- goto fail_acl;
+ goto fail_flags;
}
}
@@ -287,7 +287,7 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
fail_mutex:
pthread_mutex_unlock(rb->mtx);
- fail_acl:
+ fail_flags:
__atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
@@ -296,7 +296,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
size_t off,
const struct timespec * abstime)
{
- size_t acl;
+ size_t flags;
int ret = 0;
bool was_empty;
@@ -304,15 +304,15 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl != ACL_RDWR) {
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags != RB_RDWR) {
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
- goto fail_acl;
+ goto fail_flags;
}
- if (acl & ACL_RDONLY) {
+ if (!(flags & RB_WR)) {
ret = -ENOTALLOC;
- goto fail_acl;
+ goto fail_flags;
}
}
@@ -321,8 +321,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
while (IS_FULL(rb) && ret != -ETIMEDOUT) {
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
break;
}
@@ -341,25 +341,28 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_mutex_unlock(rb->mtx);
- fail_acl:
+ fail_flags:
__atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
-static int check_rb_acl(struct ssm_rbuff * rb)
+static int check_rb_flags(struct ssm_rbuff * rb)
{
- size_t acl;
+ size_t flags;
assert(rb != NULL);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
- if (acl & ACL_FLOWDOWN)
+ if (flags & RB_FLOWDOWN)
return -EFLOWDOWN;
- if (acl & ACL_FLOWPEER)
+ if (flags & RB_FLOWPEER)
return -EFLOWPEER;
+ if (!(flags & RB_RD))
+ return -ENOTALLOC;
+
return -EAGAIN;
}
@@ -372,7 +375,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
if (IS_EMPTY(rb)) {
- ret = check_rb_acl(rb);
+ ret = check_rb_flags(rb);
goto out;
}
@@ -380,7 +383,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
if (IS_EMPTY(rb)) {
pthread_mutex_unlock(rb->mtx);
- ret = check_rb_acl(rb);
+ ret = check_rb_flags(rb);
goto out;
}
@@ -400,14 +403,14 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
const struct timespec * abstime)
{
ssize_t idx = -1;
- size_t acl;
+ size_t flags;
assert(rb != NULL);
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (IS_EMPTY(rb) && (flags & RB_FLOWDOWN)) {
idx = -EFLOWDOWN;
goto out;
}
@@ -418,7 +421,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
while (IS_EMPTY(rb) &&
idx != -ETIMEDOUT &&
- check_rb_acl(rb) == -EAGAIN) {
+ check_rb_flags(rb) == -EAGAIN) {
idx = -robust_wait(rb->add, rb->mtx, abstime);
}
@@ -429,7 +432,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
ADVANCE_TAIL(rb);
pthread_cond_broadcast(rb->del);
} else if (idx != -ETIMEDOUT) {
- idx = check_rb_acl(rb);
+ idx = check_rb_flags(rb);
}
pthread_mutex_unlock(rb->mtx);
@@ -441,23 +444,35 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
return idx;
}
-void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
- uint32_t flags)
+void ssm_rbuff_set_bits(struct ssm_rbuff * rb,
+ uint32_t bits)
+{
+ assert(rb != NULL);
+
+ robust_mutex_lock(rb->mtx);
+ __atomic_fetch_or(rb->flags, (size_t) bits, __ATOMIC_SEQ_CST);
+ pthread_cond_broadcast(rb->add);
+ pthread_cond_broadcast(rb->del);
+ pthread_mutex_unlock(rb->mtx);
+}
+
+void ssm_rbuff_clr_bits(struct ssm_rbuff * rb,
+ uint32_t bits)
{
assert(rb != NULL);
robust_mutex_lock(rb->mtx);
- __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST);
+ __atomic_fetch_and(rb->flags, ~(size_t) bits, __ATOMIC_SEQ_CST);
pthread_cond_broadcast(rb->add);
pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->mtx);
}
-uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb)
+uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb)
{
assert(rb != NULL);
- return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ return (uint32_t) __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
}
void ssm_rbuff_fini(struct ssm_rbuff * rb)
diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c
index 58cb39c3..48e5a714 100644
--- a/src/lib/ssm/tests/rbuff_test.c
+++ b/src/lib/ssm/tests/rbuff_test.c
@@ -206,10 +206,10 @@ static int test_ssm_rbuff_fill_drain(void)
return TEST_RC_FAIL;
}
-static int test_ssm_rbuff_acl(void)
+static int test_ssm_rbuff_flags(void)
{
struct ssm_rbuff * rb;
- uint32_t acl;
+ uint32_t flags;
TEST_START();
@@ -219,16 +219,16 @@ static int test_ssm_rbuff_acl(void)
goto fail;
}
- acl = ssm_rbuff_get_acl(rb);
- if (acl != ACL_RDWR) {
- printf("Expected ACL_RDWR, got %u.\n", acl);
+ flags = ssm_rbuff_get_flags(rb);
+ if (flags != RB_RDWR) {
+ printf("Expected RB_RDWR, got %u.\n", flags);
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDONLY);
- acl = ssm_rbuff_get_acl(rb);
- if (acl != ACL_RDONLY) {
- printf("Expected ACL_RDONLY, got %u.\n", acl);
+ ssm_rbuff_clr_bits(rb, RB_WR);
+ flags = ssm_rbuff_get_flags(rb);
+ if (flags != RB_RD) {
+ printf("Expected RB_RD, got %u.\n", flags);
goto fail_rb;
}
@@ -237,7 +237,7 @@ static int test_ssm_rbuff_acl(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) {
printf("Expected -EFLOWDOWN on FLOWDOWN.\n");
goto fail_rb;
@@ -553,7 +553,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &interval, &abs_timeout);
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
ret = ssm_rbuff_read_b(rb, &abs_timeout);
if (ret != -EFLOWDOWN) {
@@ -561,7 +561,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDWR);
+ ssm_rbuff_clr_bits(rb, RB_FLOWDOWN);
for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
if (ssm_rbuff_write(rb, i) < 0) {
@@ -573,7 +573,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &interval, &abs_timeout);
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
if (ret != -EFLOWDOWN) {
@@ -581,7 +581,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDWR);
+ ssm_rbuff_clr_bits(rb, RB_FLOWDOWN);
while (ssm_rbuff_read(rb) >= 0)
;
@@ -664,7 +664,7 @@ int rbuff_test(int argc,
ret |= test_ssm_rbuff_write_read();
ret |= test_ssm_rbuff_read_empty();
ret |= test_ssm_rbuff_fill_drain();
- ret |= test_ssm_rbuff_acl();
+ ret |= test_ssm_rbuff_flags();
ret |= test_ssm_rbuff_open_close();
ret |= test_ssm_rbuff_threaded();
ret |= test_ssm_rbuff_blocking();