summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@ugent.be>2017-04-04 07:32:47 +0000
committerSander Vrijders <sander.vrijders@ugent.be>2017-04-04 07:32:47 +0000
commitca1d635458b5ef56e27d9265a2c89580d9d0f218 (patch)
treeb68ec1c7f6ed81169cfd42f4cac1af6fe0bb65c6 /src/lib/dev.c
parente38e34017b4a7667e11c08e9947e72dfc0b87474 (diff)
parentf48008cdd28bf31e6f0646b1bb3786f0dc0aede0 (diff)
downloadouroboros-ca1d635458b5ef56e27d9265a2c89580d9d0f218.tar.gz
ouroboros-ca1d635458b5ef56e27d9265a2c89580d9d0f218.zip
Merged in dstaesse/ouroboros/be-stable (pull request #457)
lib, irmd, ipcpd: Stabilize flow allocation
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c79
1 files changed, 51 insertions, 28 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 5acbada2..c063fd47 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -49,6 +49,7 @@ struct fqueue {
enum port_state {
PORT_NULL = 0,
+ PORT_INIT,
PORT_ID_PENDING,
PORT_ID_ASSIGNED,
PORT_DESTROY
@@ -82,7 +83,7 @@ static void port_destroy(struct port * p)
pthread_cond_wait(&p->state_cond, &p->state_lock);
p->fd = -1;
- p->state = PORT_ID_PENDING;
+ p->state = PORT_INIT;
pthread_mutex_unlock(&p->state_lock);
}
@@ -109,11 +110,14 @@ static enum port_state port_wait_assign(struct port * p)
pthread_mutex_lock(&p->state_lock);
- if (p->state != PORT_ID_PENDING) {
+ if (p->state == PORT_ID_ASSIGNED) {
pthread_mutex_unlock(&p->state_lock);
- return -1;
+ return PORT_ID_ASSIGNED;
}
+ if(p->state == PORT_INIT)
+ p->state = PORT_ID_PENDING;
+
while (p->state == PORT_ID_PENDING)
pthread_cond_wait(&p->state_cond, &p->state_lock);
@@ -124,6 +128,8 @@ static enum port_state port_wait_assign(struct port * p)
state = p->state;
+ assert(state != PORT_INIT);
+
pthread_mutex_unlock(&p->state_lock);
return state;
@@ -237,7 +243,6 @@ static void reset_flow(int fd)
shm_flow_set_close(ai.flows[fd].set);
init_flow(fd);
-
}
int ap_init(const char * ap_name)
@@ -319,7 +324,7 @@ int ap_init(const char * ap_name)
}
for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
- ai.ports[i].state = PORT_ID_PENDING;
+ ai.ports[i].state = PORT_INIT;
pthread_mutex_init(&ai.ports[i].state_lock, NULL);
pthread_cond_init(&ai.ports[i].state_cond, NULL);
}
@@ -354,8 +359,7 @@ void ap_fini()
ssize_t idx;
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- port_set_state(&ai.ports[ai.flows[i].port_id],
- PORT_NULL);
+ port_destroy(&ai.ports[ai.flows[i].port_id]);
reset_flow(i);
}
}
@@ -459,6 +463,8 @@ int flow_accept(qosspec_t * qs,
ai.flows[fd].api = recv_msg->api;
ai.flows[fd].cube = recv_msg->qoscube;
+ assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT);
+
if (qs != NULL)
fill_qosspec(qs, ai.flows[fd].cube);
@@ -555,6 +561,8 @@ int flow_alloc(const char * dst_name,
ai.flows[fd].api = recv_msg->api;
ai.flows[fd].cube = recv_msg->qoscube;
+ assert(ai.ports[recv_msg->port_id].state == PORT_INIT);
+
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
@@ -600,6 +608,7 @@ int flow_dealloc(int fd)
if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
+ assert(false);
return -1;
}
@@ -1113,8 +1122,8 @@ int np1_flow_alloc(pid_t n_api,
ai.flows[fd].oflags = FLOW_O_DEFAULT;
ai.flows[fd].api = n_api;
- ai.ports[port_id].fd = fd;
- port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+ ai.ports[port_id].fd = fd;
+ ai.ports[port_id].state = PORT_ID_ASSIGNED;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1127,7 +1136,7 @@ int np1_flow_dealloc(int port_id)
int fd;
pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
fd = ai.ports[port_id].fd;
@@ -1141,10 +1150,11 @@ int np1_flow_resp(int port_id)
{
int fd;
- port_wait_assign(&ai.ports[port_id]);
+ if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED)
+ return -1;
pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
fd = ai.ports[port_id].fd;
@@ -1211,66 +1221,78 @@ int ipcp_flow_req_arr(pid_t api,
return -1; /* -ENOMOREFDS */
}
- ai.flows[fd].tx_rb = NULL;
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (recv_msg == NULL) {
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -EIRMD;
+ }
if (!recv_msg->has_port_id || !recv_msg->has_api) {
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
}
port_id = recv_msg->port_id;
if (port_id < 0) {
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_wrlock(&ai.flows_lock);
-
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
if (ai.flows[fd].rx_rb == NULL) {
- irm_msg__free_unpacked(recv_msg, NULL);
reset_flow(fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id);
if (ai.flows[fd].tx_rb == NULL) {
- irm_msg__free_unpacked(recv_msg, NULL);
reset_flow(fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
if (ai.flows[fd].set == NULL) {
- irm_msg__free_unpacked(recv_msg, NULL);
reset_flow(fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
ai.ports[port_id].fd = fd;
- port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED);
+ port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1390,19 +1412,20 @@ int ipcp_flow_write(int fd,
int ipcp_flow_fini(int fd)
{
- struct shm_rbuff * rb;
+ struct shm_rbuff * rx_rb;
flow_set_flags(fd, FLOW_O_WRONLY);
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- rb = ai.flows[fd].rx_rb;
+ rx_rb = ai.flows[fd].rx_rb;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- shm_rbuff_fini(rb);
+ shm_rbuff_fini(rx_rb);
+
return 0;
}