diff options
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 148 |
1 files changed, 130 insertions, 18 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 77e288a8..473894d5 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -40,6 +40,10 @@ #include <signal.h> #include <sys/stat.h> +#define FN_MAX_CHARS 255 +#define NORTH false +#define SOUTH true + #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + 2 * sizeof (pthread_cond_t)) @@ -59,19 +63,24 @@ struct shm_ap_rbuff { pthread_cond_t * add; /* SDU arrived */ pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ + bool dir; /* direction, false = N */ int fd; }; -struct shm_ap_rbuff * shm_ap_rbuff_create() +static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; pthread_mutexattr_t mattr; pthread_condattr_t cattr; - char fn[25]; + char fn[FN_MAX_CHARS]; + mode_t mask; - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); + if (dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid()); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -79,6 +88,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() return NULL; } + mask = umask(0); + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); if (shm_fd == -1) { LOG_DBG("Failed creating ring buffer."); @@ -86,11 +97,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() return NULL; } - if (fchmod(shm_fd, 0666)) { - LOG_DBG("Failed to chmod shared memory."); - free(rb); - return NULL; - } + umask(mask); if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { LOG_DBG("Failed to extend ringbuffer."); @@ -150,18 +157,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->fd = shm_fd; rb->api = getpid(); + rb->dir = dir; return rb; } -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) +static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; - char fn[25]; + char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); + if (dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -204,9 +215,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) rb->fd = shm_fd; rb->api = api; + rb->dir = dir; return rb; } + +struct shm_ap_rbuff * shm_ap_rbuff_create_n() +{ + return shm_ap_rbuff_create(NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_create_s() +{ + return shm_ap_rbuff_create(SOUTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) +{ + return shm_ap_rbuff_open(api, NORTH); +} + +struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) +{ + return shm_ap_rbuff_open(api, SOUTH); +} + void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { if (rb == NULL) { @@ -252,7 +285,10 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); + if (rb->dir == SOUTH) + sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); + else + sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api); if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); @@ -311,15 +347,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) return -1; } - ret = (rb->shm_base + *rb->ptr_tail)->index; + ret = tail_el_ptr(rb)->index; pthread_mutex_unlock(rb->lock); return ret; } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, + const struct timespec * timeout) { struct timespec abstime; int ret = 0; @@ -360,7 +396,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, } if (ret != ETIMEDOUT) - ret = (rb->shm_base + *rb->ptr_tail)->port_id; + ret = tail_el_ptr(rb)->port_id; + else + ret = -ETIMEDOUT; + + pthread_cleanup_pop(true); + + return ret; +} + +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + bool * set, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret; + + if (set == NULL) + return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) + && (ret != ETIMEDOUT)) { + while (shm_rbuff_empty(rb)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + + while (!set[tail_el_ptr(rb)->port_id]) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->del, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + } + + if (ret != ETIMEDOUT) + ret = tail_el_ptr(rb)->port_id; else ret = -ETIMEDOUT; @@ -369,6 +480,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, return ret; } + struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; @@ -434,8 +546,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, const struct timespec * timeout) { struct timespec abstime; |