diff options
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 70 |
1 files changed, 63 insertions, 7 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 69e96c40..f54627b7 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -26,6 +26,7 @@ #include <ouroboros/logs.h> #include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_du_map.h> #include <pthread.h> #include <sys/mman.h> @@ -34,7 +35,6 @@ #include <string.h> #include <stdint.h> #include <unistd.h> -#include <stdbool.h> #include <errno.h> #include <sys/stat.h> @@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->work = (pthread_cond_t *) (rb->shm_mutex + 1); pthread_mutexattr_init(&mattr); + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(rb->shm_mutex, &mattr); @@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) { char fn[25]; + struct shm_du_map * dum = NULL; if (rb == NULL) { LOG_DBGF("Bogus input. Bugging out."); @@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) } if (rb->api != getpid()) { - LOG_ERR("Tried to destroy other AP's rbuff."); - return; + dum = shm_du_map_open(); + if (shm_du_map_owner(dum) == getpid()) { + LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.", + rb->api, getpid()); + shm_du_map_close(dum); + } else { + LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", + getpid(), rb->api); + shm_du_map_close(dum); + return; + } } if (close(rb->fd) < 0) @@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) if (rb == NULL || e == NULL) return -1; - pthread_mutex_lock(rb->shm_mutex); + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } if (!shm_rbuff_free(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } + if (shm_rbuff_empty(rb)) pthread_cond_broadcast(rb->work); @@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->shm_mutex); - pthread_mutex_lock(rb->shm_mutex); + + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } + + while (tail_el_ptr->port_id < 0) + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); while(shm_rbuff_empty(rb)) - pthread_cond_wait(rb->work, rb->shm_mutex); + if (pthread_cond_wait(rb->work, rb->shm_mutex) + == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } e = malloc(sizeof(*e)); if (e == NULL) { @@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) { ssize_t idx = -1; - pthread_mutex_lock(rb->shm_mutex); + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } if (shm_rbuff_empty(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } + while (tail_el_ptr->port_id < 0) + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + if (tail_el_ptr->port_id != port_id) { pthread_mutex_unlock(rb->shm_mutex); return -1; @@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } + +pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb) +{ + pid_t api = 0; + if (rb == NULL) + return 0; + + pthread_mutex_lock(rb->shm_mutex); + api = rb->api; + pthread_mutex_unlock(rb->shm_mutex); + + return api; +} + +void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) +{ + if (rb == NULL) + return; + + pthread_mutex_lock(rb->shm_mutex); + *rb->ptr_tail = 0; + *rb->ptr_head = 0; + pthread_mutex_unlock(rb->shm_mutex); +} |