From f48008cdd28bf31e6f0646b1bb3786f0dc0aede0 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 4 Apr 2017 02:43:10 +0200 Subject: lib, irmd, ipcpd: Stabilize flow allocation --- src/lib/dev.c | 79 ++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 28 deletions(-) (limited to 'src/lib') diff --git a/src/lib/dev.c b/src/lib/dev.c index 5acbada2..c063fd47 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -49,6 +49,7 @@ struct fqueue { enum port_state { PORT_NULL = 0, + PORT_INIT, PORT_ID_PENDING, PORT_ID_ASSIGNED, PORT_DESTROY @@ -82,7 +83,7 @@ static void port_destroy(struct port * p) pthread_cond_wait(&p->state_cond, &p->state_lock); p->fd = -1; - p->state = PORT_ID_PENDING; + p->state = PORT_INIT; pthread_mutex_unlock(&p->state_lock); } @@ -109,11 +110,14 @@ static enum port_state port_wait_assign(struct port * p) pthread_mutex_lock(&p->state_lock); - if (p->state != PORT_ID_PENDING) { + if (p->state == PORT_ID_ASSIGNED) { pthread_mutex_unlock(&p->state_lock); - return -1; + return PORT_ID_ASSIGNED; } + if(p->state == PORT_INIT) + p->state = PORT_ID_PENDING; + while (p->state == PORT_ID_PENDING) pthread_cond_wait(&p->state_cond, &p->state_lock); @@ -124,6 +128,8 @@ static enum port_state port_wait_assign(struct port * p) state = p->state; + assert(state != PORT_INIT); + pthread_mutex_unlock(&p->state_lock); return state; @@ -237,7 +243,6 @@ static void reset_flow(int fd) shm_flow_set_close(ai.flows[fd].set); init_flow(fd); - } int ap_init(const char * ap_name) @@ -319,7 +324,7 @@ int ap_init(const char * ap_name) } for (i = 0; i < IRMD_MAX_FLOWS; ++i) { - ai.ports[i].state = PORT_ID_PENDING; + ai.ports[i].state = PORT_INIT; pthread_mutex_init(&ai.ports[i].state_lock, NULL); pthread_cond_init(&ai.ports[i].state_cond, NULL); } @@ -354,8 +359,7 @@ void ap_fini() ssize_t idx; while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - port_set_state(&ai.ports[ai.flows[i].port_id], - PORT_NULL); + port_destroy(&ai.ports[ai.flows[i].port_id]); reset_flow(i); } } @@ -459,6 +463,8 @@ int flow_accept(qosspec_t * qs, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; + assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); + if (qs != NULL) fill_qosspec(qs, ai.flows[fd].cube); @@ -555,6 +561,8 @@ int flow_alloc(const char * dst_name, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; + assert(ai.ports[recv_msg->port_id].state == PORT_INIT); + ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -600,6 +608,7 @@ int flow_dealloc(int fd) if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); + assert(false); return -1; } @@ -1113,8 +1122,8 @@ int np1_flow_alloc(pid_t n_api, ai.flows[fd].oflags = FLOW_O_DEFAULT; ai.flows[fd].api = n_api; - ai.ports[port_id].fd = fd; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + ai.ports[port_id].fd = fd; + ai.ports[port_id].state = PORT_ID_ASSIGNED; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1127,7 +1136,7 @@ int np1_flow_dealloc(int port_id) int fd; pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); fd = ai.ports[port_id].fd; @@ -1141,10 +1150,11 @@ int np1_flow_resp(int port_id) { int fd; - port_wait_assign(&ai.ports[port_id]); + if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED) + return -1; pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); fd = ai.ports[port_id].fd; @@ -1211,66 +1221,78 @@ int ipcp_flow_req_arr(pid_t api, return -1; /* -ENOMOREFDS */ } - ai.flows[fd].tx_rb = NULL; - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (recv_msg == NULL) { + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -EIRMD; + } if (!recv_msg->has_port_id || !recv_msg->has_api) { + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } if (recv_msg->has_result && recv_msg->result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; } port_id = recv_msg->port_id; if (port_id < 0) { + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_wrlock(&ai.flows_lock); - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); if (ai.flows[fd].rx_rb == NULL) { - irm_msg__free_unpacked(recv_msg, NULL); reset_flow(fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); if (ai.flows[fd].tx_rb == NULL) { - irm_msg__free_unpacked(recv_msg, NULL); reset_flow(fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].set = shm_flow_set_open(recv_msg->api); if (ai.flows[fd].set == NULL) { - irm_msg__free_unpacked(recv_msg, NULL); reset_flow(fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].port_id = port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].oflags = FLOW_O_DEFAULT; ai.ports[port_id].fd = fd; - port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); + port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1390,19 +1412,20 @@ int ipcp_flow_write(int fd, int ipcp_flow_fini(int fd) { - struct shm_rbuff * rb; + struct shm_rbuff * rx_rb; flow_set_flags(fd, FLOW_O_WRONLY); pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - rb = ai.flows[fd].rx_rb; + rx_rb = ai.flows[fd].rx_rb; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - shm_rbuff_fini(rb); + shm_rbuff_fini(rx_rb); + return 0; } -- cgit v1.2.3