From b23e3024d12c28b01426cc37d5adf03f9c1bea88 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 23 Oct 2016 23:00:15 +0200 Subject: lib: Stabilise flow allocation Deallocation was reverted to a synchronoous operation between the AP, IRMd and IPCP in order to avoid inconsistent states of the port_id. Fixes some memory leaks, particularly the shm_flow_set is now closed upon deallocation. --- include/ouroboros/sockets.h | 2 -- src/ipcpd/local/main.c | 1 + src/irmd/main.c | 7 +++---- src/lib/dev.c | 27 ++++++++++++++++++++++----- src/lib/sockets.c | 33 --------------------------------- 5 files changed, 26 insertions(+), 44 deletions(-) diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h index 3885ffb2..aef4259e 100644 --- a/include/ouroboros/sockets.h +++ b/include/ouroboros/sockets.h @@ -54,6 +54,4 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg); -void send_irm_msg(irm_msg_t * msg); - #endif diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index f6fe4ec1..192607c1 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -98,6 +98,7 @@ static void * ipcp_local_sdu_loop(void * o) if (ipcp_get_state() != IPCP_ENROLLED) { pthread_rwlock_unlock(&ipcpi.state_lock); + fqueue_destroy(fq); return (void *) 1; /* -ENOTENROLLED */ } diff --git a/src/irmd/main.c b/src/irmd/main.c index 390681a8..3884a9a7 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1929,10 +1929,9 @@ void * mainloop(void * o) ret_msg.result = flow_alloc_res(msg->port_id); break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - flow_dealloc(msg->api, msg->port_id); - irm_msg__free_unpacked(msg, NULL); - close(cli_sockfd); - continue; + ret_msg.has_result = true; + ret_msg.result = flow_dealloc(msg->api, msg->port_id); + break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->api, msg->dst_name, diff --git a/src/lib/dev.c b/src/lib/dev.c index 0a22cb12..94fbd394 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -283,7 +283,7 @@ void ap_fini() pthread_rwlock_rdlock(&ai.flows_lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ai.flows[i].tx_rb != NULL) { + if (ai.flows[i].rx_rb != NULL) { ssize_t idx; while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); @@ -560,15 +560,14 @@ int flow_alloc_res(int fd) if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -595,6 +594,7 @@ int flow_alloc_res(int fd) int flow_dealloc(int fd) { irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; msg.has_port_id = true; @@ -618,6 +618,24 @@ int flow_dealloc(int fd) msg.port_id = ai.flows[fd].port_id; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + recv_msg = send_recv_irm_msg_b(&msg); + if (recv_msg == NULL) { + return -1; + } + + if (!recv_msg->has_result) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + irm_msg__free_unpacked(recv_msg, NULL); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + port_destroy(&ai.ports[msg.port_id]); ai.flows[fd].port_id = -1; @@ -627,6 +645,7 @@ int flow_dealloc(int fd) ai.flows[fd].tx_rb = NULL; ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; + shm_flow_set_close(ai.flows[fd].set); if (ai.flows[fd].timeout != NULL) { free(ai.flows[fd].timeout); ai.flows[fd].timeout = NULL; @@ -637,8 +656,6 @@ int flow_dealloc(int fd) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - send_irm_msg(&msg); - return 0; } diff --git a/src/lib/sockets.c b/src/lib/sockets.c index db1f3f6b..a1517b7b 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -153,39 +153,6 @@ static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed) return recv_msg; } -void send_irm_msg(irm_msg_t * msg) -{ - int sockfd; - buffer_t buf; - - sockfd = client_socket_open(IRM_SOCK_PATH); - if (sockfd < 0) - return; - - buf.len = irm_msg__get_packed_size(msg); - if (buf.len == 0) { - close(sockfd); - return; - } - - buf.data = malloc(buf.len); - if (buf.data == NULL) { - close(sockfd); - return; - } - - pthread_cleanup_push(close_ptr, &sockfd); - pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); - - irm_msg__pack(msg, buf.data); - - if (write(sockfd, buf.data, buf.len) < 0) - return; - - pthread_cleanup_pop(true); - pthread_cleanup_pop(true); -} - irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) { return send_recv_irm_msg_timed(msg, true); } -- cgit v1.2.3