diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 100 | 
1 files changed, 55 insertions, 45 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 306fd008..aa9d8bc5 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,6 +63,36 @@ struct port {          pthread_cond_t  state_cond;  }; +struct flow { +        struct shm_rbuff *    rx_rb; +        struct shm_rbuff *    tx_rb; +        struct shm_flow_set * set; +        int                   port_id; +        int                   oflags; +        qoscube_t             cube; + +        pid_t                 api; + +        bool                  timesout; +        struct timespec       rcv_timeo; +}; + +struct { +        char *                ap_name; +        char *                daf_name; +        pid_t                 api; + +        struct shm_rdrbuff *  rdrb; +        struct shm_flow_set * fqset; + +        struct bmp *          fds; +        struct bmp *          fqueues; +        struct flow *         flows; +        struct port *         ports; + +        pthread_rwlock_t      flows_lock; +} ai; +  static void port_destroy(struct port * p)  {          pthread_mutex_lock(&p->state_lock); @@ -104,9 +134,16 @@ static void port_set_state(struct port *   p,          pthread_mutex_unlock(&p->state_lock);  } -static enum port_state port_wait_assign(struct port * p) +static enum port_state port_wait_assign(int port_id)  {          enum port_state state; +        struct port *   p; + +        pthread_rwlock_rdlock(&ai.flows_lock); + +        p = &ai.ports[port_id]; + +        pthread_rwlock_unlock(&ai.flows_lock);          pthread_mutex_lock(&p->state_lock); @@ -115,7 +152,7 @@ static enum port_state port_wait_assign(struct port * p)                  return PORT_ID_ASSIGNED;          } -        if(p->state == PORT_INIT) +        if (p->state == PORT_INIT)                  p->state = PORT_ID_PENDING;          while (p->state == PORT_ID_PENDING) @@ -135,36 +172,6 @@ static enum port_state port_wait_assign(struct port * p)          return state;  } -struct flow { -        struct shm_rbuff *    rx_rb; -        struct shm_rbuff *    tx_rb; -        struct shm_flow_set * set; -        int                   port_id; -        int                   oflags; -        qoscube_t             cube; - -        pid_t                 api; - -        bool                  timesout; -        struct timespec       rcv_timeo; -}; - -struct { -        char *                ap_name; -        char *                daf_name; -        pid_t                 api; - -        struct shm_rdrbuff *  rdrb; -        struct shm_flow_set * fqset; - -        struct bmp *          fds; -        struct bmp *          fqueues; -        struct flow *         flows; -        struct port *         ports; - -        pthread_rwlock_t      flows_lock; -} ai; -  /* FIXME: translate real spec to cube */  static qoscube_t spec_to_cube(qosspec_t * qs)  { @@ -249,7 +256,7 @@ int ouroboros_init(const char * ap_name)          ai.api = getpid();          ai.daf_name = NULL; -        ai.fds = bmp_create(AP_MAX_FLOWS, AP_RES_FDS + 1); +        ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);          if (ai.fds == NULL)                  return -ENOMEM; @@ -351,7 +358,6 @@ void ouroboros_fini()                          ssize_t idx;                          while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)                                  shm_rdrbuff_remove(ai.rdrb, idx); -                        port_destroy(&ai.ports[ai.flows[i].port_id]);                          reset_flow(i);                  }          } @@ -380,6 +386,7 @@ int flow_accept(qosspec_t *             qs,          msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;          msg.has_api = true; +        msg.api     = ai.api;          if (timeo != NULL) {                  msg.has_timeo_sec = true; @@ -388,8 +395,6 @@ int flow_accept(qosspec_t *             qs,                  msg.timeo_nsec = timeo->tv_nsec;          } -        msg.api     = ai.api; -          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -EIRMD; @@ -400,7 +405,7 @@ int flow_accept(qosspec_t *             qs,          }          if (recv_msg->result !=  0) { -                int res =  recv_msg->result; +                int res = recv_msg->result;                  irm_msg__free_unpacked(recv_msg, NULL);                  return res;          } @@ -488,7 +493,6 @@ int flow_alloc(const char *            dst_name,                  msg.timeo_nsec = timeo->tv_nsec;          } -          recv_msg = send_recv_irm_msg(&msg);          if (recv_msg == NULL)                  return -EIRMD; @@ -574,13 +578,9 @@ int flow_dealloc(int fd)          msg.has_api      = true;          msg.api          = ai.api; -        pthread_rwlock_wrlock(&ai.flows_lock); +        pthread_rwlock_rdlock(&ai.flows_lock); -        if (ai.flows[fd].port_id < 0) { -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.flows_lock); -                return 0; -        } +        assert (!(ai.flows[fd].port_id < 0));          msg.port_id = ai.flows[fd].port_id; @@ -1032,6 +1032,7 @@ int np1_flow_alloc(pid_t n_api,          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);          if (ai.flows[fd].rx_rb == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  return -1;          } @@ -1039,6 +1040,7 @@ int np1_flow_alloc(pid_t n_api,          ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id);          if (ai.flows[fd].tx_rb == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  return -1;          } @@ -1046,6 +1048,7 @@ int np1_flow_alloc(pid_t n_api,          ai.flows[fd].set = shm_flow_set_open(n_api);          if (ai.flows[fd].set == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  return -1;          } @@ -1079,7 +1082,7 @@ int np1_flow_resp(int port_id)  {          int fd; -        if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED) +        if (port_wait_assign(port_id) != PORT_ID_ASSIGNED)                  return -1;          pthread_rwlock_rdlock(&ai.flows_lock); @@ -1157,12 +1160,14 @@ int ipcp_flow_req_arr(pid_t     api,          if (recv_msg == NULL) {                  ai.ports[fd].state = PORT_INIT; +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  return -EIRMD;          }          if (!recv_msg->has_port_id || !recv_msg->has_api) {                  ai.ports[fd].state = PORT_INIT; +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; @@ -1170,6 +1175,7 @@ int ipcp_flow_req_arr(pid_t     api,          if (recv_msg->has_result && recv_msg->result) {                  ai.ports[fd].state = PORT_INIT; +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; @@ -1178,6 +1184,7 @@ int ipcp_flow_req_arr(pid_t     api,          port_id = recv_msg->port_id;          if (port_id < 0) {                  ai.ports[fd].state = PORT_INIT; +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; @@ -1186,6 +1193,7 @@ int ipcp_flow_req_arr(pid_t     api,          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);          if (ai.flows[fd].rx_rb == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; @@ -1194,6 +1202,7 @@ int ipcp_flow_req_arr(pid_t     api,          ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id);          if (ai.flows[fd].tx_rb == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; @@ -1202,6 +1211,7 @@ int ipcp_flow_req_arr(pid_t     api,          ai.flows[fd].set = shm_flow_set_open(recv_msg->api);          if (ai.flows[fd].set == NULL) {                  reset_flow(fd); +                bmp_release(ai.fds, fd);                  pthread_rwlock_unlock(&ai.flows_lock);                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1; | 
