summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-07 15:25:22 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-07 15:42:44 +0200
commit71f10f5efab37f3df3d909d324cff2e098d21c85 (patch)
tree90d6031870d02b557107b0bc2623a129c4b1d074
parentaa0eac4f93b80537d02123715842d594a8ff3aad (diff)
downloadouroboros-71f10f5efab37f3df3d909d324cff2e098d21c85.tar.gz
ouroboros-71f10f5efab37f3df3d909d324cff2e098d21c85.zip
lib, dev: Add asynchronous deallocation
Flow deallocation from the application will immediately return (void call). The IRMd will not send a reply message.
-rw-r--r--include/ouroboros/sockets.h2
-rw-r--r--src/ipcpd/ipcp.c11
-rw-r--r--src/irmd/main.c45
-rw-r--r--src/lib/dev.c38
-rw-r--r--src/lib/sockets.c33
5 files changed, 74 insertions, 55 deletions
diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h
index aef4259e..3885ffb2 100644
--- a/include/ouroboros/sockets.h
+++ b/include/ouroboros/sockets.h
@@ -54,4 +54,6 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg);
irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg);
+void send_irm_msg(irm_msg_t * msg);
+
#endif
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index db72b88d..a9f80ee7 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -313,7 +313,8 @@ void * ipcp_main_loop(void * o)
}
fd = np1_flow_alloc(msg->api, msg->port_id);
if (fd < 0) {
- LOG_ERR("Could not get fd for flow.");
+ LOG_ERR("Could not get fd for port_id. %d",
+ msg->port_id);
ret_msg.has_result = true;
ret_msg.result = -1;
break;
@@ -326,7 +327,7 @@ void * ipcp_main_loop(void * o)
msg->src_ae_name,
msg->qos_cube);
if (ret_msg.result < 0) {
- LOG_DBG("Deallocating failed flow on port_id %d.",
+ LOG_DBG("Deallocate failed on port_id %d.",
msg->port_id);
flow_dealloc(fd);
}
@@ -340,7 +341,8 @@ void * ipcp_main_loop(void * o)
if (!msg->response) {
fd = np1_flow_resp(msg->api, msg->port_id);
if (fd < 0) {
- LOG_ERR("Could not get fd for flow.");
+ LOG_ERR("Could not get fd for port_id %d.",
+ msg->port_id);
ret_msg.has_result = true;
ret_msg.result = -1;
break;
@@ -359,7 +361,8 @@ void * ipcp_main_loop(void * o)
fd = np1_flow_dealloc(msg->port_id);
if (fd < 0) {
- LOG_ERR("Could not get fd for flow.");
+ LOG_ERR("Could not deallocate port_id %d.",
+ msg->port_id);
ret_msg.has_result = true;
ret_msg.result = -1;
break;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 523741ef..24a49c49 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -587,8 +587,6 @@ static int bind_api(pid_t api,
if (name == NULL)
return -EINVAL;
- LOG_DBG("BIND_API called %d, %s", api, name);
-
pthread_rwlock_rdlock(&irmd->state_lock);
if (irmd->state != IRMD_RUNNING) {
@@ -1231,29 +1229,32 @@ static int flow_alloc_res(int port_id)
static int flow_dealloc(pid_t api, int port_id)
{
- pid_t n_1_api;
+ pid_t n_1_api = -1;
int ret = 0;
struct irm_flow * f = NULL;
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- bmp_release(irmd->port_ids, port_id);
f = get_irm_flow(port_id);
if (f == NULL) {
+ bmp_release(irmd->port_ids, port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
- n_1_api = f->n_1_api;
+ if (api == f->n_api) {
+ bmp_release(irmd->port_ids, port_id);
+ n_1_api = f->n_1_api;
+ }
list_del(&f->next);
pthread_rwlock_unlock(&irmd->flows_lock);
- if (api != n_1_api)
+ if (n_1_api != -1)
ret = ipcp_flow_dealloc(n_1_api, port_id);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1772,8 +1773,7 @@ void * mainloop()
break;
case IRM_MSG_CODE__IRM_BOOTSTRAP_IPCP:
ret_msg.has_result = true;
- ret_msg.result = bootstrap_ipcp(msg->api,
- msg->conf);
+ ret_msg.result = bootstrap_ipcp(msg->api, msg->conf);
break;
case IRM_MSG_CODE__IRM_ENROLL_IPCP:
ret_msg.has_result = true;
@@ -1790,27 +1790,22 @@ void * mainloop()
break;
case IRM_MSG_CODE__IRM_UNBIND_AP:
ret_msg.has_result = true;
- ret_msg.result = unbind_ap(msg->ap_name,
- msg->dst_name);
+ ret_msg.result = unbind_ap(msg->ap_name, msg->dst_name);
break;
case IRM_MSG_CODE__IRM_API_ANNOUNCE:
ret_msg.has_result = true;
- ret_msg.result = api_announce(msg->api,
- msg->ap_name);
+ ret_msg.result = api_announce(msg->api, msg->ap_name);
break;
case IRM_MSG_CODE__IRM_BIND_API:
ret_msg.has_result = true;
- ret_msg.result = bind_api(msg->api,
- msg->dst_name);
+ ret_msg.result = bind_api(msg->api, msg->dst_name);
break;
case IRM_MSG_CODE__IRM_UNBIND_API:
ret_msg.has_result = true;
- ret_msg.result = unbind_api(msg->api,
- msg->dst_name);
+ ret_msg.result = unbind_api(msg->api, msg->dst_name);
break;
case IRM_MSG_CODE__IRM_LIST_IPCPS:
- ret_msg.n_apis = list_ipcps(msg->dst_name,
- &apis);
+ ret_msg.n_apis = list_ipcps(msg->dst_name, &apis);
ret_msg.apis = apis;
ret_msg.has_result = true;
break;
@@ -1827,15 +1822,12 @@ void * mainloop()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- e = flow_accept(msg->api,
- &ret_msg.ae_name);
-
+ e = flow_accept(msg->api, &ret_msg.ae_name);
if (e == NULL) {
ret_msg.has_result = true;
ret_msg.result = -1;
break;
}
-
ret_msg.has_port_id = true;
ret_msg.port_id = e->port_id;
ret_msg.has_api = true;
@@ -1857,8 +1849,6 @@ void * mainloop()
ret_msg.result = -1;
break;
}
-
- /* FIXME: badly timed dealloc may give SEGV */
ret_msg.has_port_id = true;
ret_msg.port_id = e->port_id;
ret_msg.has_api = true;
@@ -1869,9 +1859,10 @@ void * mainloop()
ret_msg.result = flow_alloc_res(msg->port_id);
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
- ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->api, msg->port_id);
- break;
+ flow_dealloc(msg->api, msg->port_id);
+ irm_msg__free_unpacked(msg, NULL);
+ close(cli_sockfd);
+ continue;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
e = flow_req_arr(msg->api,
msg->dst_name,
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 8556d6e2..d36764ed 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -130,7 +130,7 @@ struct flow {
pid_t api;
- struct timespec timeout;
+ struct timespec * timeout;
};
struct {
@@ -220,8 +220,7 @@ int ap_init(char * ap_name)
ai.flows[i].port_id = -1;
ai.flows[i].oflags = 0;
ai.flows[i].api = -1;
- ai.flows[i].timeout.tv_sec = 0;
- ai.flows[i].timeout.tv_nsec = 0;
+ ai.flows[i].timeout = NULL;
}
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
@@ -270,9 +269,12 @@ void ap_fini()
pthread_rwlock_rdlock(&ai.flows_lock);
- for (i = 0; i < AP_MAX_FLOWS; ++i)
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (ai.flows[i].rb != NULL)
shm_ap_rbuff_close(ai.flows[i].rb);
+ if (ai.flows[i].timeout != NULL)
+ free(ai.flows[i].timeout);
+ }
for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
ai.ports[i].state = PORT_NULL;
@@ -527,8 +529,6 @@ int flow_alloc_res(int fd)
int flow_dealloc(int fd)
{
irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
msg.has_port_id = true;
@@ -552,30 +552,20 @@ int flow_dealloc(int fd)
shm_ap_rbuff_close(ai.flows[fd].rb);
ai.flows[fd].rb = NULL;
ai.flows[fd].api = -1;
+ if (ai.flows[fd].timeout != NULL) {
+ free(ai.flows[fd].timeout);
+ ai.flows[fd].timeout = NULL;
+ }
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL) {
- pthread_rwlock_unlock(&ai.data_lock);
- return -1;
- }
-
- if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&ai.data_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
+ send_irm_msg(&msg);
pthread_rwlock_unlock(&ai.data_lock);
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
+ return 0;
}
int flow_cntl(int fd, int cmd, int oflags)
@@ -708,10 +698,10 @@ ssize_t flow_read(int fd, void * buf, size_t count)
} else {
struct shm_ap_rbuff * rb = ai.rb;
int port_id = ai.flows[fd].port_id;
- struct timespec timeout = ai.flows[fd].timeout;
+ struct timespec * timeout = ai.flows[fd].timeout;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout);
+ idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
pthread_rwlock_rdlock(&ai.data_lock);
}
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 408e79e7..c8375c22 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -154,6 +154,39 @@ static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)
return recv_msg;
}
+void send_irm_msg(irm_msg_t * msg)
+{
+ int sockfd;
+ buffer_t buf;
+
+ sockfd = client_socket_open(IRM_SOCK_PATH);
+ if (sockfd < 0)
+ return;
+
+ buf.len = irm_msg__get_packed_size(msg);
+ if (buf.len == 0) {
+ close(sockfd);
+ return;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ close(sockfd);
+ return;
+ }
+
+ pthread_cleanup_push(close_ptr, &sockfd);
+ pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data);
+
+ irm_msg__pack(msg, buf.data);
+
+ if (write(sockfd, buf.data, buf.len) < 0)
+ return;
+
+ pthread_cleanup_pop(true);
+ pthread_cleanup_pop(true);
+}
+
irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
{ return send_recv_irm_msg_timed(msg, true); }