From 6a4109706b20266833619d26cd89c5f9447fdd91 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 18 May 2016 20:34:51 +0200 Subject: Implementation of a full flow allocator for the shim UDP. It uses UDP port 0x0D1F on all hosts to send and receive flow allocation messages. It supports communication between server and client AP over a single shim IPCP. Implementation of full flow deallocation is pending. Both the client and the server still have to call flow_dealloc(); --- src/ipcpd/ipcp-ops.h | 8 +- src/ipcpd/ipcp.c | 8 +- src/ipcpd/shim-udp/CMakeLists.txt | 14 +- src/ipcpd/shim-udp/main.c | 753 +++++++++++++++++++---------- src/ipcpd/shim-udp/shim_udp_messages.proto | 14 + src/irmd/main.c | 1 - 6 files changed, 540 insertions(+), 258 deletions(-) create mode 100644 src/ipcpd/shim-udp/shim_udp_messages.proto diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index a766c3ae..72c57595 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -38,14 +38,14 @@ struct ipcp_ops { size_t len); int (* ipcp_name_reg)(char * name); int (* ipcp_name_unreg)(char * name); - int (* ipcp_flow_alloc)(int port_id, - pid_t n_pid, + int (* ipcp_flow_alloc)(pid_t n_pid, + int port_id, char * dst_ap_name, char * src_ap_name, char * src_ae_name, enum qos_cube qos); - int (* ipcp_flow_alloc_resp)(int port_id, - pid_t n_pid, + int (* ipcp_flow_alloc_resp)(pid_t n_pid, + int port_id, int response); int (* ipcp_flow_dealloc)(int port_id); }; diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 76d3620b..dd370005 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -200,8 +200,8 @@ void * ipcp_main_loop(void * o) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_alloc(msg->port_id, - msg->pid, + _ipcp->ops->ipcp_flow_alloc(msg->pid, + msg->port_id, msg->dst_name, msg->src_ap_name, msg->src_ae_name, @@ -214,8 +214,8 @@ void * ipcp_main_loop(void * o) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, - msg->pid, + _ipcp->ops->ipcp_flow_alloc_resp(msg->pid, + msg->port_id, msg->result); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: diff --git a/src/ipcpd/shim-udp/CMakeLists.txt b/src/ipcpd/shim-udp/CMakeLists.txt index e10a8ca6..b4d9ffbd 100644 --- a/src/ipcpd/shim-udp/CMakeLists.txt +++ b/src/ipcpd/shim-udp/CMakeLists.txt @@ -12,6 +12,11 @@ include_directories(${CURRENT_BINARY_PARENT_DIR}) include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_BINARY_DIR}/include) +find_package(ProtobufC REQUIRED) +include_directories(${PROTOBUF_INCLUDE_DIRS}) +protobuf_generate_c(SHIM_UDP_PROTO_SRCS SHIM_UDP_PROTO_HDRS + shim_udp_messages.proto) + # Find library needed for gethostbyname. include(CheckFunctionExists) CHECK_FUNCTION_EXISTS("gethostbyname" CMAKE_HAVE_GETHOSTBYNAME) @@ -64,9 +69,12 @@ set(SHIM_UDP_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/main.c) +install(FILES ${SHIM_UDP_PROTO_HDRS} DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}) + add_executable (ipcpd-shim-udp ${SHIM_UDP_SOURCES} ${IPCP_SOURCES} - "${CMAKE_CURRENT_BINARY_DIR}/shim_udp_config.h") -target_link_libraries (ipcpd-shim-udp LINK_PUBLIC ouroboros) + ${SHIM_UDP_PROTO_SRCS} "${CMAKE_CURRENT_BINARY_DIR}/shim_udp_config.h") +target_link_libraries (ipcpd-shim-udp LINK_PUBLIC ouroboros + ${PROTOBUF_C_LIBRARY}) # Enable DNS by default if (NOT DISABLE_DNS MATCHES True) @@ -81,4 +89,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug) install(TARGETS ipcpd-shim-udp RUNTIME DESTINATION bin) # Enable once ipcp-shim-udp has tests -add_subdirectory(tests) +# add_subdirectory(tests) diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 0802583c..4f70d053 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -52,9 +52,14 @@ #include #include +#include "shim_udp_messages.pb-c.h" + +typedef ShimUdpMsg shim_udp_msg_t; + #define THIS_TYPE IPCP_SHIM_UDP #define LISTEN_PORT htons(0x0D1F) #define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 #define SHIM_UDP_MAX_SDU_SIZE 8980 #define DNS_TTL 86400 @@ -150,7 +155,6 @@ static int shim_ap_init(char * ap_name) } rw_lock_init(&_ap_instance->flows_lock); - return 0; } @@ -209,10 +213,14 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) rw_lock_rdlock(&_ipcp->state_lock); + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return -1; /* -ENOTENROLLED */ + } + index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); if (index == -1) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } @@ -225,11 +233,11 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { rw_lock_unlock(&_ap_instance->flows_lock); shm_release_du_buff(_ap_instance->dum, index); - rw_lock_unlock(&_ipcp->state_lock); return -EPIPE; } rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); return 0; @@ -239,13 +247,29 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) * end copy from dev.c */ +/* only call this under flows_lock */ +static int udp_port_to_fd(int udp_port) +{ + int i; + struct sockaddr_in f_saddr; + socklen_t len = sizeof(f_saddr); + + for (i = 0; i < AP_MAX_FLOWS; ++i) { + if (getsockname(i, (struct sockaddr *) &f_saddr, &len) < 0) + continue; + if (f_saddr.sin_port == udp_port) + return i; + } + + return -1; +} + struct ipcp_udp_data { /* keep ipcp_data first for polymorphism */ struct ipcp_data ipcp_data; uint32_t ip_addr; uint32_t dns_addr; - /* listen server */ struct sockaddr_in s_saddr; int s_fd; @@ -260,7 +284,7 @@ struct ipcp_udp_data * ipcp_udp_data_create() struct ipcp_data * data; enum ipcp_type ipcp_type; - udp_data = malloc(sizeof *udp_data); + udp_data = malloc(sizeof(*udp_data)); if (udp_data == NULL) { LOG_ERR("Failed to allocate."); return NULL; @@ -278,74 +302,277 @@ struct ipcp_udp_data * ipcp_udp_data_create() return udp_data; } -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +static int send_shim_udp_msg(shim_udp_msg_t * msg, + uint32_t dst_ip_addr) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - bool clean_threads = false; + buffer_t buf; + struct sockaddr_in r_saddr; + + memset((char *)&r_saddr, 0, sizeof(r_saddr)); + r_saddr.sin_family = AF_INET; + r_saddr.sin_addr.s_addr = dst_ip_addr; + r_saddr.sin_port = LISTEN_PORT; + + buf.size = shim_udp_msg__get_packed_size(msg); + if (buf.size == 0) { + return -1; + } + + buf.data = malloc(SHIM_UDP_MSG_SIZE); + if (buf.data == NULL) { + return -1; + } + + shim_udp_msg__pack(msg, buf.data); + + if (sendto(shim_data(_ipcp)->s_fd, + buf.data, + buf.size, + 0, + (struct sockaddr *) &r_saddr, + sizeof(r_saddr)) == -1) { + LOG_ERR("Failed to send message."); + free(buf.data); + return -1; + } + + free(buf.data); + + return 0; +} - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGHUP: - if (info->si_pid == irmd_pid || info->si_pid == 0) { - pthread_sigmask(SIG_BLOCK, &sigset, NULL); +static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, + uint32_t src_udp_port, + char * dst_name, + char * src_ap_name, + char * src_ae_name) +{ + shim_udp_msg_t msg = SHIM_UDP_MSG__INIT; - LOG_DBG("Terminating by order of %d. Bye.", - info->si_pid); + msg.code = SHIM_UDP_MSG_CODE__FLOW_REQ; + msg.src_udp_port = src_udp_port; + msg.dst_name = dst_name; + msg.src_ap_name = src_ap_name; + msg.src_ae_name = src_ae_name; - rw_lock_wrlock(&_ipcp->state_lock); + return send_shim_udp_msg(&msg, dst_ip_addr); +} - if (_ipcp->state == IPCP_ENROLLED) { - clean_threads = true; - } +static int ipcp_udp_port_alloc_resp(uint32_t ip_addr, + uint16_t src_udp_port, + uint16_t dst_udp_port, + int response) +{ + shim_udp_msg_t msg = SHIM_UDP_MSG__INIT; - if (clean_threads) { - pthread_cancel(_ap_instance->handler); - pthread_cancel(_ap_instance->sdu_reader); - pthread_cancel(_ap_instance->sduloop); + msg.code = SHIM_UDP_MSG_CODE__FLOW_REPLY; + msg.src_udp_port = src_udp_port; + msg.has_dst_udp_port = true; + msg.dst_udp_port = dst_udp_port; + msg.has_response = true; + msg.response = response; - pthread_join(_ap_instance->sduloop, NULL); - pthread_join(_ap_instance->handler, NULL); - pthread_join(_ap_instance->sdu_reader, NULL); - } + return send_shim_udp_msg(&msg, ip_addr); +} - pthread_cancel(_ap_instance->mainloop); +static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, + char * dst_name, + char * src_ap_name, + char * src_ae_name) +{ + int fd; + int port_id; - _ipcp->state = IPCP_SHUTDOWN; + struct sockaddr_in f_saddr; + socklen_t f_saddr_len = sizeof(f_saddr); - rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Port request arrived from UDP port %d", + ntohs(c_saddr->sin_port)); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + LOG_ERR("Could not create UDP socket."); + return -1; + } - } - default: - return; + memset((char *) &f_saddr, 0, sizeof(f_saddr)); + f_saddr.sin_family = AF_INET; + f_saddr.sin_addr.s_addr = local_ip; + + /* + * FIXME: we could have a port dedicated per registered AP + * Not that critical for UDP, but will be for LLC + */ + + f_saddr.sin_port = 0; + + if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { + LOG_ERR("Could not bind to socket."); + close(fd); + return -1; + } + + if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + LOG_ERR("Could not get address from fd."); + return -1; + } + + /* + * store the remote address in the file descriptor + * this avoids having to store the sockaddr_in in + * the flow structure + */ + + if (connect(fd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { + LOG_ERR("Could not connect to remote UDP client."); + close(fd); + return -1; + } + + + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + /* reply to IRM */ + port_id = ipcp_flow_req_arr(getpid(), + dst_name, + src_ap_name, + src_ae_name); + + if (port_id < 0) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_ERR("Could not get port id from IRMd"); + close(fd); + return -1; + } + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + LOG_DBGF("Pending allocation request, port_id %d, UDP port (%d, %d).", + port_id, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); + + return 0; +} + +static int ipcp_udp_port_alloc_reply(int src_udp_port, + int dst_udp_port, + int response) +{ + int fd = -1; + int ret = 0; + int port_id = -1; + + struct sockaddr_in t_saddr; + socklen_t t_saddr_len = sizeof(t_saddr); + + LOG_DBGF("Received reply for flow on udp port %d.", + ntohs(dst_udp_port)); + + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_rdlock(&_ap_instance->flows_lock); + + fd = udp_port_to_fd(dst_udp_port); + if (fd == -1) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Unknown flow on UDP port %d.", dst_udp_port); + return -1; /* -EUNKNOWNFLOW */ + } + + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow on UDP port %d not pending.", dst_udp_port); + return -1; /* -EFLOWNOTPENDING */ + } + + port_id = _ap_instance->flows[fd].port_id; + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + if ((ret = ipcp_flow_alloc_reply(getpid(), + port_id, + response)) < 0) { + return -1; /* -EPIPE */ + } + + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + if (response) { + _ap_instance->flows[fd].port_id = -1; + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_NULL; + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + close(fd); + return 0; + } + + /* get the original address with the LISTEN PORT */ + if (getpeername(fd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return 0; + }; + + /* connect to the flow udp port */ + t_saddr.sin_port = src_udp_port; + + if (connect(fd, + (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + close(fd); + return -1; } + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).", + ntohs(src_udp_port), ntohs(dst_udp_port)); + + return ret; + } static void * ipcp_udp_listener() { - char buf[SHIM_UDP_BUF_SIZE]; + uint8_t buf[SHIM_UDP_MSG_SIZE]; int n = 0; - struct sockaddr_in f_saddr; struct sockaddr_in c_saddr; - int sfd = shim_data(_ipcp)->s_fd; while (true) { - int fd; - int port_id; + int sfd = 0; + shim_udp_msg_t * msg = NULL; rw_lock_rdlock(&_ipcp->state_lock); - memset(&buf, 0, SHIM_UDP_BUF_SIZE); - n = sizeof c_saddr; - n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 1; /* -ENOTENROLLED */ + } + + sfd = shim_data(_ipcp)->s_fd; + + rw_lock_unlock(&_ipcp->state_lock); + + memset(&buf, 0, SHIM_UDP_MSG_SIZE); + n = sizeof(c_saddr); + n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); + if (n < 0) { - rw_lock_unlock(&_ipcp->state_lock); continue; } @@ -353,69 +580,36 @@ static void * ipcp_udp_listener() if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, sizeof(c_saddr.sin_addr.s_addr), AF_INET) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - continue; - } - - fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - - memset((char *) &f_saddr, 0, sizeof f_saddr); - f_saddr.sin_family = AF_INET; - f_saddr.sin_addr.s_addr = local_ip; - - /* - * FIXME: we could have a port dedicated per registered AP - * Not that critical for UDP, but will be for LLC - */ - - f_saddr.sin_port = 0; - - /* - * store the remote address in the file descriptor - * this avoids having to store the sockaddr_in in - * the flow structure - */ - - if (connect(fd, - (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { - rw_lock_unlock(&_ipcp->state_lock); - close(fd); continue; } - /* echo back the packet */ - if (send(fd, buf, strlen(buf), 0) < 0) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Failed to echo back the packet."); - close(fd); + msg = shim_udp_msg__unpack(NULL, n, buf); + if (msg == NULL) { continue; } - /* reply to IRM */ - rw_lock_wrlock(&_ap_instance->flows_lock); - - port_id = ipcp_flow_req_arr(getpid(), - buf, - UNKNOWN_AP, - UNKNOWN_AE); - - if (port_id < 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not get port id from IRMd"); - close(fd); + switch (msg->code) { + case SHIM_UDP_MSG_CODE__FLOW_REQ: + c_saddr.sin_port = msg->src_udp_port; + ipcp_udp_port_req(&c_saddr, + msg->dst_name, + msg->src_ap_name, + msg->src_ae_name); + break; + case SHIM_UDP_MSG_CODE__FLOW_REPLY: + ipcp_udp_port_alloc_reply(msg->src_udp_port, + msg->dst_udp_port, + msg->response); + break; + default: + LOG_ERR("Unknown message received %d.", msg->code); + shim_udp_msg__free_unpacked(msg, NULL); continue; } - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].state = FLOW_PENDING; - - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); + c_saddr.sin_port = LISTEN_PORT; - LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", - port_id, fd); + shim_udp_msg__free_unpacked(msg, NULL); } return 0; @@ -436,16 +630,17 @@ static void * ipcp_udp_sdu_reader() if (_ipcp->state != IPCP_ENROLLED) { rw_lock_unlock(&_ipcp->state_lock); - return (void *) 0; + return (void *) 1; /* -ENOTENROLLED */ } rw_lock_rdlock(&_ap_instance->flows_lock); read_fds = shim_data(_ipcp)->flow_fd_s; + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); continue; } @@ -455,7 +650,7 @@ static void * ipcp_udp_sdu_reader() flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); - n = sizeof r_saddr; + n = sizeof(r_saddr); if ((n = recvfrom(fd, buf, SHIM_UDP_MAX_SDU_SIZE, @@ -468,9 +663,6 @@ static void * ipcp_udp_sdu_reader() if (ipcp_udp_flow_write(fd, buf, n) < 0) LOG_ERR("Failed to write SDU."); } - - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); } return (void *) 0; @@ -479,6 +671,7 @@ static void * ipcp_udp_sdu_reader() /* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_udp_sdu_loop(void * o) { + while (true) { struct rb_entry * e; int fd; @@ -489,7 +682,7 @@ static void * ipcp_udp_sdu_loop(void * o) if (_ipcp->state != IPCP_ENROLLED) { rw_lock_unlock(&_ipcp->state_lock); - return (void *) 0; + return (void *) 1; /* -ENOTENROLLED */ } e = shm_ap_rbuff_read(_ap_instance->rb); @@ -502,7 +695,7 @@ static void * ipcp_udp_sdu_loop(void * o) len = shm_du_map_read_sdu((uint8_t **) &buf, _ap_instance->dum, e->index); - if (len == -1) { + if (len <= 0) { rw_lock_unlock(&_ipcp->state_lock); free(e); continue; @@ -512,16 +705,10 @@ static void * ipcp_udp_sdu_loop(void * o) fd = port_id_to_fd(e->port_id); - if (fd == -1) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - free(e); - continue; - } + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); - if (len == 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); + if (fd == -1) { free(e); continue; } @@ -529,19 +716,65 @@ static void * ipcp_udp_sdu_loop(void * o) if (send(fd, buf, len, 0) < 0) LOG_ERR("Failed to send SDU."); - shm_release_du_buff(_ap_instance->dum, e->index); + rw_lock_rdlock(&_ipcp->state_lock); - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); + if (_ap_instance->dum != NULL) + shm_release_du_buff(_ap_instance->dum, e->index); - free(e); + rw_lock_unlock(&_ipcp->state_lock); } return (void *) 1; } +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGHUP: + if (info->si_pid == irmd_pid || info->si_pid == 0) { + LOG_DBG("Terminating by order of %d. Bye.", + info->si_pid); + + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state == IPCP_ENROLLED) { + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader); + pthread_cancel(_ap_instance->sduloop); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader, NULL); + } + + pthread_cancel(_ap_instance->mainloop); + + _ipcp->state = IPCP_SHUTDOWN; + + rw_lock_unlock(&_ipcp->state_lock); + } + default: + return; + } + + LOG_DBGF("Lock check."); + rw_lock_wrlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ap_instance->flows_lock); + LOG_DBGF("flows_lock passed."); + rw_lock_wrlock(&_ipcp->state_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("state_lock passed."); +} + static int ipcp_udp_bootstrap(struct dif_config * conf) { + struct sockaddr_in s_saddr; char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; int enable = 1; @@ -552,19 +785,10 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - rw_lock_wrlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_INIT) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("IPCP in wrong state."); - return -1; - } - if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to convert IP address"); return -1; } @@ -574,7 +798,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) &conf->dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to convert DNS address"); return -1; } @@ -587,7 +810,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Can't create socket."); return -1; } @@ -596,28 +818,39 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) SOL_SOCKET, SO_REUSEADDR, &enable, - sizeof(int)) < 0) { + sizeof(int)) < 0) LOG_WARN("Setsockopt(SO_REUSEADDR) failed."); - } - - shim_data(_ipcp)->s_fd = fd; - shim_data(_ipcp)->ip_addr = conf->ip_addr; - shim_data(_ipcp)->dns_addr = conf->dns_addr; + memset((char *) &s_saddr, 0, sizeof(s_saddr)); shim_data(_ipcp)->s_saddr.sin_family = AF_INET; shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT; if (bind(fd, (struct sockaddr *) &shim_data(_ipcp)->s_saddr, - sizeof shim_data(_ipcp)->s_saddr ) < 0) { - rw_lock_unlock(&_ipcp->state_lock); + sizeof(shim_data(_ipcp)->s_saddr)) < 0) { LOG_ERR("Couldn't bind to %s.", ipstr); + close(fd); + return -1; + } + + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_INIT) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_ERR("IPCP in wrong state."); + close(fd); return -1; } + shim_data(_ipcp)->s_fd = fd; + shim_data(_ipcp)->ip_addr = conf->ip_addr; + shim_data(_ipcp)->dns_addr = conf->dns_addr; + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + _ipcp->state = IPCP_ENROLLED; + pthread_create(&_ap_instance->handler, NULL, ipcp_udp_listener, @@ -632,8 +865,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) ipcp_udp_sdu_loop, NULL); - _ipcp->state = IPCP_ENROLLED; - rw_lock_unlock(&_ipcp->state_lock); LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", @@ -803,18 +1034,19 @@ static int ipcp_udp_name_reg(char * name) /* register application with DNS server */ dns_addr = shim_data(_ipcp)->dns_addr; + + rw_lock_unlock(&_ipcp->state_lock); + if (dns_addr != 0) { ip_addr = shim_data(_ipcp)->ip_addr; if (inet_ntop(AF_INET, &ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } @@ -822,15 +1054,13 @@ static int ipcp_udp_name_reg(char * name) dnsstr, name, DNS_TTL, ipstr); if (ddns_send(cmd)) { + rw_lock_rdlock(&_ipcp->state_lock); ipcp_data_del_reg_entry(_ipcp->data, name); rw_lock_unlock(&_ipcp->state_lock); return -1; } } #endif - - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBG("Registered %s.", name); return 0; @@ -862,10 +1092,12 @@ static int ipcp_udp_name_unreg(char * name) } dns_addr = shim_data(_ipcp)->dns_addr; + + rw_lock_unlock(&_ipcp->state_lock); + if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } sprintf(cmd, "server %s\nupdate delete %s A\nsend\nquit\n", @@ -875,28 +1107,26 @@ static int ipcp_udp_name_unreg(char * name) } #endif + rw_lock_rdlock(&_ipcp->state_lock); + ipcp_data_del_reg_entry(_ipcp->data, name); rw_lock_unlock(&_ipcp->state_lock); - LOG_DBG("Unregistered %s.", name); - return 0; } -static int ipcp_udp_flow_alloc(int port_id, - pid_t n_pid, +static int ipcp_udp_flow_alloc(pid_t n_pid, + int port_id, char * dst_name, char * src_ap_name, char * src_ae_name, enum qos_cube qos) { - struct sockaddr_in l_saddr; - struct sockaddr_in r_saddr; - struct sockaddr_in rf_saddr; + struct sockaddr_in r_saddr; /* server address */ + struct sockaddr_in f_saddr; /* flow */ + socklen_t f_saddr_len = sizeof(f_saddr); int fd; - int n; - char * recv_buf = NULL; struct hostent * h; uint32_t ip_addr = 0; #ifdef CONFIG_OUROBOROS_ENABLE_DNS @@ -904,21 +1134,13 @@ static int ipcp_udp_flow_alloc(int port_id, #endif struct shm_ap_rbuff * rb; + LOG_INFO("Allocating flow from %s to %s.", src_ap_name, dst_name); + if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; - - rw_lock_rdlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_ENROLLED) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); - return -1; /* -ENOTENROLLED */ - } - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Name too long for this shim."); return -1; } @@ -926,27 +1148,46 @@ static int ipcp_udp_flow_alloc(int port_id, if (qos != QOS_CUBE_BE) LOG_DBGF("QoS requested. UDP/IP can't do that."); + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) + return -1; /* -ENORBUFF */ + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ - memset((char *) &l_saddr, 0, sizeof l_saddr); - l_saddr.sin_family = AF_INET; - l_saddr.sin_addr.s_addr = local_ip; - l_saddr.sin_port = 0; + memset((char *) &f_saddr, 0, sizeof(f_saddr)); + f_saddr.sin_family = AF_INET; + f_saddr.sin_addr.s_addr = local_ip; + f_saddr.sin_port = 0; - if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { - rw_lock_unlock(&_ipcp->state_lock); + if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { close(fd); return -1; } + if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + LOG_ERR("Could not get address from fd."); + close(fd); + return -1; + } + + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); + close(fd); + return -1; /* -ENOTENROLLED */ + } + #ifdef CONFIG_OUROBOROS_ENABLE_DNS dns_addr = shim_data(_ipcp)->dns_addr; + rw_lock_unlock(&_ipcp->state_lock); + if (dns_addr != 0) { ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { - rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not resolve %s.", dst_name); close(fd); return -1; @@ -955,7 +1196,6 @@ static int ipcp_udp_flow_alloc(int port_id, #endif h = gethostbyname(dst_name); if (h == NULL) { - rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not resolve %s.", dst_name); close(fd); return -1; @@ -966,97 +1206,73 @@ static int ipcp_udp_flow_alloc(int port_id, } #endif - memset((char *) &r_saddr, 0, sizeof r_saddr); + /* connect to server (store the remote IP address in the fd) */ + memset((char *) &r_saddr, 0, sizeof(r_saddr)); r_saddr.sin_family = AF_INET; r_saddr.sin_addr.s_addr = ip_addr; r_saddr.sin_port = LISTEN_PORT; - if (sendto(fd, dst_name, strlen(dst_name), 0, - (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Failed to send packet"); - close(fd); - return -1; - } - - /* wait for the other shim IPCP to respond */ - - recv_buf = malloc(strlen(dst_name) + 1); - if (recv_buf == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Failed to malloc recv_buff."); - close(fd); - return -1; - } - n = sizeof(rf_saddr); - n = recvfrom(fd, - recv_buf, - strlen(dst_name), - 0, - (struct sockaddr *) &rf_saddr, - (unsigned *) &n); - - if (connect(fd, - (struct sockaddr *) &rf_saddr, - sizeof rf_saddr) - < 0) { - rw_lock_unlock(&_ipcp->state_lock); + if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { close(fd); - free(recv_buf); return -1; } - if (memcmp(recv_buf, dst_name, strlen(dst_name))) - LOG_WARN("Incorrect echo from server"); + rw_lock_unlock(&_ipcp->state_lock); - free(recv_buf); + LOG_DBGF("Pending flow with port_id %d on UDP port %d.", + port_id, ntohs(f_saddr.sin_port)); - rb = shm_ap_rbuff_open(n_pid); - if (rb == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not open N + 1 ringbuffer."); - close(fd); - return -1; /* -ENORBUFF */ - } + rw_lock_rdlock(&_ipcp->state_lock); rw_lock_wrlock(&_ap_instance->flows_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].state = FLOW_PENDING; _ap_instance->flows[fd].rb = rb; - FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); - /* tell IRMd that flow allocation "worked" */ + if (ipcp_udp_port_alloc(ip_addr, + f_saddr.sin_port, + dst_name, + src_ap_name, + src_ae_name) < 0) { + LOG_DBGF("Port alloc returned -1."); + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + _ap_instance->flows[fd].port_id = -1; + _ap_instance->flows[fd].state = FLOW_NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + _ap_instance->flows[fd].rb = NULL; - if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { + rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); - shm_ap_rbuff_close(rb); - LOG_ERR("Failed to notify IRMd about flow allocation reply"); close(fd); return -1; } - rw_lock_unlock(&_ipcp->state_lock); - - LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd); - return fd; } -static int ipcp_udp_flow_alloc_resp(int port_id, - pid_t n_pid, +static int ipcp_udp_flow_alloc_resp(pid_t n_pid, + int port_id, int response) { struct shm_ap_rbuff * rb; - struct timespec wait = {0, 1000000}; int fd = -1; + struct sockaddr_in f_saddr; + struct sockaddr_in r_saddr; + socklen_t len = sizeof(r_saddr); if (response) return 0; - rw_lock_unlock(&_ipcp->state_lock); + rw_lock_rdlock(&_ipcp->state_lock); /* awaken pending flow */ @@ -1081,23 +1297,59 @@ static int ipcp_udp_flow_alloc_resp(int port_id, if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); _ap_instance->flows[fd].state = FLOW_NULL; - _ap_instance->flows[fd].port_id = 0; + _ap_instance->flows[fd].port_id = -1; rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); return 0; } + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + if (getsockname(fd, (struct sockaddr *) &f_saddr, &len) < 0) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return 0; + }; + + if (getpeername(fd, (struct sockaddr *) &r_saddr, &len) < 0) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return 0; + }; + + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_ALLOCATED; _ap_instance->flows[fd].rb = rb; FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - nanosleep(&wait, NULL); - rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); - LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); + if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, + f_saddr.sin_port, + r_saddr.sin_port, + response) < 0) { + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].state = FLOW_NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + _ap_instance->flows[fd].rb = NULL; + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + LOG_DBGF("Could not send response."); + return -1; + } + + LOG_DBGF("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); return 0; } @@ -1108,6 +1360,11 @@ static int ipcp_udp_flow_dealloc(int port_id) struct shm_ap_rbuff * rb; struct timespec wait = {0, 1000000}; + /* flow deallocation should wait for 2 MPL */ + nanosleep(&wait, NULL); + + LOG_DBGF("Deallocating flow with port_id %d.", port_id); + rw_lock_rdlock(&_ipcp->state_lock); rw_lock_wrlock(&_ap_instance->flows_lock); @@ -1129,25 +1386,22 @@ static int ipcp_udp_flow_dealloc(int port_id) FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); - nanosleep(&wait, NULL); + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); close(fd); - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d deallocated.", port_id); return 0; } -static struct ipcp * ipcp_udp_create(char * ap_name) +static struct ipcp * ipcp_udp_create() { struct ipcp * i; struct ipcp_udp_data * data; struct ipcp_ops * ops; - if (shim_ap_init(ap_name) < 0) - return NULL; - i = ipcp_instance_create(); if (i == NULL) return NULL; @@ -1158,7 +1412,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name) return NULL; } - ops = malloc(sizeof *ops); + ops = malloc(sizeof(*ops)); if (ops == NULL) { free(data); free(i); @@ -1202,11 +1456,14 @@ int main (int argc, char * argv[]) exit(1); } + if (shim_ap_init(argv[2]) < 0) + exit(1); + /* store the process id of the irmd */ irmd_pid = atoi(argv[1]); /* init sig_act */ - memset(&sig_act, 0, sizeof sig_act); + memset(&sig_act, 0, sizeof(sig_act)); /* install signal traps */ sig_act.sa_sigaction = &ipcp_sig_handler; @@ -1217,18 +1474,22 @@ int main (int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_udp_create(argv[2]); + _ipcp = ipcp_udp_create(); if (_ipcp == NULL) { LOG_ERR("Won't."); exit(1); } + rw_lock_wrlock(&_ipcp->state_lock); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + rw_lock_unlock(&_ipcp->state_lock); + pthread_join(_ap_instance->mainloop, NULL); shim_ap_fini(); diff --git a/src/ipcpd/shim-udp/shim_udp_messages.proto b/src/ipcpd/shim-udp/shim_udp_messages.proto new file mode 100644 index 00000000..1d054f1f --- /dev/null +++ b/src/ipcpd/shim-udp/shim_udp_messages.proto @@ -0,0 +1,14 @@ +enum shim_udp_msg_code { + FLOW_REQ = 1; + FLOW_REPLY = 2; +}; + +message shim_udp_msg { + required shim_udp_msg_code code = 1; + optional string dst_name = 2; + optional string src_ap_name = 3; + optional string src_ae_name = 4; + required sint32 src_udp_port = 5; + optional sint32 dst_udp_port = 6; + optional sint32 response = 7; +}; diff --git a/src/irmd/main.c b/src/irmd/main.c index 89263977..28f82751 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1047,7 +1047,6 @@ static int flow_alloc_res(int port_id) if (e->state == FLOW_ALLOCATED) { rw_lock_unlock(&instance->flows_lock); rw_lock_unlock(&instance->state_lock); - LOG_DBGF("Returning 0."); return 0; } if (e->state == FLOW_NULL) { -- cgit v1.2.3 From c3485a0b159469c0899c76de230f2d36d371909e Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Thu, 19 May 2016 20:33:26 +0200 Subject: ipcpd: shim-udp: removed debug code some debug code for locking and a sleep check are removed from the shim-udp. --- src/ipcpd/shim-udp/main.c | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 4f70d053..819c8149 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -762,14 +762,6 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) default: return; } - - LOG_DBGF("Lock check."); - rw_lock_wrlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ap_instance->flows_lock); - LOG_DBGF("flows_lock passed."); - rw_lock_wrlock(&_ipcp->state_lock); - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBGF("state_lock passed."); } static int ipcp_udp_bootstrap(struct dif_config * conf) @@ -1358,10 +1350,6 @@ static int ipcp_udp_flow_dealloc(int port_id) { int fd = -1; struct shm_ap_rbuff * rb; - struct timespec wait = {0, 1000000}; - - /* flow deallocation should wait for 2 MPL */ - nanosleep(&wait, NULL); LOG_DBGF("Deallocating flow with port_id %d.", port_id); -- cgit v1.2.3 From e0f5fbea688a6e9b19c79d16b41076fc7c508df6 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 20 May 2016 00:34:36 +0200 Subject: ipcpd: shim-ipcp: fixed race conditions There were two race conditions. The first was where client received the response message before the client UDP port was connected to the server UDP port and the first message of the client was sent to the LISTEN port instead of the server application port. This was solved by reordering the operations so the response is sent after the port is correctly connected. The second race condition is that the FD is not added to the set in time at the server side before the first SDU arrives. This was solved by adding a variable proteceted by a mutex. --- src/ipcpd/shim-udp/main.c | 97 ++++++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 819c8149..a75f3ce3 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -90,7 +90,6 @@ struct shim_ap_data { struct shm_du_map * dum; struct bmp * fds; struct shm_ap_rbuff * rb; - rw_lock_t data_lock; struct flow flows[AP_MAX_FLOWS]; rw_lock_t flows_lock; @@ -99,6 +98,9 @@ struct shim_ap_data { pthread_t sduloop; pthread_t handler; pthread_t sdu_reader; + + bool fd_set_sync; + pthread_mutex_t fd_set_lock; } * _ap_instance; static int shim_ap_init(char * ap_name) @@ -155,6 +157,7 @@ static int shim_ap_init(char * ap_name) } rw_lock_init(&_ap_instance->flows_lock); + pthread_mutex_init(&_ap_instance->fd_set_lock, NULL); return 0; } @@ -226,7 +229,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) e.index = index; - rw_lock_rdlock(&_ap_instance->flows_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); e.port_id = _ap_instance->flows[fd].port_id; @@ -492,52 +495,47 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port, port_id = _ap_instance->flows[fd].port_id; - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - - if ((ret = ipcp_flow_alloc_reply(getpid(), - port_id, - response)) < 0) { - return -1; /* -EPIPE */ - } - - rw_lock_rdlock(&_ipcp->state_lock); - rw_lock_wrlock(&_ap_instance->flows_lock); - if (response) { _ap_instance->flows[fd].port_id = -1; _ap_instance->flows[fd].rb = NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); _ap_instance->flows[fd].state = FLOW_NULL; - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - close(fd); - return 0; - } + } else { + /* get the original address with the LISTEN PORT */ + if (getpeername(fd, + (struct sockaddr *) &t_saddr, + &t_saddr_len) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return -1; + } - /* get the original address with the LISTEN PORT */ - if (getpeername(fd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow with port_id %d has no peer.", port_id); - return 0; - }; + /* connect to the flow udp port */ + t_saddr.sin_port = src_udp_port; - /* connect to the flow udp port */ - t_saddr.sin_port = src_udp_port; + if (connect(fd, + (struct sockaddr *) &t_saddr, + sizeof(t_saddr)) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + close(fd); + return -1; + } - if (connect(fd, - (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - close(fd); - return -1; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; } - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); + + if ((ret = ipcp_flow_alloc_reply(getpid(), + port_id, + response)) < 0) { + return -1; /* -EPIPE */ + } + LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).", ntohs(src_udp_port), ntohs(dst_udp_port)); @@ -635,7 +633,12 @@ static void * ipcp_udp_sdu_reader() rw_lock_rdlock(&_ap_instance->flows_lock); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + read_fds = shim_data(_ipcp)->flow_fd_s; + _ap_instance->fd_set_sync = false; + + pthread_mutex_unlock(&_ap_instance->fd_set_lock); rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); @@ -1240,7 +1243,7 @@ static int ipcp_udp_flow_alloc(pid_t n_pid, _ap_instance->flows[fd].port_id = -1; _ap_instance->flows[fd].state = FLOW_NULL; shm_ap_rbuff_close(_ap_instance->flows[fd].rb); - _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].rb = NULL; rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); @@ -1260,6 +1263,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid, struct sockaddr_in f_saddr; struct sockaddr_in r_saddr; socklen_t len = sizeof(r_saddr); + bool fd_wait = true; if (response) return 0; @@ -1316,8 +1320,20 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid, _ap_instance->flows[fd].state = FLOW_ALLOCATED; _ap_instance->flows[fd].rb = rb; + pthread_mutex_lock(&_ap_instance->fd_set_lock); + + _ap_instance->fd_set_sync = true; FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + + while (fd_wait) { + sched_yield(); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + fd_wait = _ap_instance->fd_set_sync; + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + } + rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); @@ -1369,12 +1385,13 @@ static int ipcp_udp_flow_dealloc(int port_id) rb = _ap_instance->flows[fd].rb; _ap_instance->flows[fd].rb = NULL; - if (rb != NULL) - shm_ap_rbuff_close(rb); - FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); rw_lock_unlock(&_ap_instance->flows_lock); + + if (rb != NULL) + shm_ap_rbuff_close(rb); + rw_lock_unlock(&_ipcp->state_lock); close(fd); -- cgit v1.2.3 From 129b15eea7a790bff0a83d1668b8d666fe0e6f35 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 20 May 2016 07:34:41 +0200 Subject: ipcpd: shim-udp: read fd on client side There could be a theoretical race condition that the server sends a message before the FD is set on the client side, which is resolved. --- src/ipcpd/shim-udp/main.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index a75f3ce3..3ef783c4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -1124,6 +1124,7 @@ static int ipcp_udp_flow_alloc(pid_t n_pid, int fd; struct hostent * h; uint32_t ip_addr = 0; + bool fd_wait = true; #ifdef CONFIG_OUROBOROS_ENABLE_DNS uint32_t dns_addr = 0; #endif @@ -1220,8 +1221,20 @@ static int ipcp_udp_flow_alloc(pid_t n_pid, rw_lock_rdlock(&_ipcp->state_lock); rw_lock_wrlock(&_ap_instance->flows_lock); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + + _ap_instance->fd_set_sync = true; FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + + while (fd_wait) { + sched_yield(); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + fd_wait = _ap_instance->fd_set_sync; + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + } + _ap_instance->flows[fd].port_id = port_id; _ap_instance->flows[fd].state = FLOW_PENDING; _ap_instance->flows[fd].rb = rb; -- cgit v1.2.3