summaryrefslogtreecommitdiff
path: root/src/lib/shm_ap_rbuff.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r--src/lib/shm_ap_rbuff.c70
1 files changed, 63 insertions, 7 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 69e96c40..f54627b7 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/shm_du_map.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -34,7 +35,6 @@
#include <string.h>
#include <stdint.h>
#include <unistd.h>
-#include <stdbool.h>
#include <errno.h>
#include <sys/stat.h>
@@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->work = (pthread_cond_t *) (rb->shm_mutex + 1);
pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(rb->shm_mutex, &mattr);
@@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
{
char fn[25];
+ struct shm_du_map * dum = NULL;
if (rb == NULL) {
LOG_DBGF("Bogus input. Bugging out.");
@@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
}
if (rb->api != getpid()) {
- LOG_ERR("Tried to destroy other AP's rbuff.");
- return;
+ dum = shm_du_map_open();
+ if (shm_du_map_owner(dum) == getpid()) {
+ LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.",
+ rb->api, getpid());
+ shm_du_map_close(dum);
+ } else {
+ LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.",
+ getpid(), rb->api);
+ shm_du_map_close(dum);
+ return;
+ }
}
if (close(rb->fd) < 0)
@@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
if (rb == NULL || e == NULL)
return -1;
- pthread_mutex_lock(rb->shm_mutex);
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
if (!shm_rbuff_free(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+
if (shm_rbuff_empty(rb))
pthread_cond_broadcast(rb->work);
@@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->shm_mutex);
- pthread_mutex_lock(rb->shm_mutex);
+
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
+
+ while (tail_el_ptr->port_id < 0)
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
while(shm_rbuff_empty(rb))
- pthread_cond_wait(rb->work, rb->shm_mutex);
+ if (pthread_cond_wait(rb->work, rb->shm_mutex)
+ == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
e = malloc(sizeof(*e));
if (e == NULL) {
@@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
{
ssize_t idx = -1;
- pthread_mutex_lock(rb->shm_mutex);
+ if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->shm_mutex);
+ }
if (shm_rbuff_empty(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+ while (tail_el_ptr->port_id < 0)
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+
if (tail_el_ptr->port_id != port_id) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
@@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
+
+pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb)
+{
+ pid_t api = 0;
+ if (rb == NULL)
+ return 0;
+
+ pthread_mutex_lock(rb->shm_mutex);
+ api = rb->api;
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return api;
+}
+
+void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb)
+{
+ if (rb == NULL)
+ return;
+
+ pthread_mutex_lock(rb->shm_mutex);
+ *rb->ptr_tail = 0;
+ *rb->ptr_head = 0;
+ pthread_mutex_unlock(rb->shm_mutex);
+}