diff options
Diffstat (limited to 'src/lib/shm_rbuff.c')
| -rw-r--r-- | src/lib/shm_rbuff.c | 211 |
1 files changed, 90 insertions, 121 deletions
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index bee52b4e..ec3bd152 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -45,30 +45,59 @@ #define FN_MAX_CHARS 255 -#define SHM_RB_FILE_SIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t) \ +#define SHM_RBUFF_FILESIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t) \ + 3 * sizeof(size_t) \ + sizeof(pthread_mutex_t) \ - + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb) ((*rb->head + (SHM_RBUFF_SIZE) - *rb->tail) \ - & ((SHM_RBUFF_SIZE) - 1)) -#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < (SHM_RBUFF_SIZE)) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + + 2 * sizeof(pthread_cond_t)) + +#define HEAD(rb) \ + *(rb->shm_base + *rb->head) +#define TAIL(rb) \ + *(rb->shm_base + *rb->tail) +#define ADVANCE(el) \ + (*(el) = (*(el) + 1) & ((SHM_RBUFF_SIZE) - 1)) +#define QUEUED(rb) \ + ((*rb->head - *rb->tail + (SHM_RBUFF_SIZE)) & (SHM_RBUFF_SIZE - 1)) +#define IS_FULL(rb) \ + (QUEUED(rb) == (SHM_RBUFF_SIZE) - 1) +#define IS_EMPTY(rb) \ + (*rb->head == *rb->tail) struct shm_rbuff { ssize_t * shm_base; /* start of entry */ size_t * head; /* start of ringbuffer head */ size_t * tail; /* start of ringbuffer tail */ size_t * acl; /* access control */ - pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_mutex_t * mtx; /* lock all space in shm */ pthread_cond_t * add; /* packet arrived */ pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ int flow_id; /* flow_id of the flow */ }; +static void robust_mutex_lock(pthread_mutex_t * mtx) +{ +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(mtx); +#else + if (pthread_mutex_lock(mtx) == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif +} + +static int robust_wait(pthread_cond_t * cond, + pthread_mutex_t * mtx, + const struct timespec * abstime) +{ + int ret = __timedwait(cond, mtx, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (ret == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif + return ret; +} + + #define MM_FLAGS (PROT_READ | PROT_WRITE) static struct shm_rbuff * rbuff_create(pid_t pid, @@ -90,10 +119,10 @@ static struct shm_rbuff * rbuff_create(pid_t pid, if (fd == -1) goto fail_open; - if ((flags & O_CREAT) && ftruncate(fd, SHM_RB_FILE_SIZE) < 0) + if ((flags & O_CREAT) && ftruncate(fd, SHM_RBUFF_FILESIZE) < 0) goto fail_truncate; - shm_base = mmap(NULL, SHM_RB_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0); + shm_base = mmap(NULL, SHM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0); if (shm_base == MAP_FAILED) goto fail_truncate; @@ -103,8 +132,8 @@ static struct shm_rbuff * rbuff_create(pid_t pid, rb->head = (size_t *) (rb->shm_base + (SHM_RBUFF_SIZE)); rb->tail = rb->head + 1; rb->acl = rb->tail + 1; - rb->lock = (pthread_mutex_t *) (rb->acl + 1); - rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->mtx = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->mtx + 1); rb->del = rb->add + 1; rb->pid = pid; rb->flow_id = flow_id; @@ -123,7 +152,7 @@ static struct shm_rbuff * rbuff_create(pid_t pid, static void rbuff_destroy(struct shm_rbuff * rb) { - munmap(rb->shm_base, SHM_RB_FILE_SIZE); + munmap(rb->shm_base, SHM_RBUFF_FILESIZE); free(rb); } @@ -152,7 +181,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, #ifdef HAVE_ROBUST_MUTEX pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); #endif - if (pthread_mutex_init(rb->lock, &mattr)) + if (pthread_mutex_init(rb->mtx, &mattr)) goto fail_mutex; if (pthread_condattr_init(&cattr)) @@ -185,7 +214,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, fail_add: pthread_condattr_destroy(&cattr); fail_cattr: - pthread_mutex_destroy(rb->lock); + pthread_mutex_destroy(rb->mtx); fail_mutex: pthread_mutexattr_destroy(&mattr); fail_mattr: @@ -200,16 +229,6 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) assert(rb != NULL); -#ifdef CONFIG_OUROBOROS_DEBUG - pthread_mutex_lock(rb->lock); - - *rb->acl = *rb->acl & ACL_FLOWDOWN; - - pthread_cond_broadcast(rb->add); - pthread_cond_broadcast(rb->del); - - pthread_mutex_unlock(rb->lock); -#endif sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); shm_rbuff_close(rb); @@ -238,12 +257,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, assert(rb != NULL); assert(idx < SHM_BUFFER_SIZE); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); if (*rb->acl != ACL_RDWR) { if (*rb->acl & ACL_FLOWDOWN) @@ -253,22 +267,22 @@ int shm_rbuff_write(struct shm_rbuff * rb, goto err; } - if (!shm_rbuff_free(rb)) { + if (IS_FULL(rb)) { ret = -EAGAIN; goto err; } - if (shm_rbuff_empty(rb)) + if (IS_EMPTY(rb)) pthread_cond_broadcast(rb->add); - *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); + HEAD(rb) = (ssize_t) idx; + ADVANCE(rb->head); - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return 0; err: - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return ret; } @@ -281,12 +295,7 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, assert(rb != NULL); assert(idx < SHM_BUFFER_SIZE); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); if (*rb->acl != ACL_RDWR) { if (*rb->acl & ACL_FLOWDOWN) @@ -296,30 +305,26 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, goto err; } - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); - while (!shm_rbuff_free(rb) + while (IS_FULL(rb) && ret != -ETIMEDOUT && !(*rb->acl & ACL_FLOWDOWN)) { - ret = -__timedwait(rb->del, rb->lock, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (ret == -EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + ret = -robust_wait(rb->del, rb->mtx, abstime); } if (ret != -ETIMEDOUT) { - if (shm_rbuff_empty(rb)) + if (IS_EMPTY(rb)) pthread_cond_broadcast(rb->add); - *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); + HEAD(rb) = (ssize_t) idx; + ADVANCE(rb->head); } pthread_cleanup_pop(true); return ret; err: - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return ret; } @@ -342,24 +347,19 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) assert(rb != NULL); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); - if (shm_rbuff_empty(rb)) { + if (IS_EMPTY(rb)) { ret = check_rb_acl(rb); - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return ret; } - ret = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); + ret = TAIL(rb); + ADVANCE(rb->tail); pthread_cond_broadcast(rb->del); - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return ret; } @@ -371,33 +371,24 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, assert(rb != NULL); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); - if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) { - pthread_mutex_unlock(rb->lock); + if (IS_EMPTY(rb) && (*rb->acl & ACL_FLOWDOWN)) { + pthread_mutex_unlock(rb->mtx); return -EFLOWDOWN; } - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); - while (shm_rbuff_empty(rb) && + while (IS_EMPTY(rb) && idx != -ETIMEDOUT && check_rb_acl(rb) == -EAGAIN) { - idx = -__timedwait(rb->add, rb->lock, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (idx == -EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + idx = -robust_wait(rb->add, rb->mtx, abstime); } - if (!shm_rbuff_empty(rb)) { - idx = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); + if (!IS_EMPTY(rb)) { + idx = TAIL(rb); + ADVANCE(rb->tail); pthread_cond_broadcast(rb->del); } else if (idx != -ETIMEDOUT) { idx = check_rb_acl(rb); @@ -415,15 +406,10 @@ void shm_rbuff_set_acl(struct shm_rbuff * rb, { assert(rb != NULL); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); *rb->acl = (size_t) flags; - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); } uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) @@ -432,15 +418,11 @@ uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) assert(rb != NULL); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); + flags = (uint32_t) *rb->acl; - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return flags; } @@ -449,21 +431,13 @@ void shm_rbuff_fini(struct shm_rbuff * rb) { assert(rb != NULL); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif - pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + robust_mutex_lock(rb->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (!IS_EMPTY(rb)) + robust_wait(rb->del, rb->mtx, NULL); - while (!shm_rbuff_empty(rb)) -#ifndef HAVE_ROBUST_MUTEX - pthread_cond_wait(rb->add, rb->lock); -#else - if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif pthread_cleanup_pop(true); } @@ -473,16 +447,11 @@ size_t shm_rbuff_queued(struct shm_rbuff * rb) assert(rb != NULL); -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rb->lock); -#endif + robust_mutex_lock(rb->mtx); - ret = shm_rbuff_used(rb); + ret = QUEUED(rb); - pthread_mutex_unlock(rb->lock); + pthread_mutex_unlock(rb->mtx); return ret; } |
