From d32d7eae209f3ee09a690cc9adb6ea277e0d17aa Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 20 May 2016 17:22:04 +0200 Subject: lib: allow parallel connections dev.c: read now only reads an SDU if is is for the correct port_id shm_ap_rbuff: added a function peek() that returns the port_id of the tail. --- include/ouroboros/shm_ap_rbuff.h | 1 + src/lib/dev.c | 17 ++++++++++------- src/lib/shm_ap_rbuff.c | 24 ++++++++++++++++++++---- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 956a9540..0ececf88 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -47,6 +47,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb); void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e); +int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb); struct rb_entry * shm_ap_rbuff_read(); #endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/src/lib/dev.c b/src/lib/dev.c index c365a17b..fab37bbf 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -588,18 +588,21 @@ ssize_t flow_read(int fd, void * buf, size_t count) rw_lock_rdlock(&_ap_instance->flows_lock); if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - e = shm_ap_rbuff_read(_ap_instance->rb); - } else { - - /* FIXME: this will throw away packets for other fd's */ - while (e == NULL || - e->port_id != _ap_instance->flows[fd].port_id) { - e = shm_ap_rbuff_read(_ap_instance->rb); + if (shm_ap_rbuff_peek(_ap_instance->rb) + != _ap_instance->flows[fd].port_id) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ap_instance->data_lock); + return -1; } + } else { /* block */ + while (shm_ap_rbuff_peek(_ap_instance->rb) + != _ap_instance->flows[fd].port_id) + ; } rw_lock_unlock(&_ap_instance->flows_lock); + e = shm_ap_rbuff_read(_ap_instance->rb); if (e == NULL) { rw_lock_unlock(&_ap_instance->data_lock); return -1; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 71a7e733..18fedc88 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -219,8 +219,6 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) { - struct rb_entry * pos; - if (rb == NULL || e == NULL) return -1; @@ -231,8 +229,7 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) return -1; } - pos = rb->shm_base + *rb->ptr_head; - *pos = *e; + *(rb->shm_base + *rb->ptr_head) = *e; *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); pthread_mutex_unlock(rb->shm_mutex); @@ -240,6 +237,25 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) return 0; } + +int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb) +{ + int port_id = -1; + + pthread_mutex_lock(rb->shm_mutex); + + if (shm_rbuff_used(rb) == 0) { + pthread_mutex_unlock(rb->shm_mutex); + return -7; /* -EAGAIN */ + } + + port_id = (rb->shm_base + *rb->ptr_tail)->port_id; + + pthread_mutex_unlock(rb->shm_mutex); + + return port_id; +} + struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; -- cgit v1.2.3