summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/dev.c100
1 files changed, 55 insertions, 45 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 306fd008..aa9d8bc5 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -63,6 +63,36 @@ struct port {
pthread_cond_t state_cond;
};
+struct flow {
+ struct shm_rbuff * rx_rb;
+ struct shm_rbuff * tx_rb;
+ struct shm_flow_set * set;
+ int port_id;
+ int oflags;
+ qoscube_t cube;
+
+ pid_t api;
+
+ bool timesout;
+ struct timespec rcv_timeo;
+};
+
+struct {
+ char * ap_name;
+ char * daf_name;
+ pid_t api;
+
+ struct shm_rdrbuff * rdrb;
+ struct shm_flow_set * fqset;
+
+ struct bmp * fds;
+ struct bmp * fqueues;
+ struct flow * flows;
+ struct port * ports;
+
+ pthread_rwlock_t flows_lock;
+} ai;
+
static void port_destroy(struct port * p)
{
pthread_mutex_lock(&p->state_lock);
@@ -104,9 +134,16 @@ static void port_set_state(struct port * p,
pthread_mutex_unlock(&p->state_lock);
}
-static enum port_state port_wait_assign(struct port * p)
+static enum port_state port_wait_assign(int port_id)
{
enum port_state state;
+ struct port * p;
+
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ p = &ai.ports[port_id];
+
+ pthread_rwlock_unlock(&ai.flows_lock);
pthread_mutex_lock(&p->state_lock);
@@ -115,7 +152,7 @@ static enum port_state port_wait_assign(struct port * p)
return PORT_ID_ASSIGNED;
}
- if(p->state == PORT_INIT)
+ if (p->state == PORT_INIT)
p->state = PORT_ID_PENDING;
while (p->state == PORT_ID_PENDING)
@@ -135,36 +172,6 @@ static enum port_state port_wait_assign(struct port * p)
return state;
}
-struct flow {
- struct shm_rbuff * rx_rb;
- struct shm_rbuff * tx_rb;
- struct shm_flow_set * set;
- int port_id;
- int oflags;
- qoscube_t cube;
-
- pid_t api;
-
- bool timesout;
- struct timespec rcv_timeo;
-};
-
-struct {
- char * ap_name;
- char * daf_name;
- pid_t api;
-
- struct shm_rdrbuff * rdrb;
- struct shm_flow_set * fqset;
-
- struct bmp * fds;
- struct bmp * fqueues;
- struct flow * flows;
- struct port * ports;
-
- pthread_rwlock_t flows_lock;
-} ai;
-
/* FIXME: translate real spec to cube */
static qoscube_t spec_to_cube(qosspec_t * qs)
{
@@ -249,7 +256,7 @@ int ouroboros_init(const char * ap_name)
ai.api = getpid();
ai.daf_name = NULL;
- ai.fds = bmp_create(AP_MAX_FLOWS, AP_RES_FDS + 1);
+ ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);
if (ai.fds == NULL)
return -ENOMEM;
@@ -351,7 +358,6 @@ void ouroboros_fini()
ssize_t idx;
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- port_destroy(&ai.ports[ai.flows[i].port_id]);
reset_flow(i);
}
}
@@ -380,6 +386,7 @@ int flow_accept(qosspec_t * qs,
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
+ msg.api = ai.api;
if (timeo != NULL) {
msg.has_timeo_sec = true;
@@ -388,8 +395,6 @@ int flow_accept(qosspec_t * qs,
msg.timeo_nsec = timeo->tv_nsec;
}
- msg.api = ai.api;
-
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -EIRMD;
@@ -400,7 +405,7 @@ int flow_accept(qosspec_t * qs,
}
if (recv_msg->result != 0) {
- int res = recv_msg->result;
+ int res = recv_msg->result;
irm_msg__free_unpacked(recv_msg, NULL);
return res;
}
@@ -488,7 +493,6 @@ int flow_alloc(const char * dst_name,
msg.timeo_nsec = timeo->tv_nsec;
}
-
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -EIRMD;
@@ -574,13 +578,9 @@ int flow_dealloc(int fd)
msg.has_api = true;
msg.api = ai.api;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].port_id < 0) {
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
- return 0;
- }
+ assert (!(ai.flows[fd].port_id < 0));
msg.port_id = ai.flows[fd].port_id;
@@ -1032,6 +1032,7 @@ int np1_flow_alloc(pid_t n_api,
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
if (ai.flows[fd].rx_rb == NULL) {
reset_flow(fd);
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
return -1;
}
@@ -1039,6 +1040,7 @@ int np1_flow_alloc(pid_t n_api,
ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id);
if (ai.flows[fd].tx_rb == NULL) {
reset_flow(fd);
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
return -1;
}
@@ -1046,6 +1048,7 @@ int np1_flow_alloc(pid_t n_api,
ai.flows[fd].set = shm_flow_set_open(n_api);
if (ai.flows[fd].set == NULL) {
reset_flow(fd);
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
return -1;
}
@@ -1079,7 +1082,7 @@ int np1_flow_resp(int port_id)
{
int fd;
- if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED)
+ if (port_wait_assign(port_id) != PORT_ID_ASSIGNED)
return -1;
pthread_rwlock_rdlock(&ai.flows_lock);
@@ -1157,12 +1160,14 @@ int ipcp_flow_req_arr(pid_t api,
if (recv_msg == NULL) {
ai.ports[fd].state = PORT_INIT;
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
return -EIRMD;
}
if (!recv_msg->has_port_id || !recv_msg->has_api) {
ai.ports[fd].state = PORT_INIT;
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
@@ -1170,6 +1175,7 @@ int ipcp_flow_req_arr(pid_t api,
if (recv_msg->has_result && recv_msg->result) {
ai.ports[fd].state = PORT_INIT;
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
@@ -1178,6 +1184,7 @@ int ipcp_flow_req_arr(pid_t api,
port_id = recv_msg->port_id;
if (port_id < 0) {
ai.ports[fd].state = PORT_INIT;
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
@@ -1186,6 +1193,7 @@ int ipcp_flow_req_arr(pid_t api,
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
if (ai.flows[fd].rx_rb == NULL) {
reset_flow(fd);
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
@@ -1194,6 +1202,7 @@ int ipcp_flow_req_arr(pid_t api,
ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id);
if (ai.flows[fd].tx_rb == NULL) {
reset_flow(fd);
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
@@ -1202,6 +1211,7 @@ int ipcp_flow_req_arr(pid_t api,
ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
if (ai.flows[fd].set == NULL) {
reset_flow(fd);
+ bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;