diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 29 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 44 | 
2 files changed, 42 insertions, 31 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index d332a27f..1332b014 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -300,6 +300,7 @@ int flow_accept(int     fd,          rw_lock_rdlock(&_ap_instance->data_lock);          rw_lock_wrlock(&_ap_instance->flows_lock); +          cfd = bmp_allocate(_ap_instance->fds);          if (!bmp_is_id_valid(_ap_instance->fds, cfd)) {                  rw_lock_unlock(&_ap_instance->flows_lock); @@ -588,37 +589,39 @@ ssize_t flow_write(int fd, void * buf, size_t count)  ssize_t flow_read(int fd, void * buf, size_t count)  { -        struct rb_entry * e = NULL; +        int idx = -1;          int n;          uint8_t * sdu;          rw_lock_rdlock(&_ap_instance->data_lock);          rw_lock_rdlock(&_ap_instance->flows_lock); +        if (_ap_instance->flows[fd].port_id < 0) { +                rw_lock_unlock(&_ap_instance->flows_lock); +                rw_lock_unlock(&_ap_instance->data_lock); +                return -1; +        } +          if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { -                if (shm_ap_rbuff_peek(_ap_instance->rb) -                    != _ap_instance->flows[fd].port_id) { -                        rw_lock_unlock(&_ap_instance->flows_lock); -                        rw_lock_unlock(&_ap_instance->data_lock); -                        return -1; -                } +                idx = shm_ap_rbuff_read_port(_ap_instance->rb, +                                           _ap_instance->flows[fd].port_id);          } else { /* block */ -                while (shm_ap_rbuff_peek(_ap_instance->rb) -                       != _ap_instance->flows[fd].port_id) +                while ((idx = shm_ap_rbuff_read_port( +                                _ap_instance->rb, +                                _ap_instance->flows[fd].port_id)) < 0)                          ;          }          rw_lock_unlock(&_ap_instance->flows_lock); -        e = shm_ap_rbuff_read(_ap_instance->rb); -        if (e == NULL) { +        if (idx < 0) {                  rw_lock_unlock(&_ap_instance->data_lock);                  return -1;          }          n = shm_du_map_read_sdu(&sdu,                                  _ap_instance->dum, -                                e->index); +                                idx);          if (n < 0) {                  rw_lock_unlock(&_ap_instance->data_lock);                  return -1; @@ -626,7 +629,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)          memcpy(buf, sdu, MIN(n, count)); -        shm_release_du_buff(_ap_instance->dum, e->index); +        shm_release_du_buff(_ap_instance->dum, idx);          rw_lock_unlock(&_ap_instance->data_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 18fedc88..1cfafeda 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -41,6 +41,8 @@  #define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)    \                            & (SHM_RBUFF_SIZE - 1))  #define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) +#define head_el_ptr (rb->shm_base + *rb->ptr_head) +#define tail_el_ptr (rb->shm_base + *rb->ptr_tail)  struct shm_ap_rbuff {          struct rb_entry * shm_base;    /* start of entry */ @@ -229,7 +231,7 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)                  return -1;          } -        *(rb->shm_base + *rb->ptr_head) = *e; +        *head_el_ptr = *e;          *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);          pthread_mutex_unlock(rb->shm_mutex); @@ -237,50 +239,56 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)          return 0;  } - -int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb) +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)  { -        int port_id = -1; +        struct rb_entry * e = NULL; + +        if (rb == NULL) +                return NULL;          pthread_mutex_lock(rb->shm_mutex);          if (shm_rbuff_used(rb) == 0) {                  pthread_mutex_unlock(rb->shm_mutex); -                return -7; /* -EAGAIN */ +                return NULL;          } -        port_id = (rb->shm_base + *rb->ptr_tail)->port_id; +        e = malloc(sizeof(*e)); +        if (e == NULL) { +                pthread_mutex_unlock(rb->shm_mutex); +                return NULL; +        } + +        *e = *(rb->shm_base + *rb->ptr_tail); + +        *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);          pthread_mutex_unlock(rb->shm_mutex); -        return port_id; +        return e;  } -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)  { -        struct rb_entry * e = NULL; - -        if (rb == NULL) -                return NULL; +        ssize_t idx = -1;          pthread_mutex_lock(rb->shm_mutex);          if (shm_rbuff_used(rb) == 0) {                  pthread_mutex_unlock(rb->shm_mutex); -                return NULL; +                return -1;          } -        e = malloc(sizeof(*e)); -        if (e == NULL) { +        if (tail_el_ptr->port_id != port_id) {                  pthread_mutex_unlock(rb->shm_mutex); -                return NULL; +                return -1;          } -        *e = *(rb->shm_base + *rb->ptr_tail); +        idx = tail_el_ptr->index;          *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);          pthread_mutex_unlock(rb->shm_mutex); -        return e; +        return idx;  } | 
