diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-29 10:52:56 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-29 10:52:56 +0200 | 
| commit | 81cdb27742f9ae93169b0b03d1a45090c4354f89 (patch) | |
| tree | 27d596fc4403363cfda9af734fc54edfad5258bd /src/lib | |
| parent | 06549f99edb0e25575487c3f76a6c76efa6e4017 (diff) | |
| parent | 466fa24d57059aa87f432811eedae7b2a61e97d5 (diff) | |
| download | ouroboros-81cdb27742f9ae93169b0b03d1a45090c4354f89.tar.gz ouroboros-81cdb27742f9ae93169b0b03d1a45090c4354f89.zip | |
Merged in dstaesse/ouroboros/be-rbuff-sleep (pull request #120)
lib: shm_ap_rbuff: sleep when read on empty rbuff
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 34 | 
1 files changed, 23 insertions, 11 deletions
| diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 1cfafeda..a855ed8f 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -36,11 +36,13 @@  #include <errno.h>  #define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry)          \ -                             + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) +                             + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \ +                             + sizeof (pthread_cond_t))  #define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)    \                            & (SHM_RBUFF_SIZE - 1))  #define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) +#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail)  #define head_el_ptr (rb->shm_base + *rb->ptr_head)  #define tail_el_ptr (rb->shm_base + *rb->ptr_tail) @@ -49,6 +51,7 @@ struct shm_ap_rbuff {          size_t *          ptr_head;    /* start of ringbuffer head */          size_t *          ptr_tail;    /* start of ringbuffer tail */          pthread_mutex_t * shm_mutex;   /* lock all free space in shm */ +        pthread_cond_t *  work;        /* threads will wait for a signal */          pid_t             pid;         /* pid to which this rb belongs */          int               fd;  }; @@ -58,7 +61,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()          struct shm_ap_rbuff * rb;          int                   shm_fd;          struct rb_entry *     shm_base; -        pthread_mutexattr_t   attr; +        pthread_mutexattr_t   mattr; +        pthread_condattr_t    cattr;          char                  fn[25];          sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); @@ -111,10 +115,15 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()          rb->ptr_head  = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);          rb->ptr_tail  = rb->ptr_head + 1;          rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1); +        rb->work      = (pthread_cond_t *) (rb->shm_mutex + 1); -        pthread_mutexattr_init(&attr); -        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); -        pthread_mutex_init(rb->shm_mutex, &attr); +        pthread_mutexattr_init(&mattr); +        pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +        pthread_mutex_init(rb->shm_mutex, &mattr); + +        pthread_condattr_init(&cattr); +        pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +        pthread_cond_init(rb->work, &cattr);          *rb->ptr_head = 0;          *rb->ptr_tail = 0; @@ -169,6 +178,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid)          rb->ptr_head  = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);          rb->ptr_tail  = rb->ptr_head + 1;          rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1); +        rb->work      = (pthread_cond_t *) (rb->shm_mutex + 1);          rb->fd = shm_fd;          rb->pid = pid; @@ -230,6 +240,8 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)                  pthread_mutex_unlock(rb->shm_mutex);                  return -1;          } +        if (shm_rbuff_empty(rb)) +                pthread_cond_broadcast(rb->work);          *head_el_ptr = *e;          *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); @@ -246,12 +258,12 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)          if (rb == NULL)                  return NULL; +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->shm_mutex);          pthread_mutex_lock(rb->shm_mutex); -        if (shm_rbuff_used(rb) == 0) { -                pthread_mutex_unlock(rb->shm_mutex); -                return NULL; -        } +        while(shm_rbuff_empty(rb)) +                pthread_cond_wait(rb->work, rb->shm_mutex);          e = malloc(sizeof(*e));          if (e == NULL) { @@ -263,7 +275,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)          *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); -        pthread_mutex_unlock(rb->shm_mutex); +        pthread_cleanup_pop(1);          return e;  } @@ -274,7 +286,7 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)          pthread_mutex_lock(rb->shm_mutex); -        if (shm_rbuff_used(rb) == 0) { +        if (shm_rbuff_empty(rb)) {                  pthread_mutex_unlock(rb->shm_mutex);                  return -1;          } | 
