summaryrefslogtreecommitdiff
path: root/src/lib/shm_ap_rbuff.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-04 17:07:45 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-04 17:07:45 +0200
commit51bb7c6f315dba4044eb2ece5c1312362674d7fb (patch)
treefff3eeadb6eb04edee21340ecdcdfc13da3115b4 /src/lib/shm_ap_rbuff.c
parent44b55f0b03ffc6aff4f1c290b5687d5ac95ddbf9 (diff)
parent4931526cf9b5e40294e043deab856f25bf56c7cf (diff)
downloadouroboros-51bb7c6f315dba4044eb2ece5c1312362674d7fb.tar.gz
ouroboros-51bb7c6f315dba4044eb2ece5c1312362674d7fb.zip
Merged in dstaesse/ouroboros/be-blocking (pull request #185)
lib: Revise blocking I/O
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r--src/lib/shm_ap_rbuff.c126
1 files changed, 94 insertions, 32 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index be4cd0c2..56555533 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -42,23 +42,27 @@
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
-#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \
+#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
- + sizeof (pthread_cond_t))
+ + 2 * sizeof (pthread_cond_t))
-#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \
- & (SHM_RBUFF_SIZE - 1))
-#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE)
+#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \
+ & (SHM_BUFFER_SIZE - 1))
+#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE)
#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail)
-#define head_el_ptr (rb->shm_base + *rb->ptr_head)
-#define tail_el_ptr (rb->shm_base + *rb->ptr_tail)
+#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head)
+#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail)
+#define clean_sdus(rb) \
+ while (!shm_rbuff_empty(rb) && tail_el_ptr(rb)->port_id < 0) \
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); \
struct shm_ap_rbuff {
struct rb_entry * shm_base; /* start of entry */
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
pthread_mutex_t * lock; /* lock all free space in shm */
- pthread_cond_t * work; /* threads will wait for a signal */
+ pthread_cond_t * add; /* SDU arrived */
+ pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
int fd;
};
@@ -125,10 +129,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
}
rb->shm_base = shm_base;
- rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
rb->ptr_tail = rb->ptr_head + 1;
rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1);
- rb->work = (pthread_cond_t *) (rb->lock + 1);
+ rb->add = (pthread_cond_t *) (rb->lock + 1);
+ rb->del = rb->add + 1;
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
@@ -138,7 +143,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
- pthread_cond_init(rb->work, &cattr);
+ pthread_cond_init(rb->add, &cattr);
+ pthread_cond_init(rb->del, &cattr);
*rb->ptr_head = 0;
*rb->ptr_tail = 0;
@@ -190,10 +196,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
}
rb->shm_base = shm_base;
- rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
rb->ptr_tail = rb->ptr_head + 1;
rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1);
- rb->work = (pthread_cond_t *) (rb->lock + 1);
+ rb->add = (pthread_cond_t *) (rb->lock + 1);
+ rb->del = rb->add + 1;
rb->fd = shm_fd;
rb->api = api;
@@ -243,7 +250,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
}
pthread_mutex_destroy(rb->lock);
- pthread_cond_destroy(rb->work);
+ pthread_cond_destroy(rb->add);
+ pthread_cond_destroy(rb->del);
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
@@ -275,10 +283,10 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
}
if (shm_rbuff_empty(rb))
- pthread_cond_broadcast(rb->work);
+ pthread_cond_broadcast(rb->add);
- *head_el_ptr = *e;
- *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
+ *head_el_ptr(rb) = *e;
+ *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1);
pthread_mutex_unlock(rb->lock);
@@ -307,13 +315,17 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,
pthread_mutex_consistent(rb->lock);
}
+ clean_sdus(rb);
+
while (shm_rbuff_empty(rb)) {
if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->work,
+ ret = pthread_cond_timedwait(rb->add,
rb->lock,
&abstime);
else
- ret = pthread_cond_wait(rb->work, rb->lock);
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+ clean_sdus(rb);
if (ret == EOWNERDEAD) {
LOG_DBG("Recovering dead mutex.");
@@ -348,11 +360,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
pthread_mutex_consistent(rb->lock);
}
- while (!shm_rbuff_empty(rb) && tail_el_ptr->port_id < 0)
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ clean_sdus(rb);
while (shm_rbuff_empty(rb))
- if (pthread_cond_wait(rb->work, rb->lock) == EOWNERDEAD) {
+ if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) {
LOG_DBG("Recovering dead mutex.");
pthread_mutex_consistent(rb->lock);
}
@@ -365,7 +376,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
*e = *(rb->shm_base + *rb->ptr_tail);
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
pthread_cleanup_pop(true);
@@ -381,24 +392,75 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_consistent(rb->lock);
}
- if (shm_rbuff_empty(rb)) {
+ clean_sdus(rb);
+
+ if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {
pthread_mutex_unlock(rb->lock);
return -1;
}
- while (tail_el_ptr->port_id < 0)
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ idx = tail_el_ptr(rb)->index;
- if (tail_el_ptr->port_id != port_id) {
- pthread_mutex_unlock(rb->lock);
- return -1;
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+
+ pthread_cond_broadcast(rb->del);
+
+ pthread_mutex_unlock(rb->lock);
+
+ return idx;
+}
+
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret = 0;
+ ssize_t idx = -1;
+
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
}
- idx = tail_el_ptr->index;
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+ clean_sdus(rb);
- pthread_mutex_unlock(rb->lock);
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while (tail_el_ptr(rb)->port_id != port_id) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+ clean_sdus(rb);
+
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+
+ if (ret == ETIMEDOUT) {
+ pthread_mutex_unlock(rb->lock);
+ return -ret;
+ }
+ }
+
+ idx = tail_el_ptr(rb)->index;
+
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+
+ pthread_cond_broadcast(rb->del);
+
+ pthread_cleanup_pop(true);
return idx;
}