diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/dev.c | 100 |
1 files changed, 55 insertions, 45 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 306fd008..aa9d8bc5 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,6 +63,36 @@ struct port { pthread_cond_t state_cond; }; +struct flow { + struct shm_rbuff * rx_rb; + struct shm_rbuff * tx_rb; + struct shm_flow_set * set; + int port_id; + int oflags; + qoscube_t cube; + + pid_t api; + + bool timesout; + struct timespec rcv_timeo; +}; + +struct { + char * ap_name; + char * daf_name; + pid_t api; + + struct shm_rdrbuff * rdrb; + struct shm_flow_set * fqset; + + struct bmp * fds; + struct bmp * fqueues; + struct flow * flows; + struct port * ports; + + pthread_rwlock_t flows_lock; +} ai; + static void port_destroy(struct port * p) { pthread_mutex_lock(&p->state_lock); @@ -104,9 +134,16 @@ static void port_set_state(struct port * p, pthread_mutex_unlock(&p->state_lock); } -static enum port_state port_wait_assign(struct port * p) +static enum port_state port_wait_assign(int port_id) { enum port_state state; + struct port * p; + + pthread_rwlock_rdlock(&ai.flows_lock); + + p = &ai.ports[port_id]; + + pthread_rwlock_unlock(&ai.flows_lock); pthread_mutex_lock(&p->state_lock); @@ -115,7 +152,7 @@ static enum port_state port_wait_assign(struct port * p) return PORT_ID_ASSIGNED; } - if(p->state == PORT_INIT) + if (p->state == PORT_INIT) p->state = PORT_ID_PENDING; while (p->state == PORT_ID_PENDING) @@ -135,36 +172,6 @@ static enum port_state port_wait_assign(struct port * p) return state; } -struct flow { - struct shm_rbuff * rx_rb; - struct shm_rbuff * tx_rb; - struct shm_flow_set * set; - int port_id; - int oflags; - qoscube_t cube; - - pid_t api; - - bool timesout; - struct timespec rcv_timeo; -}; - -struct { - char * ap_name; - char * daf_name; - pid_t api; - - struct shm_rdrbuff * rdrb; - struct shm_flow_set * fqset; - - struct bmp * fds; - struct bmp * fqueues; - struct flow * flows; - struct port * ports; - - pthread_rwlock_t flows_lock; -} ai; - /* FIXME: translate real spec to cube */ static qoscube_t spec_to_cube(qosspec_t * qs) { @@ -249,7 +256,7 @@ int ouroboros_init(const char * ap_name) ai.api = getpid(); ai.daf_name = NULL; - ai.fds = bmp_create(AP_MAX_FLOWS, AP_RES_FDS + 1); + ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS); if (ai.fds == NULL) return -ENOMEM; @@ -351,7 +358,6 @@ void ouroboros_fini() ssize_t idx; while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - port_destroy(&ai.ports[ai.flows[i].port_id]); reset_flow(i); } } @@ -380,6 +386,7 @@ int flow_accept(qosspec_t * qs, msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; + msg.api = ai.api; if (timeo != NULL) { msg.has_timeo_sec = true; @@ -388,8 +395,6 @@ int flow_accept(qosspec_t * qs, msg.timeo_nsec = timeo->tv_nsec; } - msg.api = ai.api; - recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -EIRMD; @@ -400,7 +405,7 @@ int flow_accept(qosspec_t * qs, } if (recv_msg->result != 0) { - int res = recv_msg->result; + int res = recv_msg->result; irm_msg__free_unpacked(recv_msg, NULL); return res; } @@ -488,7 +493,6 @@ int flow_alloc(const char * dst_name, msg.timeo_nsec = timeo->tv_nsec; } - recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -EIRMD; @@ -574,13 +578,9 @@ int flow_dealloc(int fd) msg.has_api = true; msg.api = ai.api; - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].port_id < 0) { - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - return 0; - } + assert (!(ai.flows[fd].port_id < 0)); msg.port_id = ai.flows[fd].port_id; @@ -1032,6 +1032,7 @@ int np1_flow_alloc(pid_t n_api, ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); if (ai.flows[fd].rx_rb == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); return -1; } @@ -1039,6 +1040,7 @@ int np1_flow_alloc(pid_t n_api, ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); if (ai.flows[fd].tx_rb == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); return -1; } @@ -1046,6 +1048,7 @@ int np1_flow_alloc(pid_t n_api, ai.flows[fd].set = shm_flow_set_open(n_api); if (ai.flows[fd].set == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); return -1; } @@ -1079,7 +1082,7 @@ int np1_flow_resp(int port_id) { int fd; - if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED) + if (port_wait_assign(port_id) != PORT_ID_ASSIGNED) return -1; pthread_rwlock_rdlock(&ai.flows_lock); @@ -1157,12 +1160,14 @@ int ipcp_flow_req_arr(pid_t api, if (recv_msg == NULL) { ai.ports[fd].state = PORT_INIT; + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); return -EIRMD; } if (!recv_msg->has_port_id || !recv_msg->has_api) { ai.ports[fd].state = PORT_INIT; + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; @@ -1170,6 +1175,7 @@ int ipcp_flow_req_arr(pid_t api, if (recv_msg->has_result && recv_msg->result) { ai.ports[fd].state = PORT_INIT; + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; @@ -1178,6 +1184,7 @@ int ipcp_flow_req_arr(pid_t api, port_id = recv_msg->port_id; if (port_id < 0) { ai.ports[fd].state = PORT_INIT; + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; @@ -1186,6 +1193,7 @@ int ipcp_flow_req_arr(pid_t api, ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); if (ai.flows[fd].rx_rb == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; @@ -1194,6 +1202,7 @@ int ipcp_flow_req_arr(pid_t api, ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); if (ai.flows[fd].tx_rb == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; @@ -1202,6 +1211,7 @@ int ipcp_flow_req_arr(pid_t api, ai.flows[fd].set = shm_flow_set_open(recv_msg->api); if (ai.flows[fd].set == NULL) { reset_flow(fd); + bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; |