diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/ssm/rbuff.c | 52 |
1 files changed, 48 insertions, 4 deletions
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c index 77e23010..c149c306 100644 --- a/src/lib/ssm/rbuff.c +++ b/src/lib/ssm/rbuff.c @@ -80,6 +80,7 @@ struct ssm_rbuff { pthread_cond_t * del; /* signal when data removed */ pid_t pid; /* pid of the owner */ int flow_id; /* flow_id of the flow */ + size_t n_users; /* in-flight users */ }; #define MM_FLAGS (PROT_READ | PROT_WRITE) @@ -119,6 +120,7 @@ static struct ssm_rbuff * rbuff_create(pid_t pid, rb->del = rb->add + 1; rb->pid = pid; rb->flow_id = flow_id; + rb->n_users = 0; return rb; @@ -228,6 +230,15 @@ void ssm_rbuff_close(struct ssm_rbuff * rb) { assert(rb); + /* + * Caller must set ACL_FLOWDOWN first; if a user becomes + * cancellable, push a cleanup that decrements n_users. + */ + while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) { + struct timespec tic = { 0, 100000 }; + nanosleep(&tic, NULL); + } + rbuff_destroy(rb); } @@ -240,6 +251,8 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); if (acl != ACL_RDWR) { if (acl & ACL_FLOWDOWN) { @@ -269,11 +282,13 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, pthread_mutex_unlock(rb->mtx); + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return 0; fail_mutex: pthread_mutex_unlock(rb->mtx); fail_acl: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -287,6 +302,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); if (acl != ACL_RDWR) { if (acl & ACL_FLOWDOWN) { @@ -325,6 +342,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, pthread_mutex_unlock(rb->mtx); fail_acl: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -351,11 +369,21 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) assert(rb != NULL); - if (IS_EMPTY(rb)) - return check_rb_acl(rb); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + + if (IS_EMPTY(rb)) { + ret = check_rb_acl(rb); + goto out; + } robust_mutex_lock(rb->mtx); + if (IS_EMPTY(rb)) { + pthread_mutex_unlock(rb->mtx); + ret = check_rb_acl(rb); + goto out; + } + ret = TAIL(rb); ADVANCE_TAIL(rb); @@ -363,6 +391,8 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) pthread_mutex_unlock(rb->mtx); + out: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -374,9 +404,13 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) - return -EFLOWDOWN; + if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) { + idx = -EFLOWDOWN; + goto out; + } robust_mutex_lock(rb->mtx); @@ -402,6 +436,8 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, assert(idx != -EAGAIN); + out: + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return idx; } @@ -410,7 +446,11 @@ void ssm_rbuff_set_acl(struct ssm_rbuff * rb, { assert(rb != NULL); + robust_mutex_lock(rb->mtx); __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(rb->add); + pthread_cond_broadcast(rb->del); + pthread_mutex_unlock(rb->mtx); } uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb) @@ -424,6 +464,8 @@ void ssm_rbuff_fini(struct ssm_rbuff * rb) { assert(rb != NULL); + __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); + robust_mutex_lock(rb->mtx); pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); @@ -432,6 +474,8 @@ void ssm_rbuff_fini(struct ssm_rbuff * rb) robust_wait(rb->del, rb->mtx, NULL); pthread_cleanup_pop(true); + + __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); } size_t ssm_rbuff_queued(struct ssm_rbuff * rb) |
