diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-08-03 13:40:16 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-08-03 13:40:16 +0200 |
commit | 4931526cf9b5e40294e043deab856f25bf56c7cf (patch) | |
tree | fff3eeadb6eb04edee21340ecdcdfc13da3115b4 /src/lib/shm_ap_rbuff.c | |
parent | ca494922f3815077efbcd28da3748df38c8a6961 (diff) | |
download | ouroboros-4931526cf9b5e40294e043deab856f25bf56c7cf.tar.gz ouroboros-4931526cf9b5e40294e043deab856f25bf56c7cf.zip |
lib: Revise blocking I/O
Blocking I/O now uses condition variables in the shared memory instead
of busy waiting. Timeouts can be specified. This requires the size of
the rbuffs and du_map to be the same, to guarantee that when the
shm_du_map is not full, the ap_rbuffs can't be full either.
Added the timeout option to the flow for future use.
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 126 |
1 files changed, 94 insertions, 32 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index be4cd0c2..56555533 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -42,23 +42,27 @@ #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC -#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ - + sizeof (pthread_cond_t)) + + 2 * sizeof (pthread_cond_t)) -#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \ - & (SHM_RBUFF_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) #define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail) -#define head_el_ptr (rb->shm_base + *rb->ptr_head) -#define tail_el_ptr (rb->shm_base + *rb->ptr_tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail) +#define clean_sdus(rb) \ + while (!shm_rbuff_empty(rb) && tail_el_ptr(rb)->port_id < 0) \ + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); \ struct shm_ap_rbuff { struct rb_entry * shm_base; /* start of entry */ size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * work; /* threads will wait for a signal */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ int fd; }; @@ -125,10 +129,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); rb->ptr_tail = rb->ptr_head + 1; rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); - rb->work = (pthread_cond_t *) (rb->lock + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; pthread_mutexattr_init(&mattr); pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); @@ -138,7 +143,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); - pthread_cond_init(rb->work, &cattr); + pthread_cond_init(rb->add, &cattr); + pthread_cond_init(rb->del, &cattr); *rb->ptr_head = 0; *rb->ptr_tail = 0; @@ -190,10 +196,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) } rb->shm_base = shm_base; - rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); rb->ptr_tail = rb->ptr_head + 1; rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1); - rb->work = (pthread_cond_t *) (rb->lock + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; rb->fd = shm_fd; rb->api = api; @@ -243,7 +250,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) } pthread_mutex_destroy(rb->lock); - pthread_cond_destroy(rb->work); + pthread_cond_destroy(rb->add); + pthread_cond_destroy(rb->del); if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); @@ -275,10 +283,10 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) } if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->work); + pthread_cond_broadcast(rb->add); - *head_el_ptr = *e; - *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + *head_el_ptr(rb) = *e; + *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1); pthread_mutex_unlock(rb->lock); @@ -307,13 +315,17 @@ int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, pthread_mutex_consistent(rb->lock); } + clean_sdus(rb); + while (shm_rbuff_empty(rb)) { if (timeout != NULL) - ret = pthread_cond_timedwait(rb->work, + ret = pthread_cond_timedwait(rb->add, rb->lock, &abstime); else - ret = pthread_cond_wait(rb->work, rb->lock); + ret = pthread_cond_wait(rb->add, rb->lock); + + clean_sdus(rb); if (ret == EOWNERDEAD) { LOG_DBG("Recovering dead mutex."); @@ -348,11 +360,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) pthread_mutex_consistent(rb->lock); } - while (!shm_rbuff_empty(rb) && tail_el_ptr->port_id < 0) - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + clean_sdus(rb); while (shm_rbuff_empty(rb)) - if (pthread_cond_wait(rb->work, rb->lock) == EOWNERDEAD) { + if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { LOG_DBG("Recovering dead mutex."); pthread_mutex_consistent(rb->lock); } @@ -365,7 +376,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) *e = *(rb->shm_base + *rb->ptr_tail); - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); pthread_cleanup_pop(true); @@ -381,24 +392,75 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_consistent(rb->lock); } - if (shm_rbuff_empty(rb)) { + clean_sdus(rb); + + if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { pthread_mutex_unlock(rb->lock); return -1; } - while (tail_el_ptr->port_id < 0) - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + idx = tail_el_ptr(rb)->index; - if (tail_el_ptr->port_id != port_id) { - pthread_mutex_unlock(rb->lock); - return -1; + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_cond_broadcast(rb->del); + + pthread_mutex_unlock(rb->lock); + + return idx; +} + +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + ssize_t idx = -1; + + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); } - idx = tail_el_ptr->index; + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } - *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + clean_sdus(rb); - pthread_mutex_unlock(rb->lock); + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (tail_el_ptr(rb)->port_id != port_id) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->del, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->del, rb->lock); + + clean_sdus(rb); + + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } + + if (ret == ETIMEDOUT) { + pthread_mutex_unlock(rb->lock); + return -ret; + } + } + + idx = tail_el_ptr(rb)->index; + + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_cond_broadcast(rb->del); + + pthread_cleanup_pop(true); return idx; } |