diff options
| -rw-r--r-- | include/ouroboros/local-dev.h | 7 | ||||
| -rw-r--r-- | include/ouroboros/shm_ap_rbuff.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 35 | ||||
| -rw-r--r-- | src/lib/dev.c | 32 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 31 | 
5 files changed, 53 insertions, 54 deletions
| diff --git a/include/ouroboros/local-dev.h b/include/ouroboros/local-dev.h index b4915672..77ff47e9 100644 --- a/include/ouroboros/local-dev.h +++ b/include/ouroboros/local-dev.h @@ -25,10 +25,9 @@  #ifndef OUROBOROS_LOCAL_DEV_H  #define OUROBOROS_LOCAL_DEV_H -/* returns flow descriptor and rb_entry, no access to du_buff */ -int     local_flow_read(struct rb_entry * e); +struct rb_entry * local_flow_read(int fd); -int     local_flow_write(int               fd, -                         struct rb_entry * e); +int               local_flow_write(int               fd, +                                   struct rb_entry * e);  #endif /* OUROBOROS_LOCAL_DEV_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 1e45ef7f..453e4bf8 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -47,7 +47,7 @@ void                  shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);  void                  shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb,                                               int                   port_id); -void                  shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, +int                   shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb,                                                int                   port_id);  int                   shm_ap_rbuff_write(struct shm_ap_rbuff * rb, diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 7d23c08d..b8b3335c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -24,6 +24,7 @@  #include "ipcp.h"  #include <ouroboros/errno.h>  #include <ouroboros/dev.h> +#include <ouroboros/select.h>  #include <ouroboros/ipcp-dev.h>  #include <ouroboros/local-dev.h>  #define OUROBOROS_PREFIX "ipcpd/local" @@ -66,8 +67,10 @@ void local_data_fini()  static void * ipcp_local_sdu_loop(void * o)  {          while (true) { -                struct rb_entry e; -                int fd = local_flow_read(&e); +                int fd; +                struct rb_entry * e; + +                fd = flow_select(NULL, NULL);                  pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -77,13 +80,18 @@ static void * ipcp_local_sdu_loop(void * o)                  }                  pthread_rwlock_rdlock(&local_data.lock); + +                e = local_flow_read(fd); +                  fd = local_data.in_out[fd]; -                pthread_rwlock_unlock(&local_data.lock);                  if (fd != -1) -                        local_flow_write(fd, &e); +                        local_flow_write(fd, e); +                pthread_rwlock_unlock(&local_data.lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); + +                free(e);          }          return (void *) 1; @@ -209,8 +217,6 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)          int out_fd = -1;          int ret = -1; -        LOG_DBG("Received response for fd %d: %d.", fd, response); -          if (response)                  return 0; @@ -235,25 +241,22 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)  static int ipcp_local_flow_dealloc(int fd)  { -        int out_fd = -1; +        struct timespec t = {0, 10000}; -        pthread_rwlock_rdlock(&ipcpi.state_lock); -        pthread_rwlock_wrlock(&local_data.lock); +        if (fd < 0) +                return -EINVAL; -        out_fd = local_data.in_out[fd]; +        while (flow_dealloc(fd) == -EBUSY) +                nanosleep(&t, NULL); -        if (out_fd != -1) { -                local_data.in_out[out_fd] = -1; -                flow_dealloc(out_fd); -        } +        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&local_data.lock);          local_data.in_out[fd] = -1;          pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); -          LOG_INFO("Flow with fd %d deallocated.", fd);          return 0; diff --git a/src/lib/dev.c b/src/lib/dev.c index 577fa7a7..a3082a7d 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -548,6 +548,12 @@ int flow_dealloc(int fd)                  return -ENOTALLOC;          } +        if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -EBUSY; +        } +          msg.port_id = ai.flows[fd].port_id;          port_destroy(&ai.ports[msg.port_id]); @@ -563,8 +569,6 @@ int flow_dealloc(int fd)          bmp_release(ai.fds, fd); -        shm_ap_rbuff_close_port(ai.rb, msg.port_id); -          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); @@ -1091,26 +1095,32 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)          return 0;  } -int local_flow_read(struct rb_entry * e) +struct rb_entry * local_flow_read(int fd)  { -        int fd; - -        *e = *(shm_ap_rbuff_read(ai.rb)); +        int port_id; +        struct rb_entry * e = NULL;          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); -        fd = ai.ports[e->port_id].fd; +        port_id = ai.flows[fd].port_id;          pthread_rwlock_unlock(&ai.flows_lock);          pthread_rwlock_unlock(&ai.data_lock); -        return fd; +        if (port_id != -1) { +                e = malloc(sizeof(*e)); +                if (e == NULL) +                        return NULL; +                e->index = shm_ap_rbuff_read_port(ai.rb, port_id); +        } + +        return e;  }  int local_flow_write(int fd, struct rb_entry * e)  { -        if (e == NULL) +        if (e == NULL || fd < 0)                  return -EINVAL;          pthread_rwlock_rdlock(&ai.data_lock); @@ -1135,9 +1145,7 @@ int local_flow_write(int fd, struct rb_entry * e)  int ipcp_read_shim(struct shm_du_buff ** sdb)  {          int fd; -        struct rb_entry * e; - -        e = shm_ap_rbuff_read(ai.rb); +        struct rb_entry * e = shm_ap_rbuff_read(ai.rb);          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index ede0b7f7..5cbf5bd0 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -249,18 +249,14 @@ void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id)                  pthread_mutex_consistent(rb->lock);          }  #endif - -#ifdef OUROBOROS_CONFIG_DEBUG -        if (!rb->acl[port_id]) -                LOG_DBG("Trying to open open port."); -#endif          rb->acl[port_id] = 0; /* open */          pthread_mutex_unlock(rb->lock);  } -void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) +int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)  { +        int ret = 0;          assert(rb); @@ -272,13 +268,14 @@ void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)                  pthread_mutex_consistent(rb->lock);          }  #endif -#ifdef OUROBOROS_CONFIG_DEBUG -        if (rb->acl[port_id]) -                LOG_DBG("Trying to close closed port."); -#endif          rb->acl[port_id] = -1; +        if (rb->cntrs[port_id] > 0) +                ret = -EBUSY; +          pthread_mutex_unlock(rb->lock); + +        return ret;  }  void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) @@ -551,6 +548,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)  {          ssize_t idx = -1; +        assert(rb); +  #ifdef __APPLE__          pthread_mutex_lock(rb->lock);  #else @@ -559,11 +558,6 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)                  pthread_mutex_consistent(rb->lock);          }  #endif -        if (rb->acl[port_id]) { -                pthread_mutex_unlock(rb->lock); -                return -ENOTALLOC; -        } -          if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {                  pthread_mutex_unlock(rb->lock);                  return -1; @@ -597,11 +591,6 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb,                  pthread_mutex_consistent(rb->lock);          }  #endif -        if (rb->acl[port_id]) { -                pthread_mutex_unlock(rb->lock); -                return -ENOTALLOC; -        } -          if (timeout != NULL) {                  idx = -ETIMEDOUT;                  clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -650,7 +639,7 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb,          if (ret != ETIMEDOUT) {                  idx = tail_el_ptr(rb)->index; -                  --rb->cntrs[port_id]; +                --rb->cntrs[port_id];                  *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);                  pthread_cond_broadcast(rb->del); | 
