summaryrefslogtreecommitdiff
path: root/src/lib/shm_ap_rbuff.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r--src/lib/shm_ap_rbuff.c148
1 files changed, 130 insertions, 18 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 77e288a8..473894d5 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -40,6 +40,10 @@
#include <signal.h>
#include <sys/stat.h>
+#define FN_MAX_CHARS 255
+#define NORTH false
+#define SOUTH true
+
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))
@@ -59,19 +63,24 @@ struct shm_ap_rbuff {
pthread_cond_t * add; /* SDU arrived */
pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
+ bool dir; /* direction, false = N */
int fd;
};
-struct shm_ap_rbuff * shm_ap_rbuff_create()
+static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
pthread_mutexattr_t mattr;
pthread_condattr_t cattr;
- char fn[25];
+ char fn[FN_MAX_CHARS];
+ mode_t mask;
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
+ if (dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid());
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid());
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -79,6 +88,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
return NULL;
}
+ mask = umask(0);
+
shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
if (shm_fd == -1) {
LOG_DBG("Failed creating ring buffer.");
@@ -86,11 +97,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
return NULL;
}
- if (fchmod(shm_fd, 0666)) {
- LOG_DBG("Failed to chmod shared memory.");
- free(rb);
- return NULL;
- }
+ umask(mask);
if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) {
LOG_DBG("Failed to extend ringbuffer.");
@@ -150,18 +157,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->fd = shm_fd;
rb->api = getpid();
+ rb->dir = dir;
return rb;
}
-struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
+static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
- char fn[25];
+ char fn[FN_MAX_CHARS];
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);
+ if (dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api);
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api);
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -204,9 +215,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
rb->fd = shm_fd;
rb->api = api;
+ rb->dir = dir;
return rb;
}
+
+struct shm_ap_rbuff * shm_ap_rbuff_create_n()
+{
+ return shm_ap_rbuff_create(NORTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_create_s()
+{
+ return shm_ap_rbuff_create(SOUTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api)
+{
+ return shm_ap_rbuff_open(api, NORTH);
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api)
+{
+ return shm_ap_rbuff_open(api, SOUTH);
+}
+
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
{
if (rb == NULL) {
@@ -252,7 +285,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
- sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
+ if (rb->dir == SOUTH)
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api);
+ else
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api);
if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");
@@ -311,15 +347,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)
return -1;
}
- ret = (rb->shm_base + *rb->ptr_tail)->index;
+ ret = tail_el_ptr(rb)->index;
pthread_mutex_unlock(rb->lock);
return ret;
}
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
+static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
+ const struct timespec * timeout)
{
struct timespec abstime;
int ret = 0;
@@ -360,7 +396,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
}
if (ret != ETIMEDOUT)
- ret = (rb->shm_base + *rb->ptr_tail)->port_id;
+ ret = tail_el_ptr(rb)->port_id;
+ else
+ ret = -ETIMEDOUT;
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+}
+
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ bool * set,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret;
+
+ if (set == NULL)
+ return shm_ap_rbuff_peek_b_all(rb, timeout);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id])
+ && (ret != ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb)) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+
+ while (!set[tail_el_ptr(rb)->port_id]) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+ }
+
+ if (ret != ETIMEDOUT)
+ ret = tail_el_ptr(rb)->port_id;
else
ret = -ETIMEDOUT;
@@ -369,6 +480,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
return ret;
}
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;
@@ -434,8 +546,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout)
{
struct timespec abstime;