summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-05-29 10:52:56 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-05-29 10:52:56 +0200
commit81cdb27742f9ae93169b0b03d1a45090c4354f89 (patch)
tree27d596fc4403363cfda9af734fc54edfad5258bd
parent06549f99edb0e25575487c3f76a6c76efa6e4017 (diff)
parent466fa24d57059aa87f432811eedae7b2a61e97d5 (diff)
downloadouroboros-81cdb27742f9ae93169b0b03d1a45090c4354f89.tar.gz
ouroboros-81cdb27742f9ae93169b0b03d1a45090c4354f89.zip
Merged in dstaesse/ouroboros/be-rbuff-sleep (pull request #120)
lib: shm_ap_rbuff: sleep when read on empty rbuff
-rw-r--r--src/ipcpd/local/main.c11
-rw-r--r--src/ipcpd/shim-udp/main.c12
-rw-r--r--src/lib/shm_ap_rbuff.c34
3 files changed, 33 insertions, 24 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index b07b0a52..4a0ad683 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -201,6 +201,11 @@ static void * ipcp_local_sdu_loop(void * o)
struct rb_entry * e;
int fd;
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+ if (e == NULL) {
+ continue;
+ }
+
rw_lock_rdlock(&_ipcp->state_lock);
if (_ipcp->state != IPCP_ENROLLED) {
@@ -208,12 +213,6 @@ static void * ipcp_local_sdu_loop(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- e = shm_ap_rbuff_read(_ap_instance->rb);
- if (e == NULL) {
- rw_lock_unlock(&_ipcp->state_lock);
- continue;
- }
-
rw_lock_rdlock(&_ap_instance->flows_lock);
fd = _ap_instance->in_out[port_id_to_fd(e->port_id)];
if (fd == -1) {
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index c6c16ebf..48fa141e 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -775,6 +775,11 @@ static void * ipcp_udp_sdu_loop(void * o)
int len = 0;
char * buf;
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+ if (e == NULL) {
+ continue;
+ }
+
rw_lock_rdlock(&_ipcp->state_lock);
if (_ipcp->state != IPCP_ENROLLED) {
@@ -782,13 +787,6 @@ static void * ipcp_udp_sdu_loop(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- e = shm_ap_rbuff_read(_ap_instance->rb);
-
- if (e == NULL) {
- rw_lock_unlock(&_ipcp->state_lock);
- continue;
- }
-
len = shm_du_map_read_sdu((uint8_t **) &buf,
_ap_instance->dum,
e->index);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 1cfafeda..a855ed8f 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -36,11 +36,13 @@
#include <errno.h>
#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \
- + 2 * sizeof(size_t) + sizeof(pthread_mutex_t))
+ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ + sizeof (pthread_cond_t))
#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail) \
& (SHM_RBUFF_SIZE - 1))
#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE)
+#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail)
#define head_el_ptr (rb->shm_base + *rb->ptr_head)
#define tail_el_ptr (rb->shm_base + *rb->ptr_tail)
@@ -49,6 +51,7 @@ struct shm_ap_rbuff {
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ pthread_cond_t * work; /* threads will wait for a signal */
pid_t pid; /* pid to which this rb belongs */
int fd;
};
@@ -58,7 +61,8 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
- pthread_mutexattr_t attr;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
char fn[25];
sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
@@ -111,10 +115,15 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
rb->ptr_tail = rb->ptr_head + 1;
rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1);
+ rb->work = (pthread_cond_t *) (rb->shm_mutex + 1);
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
- pthread_mutex_init(rb->shm_mutex, &attr);
+ pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(rb->shm_mutex, &mattr);
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+ pthread_cond_init(rb->work, &cattr);
*rb->ptr_head = 0;
*rb->ptr_tail = 0;
@@ -169,6 +178,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid)
rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
rb->ptr_tail = rb->ptr_head + 1;
rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1);
+ rb->work = (pthread_cond_t *) (rb->shm_mutex + 1);
rb->fd = shm_fd;
rb->pid = pid;
@@ -230,6 +240,8 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}
+ if (shm_rbuff_empty(rb))
+ pthread_cond_broadcast(rb->work);
*head_el_ptr = *e;
*rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
@@ -246,12 +258,12 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
if (rb == NULL)
return NULL;
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->shm_mutex);
pthread_mutex_lock(rb->shm_mutex);
- if (shm_rbuff_used(rb) == 0) {
- pthread_mutex_unlock(rb->shm_mutex);
- return NULL;
- }
+ while(shm_rbuff_empty(rb))
+ pthread_cond_wait(rb->work, rb->shm_mutex);
e = malloc(sizeof(*e));
if (e == NULL) {
@@ -263,7 +275,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
*rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
- pthread_mutex_unlock(rb->shm_mutex);
+ pthread_cleanup_pop(1);
return e;
}
@@ -274,7 +286,7 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_lock(rb->shm_mutex);
- if (shm_rbuff_used(rb) == 0) {
+ if (shm_rbuff_empty(rb)) {
pthread_mutex_unlock(rb->shm_mutex);
return -1;
}