diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2019-03-16 15:16:21 +0100 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2019-03-18 11:09:31 +0100 |
commit | 8940fe2cc063d2de8393684ff48efec0e27edc8a (patch) | |
tree | 934574e25a84f9a486004e7fc30cc35430cc44bf /src/ipcpd | |
parent | 7702cb0f44f4cbb31436b2d2c621d4e5b4c0edec (diff) | |
download | ouroboros-8940fe2cc063d2de8393684ff48efec0e27edc8a.tar.gz ouroboros-8940fe2cc063d2de8393684ff48efec0e27edc8a.zip |
ipcpd: Revise UDP IPCP
The UDP IPCP now uses a fixed server UDP port (default 3435) for all
communications. This allows passing firewalls more easily since only a
single port needs to be opened. The client port can be fixed as well
if needed (default random). It uses an internal eid, so the MTU of the
UDP layer is reduced by 4 bytes, similar to the Ethernet IPCPs.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/config.h.in | 8 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 34 | ||||
-rw-r--r-- | src/ipcpd/udp/CMakeLists.txt | 9 | ||||
-rw-r--r-- | src/ipcpd/udp/main.c | 787 |
4 files changed, 397 insertions, 441 deletions
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index ddd0d56e..8545021b 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -39,7 +39,7 @@ #define IPCP_ADD_THREADS @IPCP_ADD_THREADS@ #cmakedefine HAVE_LIBGCRYPT -/* normal IPCP */ +/* unicast IPCP */ #define QOS_PRIO_BE @IPCP_QOS_CUBE_BE_PRIO@ #define QOS_PRIO_VIDEO @IPCP_QOS_CUBE_VIDEO_PRIO@ #define QOS_PRIO_VOICE @IPCP_QOS_CUBE_VOICE_PRIO@ @@ -55,11 +55,13 @@ #cmakedefine HAVE_DDNS #define NSUPDATE_EXEC "@NSUPDATE_EXECUTABLE@" #define NSLOOKUP_EXEC "@NSLOOKUP_EXECUTABLE@" +#define IPCP_UDP_RD_THR @IPCP_UDP_RD_THR@ +#define IPCP_UDP_WR_THR @IPCP_UDP_WR_THR@ /* eth-llc */ #cmakedefine HAVE_NETMAP #cmakedefine HAVE_BPF #cmakedefine HAVE_RAW_SOCKETS -#define IPCP_ETH_RD_THR @IPCP_ETH_RD_THR@ -#define IPCP_ETH_WR_THR @IPCP_ETH_WR_THR@ #cmakedefine IPCP_ETH_QDISC_BYPASS +#define IPCP_ETH_RD_THR @IPCP_ETH_RD_THR@ +#define IPCP_ETH_WR_THR @IPCP_ETH_WR_THR@ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 8e0cd189..bd0aeee5 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -231,37 +231,39 @@ static void * mainloop(void * o) conf.type = conf_msg->ipcp_type; strcpy(conf.layer_info.layer_name, conf_msg->layer_info->layer_name); - if (conf_msg->ipcp_type == IPCP_NORMAL) { + + switch(conf_msg->ipcp_type) { + case IPCP_NORMAL: conf.addr_size = conf_msg->addr_size; conf.eid_size = conf_msg->eid_size; conf.max_ttl = conf_msg->max_ttl; conf.addr_auth_type = conf_msg->addr_auth_type; conf.routing_type = conf_msg->routing_type; conf.pff_type = conf_msg->pff_type; - } - - if (conf_msg->ipcp_type == IPCP_ETH_LLC) - conf.dev = conf_msg->dev; - - if (conf_msg->ipcp_type == IPCP_ETH_DIX) { - conf.dev = conf_msg->dev; + break; + case IPCP_ETH_DIX: conf.ethertype = conf_msg->ethertype; - } - - if (conf_msg->ipcp_type == IPCP_UDP) { + /* FALLTHRU */ + case IPCP_ETH_LLC: + conf.dev = conf_msg->dev; + break; + case IPCP_UDP: conf.ip_addr = conf_msg->ip_addr; conf.dns_addr = conf_msg->dns_addr; - + conf.clt_port = conf_msg->clt_port; + conf.srv_port = conf_msg->srv_port; conf.layer_info.dir_hash_algo = HASH_MD5; layer_info.dir_hash_algo = HASH_MD5; - } - - if (conf_msg->ipcp_type == IPCP_BROADCAST) { + break; + case IPCP_BROADCAST: conf.layer_info.dir_hash_algo = HASH_SHA3_256; layer_info.dir_hash_algo = HASH_SHA3_256; + break; + default: + log_err("Unknown IPCP type."); } - /* UDP and broadcast have a fixed hash algorithm. */ + /* UDP and broadcast use fixed hash algorithm. */ if (conf_msg->ipcp_type != IPCP_UDP && conf_msg->ipcp_type != IPCP_BROADCAST) { switch(conf_msg->layer_info->dir_hash_algo) { diff --git a/src/ipcpd/udp/CMakeLists.txt b/src/ipcpd/udp/CMakeLists.txt index b21afe75..f1a29ef6 100644 --- a/src/ipcpd/udp/CMakeLists.txt +++ b/src/ipcpd/udp/CMakeLists.txt @@ -16,9 +16,11 @@ set(IPCP_UDP_TARGET ipcpd-udp CACHE INTERNAL "") set(UDP_SOURCES # Add source files here - ${CMAKE_CURRENT_SOURCE_DIR}/main.c) + ${CMAKE_CURRENT_SOURCE_DIR}/main.c + ) add_executable(ipcpd-udp ${UDP_SOURCES} ${IPCP_SOURCES}) + target_link_libraries(ipcpd-udp LINK_PUBLIC ouroboros-dev) # Find the nsupdate executable @@ -52,6 +54,11 @@ else () endif () endif () +set(IPCP_UDP_RD_THR 3 CACHE STRING + "Number of reader threads in UDP IPCP") +set(IPCP_UDP_WR_THR 3 CACHE STRING + "Number of writer threads in UDP IPCP") + include(AddCompileFlags) if (CMAKE_BUILD_TYPE MATCHES "Debug*") add_compile_flags(ipcpd-udp -DCONFIG_OUROBOROS_DEBUG) diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index f45a18cb..559be55a 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -30,6 +30,7 @@ #define OUROBOROS_PREFIX "ipcpd/udp" +#include <ouroboros/bitmap.h> #include <ouroboros/hash.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> @@ -58,34 +59,50 @@ #define FLOW_REPLY 2 #define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define IPCP_UDP_MAX_PACKET_SIZE 8980 +#define OUR_HEADER_LEN sizeof(uint32_t) /* adds eid */ + +#define IPCP_UDP_BUF_SIZE 256 +#define IPCP_UDP_MSG_SIZE 256 #define DNS_TTL 86400 #define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#define SERV_PORT udp_data.s_saddr.sin_port; +#define SERV_SADDR ((struct sockaddr *) &udp_data.s_saddr) +#define CLNT_SADDR ((struct sockaddr *) &udp_data.c_saddr) +#define SERV_SADDR_SIZE (sizeof(udp_data.s_saddr)) +#define LOCAL_IP (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS 0xFFFF +#define MGMT_EID 0 +#define MGMT_FRAME_SIZE 512 +/* Keep order for alignment. */ struct mgmt_msg { - uint16_t src_udp_port; - uint16_t dst_udp_port; + uint32_t eid; + uint32_t s_eid; + uint32_t d_eid; uint8_t code; - uint8_t response; - /* QoS parameters from spec, aligned */ + int8_t response; + /* QoS parameters from spec */ uint8_t availability; uint8_t in_order; - uint32_t delay; uint64_t bandwidth; + uint32_t delay; uint32_t loss; uint32_t ber; uint32_t max_gap; } __attribute__((packed)); +struct mgmt_frame { + struct list_head next; + struct sockaddr_in r_saddr; + uint8_t buf[MGMT_FRAME_SIZE]; +}; + +/* UDP flow */ struct uf { - int udp; + int d_eid; + /* IP details are stored through connect(). */ int skfd; }; @@ -94,25 +111,25 @@ struct { uint32_t ip_addr; uint32_t dns_addr; - /* listen server */ + /* server socket */ struct sockaddr_in s_saddr; int s_fd; + /* client port */ + int clt_port; fset_t * np1_flows; fqueue_t * fq; - fd_set flow_fd_s; - /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ - int uf_to_fd[FD_SETSIZE]; struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t packet_loop; - pthread_t handler; - pthread_t packet_reader; + pthread_t packet_writer[IPCP_UDP_WR_THR]; + pthread_t packet_reader[IPCP_UDP_RD_THR]; - bool fd_set_mod; - pthread_cond_t fd_set_cond; - pthread_mutex_t fd_set_lock; + /* Handle mgmt frames in a different thread */ + pthread_t mgmt_handler; + pthread_mutex_t mgmt_lock; + pthread_cond_t mgmt_cond; + struct list_head mgmt_frames; } udp_data; static int udp_data_init(void) @@ -120,24 +137,19 @@ static int udp_data_init(void) int i; if (pthread_rwlock_init(&udp_data.flows_lock, NULL)) - return -1; - - if (pthread_cond_init(&udp_data.fd_set_cond, NULL)) - goto fail_set_cond; + goto fail_rwlock_init; - if (pthread_mutex_init(&udp_data.fd_set_lock, NULL)) - goto fail_set_lock; + if (pthread_cond_init(&udp_data.mgmt_cond, NULL)) + goto fail_mgmt_cond; - for (i = 0; i < FD_SETSIZE; ++i) - udp_data.uf_to_fd[i] = -1; + if (pthread_mutex_init(&udp_data.mgmt_lock, NULL)) + goto fail_mgmt_lock; for (i = 0; i < SYS_MAX_FLOWS; ++i) { - udp_data.fd_to_uf[i].skfd = -1; - udp_data.fd_to_uf[i].udp = -1; + udp_data.fd_to_uf[i].skfd = -1; + udp_data.fd_to_uf[i].d_eid = -1; } - FD_ZERO(&udp_data.flow_fd_s); - udp_data.np1_flows = fset_create(); if (udp_data.np1_flows == NULL) goto fail_fset; @@ -150,88 +162,43 @@ static int udp_data_init(void) if (udp_data.shim_data == NULL) goto fail_data; + list_head_init(&udp_data.mgmt_frames); + return 0; fail_data: fqueue_destroy(udp_data.fq); fail_fqueue: fset_destroy(udp_data.np1_flows); fail_fset: - pthread_mutex_destroy(&udp_data.fd_set_lock); - fail_set_lock: - pthread_cond_destroy(&udp_data.fd_set_cond); - fail_set_cond: + pthread_mutex_destroy(&udp_data.mgmt_lock); + fail_mgmt_lock: + pthread_cond_destroy(&udp_data.mgmt_cond); + fail_mgmt_cond: pthread_rwlock_destroy(&udp_data.flows_lock); + fail_rwlock_init: return -1; } static void udp_data_fini(void) { - fset_destroy(udp_data.np1_flows); - fqueue_destroy(udp_data.fq); - shim_data_destroy(udp_data.shim_data); - pthread_rwlock_destroy(&udp_data.flows_lock); - pthread_mutex_destroy(&udp_data.fd_set_lock); - pthread_cond_destroy(&udp_data.fd_set_cond); -} - -static void set_fd(int fd) -{ - pthread_mutex_lock(&udp_data.fd_set_lock); - - udp_data.fd_set_mod = true; - FD_SET(fd, &udp_data.flow_fd_s); - - while (udp_data.fd_set_mod) - pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - - pthread_mutex_unlock(&udp_data.fd_set_lock); -} - -static void clr_fd(int fd) -{ - pthread_mutex_lock(&udp_data.fd_set_lock); - - udp_data.fd_set_mod = true; - FD_CLR(fd, &udp_data.flow_fd_s); - - while (udp_data.fd_set_mod) - pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - - pthread_mutex_unlock(&udp_data.fd_set_lock); -} - -static int send_shim_udp_msg(uint8_t * buf, - size_t len, - uint32_t dst_ip_addr) -{ - struct sockaddr_in r_saddr; - - memset((char *)&r_saddr, 0, sizeof(r_saddr)); - r_saddr.sin_family = AF_INET; - r_saddr.sin_addr.s_addr = dst_ip_addr; - r_saddr.sin_port = LISTEN_PORT; - - if (sendto(udp_data.s_fd, buf, len, 0, - (struct sockaddr *) &r_saddr, - sizeof(r_saddr)) == -1) { - log_err("Failed to send message."); - return -1; - } + fqueue_destroy(udp_data.fq); + fset_destroy(udp_data.np1_flows); - return 0; + pthread_rwlock_destroy(&udp_data.flows_lock); + pthread_cond_destroy(&udp_data.mgmt_cond); + pthread_mutex_destroy(&udp_data.mgmt_lock); } -static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, - uint16_t src_udp_port, +static int ipcp_udp_port_alloc(int skfd, + uint32_t s_eid, const uint8_t * dst, qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; size_t len; - int ret; len = sizeof(*msg) + ipcp_dir_hash_len(); @@ -240,8 +207,9 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, return -1; msg = (struct mgmt_msg *) buf; + msg->eid = hton32(MGMT_EID); msg->code = FLOW_REQ; - msg->src_udp_port = src_udp_port; + msg->s_eid = hton32(s_eid); msg->delay = hton32(qs.delay); msg->bandwidth = hton64(qs.bandwidth); msg->availability = qs.availability; @@ -252,73 +220,63 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, memcpy(msg + 1, dst, ipcp_dir_hash_len()); - ret = send_shim_udp_msg(buf, len, dst_ip_addr); + if (write(skfd, msg, len) < 0) { + free(buf); + return -1; + } free(buf); - return ret; + return 0; } -static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, - uint16_t src_udp_port, - uint16_t dst_udp_port, - int response) +static int ipcp_udp_port_alloc_resp(int skfd, + uint32_t s_eid, + uint32_t d_eid, + int8_t response) { - struct mgmt_msg * msg; - int ret; + struct mgmt_msg * msg; msg = malloc(sizeof(*msg)); if (msg == NULL) return -1; - msg->code = FLOW_REPLY; - msg->src_udp_port = src_udp_port; - msg->dst_udp_port = dst_udp_port; - msg->response = response; + msg->eid = hton32(MGMT_EID); + msg->code = FLOW_REPLY; + msg->s_eid = hton32(s_eid); + msg->d_eid = hton32(d_eid); + msg->response = response; - ret = send_shim_udp_msg((uint8_t *) msg, sizeof(*msg), dst_ip_addr); + if (write(skfd, msg, sizeof(*msg)) < 0) { + free(msg); + return -1; + } free(msg); - return ret; + return 0; } static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, + int d_eid, const uint8_t * dst, qosspec_t qs) { - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - struct timespec abstime; - struct sockaddr_in f_saddr; - socklen_t f_saddr_len = sizeof(f_saddr); - int skfd; - int fd; + struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; + struct timespec abstime; + int skfd; + int fd; - log_dbg("Port request arrived from UDP port %d", - ntohs(c_saddr->sin_port)); - - if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (skfd < 0) { log_err("Could not create UDP socket."); return -1; } - memset((char *) &f_saddr, 0, sizeof(f_saddr)); - f_saddr.sin_family = AF_INET; - f_saddr.sin_addr.s_addr = local_ip; - f_saddr.sin_port = 0; - - if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { - log_err("Could not bind to socket."); - close(skfd); - return -1; - } + /* Remote listens on server port. Mod of c_saddr allowed. */ + c_saddr->sin_port = udp_data.s_saddr.sin_port; - if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { - log_err("Could not get address from fd."); - return -1; - } - - /* connect stores the remote address in the file descriptor */ + /* Connect stores the remote address in the file descriptor. */ if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { log_err("Could not connect to remote UDP client."); close(skfd); @@ -331,8 +289,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, + pthread_cond_timedwait(&ipcpi.alloc_cond, &ipcpi.alloc_lock, &abstime); } @@ -354,9 +311,8 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, pthread_rwlock_wrlock(&udp_data.flows_lock); - udp_data.uf_to_fd[skfd] = fd; - udp_data.fd_to_uf[fd].skfd = skfd; - udp_data.fd_to_uf[fd].udp = f_saddr.sin_port; + udp_data.fd_to_uf[fd].skfd = skfd; + udp_data.fd_to_uf[fd].d_eid = d_eid; pthread_rwlock_unlock(&udp_data.flows_lock); @@ -365,224 +321,228 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Pending allocation request, fd %d, UDP port (%d, %d).", - fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); + log_dbg("Pending allocation request, fd %d, remote eid %d.", + fd, d_eid); return 0; } -/* returns the n flow descriptor */ -static int udp_port_to_fd(int udp_port) +static int ipcp_udp_port_alloc_reply(uint32_t s_eid, + uint32_t d_eid, + int8_t response) { - int i; - - for (i = 0; i < SYS_MAX_FLOWS; ++i) - if (udp_data.fd_to_uf[i].udp == udp_port) - return i; - - return -1; -} - -static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port, - uint16_t dst_udp_port, - int response) -{ - int fd = -1; - int ret = 0; - int skfd = -1; - struct sockaddr_in t_saddr; - socklen_t t_saddr_len = sizeof(t_saddr); + socklen_t t_saddr_len; + int ret = 0; + int skfd = -1; - log_dbg("Received reply for flow on udp port %d.", - ntohs(dst_udp_port)); + t_saddr_len = sizeof(t_saddr); - pthread_rwlock_rdlock(&udp_data.flows_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); - fd = udp_port_to_fd(dst_udp_port); - if (fd < 0) { + skfd = udp_data.fd_to_uf[s_eid].skfd; + if (skfd < 0) { pthread_rwlock_unlock(&udp_data.flows_lock); + log_err("Got reply for unknown UDP eid: %u.", s_eid); return -1; } - skfd = udp_data.fd_to_uf[fd].skfd; + udp_data.fd_to_uf[s_eid].d_eid = d_eid; pthread_rwlock_unlock(&udp_data.flows_lock); - /* get the original address with the LISTEN PORT */ if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { - log_dbg("Flow with fd %d has no peer.", fd); + log_dbg("Flow with fd %d has no peer.", s_eid); + close(skfd); return -1; } - /* connect to the flow udp port */ - t_saddr.sin_port = src_udp_port; - if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { + log_dbg("Could not connect flow to remote."); close(skfd); return -1; } - pthread_rwlock_rdlock(&udp_data.flows_lock); - - set_fd(skfd); - - pthread_rwlock_unlock(&udp_data.flows_lock); - - if (ipcp_flow_alloc_reply(fd, response) < 0) + if (ipcp_flow_alloc_reply(s_eid, response) < 0) { + log_dbg("Failed to reply to flow allocation."); return -1; + } - log_dbg("Flow allocation completed, UDP ports: (%d, %d).", - ntohs(dst_udp_port), ntohs(src_udp_port)); + log_dbg("Flow allocation completed on eids (%d, %d).", + s_eid, d_eid); return ret; } -static void * ipcp_udp_listener(void * o) +static int ipcp_udp_mgmt_frame(const uint8_t * buf, + struct sockaddr_in c_saddr) { - uint8_t buf[SHIM_UDP_MSG_SIZE]; - ssize_t n = 0; - struct sockaddr_in c_saddr; - int sfd = udp_data.s_fd; + struct mgmt_msg * msg; + qosspec_t qs; + + msg = (struct mgmt_msg *) buf; + + switch (msg->code) { + case FLOW_REQ: + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid), + (uint8_t *) (msg + 1), qs); + case FLOW_REPLY: + return ipcp_udp_port_alloc_reply(ntoh32(msg->s_eid), + ntoh32(msg->d_eid), + msg->response); + default: + log_err("Unknown message received %d.", msg->code); + return -1; + } +} +static void * ipcp_udp_mgmt_handler(void * o) +{ (void) o; + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + (void *) &udp_data.mgmt_lock); + while (true) { - struct mgmt_msg * msg = NULL; - qosspec_t qs; - memset(&buf, 0, SHIM_UDP_MSG_SIZE); - n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, - (struct sockaddr *) &c_saddr, - (socklen_t *) sizeof(c_saddr)); - if (n < 0) - continue; + struct mgmt_frame * frame; - /* flow alloc request from other host */ - if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, - sizeof(c_saddr.sin_addr.s_addr), AF_INET) - == NULL) - continue; + pthread_mutex_lock(&udp_data.mgmt_lock); - msg = (struct mgmt_msg *) buf; - - switch (msg->code) { - case FLOW_REQ: - c_saddr.sin_port = msg->src_udp_port; - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - ipcp_udp_port_req(&c_saddr, - (uint8_t *) (msg + 1), - qs); - break; - case FLOW_REPLY: - ipcp_udp_port_alloc_reply(msg->src_udp_port, - msg->dst_udp_port, - msg->response); - break; - default: - log_err("Unknown message received %d.", msg->code); - continue; - } + while (list_is_empty(&udp_data.mgmt_frames)) + pthread_cond_wait(&udp_data.mgmt_cond, + &udp_data.mgmt_lock); + + frame = list_first_entry((&udp_data.mgmt_frames), + struct mgmt_frame, next); + assert(frame != NULL); + list_del(&frame->next); + + pthread_mutex_unlock(&udp_data.mgmt_lock); - c_saddr.sin_port = LISTEN_PORT; + ipcp_udp_mgmt_frame(frame->buf, frame->r_saddr); + + free(frame); } - return 0; + pthread_cleanup_pop(false); + + return (void *) 0; } static void * ipcp_udp_packet_reader(void * o) { - ssize_t n; - int skfd; - int fd; - /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_PACKET_SIZE]; - struct sockaddr_in r_saddr; - struct timeval tv = {0, FD_UPDATE_TIMEOUT}; - fd_set read_fds; - int flags; + uint8_t buf[IPCP_UDP_MAX_PACKET_SIZE]; + uint8_t * data; + ssize_t n; + uint32_t eid; (void) o; - ipcp_lock_to_core(); + data = buf + sizeof(uint32_t); while (true) { - pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_mutex_lock(&udp_data.fd_set_lock); + struct mgmt_frame * frame; + struct sockaddr_in r_saddr; + socklen_t len; - read_fds = udp_data.flow_fd_s; - udp_data.fd_set_mod = false; - pthread_cond_broadcast(&udp_data.fd_set_cond); + len = sizeof(r_saddr); - pthread_mutex_unlock(&udp_data.fd_set_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); + n = recvfrom(udp_data.s_fd, buf, IPCP_UDP_MAX_PACKET_SIZE, 0, + (struct sockaddr *) &r_saddr, &len); + if (n < 0) + continue; - if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) + if (n == 0) + log_dbg("Got a 0 frame."); + + if ((size_t) n < sizeof(eid)) { + log_dbg("Dropped bad frame."); continue; + } - for (skfd = 0; skfd < FD_SETSIZE; ++skfd) { - if (!FD_ISSET(skfd, &read_fds)) - continue; - flags = fcntl(skfd, F_GETFL, 0); - fcntl(skfd, F_SETFL, flags | O_NONBLOCK); - n = sizeof(r_saddr); - if ((n = recvfrom(skfd, - &buf, - SHIM_UDP_MAX_PACKET_SIZE, - 0, - (struct sockaddr *) &r_saddr, - (unsigned *) &n)) <= 0) - continue; + eid = ntoh32(*((uint32_t *) buf)); - pthread_rwlock_rdlock(&udp_data.flows_lock); + /* pass onto mgmt queue */ + if (eid == MGMT_EID) { + if (n > IPCP_UDP_MSG_SIZE) { + log_warn("Dropped oversize management frame."); + continue; + } - fd = udp_data.uf_to_fd[skfd]; + frame = malloc(sizeof(*frame)); + if (frame == NULL) + continue; - pthread_rwlock_unlock(&udp_data.flows_lock); + memcpy(frame->buf, buf, n); + memcpy(&frame->r_saddr, &r_saddr, sizeof(r_saddr)); - flow_write(fd, buf, n); + pthread_mutex_lock(&udp_data.mgmt_lock); + list_add(&frame->next, &udp_data.mgmt_frames); + pthread_cond_signal(&udp_data.mgmt_cond); + pthread_mutex_unlock(&udp_data.mgmt_lock); + continue; } + + flow_write(eid, data, n - sizeof(eid)); } - return (void *) 0; + return 0; } -static void * ipcp_udp_packet_loop(void * o) +static void * ipcp_udp_packet_writer(void * o) { - int fd; - struct shm_du_buff * sdb; - (void) o; ipcp_lock_to_core(); while (true) { + int fd; + int eid; fevent(udp_data.np1_flows, udp_data.fq, NULL); while ((fd = fqueue_next(udp_data.fq)) >= 0) { + struct shm_du_buff * sdb; + uint8_t * buf; + uint16_t len; if (ipcp_flow_read(fd, &sdb)) { - log_err("Bad read from fd %d.", fd); + log_dbg("Bad read from fd %d.", fd); + continue; + } + + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + if (len > IPCP_UDP_MAX_PACKET_SIZE) { + log_dbg("Packet length exceeds MTU."); + ipcp_sdb_release(sdb); + continue; + } + + buf = shm_du_buff_head_alloc(sdb, OUR_HEADER_LEN); + if (buf == NULL) { + log_dbg("Failed to allocate header."); + ipcp_sdb_release(sdb); continue; } pthread_rwlock_rdlock(&udp_data.flows_lock); + eid = hton32(udp_data.fd_to_uf[fd].d_eid); fd = udp_data.fd_to_uf[fd].skfd; pthread_rwlock_unlock(&udp_data.flows_lock); + memcpy(buf, &eid, sizeof(eid)); + pthread_cleanup_push((void (*)(void *)) - ipcp_sdb_release, - (void *) sdb); + ipcp_sdb_release, (void *) sdb); - if (send(fd, shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - log_err("Failed to send PACKET."); + if (write(fd, buf, len + OUR_HEADER_LEN) < 0) + log_err("Failed to send packet."); pthread_cleanup_pop(true); } @@ -593,28 +553,23 @@ static void * ipcp_udp_packet_loop(void * o) static int ipcp_udp_bootstrap(const struct ipcp_config * conf) { - struct sockaddr_in s_saddr; char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - int enable = 1; - int fd = -1; + char portstr[128]; /* port is max 64535 = 5 chars */ + int i = 1; assert(conf); assert(conf->type == THIS_TYPE); - if (inet_ntop(AF_INET, - &conf->ip_addr, - ipstr, - INET_ADDRSTRLEN) == NULL) { + if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN) + == NULL) { log_err("Failed to convert IP address"); return -1; } if (conf->dns_addr != 0) { - if (inet_ntop(AF_INET, - &conf->dns_addr, - dnsstr, - INET_ADDRSTRLEN) == NULL) { + if (inet_ntop(AF_INET, &conf->dns_addr, dnsstr, INET_ADDRSTRLEN) + == NULL) { log_err("Failed to convert DNS address"); return -1; } @@ -626,76 +581,79 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) } /* UDP listen server */ - if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - log_err("Can't create socket."); + udp_data.s_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (udp_data.s_fd < 0) { + log_err("Can't create socket: %s", strerror(errno)); goto fail_socket; } - if (setsockopt(fd, - SOL_SOCKET, - SO_REUSEADDR, - &enable, - sizeof(int)) < 0) + if (setsockopt(udp_data.s_fd, SOL_SOCKET, SO_REUSEADDR, + &i, sizeof(i)) < 0) log_warn("Failed to set SO_REUSEADDR."); - memset((char *) &s_saddr, 0, sizeof(s_saddr)); + memset((char *) &udp_data.s_saddr, 0, sizeof(udp_data.s_saddr)); udp_data.s_saddr.sin_family = AF_INET; udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr; - udp_data.s_saddr.sin_port = LISTEN_PORT; + udp_data.s_saddr.sin_port = htons(conf->srv_port); - if (bind(fd, - (struct sockaddr *) &udp_data.s_saddr, - sizeof(udp_data.s_saddr)) < 0) { + if (bind(udp_data.s_fd, SERV_SADDR, SERV_SADDR_SIZE) < 0) { log_err("Couldn't bind to %s.", ipstr); goto fail_bind; } - udp_data.s_fd = fd; udp_data.ip_addr = conf->ip_addr; udp_data.dns_addr = conf->dns_addr; - - FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s); + udp_data.clt_port = htons(conf->clt_port); ipcp_set_state(IPCP_OPERATIONAL); - if (pthread_create(&udp_data.handler, - NULL, - ipcp_udp_listener, - NULL)) { + if (pthread_create(&udp_data.mgmt_handler, NULL, + ipcp_udp_mgmt_handler, NULL)) { ipcp_set_state(IPCP_INIT); goto fail_bind; } - if (pthread_create(&udp_data.packet_reader, - NULL, - ipcp_udp_packet_reader, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_packet_reader; + for (i = 0; i < IPCP_UDP_RD_THR; ++i) { + if (pthread_create(&udp_data.packet_reader[i], NULL, + ipcp_udp_packet_reader, NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_packet_reader; + } } - if (pthread_create(&udp_data.packet_loop, - NULL, - ipcp_udp_packet_loop, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_packet_loop; + for (i = 0; i < IPCP_UDP_WR_THR; ++i) { + if (pthread_create(&udp_data.packet_writer[i], NULL, + ipcp_udp_packet_writer, NULL)) { + ipcp_set_state(IPCP_INIT); + goto fail_packet_writer; + } } + sprintf(portstr, "%d", conf->clt_port); + log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); log_dbg("Bound to IP address %s.", ipstr); + log_dbg("Client port is %s.", conf->clt_port == 0 ? "random" : portstr); + log_dbg("Server port is %u.", conf->srv_port); log_dbg("DNS server address is %s.", dnsstr); return 0; - fail_packet_loop: - pthread_cancel(udp_data.packet_reader); - pthread_join(udp_data.packet_reader, NULL); + fail_packet_writer: + while (i > 0) { + pthread_cancel(udp_data.packet_writer[--i]); + pthread_join(udp_data.packet_writer[i], NULL); + } + i = IPCP_UDP_RD_THR; fail_packet_reader: - pthread_cancel(udp_data.handler); - pthread_join(udp_data.handler, NULL); + while (i > 0) { + pthread_cancel(udp_data.packet_reader[--i]); + pthread_join(udp_data.packet_reader[i], NULL); + } + pthread_cancel(udp_data.mgmt_handler); + pthread_join(udp_data.mgmt_handler, NULL); fail_bind: - close(fd); + close(udp_data.s_fd); fail_socket: return -1; } @@ -705,9 +663,9 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) /* NOTE: Disgusted with this crap */ static int ddns_send(char * cmd) { - pid_t pid = -1; - int wstatus; - int pipe_fd[2]; + pid_t pid = -1; + int wstatus; + int pipe_fd[2]; char * argv[] = {NSUPDATE_EXEC, 0}; char * envp[] = {0}; @@ -743,22 +701,23 @@ static int ddns_send(char * cmd) log_err("Failed to register with DNS server."); close(pipe_fd[1]); + return 0; } static uint32_t ddns_resolve(char * name, uint32_t dns_addr) { - pid_t pid = -1; - int wstatus; - int pipe_fd[2]; - char dnsstr[INET_ADDRSTRLEN]; - char buf[SHIM_UDP_BUF_SIZE]; - ssize_t count = 0; - char * substr = NULL; - char * substr2 = NULL; - char * addr_str = "Address:"; - uint32_t ip_addr = 0; + pid_t pid = -1; + int wstatus; + int pipe_fd[2]; + char dnsstr[INET_ADDRSTRLEN]; + char buf[IPCP_UDP_BUF_SIZE]; + ssize_t count = 0; + char * substr = NULL; + char * substr2 = NULL; + char * addr_str = "Address:"; + uint32_t ip_addr = 0; if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) return 0; @@ -785,7 +744,7 @@ static uint32_t ddns_resolve(char * name, close(pipe_fd[1]); - count = read(pipe_fd[0], buf, SHIM_UDP_BUF_SIZE); + count = read(pipe_fd[0], buf, IPCP_UDP_BUF_SIZE); if (count <= 0) { log_err("Failed to communicate with nslookup."); close(pipe_fd[0]); @@ -796,7 +755,7 @@ static uint32_t ddns_resolve(char * name, waitpid(pid, &wstatus, 0); if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0 && - count != SHIM_UDP_BUF_SIZE) + count != IPCP_UDP_BUF_SIZE) log_dbg("Succesfully communicated with nslookup."); else log_err("Failed to resolve DNS address."); @@ -825,13 +784,13 @@ static uint32_t ddns_resolve(char * name, static int ipcp_udp_reg(const uint8_t * hash) { #ifdef HAVE_DDNS - char ipstr[INET_ADDRSTRLEN]; - char dnsstr[INET_ADDRSTRLEN]; - char cmd[1000]; + char ipstr[INET_ADDRSTRLEN]; + char dnsstr[INET_ADDRSTRLEN]; + char cmd[1000]; uint32_t dns_addr; uint32_t ip_addr; #endif - char * hashstr; + char * hashstr; hashstr = malloc(ipcp_dir_hash_strlen() + 1); if (hashstr == NULL) @@ -888,12 +847,12 @@ static int ipcp_udp_reg(const uint8_t * hash) static int ipcp_udp_unreg(const uint8_t * hash) { #ifdef HAVE_DDNS - char dnsstr[INET_ADDRSTRLEN]; + char dnsstr[INET_ADDRSTRLEN]; /* max DNS name length + max IP length + max command length */ - char cmd[100]; + char cmd[100]; uint32_t dns_addr; #endif - char * hashstr; + char * hashstr; assert(hash); @@ -932,13 +891,12 @@ static int ipcp_udp_unreg(const uint8_t * hash) static int ipcp_udp_query(const uint8_t * hash) { - uint32_t ip_addr = 0; - char * hashstr; - struct hostent * h; + uint32_t ip_addr = 0; + char * hashstr; + struct hostent * h; #ifdef HAVE_DDNS - uint32_t dns_addr = 0; + uint32_t dns_addr = 0; #endif - assert(hash); hashstr = malloc(ipcp_dir_hash_strlen() + 1); @@ -991,11 +949,14 @@ static int ipcp_udp_flow_alloc(int fd, const uint8_t * dst, qosspec_t qs) { - struct sockaddr_in r_saddr; /* server address */ - struct sockaddr_in f_saddr; /* flow */ - socklen_t f_saddr_len = sizeof(f_saddr); + struct sockaddr_in r_saddr; /* Server address */ + struct sockaddr_in c_saddr; /* Client address */ + socklen_t c_saddr_len; int skfd; uint32_t ip_addr = 0; + char ip_str[INET_ADDRSTRLEN]; + + c_saddr_len = sizeof(c_saddr); log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); @@ -1004,21 +965,24 @@ static int ipcp_udp_flow_alloc(int fd, assert(dst); skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (skfd < 0) + if (skfd < 0) { + log_err("Could not create socket."); return -1; + } - /* this socket is for the flow */ - memset((char *) &f_saddr, 0, sizeof(f_saddr)); - f_saddr.sin_family = AF_INET; - f_saddr.sin_addr.s_addr = local_ip; - f_saddr.sin_port = 0; + /* This socket is for the flow. */ + memset((char *) &c_saddr, 0, sizeof(c_saddr)); + c_saddr.sin_family = AF_INET; + c_saddr.sin_addr.s_addr = LOCAL_IP; + c_saddr.sin_port = udp_data.clt_port; - if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { + if (bind(skfd, (struct sockaddr *) &c_saddr, sizeof(c_saddr)) < 0) { + log_dbg("Could not bind socket to client address."); close(skfd); return -1; } - if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + if (getsockname(skfd, (struct sockaddr *) &c_saddr, &c_saddr_len) < 0) { log_err("Could not get address from fd."); close(skfd); return -1; @@ -1029,43 +993,41 @@ static int ipcp_udp_flow_alloc(int fd, close(skfd); return -1; } + ip_addr = (uint32_t) shim_data_dir_get_addr(udp_data.shim_data, dst); - /* connect to server (store the remote IP address in the fd) */ + inet_ntop(AF_INET, &ip_addr, ip_str, INET_ADDRSTRLEN); + log_dbg("Destination UDP ipcp resolved at %s.", ip_str); + + /* Connect to server and store the remote IP address in the skfd. */ memset((char *) &r_saddr, 0, sizeof(r_saddr)); r_saddr.sin_family = AF_INET; r_saddr.sin_addr.s_addr = ip_addr; - r_saddr.sin_port = LISTEN_PORT; + r_saddr.sin_port = udp_data.s_saddr.sin_port; if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { + log_dbg("Could not connect socket to remote."); + close(skfd); + return -1; + } + + if (ipcp_udp_port_alloc(skfd, fd, dst, qs) < 0) { + log_err("Could not allocate port."); close(skfd); return -1; } pthread_rwlock_wrlock(&udp_data.flows_lock); - udp_data.fd_to_uf[fd].udp = f_saddr.sin_port; - udp_data.fd_to_uf[fd].skfd = skfd; - udp_data.uf_to_fd[skfd] = fd; + udp_data.fd_to_uf[fd].d_eid = -1; + udp_data.fd_to_uf[fd].skfd = skfd; fset_add(udp_data.np1_flows, fd); pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) { - pthread_rwlock_wrlock(&udp_data.flows_lock); - - udp_data.fd_to_uf[fd].udp = -1; - udp_data.fd_to_uf[fd].skfd = -1; - udp_data.uf_to_fd[skfd] = -1; - - pthread_rwlock_unlock(&udp_data.flows_lock); - close(skfd); - return -1; - } - - log_dbg("Flow pending on fd %d, UDP port %d.", - fd, ntohs(f_saddr.sin_port)); + log_dbg("Flow pending on fd %d, UDP src port %d, dst port %d.", + fd, ntohs(c_saddr.sin_port), ntohs(r_saddr.sin_port)); return 0; } @@ -1073,12 +1035,10 @@ static int ipcp_udp_flow_alloc(int fd, static int ipcp_udp_flow_alloc_resp(int fd, int response) { - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - struct timespec abstime; - int skfd = -1; - struct sockaddr_in f_saddr; - struct sockaddr_in r_saddr; - socklen_t len = sizeof(r_saddr); + struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; + struct timespec abstime; + int skfd; + int d_eid; if (response) return 0; @@ -1106,38 +1066,23 @@ static int ipcp_udp_flow_alloc_resp(int fd, pthread_rwlock_rdlock(&udp_data.flows_lock); - skfd = udp_data.fd_to_uf[fd].skfd; - - pthread_rwlock_unlock(&udp_data.flows_lock); - - if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) { - log_dbg("Socket with fd %d has no address.", skfd); - return -1; - } - - if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) { - log_dbg("Socket with fd %d has no peer.", skfd); - return -1; - } - - pthread_rwlock_rdlock(&udp_data.flows_lock); - - set_fd(skfd); + skfd = udp_data.fd_to_uf[fd].skfd; + d_eid = udp_data.fd_to_uf[fd].d_eid; fset_add(udp_data.np1_flows, fd); pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, f_saddr.sin_port, - r_saddr.sin_port, response) < 0) { + if (ipcp_udp_port_alloc_resp(skfd, d_eid, fd, response) < 0) { pthread_rwlock_rdlock(&udp_data.flows_lock); - clr_fd(skfd); + fset_del(udp_data.np1_flows, fd); pthread_rwlock_unlock(&udp_data.flows_lock); + log_err("Failed to respond to flow request."); return -1; } - log_dbg("Accepted flow, fd %d on UDP port %d.", - fd, ntohs(f_saddr.sin_port)); + log_dbg("Accepted flow, fd %d on eid %d.", + fd, d_eid); return 0; } @@ -1154,18 +1099,12 @@ static int ipcp_udp_flow_dealloc(int fd) skfd = udp_data.fd_to_uf[fd].skfd; - udp_data.uf_to_fd[skfd] = -1; - udp_data.fd_to_uf[fd].udp = -1; - udp_data.fd_to_uf[fd].skfd = -1; + udp_data.fd_to_uf[fd].d_eid = -1; + udp_data.fd_to_uf[fd].skfd = -1; close(skfd); pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); - - clr_fd(skfd); - - pthread_rwlock_unlock(&udp_data.flows_lock); flow_dealloc(fd); @@ -1191,6 +1130,8 @@ static struct ipcp_ops udp_ops = { int main(int argc, char * argv[]) { + int i; + if (ipcp_init(argc, argv, &udp_ops) < 0) goto fail_init; @@ -1212,13 +1153,17 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.packet_loop); - pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.packet_reader); - - pthread_join(udp_data.packet_loop, NULL); - pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.packet_reader, NULL); + for (i = 0; i < IPCP_UDP_RD_THR; ++i) + pthread_cancel(udp_data.packet_reader[i]); + for (i = 0; i < IPCP_UDP_WR_THR; ++i) + pthread_cancel(udp_data.packet_writer[i]); + pthread_cancel(udp_data.mgmt_handler); + + for (i = 0; i < IPCP_UDP_RD_THR; ++i) + pthread_join(udp_data.packet_reader[i], NULL); + for (i = 0; i < IPCP_UDP_WR_THR; ++i) + pthread_join(udp_data.packet_writer[i], NULL); + pthread_join(udp_data.mgmt_handler, NULL); } udp_data_fini(); |