From e7aa1ba135d358a0c03c9bcb7157c86a6d9e95c5 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 28 May 2016 22:13:24 +0200 Subject: lib: shm_ap_rbuff: sleep when read on empty rbuff When the ring buffer is empty, a read call will sleep. A write call on an empty ring buffer will wake up sleeping readers. --- src/ipcpd/shim-udp/main.c | 1 - src/lib/shm_ap_rbuff.c | 32 +++++++++++++++++++++++--------- 2 files changed, 23 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index c6c16ebf..33b4be2f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -783,7 +783,6 @@ static void * ipcp_udp_sdu_loop(void * o) } e = shm_ap_rbuff_read(_ap_instance->rb); - if (e == NULL) { rw_lock_unlock(&_ipcp->state_lock); continue; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 1cfafeda..63aa8813 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -36,7 +36,8 @@ #include #define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ - + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ + + sizeof (pthread_cond_t)) #define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \ & (SHM_RBUFF_SIZE - 1)) @@ -49,6 +50,7 @@ struct shm_ap_rbuff { size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pthread_cond_t * work; /* threads will wait for a signal */ pid_t pid; /* pid to which this rb belongs */ int fd; }; @@ -58,7 +60,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; - pthread_mutexattr_t attr; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; char fn[25]; sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); @@ -111,10 +114,15 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); rb->ptr_tail = rb->ptr_head + 1; rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1); + rb->work = (pthread_cond_t *) (rb->shm_mutex + 1); - pthread_mutexattr_init(&attr); - pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(rb->shm_mutex, &attr); + pthread_mutexattr_init(&mattr); + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->shm_mutex, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); + pthread_cond_init(rb->work, &cattr); *rb->ptr_head = 0; *rb->ptr_tail = 0; @@ -169,6 +177,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); rb->ptr_tail = rb->ptr_head + 1; rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1); + rb->work = (pthread_cond_t *) (rb->shm_mutex + 1); rb->fd = shm_fd; rb->pid = pid; @@ -231,6 +240,9 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) return -1; } + if (shm_rbuff_used(rb) == 0) + pthread_cond_broadcast(rb->work); + *head_el_ptr = *e; *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); @@ -246,12 +258,14 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) if (rb == NULL) return NULL; + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void*) rb->shm_mutex); + pthread_mutex_lock(rb->shm_mutex); + while(shm_rbuff_used(rb) == 0) + pthread_cond_wait(rb->work, rb->shm_mutex); - if (shm_rbuff_used(rb) == 0) { - pthread_mutex_unlock(rb->shm_mutex); - return NULL; - } + pthread_cleanup_pop(0); e = malloc(sizeof(*e)); if (e == NULL) { -- cgit v1.2.3 From ee1974debcfba8d8b10ea9a60437d04502b965dc Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 29 May 2016 09:39:47 +0200 Subject: lib: small tweaks for shm_ap_rbuff --- src/lib/shm_ap_rbuff.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 63aa8813..666e053f 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -42,6 +42,7 @@ #define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \ & (SHM_RBUFF_SIZE - 1)) #define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) +#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail) #define head_el_ptr (rb->shm_base + *rb->ptr_head) #define tail_el_ptr (rb->shm_base + *rb->ptr_tail) @@ -239,8 +240,7 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) pthread_mutex_unlock(rb->shm_mutex); return -1; } - - if (shm_rbuff_used(rb) == 0) + if (shm_rbuff_empty(rb)) pthread_cond_broadcast(rb->work); *head_el_ptr = *e; @@ -262,11 +262,9 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) (void*) rb->shm_mutex); pthread_mutex_lock(rb->shm_mutex); - while(shm_rbuff_used(rb) == 0) + while(shm_rbuff_empty(rb)) pthread_cond_wait(rb->work, rb->shm_mutex); - pthread_cleanup_pop(0); - e = malloc(sizeof(*e)); if (e == NULL) { pthread_mutex_unlock(rb->shm_mutex); @@ -277,7 +275,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); - pthread_mutex_unlock(rb->shm_mutex); + pthread_cleanup_pop(1); return e; } @@ -288,7 +286,7 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) pthread_mutex_lock(rb->shm_mutex); - if (shm_rbuff_used(rb) == 0) { + if (shm_rbuff_empty(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } -- cgit v1.2.3 From 462a3fb9b915dedf8b061b68a4db93f6e22ab29d Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 29 May 2016 10:15:22 +0200 Subject: ipcpd: fixes shutdown lockup in local and shim-udp --- src/ipcpd/local/main.c | 11 +++++------ src/ipcpd/shim-udp/main.c | 11 +++++------ src/lib/shm_ap_rbuff.c | 4 ++-- 3 files changed, 12 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index b07b0a52..4a0ad683 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -201,6 +201,11 @@ static void * ipcp_local_sdu_loop(void * o) struct rb_entry * e; int fd; + e = shm_ap_rbuff_read(_ap_instance->rb); + if (e == NULL) { + continue; + } + rw_lock_rdlock(&_ipcp->state_lock); if (_ipcp->state != IPCP_ENROLLED) { @@ -208,12 +213,6 @@ static void * ipcp_local_sdu_loop(void * o) return (void *) 1; /* -ENOTENROLLED */ } - e = shm_ap_rbuff_read(_ap_instance->rb); - if (e == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - continue; - } - rw_lock_rdlock(&_ap_instance->flows_lock); fd = _ap_instance->in_out[port_id_to_fd(e->port_id)]; if (fd == -1) { diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 33b4be2f..48fa141e 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -775,6 +775,11 @@ static void * ipcp_udp_sdu_loop(void * o) int len = 0; char * buf; + e = shm_ap_rbuff_read(_ap_instance->rb); + if (e == NULL) { + continue; + } + rw_lock_rdlock(&_ipcp->state_lock); if (_ipcp->state != IPCP_ENROLLED) { @@ -782,12 +787,6 @@ static void * ipcp_udp_sdu_loop(void * o) return (void *) 1; /* -ENOTENROLLED */ } - e = shm_ap_rbuff_read(_ap_instance->rb); - if (e == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - continue; - } - len = shm_du_map_read_sdu((uint8_t **) &buf, _ap_instance->dum, e->index); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 666e053f..a855ed8f 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -259,9 +259,9 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) return NULL; pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void*) rb->shm_mutex); - + (void *) rb->shm_mutex); pthread_mutex_lock(rb->shm_mutex); + while(shm_rbuff_empty(rb)) pthread_cond_wait(rb->work, rb->shm_mutex); -- cgit v1.2.3