diff options
Diffstat (limited to 'src/ipcpd/udp/main.c')
| -rw-r--r-- | src/ipcpd/udp/main.c | 1211 |
1 files changed, 596 insertions, 615 deletions
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index be7491f4..2e8d84ce 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process over UDP * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -20,12 +20,18 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else #define _POSIX_C_SOURCE 200112L +#endif #include "config.h" #define OUROBOROS_PREFIX "ipcpd/udp" +#include <ouroboros/bitmap.h> +#include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> @@ -34,6 +40,7 @@ #include <ouroboros/fqueue.h> #include <ouroboros/errno.h> #include <ouroboros/logs.h> +#include <ouroboros/pthread.h> #include "ipcp.h" #include "shim-data.h" @@ -46,616 +53,635 @@ #include <netinet/in.h> #include <signal.h> #include <stdlib.h> -#include <pthread.h> #include <sys/wait.h> #include <fcntl.h> -#define FLOW_REQ 1 -#define FLOW_REPLY 2 +#define FLOW_REQ 1 +#define FLOW_REPLY 2 + +#define THIS_TYPE IPCP_UDP +#define IPCP_UDP_MAX_PACKET_SIZE 8980 +#define OUR_HEADER_LEN sizeof(uint32_t) /* adds eid */ + +#define IPCP_UDP_BUF_SIZE 8980 +#define IPCP_UDP_MSG_SIZE 8980 +#define DNS_TTL 86400 + +#define SADDR ((struct sockaddr *) &udp_data.s_saddr) +#define SADDR_SIZE (sizeof(udp_data.s_saddr)) +#define LOCAL_IP (udp_data.s_saddr.sin_addr.s_addr) -#define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_SDU_SIZE 8980 -#define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ +#define MGMT_EID 0 +#define MGMT_FRAME_SIZE (sizeof(struct mgmt_msg)) +#define MGMT_FRAME_BUF_SIZE 2048 -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#ifdef __linux__ +#define SENDTO_FLAGS MSG_CONFIRM +#else +#define SENDTO_FLAGS 0 +#endif -#define UDP_MAX_PORTS 0xFFFF +struct ipcp ipcpi; +/* Keep order for alignment. */ struct mgmt_msg { - uint16_t src_udp_port; - uint16_t dst_udp_port; + uint32_t eid; + uint32_t s_eid; + uint32_t d_eid; uint8_t code; - uint8_t qoscube; - uint8_t response; + int8_t response; + /* QoS parameters from spec */ + uint8_t availability; + uint8_t in_order; + uint64_t bandwidth; + uint32_t delay; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; + uint32_t timeout; + uint16_t cypher_s; + } __attribute__((packed)); +struct mgmt_frame { + struct list_head next; + struct sockaddr_in r_saddr; + uint8_t buf[MGMT_FRAME_BUF_SIZE]; + size_t len; +}; + +/* UDP flow */ struct uf { - int udp; - int skfd; + int d_eid; + struct sockaddr_in r_saddr; }; struct { struct shim_data * shim_data; - uint32_t ip_addr; uint32_t dns_addr; - /* listen server */ + struct sockaddr_in s_saddr; int s_fd; fset_t * np1_flows; - fqueue_t * fq; - fd_set flow_fd_s; - /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ - int uf_to_fd[FD_SETSIZE]; struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t sduloop; - pthread_t handler; - pthread_t sdu_reader; + pthread_t packet_writer[IPCP_UDP_WR_THR]; + pthread_t packet_reader[IPCP_UDP_RD_THR]; - bool fd_set_mod; - pthread_cond_t fd_set_cond; - pthread_mutex_t fd_set_lock; + /* Handle mgmt frames in a different thread */ + pthread_t mgmt_handler; + pthread_mutex_t mgmt_lock; + pthread_cond_t mgmt_cond; + struct list_head mgmt_frames; } udp_data; static int udp_data_init(void) { - int i; + int i; + pthread_condattr_t cattr; - for (i = 0; i < FD_SETSIZE; ++i) - udp_data.uf_to_fd[i] = -1; + if (pthread_rwlock_init(&udp_data.flows_lock, NULL)) + goto fail_rwlock_init; - for (i = 0; i < SYS_MAX_FLOWS; ++i) { - udp_data.fd_to_uf[i].skfd = -1; - udp_data.fd_to_uf[i].udp = -1; - } + if (pthread_condattr_init(&cattr)) + goto fail_condattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&udp_data.mgmt_cond, &cattr)) + goto fail_mgmt_cond; - FD_ZERO(&udp_data.flow_fd_s); + if (pthread_mutex_init(&udp_data.mgmt_lock, NULL)) + goto fail_mgmt_lock; + + for (i = 0; i < SYS_MAX_FLOWS; ++i) + udp_data.fd_to_uf[i].d_eid = -1; udp_data.np1_flows = fset_create(); if (udp_data.np1_flows == NULL) - return -ENOMEM; - - udp_data.fq = fqueue_create(); - if (udp_data.fq == NULL) { - fset_destroy(udp_data.np1_flows); - return -ENOMEM; - } + goto fail_fset; udp_data.shim_data = shim_data_create(); - if (udp_data.shim_data == NULL) { - fqueue_destroy(udp_data.fq); - fset_destroy(udp_data.np1_flows); - return -ENOMEM; - } + if (udp_data.shim_data == NULL) + goto fail_data; - pthread_rwlock_init(&udp_data.flows_lock, NULL); - pthread_cond_init(&udp_data.fd_set_cond, NULL); - pthread_mutex_init(&udp_data.fd_set_lock, NULL); + pthread_condattr_destroy(&cattr); + + list_head_init(&udp_data.mgmt_frames); return 0; -} -static void udp_data_fini(void) -{ + fail_data: fset_destroy(udp_data.np1_flows); - fqueue_destroy(udp_data.fq); - - shim_data_destroy(udp_data.shim_data); - + fail_fset: + pthread_mutex_destroy(&udp_data.mgmt_lock); + fail_mgmt_lock: + pthread_cond_destroy(&udp_data.mgmt_cond); + fail_mgmt_cond: + pthread_condattr_destroy(&cattr); + fail_condattr: pthread_rwlock_destroy(&udp_data.flows_lock); - pthread_mutex_destroy(&udp_data.fd_set_lock); - pthread_cond_destroy(&udp_data.fd_set_cond); -} - -static void set_fd(int fd) -{ - pthread_mutex_lock(&udp_data.fd_set_lock); - - udp_data.fd_set_mod = true; - FD_SET(fd, &udp_data.flow_fd_s); - - while (udp_data.fd_set_mod) - pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - - pthread_mutex_unlock(&udp_data.fd_set_lock); -} - -static void clr_fd(int fd) -{ - pthread_mutex_lock(&udp_data.fd_set_lock); - - udp_data.fd_set_mod = true; - FD_CLR(fd, &udp_data.flow_fd_s); - - while (udp_data.fd_set_mod) - pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - - pthread_mutex_unlock(&udp_data.fd_set_lock); + fail_rwlock_init: + return -1; } -static int send_shim_udp_msg(uint8_t * buf, - size_t len, - uint32_t dst_ip_addr) +static void udp_data_fini(void) { - struct sockaddr_in r_saddr; - - memset((char *)&r_saddr, 0, sizeof(r_saddr)); - r_saddr.sin_family = AF_INET; - r_saddr.sin_addr.s_addr = dst_ip_addr; - r_saddr.sin_port = LISTEN_PORT; + shim_data_destroy(udp_data.shim_data); - if (sendto(udp_data.s_fd, buf, len, 0, - (struct sockaddr *) &r_saddr, - sizeof(r_saddr)) == -1) { - log_err("Failed to send message."); - return -1; - } + fset_destroy(udp_data.np1_flows); - return 0; + pthread_rwlock_destroy(&udp_data.flows_lock); + pthread_cond_destroy(&udp_data.mgmt_cond); + pthread_mutex_destroy(&udp_data.mgmt_lock); } -static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, - uint16_t src_udp_port, - const uint8_t * dst, - qoscube_t cube) +static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr, + uint32_t s_eid, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; size_t len; - int ret; + + assert(data->len > 0 ? data->data != NULL : data->data == NULL); len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len); + buf = malloc(len + data->len); if (buf == NULL) return -1; msg = (struct mgmt_msg *) buf; + msg->eid = hton32(MGMT_EID); msg->code = FLOW_REQ; - msg->src_udp_port = src_udp_port; - msg->qoscube = cube; + msg->s_eid = hton32(s_eid); + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); + msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); + if (data->len > 0) + memcpy(buf + len, data->data, data->len); - ret = send_shim_udp_msg(buf, len, dst_ip_addr); + if (sendto(udp_data.s_fd, msg, len + data->len, + SENDTO_FLAGS, + (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0) { + free(buf); + return -1; + } free(buf); - return ret; + return 0; } -static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, - uint16_t src_udp_port, - uint16_t dst_udp_port, - int response) +static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr, + uint32_t s_eid, + uint32_t d_eid, + int8_t response, + const buffer_t * data) { - uint8_t * buf; struct mgmt_msg * msg; - int ret; - - buf = malloc(sizeof(*msg)); - if (buf == NULL) - return -1; - - msg = (struct mgmt_msg *) buf; - msg->code = FLOW_REPLY; - msg->src_udp_port = src_udp_port; - msg->dst_udp_port = dst_udp_port; - msg->response = response; - - ret = send_shim_udp_msg(buf, sizeof(*msg), dst_ip_addr); - - free(buf); - return ret; -} - -static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, - const uint8_t * dst, - qoscube_t cube) -{ - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - struct timespec abstime; - struct sockaddr_in f_saddr; - socklen_t f_saddr_len = sizeof(f_saddr); - int skfd; - int fd; - - log_dbg("Port request arrived from UDP port %d", - ntohs(c_saddr->sin_port)); - - if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { - log_err("Could not create UDP socket."); + msg = malloc(sizeof(*msg) + data->len); + if (msg == NULL) return -1; - } - - memset((char *) &f_saddr, 0, sizeof(f_saddr)); - f_saddr.sin_family = AF_INET; - f_saddr.sin_addr.s_addr = local_ip; - f_saddr.sin_port = 0; - if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { - log_err("Could not bind to socket."); - close(skfd); - return -1; - } + msg->eid = hton32(MGMT_EID); + msg->code = FLOW_REPLY; + msg->s_eid = hton32(s_eid); + msg->d_eid = hton32(d_eid); + msg->response = response; - if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { - log_err("Could not get address from fd."); - return -1; - } + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); - /* connect stores the remote address in the file descriptor */ - if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { - log_err("Could not connect to remote UDP client."); - close(skfd); + if (sendto(udp_data.s_fd, msg, sizeof(*msg) + data->len, + SENDTO_FLAGS, + (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0 ) { + free(msg); return -1; } - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + free(msg); - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + return 0; +} - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } +static int udp_ipcp_port_req(struct sockaddr_in * c_saddr, + int d_eid, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) +{ + int fd; - /* reply to IRM */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data); if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); - close(skfd); return -1; } pthread_rwlock_wrlock(&udp_data.flows_lock); - udp_data.uf_to_fd[skfd] = fd; - udp_data.fd_to_uf[fd].skfd = skfd; - udp_data.fd_to_uf[fd].udp = f_saddr.sin_port; + udp_data.fd_to_uf[fd].r_saddr = *c_saddr; + udp_data.fd_to_uf[fd].d_eid = d_eid; pthread_rwlock_unlock(&udp_data.flows_lock); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - log_dbg("Pending allocation request, fd %d, UDP port (%d, %d).", - fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); + log_dbg("Pending allocation request, fd %d, remote eid %d.", + fd, d_eid); return 0; } -/* returns the n flow descriptor */ -static int udp_port_to_fd(int udp_port) -{ - int i; - - for (i = 0; i < SYS_MAX_FLOWS; ++i) - if (udp_data.fd_to_uf[i].udp == udp_port) - return i; - - return -1; -} - -static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port, - uint16_t dst_udp_port, - int response) +static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr, + uint32_t s_eid, + uint32_t d_eid, + int8_t response, + const buffer_t * data) { - int fd = -1; - int ret = 0; - int skfd = -1; - - struct sockaddr_in t_saddr; - socklen_t t_saddr_len = sizeof(t_saddr); - - log_dbg("Received reply for flow on udp port %d.", - ntohs(dst_udp_port)); - - pthread_rwlock_rdlock(&udp_data.flows_lock); - - fd = udp_port_to_fd(dst_udp_port); - skfd = udp_data.fd_to_uf[fd].skfd; - - pthread_rwlock_unlock(&udp_data.flows_lock); + time_t mpl = IPCP_UDP_MPL; - /* get the original address with the LISTEN PORT */ - if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { - log_dbg("Flow with fd %d has no peer.", fd); - return -1; - } - - /* connect to the flow udp port */ - t_saddr.sin_port = src_udp_port; + pthread_rwlock_wrlock(&udp_data.flows_lock); - if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { - close(skfd); + if (memcmp(&udp_data.fd_to_uf[s_eid].r_saddr, saddr, sizeof(*saddr))) { + pthread_rwlock_unlock(&udp_data.flows_lock); + log_err("Flow allocation reply for %u from wrong source.", + s_eid); return -1; } - pthread_rwlock_rdlock(&udp_data.flows_lock); - - set_fd(skfd); + if (response == 0) + udp_data.fd_to_uf[s_eid].d_eid = d_eid; pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_flow_alloc_reply(fd, response) < 0) + if (ipcp_flow_alloc_reply(s_eid, response, mpl, data) < 0) { + log_err("Failed to reply to flow allocation."); return -1; + } - log_dbg("Flow allocation completed, UDP ports: (%d, %d).", - ntohs(dst_udp_port), ntohs(src_udp_port)); + log_dbg("Flow allocation completed on eids (%d, %d).", + s_eid, d_eid); - return ret; + return 0; } -static void * ipcp_udp_listener(void * o) +static int udp_ipcp_mgmt_frame(const uint8_t * buf, + size_t len, + struct sockaddr_in c_saddr) { - uint8_t buf[SHIM_UDP_MSG_SIZE]; - ssize_t n = 0; - struct sockaddr_in c_saddr; - int sfd = udp_data.s_fd; + struct mgmt_msg * msg; + size_t msg_len; + qosspec_t qs; + buffer_t data; + + msg = (struct mgmt_msg *) buf; + + switch (msg->code) { + case FLOW_REQ: + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + + assert(len >= msg_len); + + data.len = len - msg_len; + data.data = (uint8_t *) buf + msg_len; + + + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); + + return udp_ipcp_port_req(&c_saddr, ntoh32(msg->s_eid), + (uint8_t *) (msg + 1), qs, + &data); + case FLOW_REPLY: + assert(len >= sizeof(*msg)); + + data.len = len - sizeof(*msg); + data.data = (uint8_t *) buf + sizeof(*msg); + + return udp_ipcp_port_alloc_reply(&c_saddr, + ntoh32(msg->s_eid), + ntoh32(msg->d_eid), + msg->response, + &data); + default: + log_err("Unknown message received %d.", msg->code); + return -1; + } +} +static void * udp_ipcp_mgmt_handler(void * o) +{ (void) o; + pthread_cleanup_push(__cleanup_mutex_unlock, &udp_data.mgmt_lock); + while (true) { - struct mgmt_msg * msg = NULL; + struct mgmt_frame * frame; - memset(&buf, 0, SHIM_UDP_MSG_SIZE); - n = sizeof(c_saddr); - n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, - (struct sockaddr *) &c_saddr, (unsigned *) &n); - if (n < 0) - continue; + pthread_mutex_lock(&udp_data.mgmt_lock); - /* flow alloc request from other host */ - if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, - sizeof(c_saddr.sin_addr.s_addr), AF_INET) - == NULL) - continue; + while (list_is_empty(&udp_data.mgmt_frames)) + pthread_cond_wait(&udp_data.mgmt_cond, + &udp_data.mgmt_lock); - msg = (struct mgmt_msg *) buf; - - switch (msg->code) { - case FLOW_REQ: - c_saddr.sin_port = msg->src_udp_port; - ipcp_udp_port_req(&c_saddr, - (uint8_t *) (msg + 1), - msg->qoscube); - break; - case FLOW_REPLY: - ipcp_udp_port_alloc_reply(msg->src_udp_port, - msg->dst_udp_port, - msg->response); - break; - default: - log_err("Unknown message received %d.", msg->code); - continue; - } + frame = list_first_entry((&udp_data.mgmt_frames), + struct mgmt_frame, next); + assert(frame != NULL); + list_del(&frame->next); - c_saddr.sin_port = LISTEN_PORT; + pthread_mutex_unlock(&udp_data.mgmt_lock); + + udp_ipcp_mgmt_frame(frame->buf, frame->len, frame->r_saddr); + + free(frame); } - return 0; + pthread_cleanup_pop(false); + + return (void *) 0; } -static void * ipcp_udp_sdu_reader(void * o) +static void * udp_ipcp_packet_reader(void * o) { - ssize_t n; - int skfd; - int fd; - /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_SDU_SIZE]; - struct sockaddr_in r_saddr; - struct timeval tv = {0, FD_UPDATE_TIMEOUT}; - fd_set read_fds; - int flags; + uint8_t buf[IPCP_UDP_MAX_PACKET_SIZE]; + uint8_t * data; + ssize_t n; + uint32_t eid; + uint32_t * eid_p; (void) o; ipcp_lock_to_core(); + data = buf + sizeof(uint32_t); + eid_p = (uint32_t *) buf; + while (true) { - pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_mutex_lock(&udp_data.fd_set_lock); + struct mgmt_frame * frame; + struct sockaddr_in r_saddr; + socklen_t len; + struct shm_du_buff * sdb; + uint8_t * head; - read_fds = udp_data.flow_fd_s; - udp_data.fd_set_mod = false; - pthread_cond_broadcast(&udp_data.fd_set_cond); + len = sizeof(r_saddr); - pthread_mutex_unlock(&udp_data.fd_set_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); + n = recvfrom(udp_data.s_fd, buf, IPCP_UDP_MAX_PACKET_SIZE, 0, + (struct sockaddr *) &r_saddr, &len); + if (n < 0) + continue; - if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) + if (n == 0) + log_dbg("Got a 0 frame."); + + if ((size_t) n < sizeof(eid)) { + log_dbg("Dropped bad frame."); continue; + } - for (skfd = 0; skfd < FD_SETSIZE; ++skfd) { - if (!FD_ISSET(skfd, &read_fds)) + eid = ntoh32(*eid_p); + + /* pass onto mgmt queue */ + if (eid == MGMT_EID) { + if ((size_t) n < MGMT_FRAME_SIZE) { + log_warn("Dropped runt mgmt frame."); continue; - flags = fcntl(skfd, F_GETFL, 0); - fcntl(skfd, F_SETFL, flags | O_NONBLOCK); - n = sizeof(r_saddr); - if ((n = recvfrom(skfd, - &buf, - SHIM_UDP_MAX_SDU_SIZE, - 0, - (struct sockaddr *) &r_saddr, - (unsigned *) &n)) <= 0) + } + + frame = malloc(sizeof(*frame)); + if (frame == NULL) continue; - pthread_rwlock_rdlock(&udp_data.flows_lock); + memcpy(frame->buf, buf, n); + memcpy(&frame->r_saddr, &r_saddr, sizeof(r_saddr)); + frame->len = n; - fd = udp_data.uf_to_fd[skfd]; + pthread_mutex_lock(&udp_data.mgmt_lock); + list_add(&frame->next, &udp_data.mgmt_frames); + pthread_cond_signal(&udp_data.mgmt_cond); + pthread_mutex_unlock(&udp_data.mgmt_lock); + continue; + } - pthread_rwlock_unlock(&udp_data.flows_lock); + n-= sizeof(eid); - flow_write(fd, buf, n); - } + if (ipcp_sdb_reserve(&sdb, n)) + continue; + + head = shm_du_buff_head(sdb); + memcpy(head, data, n); + if (np1_flow_write(eid, sdb) < 0) + ipcp_sdb_release(sdb); } return (void *) 0; } -static void * ipcp_udp_sdu_loop(void * o) +static void cleanup_fqueue(void * fq) { - int fd; - struct shm_du_buff * sdb; + fqueue_destroy((fqueue_t *) fq); +} + +static void cleanup_sdb(void * sdb) +{ + ipcp_sdb_release((struct shm_du_buff *) sdb); +} + +static void * udp_ipcp_packet_writer(void * o) +{ + fqueue_t * fq; + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; (void) o; ipcp_lock_to_core(); + pthread_cleanup_push(cleanup_fqueue, fq); + while (true) { - fevent(udp_data.np1_flows, udp_data.fq, NULL); - while ((fd = fqueue_next(udp_data.fq)) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_err("Bad read from fd %d.", fd); + struct sockaddr_in saddr; + int eid; + int fd; + fevent(udp_data.np1_flows, fq, NULL); + while ((fd = fqueue_next(fq)) >= 0) { + struct shm_du_buff * sdb; + uint8_t * buf; + uint16_t len; + + if (fqueue_type(fq) != FLOW_PKT) + continue; + + if (np1_flow_read(fd, &sdb)) { + log_dbg("Bad read from fd %d.", fd); + continue; + } + + len = shm_du_buff_len(sdb); + if (len > IPCP_UDP_MAX_PACKET_SIZE) { + log_dbg("Packet length exceeds MTU."); + ipcp_sdb_release(sdb); + continue; + } + + buf = shm_du_buff_head_alloc(sdb, OUR_HEADER_LEN); + if (buf == NULL) { + log_dbg("Failed to allocate header."); + ipcp_sdb_release(sdb); continue; } pthread_rwlock_rdlock(&udp_data.flows_lock); - fd = udp_data.fd_to_uf[fd].skfd; + eid = hton32(udp_data.fd_to_uf[fd].d_eid); + saddr = udp_data.fd_to_uf[fd].r_saddr; pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_cleanup_push((void (*)(void *)) ipcp_sdb_release, - (void *) sdb); + memcpy(buf, &eid, sizeof(eid)); + + pthread_cleanup_push(cleanup_sdb, sdb); - if (send(fd, shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - log_err("Failed to send SDU."); + if (sendto(udp_data.s_fd, buf, len + OUR_HEADER_LEN, + SENDTO_FLAGS, + (const struct sockaddr *) &saddr, + sizeof(saddr)) < 0) + log_err("Failed to send packet."); pthread_cleanup_pop(true); } } + pthread_cleanup_pop(true); + return (void *) 1; } -static int ipcp_udp_bootstrap(const struct ipcp_config * conf) +static const char * inet4_ntop(const void * addr, + char * buf) +{ + return inet_ntop(AF_INET, addr, buf, INET_ADDRSTRLEN); +} + +static int udp_ipcp_bootstrap(const struct ipcp_config * conf) { - struct sockaddr_in s_saddr; char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - int enable = 1; - int fd = -1; + int i = 1; assert(conf); assert(conf->type == THIS_TYPE); - if (inet_ntop(AF_INET, - &conf->ip_addr, - ipstr, - INET_ADDRSTRLEN) == NULL) { - log_err("Failed to convert IP address"); + ipcpi.dir_hash_algo = HASH_MD5; + strcpy(ipcpi.layer_name, conf->layer_info.name); + + if (inet4_ntop(&conf->udp.ip_addr, ipstr) == NULL) { + log_err("Failed to convert IP address."); return -1; } - if (conf->dns_addr != 0) { - if (inet_ntop(AF_INET, - &conf->dns_addr, - dnsstr, - INET_ADDRSTRLEN) == NULL) { - log_err("Failed to convert DNS address"); + if (conf->udp.dns_addr != 0) { + if (inet4_ntop(&conf->udp.dns_addr, dnsstr) == NULL) { + log_err("Failed to convert DNS address."); return -1; } #ifndef HAVE_DDNS - log_warn("DNS disabled at compile time, address ignored"); + log_warn("DNS disabled at compile time, address ignored."); #endif } else { strcpy(dnsstr, "not set"); } /* UDP listen server */ - if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - log_err("Can't create socket."); + udp_data.s_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (udp_data.s_fd < 0) { + log_err("Can't create socket: %s", strerror(errno)); goto fail_socket; } - if (setsockopt(fd, - SOL_SOCKET, - SO_REUSEADDR, - &enable, - sizeof(int)) < 0) - log_warn("Failed to set SO_REUSEADDR."); - - memset((char *) &s_saddr, 0, sizeof(s_saddr)); + memset((char *) &udp_data.s_saddr, 0, sizeof(udp_data.s_saddr)); udp_data.s_saddr.sin_family = AF_INET; - udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr; - udp_data.s_saddr.sin_port = LISTEN_PORT; + udp_data.s_saddr.sin_addr.s_addr = conf->udp.ip_addr; + udp_data.s_saddr.sin_port = htons(conf->udp.port); - if (bind(fd, - (struct sockaddr *) &udp_data.s_saddr, - sizeof(udp_data.s_saddr)) < 0) { - log_err("Couldn't bind to %s.", ipstr); + if (bind(udp_data.s_fd, SADDR, SADDR_SIZE) < 0) { + log_err("Couldn't bind to %s:%d. %s.", + ipstr, conf->udp.port, strerror(errno)); goto fail_bind; } - udp_data.s_fd = fd; - udp_data.ip_addr = conf->ip_addr; - udp_data.dns_addr = conf->dns_addr; - - FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s); + udp_data.dns_addr = conf->udp.dns_addr; - ipcp_set_state(IPCP_OPERATIONAL); - - if (pthread_create(&udp_data.handler, - NULL, - ipcp_udp_listener, - NULL)) { - ipcp_set_state(IPCP_INIT); + if (pthread_create(&udp_data.mgmt_handler, NULL, + udp_ipcp_mgmt_handler, NULL)) { + log_err("Failed to create management thread."); goto fail_bind; } - if (pthread_create(&udp_data.sdu_reader, - NULL, - ipcp_udp_sdu_reader, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + for (i = 0; i < IPCP_UDP_RD_THR; ++i) { + if (pthread_create(&udp_data.packet_reader[i], NULL, + udp_ipcp_packet_reader, NULL)) { + log_err("Failed to create reader thread."); + goto fail_packet_reader; + } } - if (pthread_create(&udp_data.sduloop, - NULL, - ipcp_udp_sdu_loop, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sduloop; + for (i = 0; i < IPCP_UDP_WR_THR; ++i) { + if (pthread_create(&udp_data.packet_writer[i], NULL, + udp_ipcp_packet_writer, NULL)) { + log_err("Failed to create writer thread."); + goto fail_packet_writer; + } } log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); log_dbg("Bound to IP address %s.", ipstr); - log_dbg("DNS server address is %s.", dnsstr); + log_dbg("Using port %u.", conf->udp.port); + if (conf->udp.dns_addr != 0) + log_dbg("DNS server address is %s.", dnsstr); + else + log_dbg("DNS server not in use."); return 0; - fail_sduloop: - pthread_cancel(udp_data.sdu_reader); - pthread_join(udp_data.sdu_reader, NULL); - fail_sdu_reader: - pthread_cancel(udp_data.handler); - pthread_join(udp_data.handler, NULL); + fail_packet_writer: + while (i > 0) { + pthread_cancel(udp_data.packet_writer[--i]); + pthread_join(udp_data.packet_writer[i], NULL); + } + i = IPCP_UDP_RD_THR; + fail_packet_reader: + while (i > 0) { + pthread_cancel(udp_data.packet_reader[--i]); + pthread_join(udp_data.packet_reader[i], NULL); + } + pthread_cancel(udp_data.mgmt_handler); + pthread_join(udp_data.mgmt_handler, NULL); fail_bind: - close(fd); + close(udp_data.s_fd); fail_socket: return -1; } @@ -665,20 +691,22 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) /* NOTE: Disgusted with this crap */ static int ddns_send(char * cmd) { - pid_t pid = -1; - int wstatus; - int pipe_fd[2]; + pid_t pid; + int wstatus; + int pipe_fd[2]; char * argv[] = {NSUPDATE_EXEC, 0}; char * envp[] = {0}; if (pipe(pipe_fd)) { - log_err("Failed to create pipe."); + log_err("Failed to create pipe: %s.", strerror(errno)); return -1; } pid = fork(); if (pid == -1) { - log_err("Failed to fork."); + log_err("Failed to fork: %s.", strerror(errno)); + close(pipe_fd[0]); + close(pipe_fd[1]); return -1; } @@ -686,12 +714,15 @@ static int ddns_send(char * cmd) close(pipe_fd[1]); dup2(pipe_fd[0], 0); execve(argv[0], &argv[0], envp); + log_err("Failed to execute: %s", strerror(errno)); + exit(1); } close(pipe_fd[0]); if (write(pipe_fd[1], cmd, strlen(cmd)) == -1) { - log_err("Failed to communicate with nsupdate."); + log_err("Failed to communicate with nsupdate: %s.", + strerror(errno)); close(pipe_fd[1]); return -1; } @@ -703,35 +734,38 @@ static int ddns_send(char * cmd) log_err("Failed to register with DNS server."); close(pipe_fd[1]); + return 0; } static uint32_t ddns_resolve(char * name, uint32_t dns_addr) { - pid_t pid = -1; - int wstatus; - int pipe_fd[2]; - char dnsstr[INET_ADDRSTRLEN]; - char buf[SHIM_UDP_BUF_SIZE]; - ssize_t count = 0; - char * substr = NULL; - char * substr2 = NULL; - char * addr_str = "Address:"; - uint32_t ip_addr = 0; - - if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) + pid_t pid = -1; + int wstatus; + int pipe_fd[2]; + char dnsstr[INET_ADDRSTRLEN]; + char buf[IPCP_UDP_BUF_SIZE]; + ssize_t count = 0; + char * substr = NULL; + char * substr2 = NULL; + char * addr_str = "Address:"; + uint32_t ip_addr = 0; + + if (inet4_ntop(&dns_addr, dnsstr) == NULL) return 0; if (pipe(pipe_fd)) { - log_err("Failed to create pipe."); + log_err("Failed to create pipe: %s.", strerror(errno)); return 0; } pid = fork(); if (pid == -1) { - log_err("Failed to fork."); - return 0; + log_err("Failed to fork: %s.", strerror(errno)); + close(pipe_fd[0]); + close(pipe_fd[1]); + return -1; } if (pid == 0) { @@ -741,11 +775,13 @@ static uint32_t ddns_resolve(char * name, close(pipe_fd[0]); dup2(pipe_fd[1], 1); execve(argv[0], &argv[0], envp); + log_err("Failed to execute: %s", strerror(errno)); + exit(1); } close(pipe_fd[1]); - count = read(pipe_fd[0], buf, SHIM_UDP_BUF_SIZE); + count = read(pipe_fd[0], buf, IPCP_UDP_BUF_SIZE - 1); if (count <= 0) { log_err("Failed to communicate with nslookup."); close(pipe_fd[0]); @@ -755,7 +791,8 @@ static uint32_t ddns_resolve(char * name, close(pipe_fd[0]); waitpid(pid, &wstatus, 0); - if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0) + if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0 && + count != IPCP_UDP_BUF_SIZE - 1) log_dbg("Succesfully communicated with nslookup."); else log_err("Failed to resolve DNS address."); @@ -781,24 +818,31 @@ static uint32_t ddns_resolve(char * name, } #endif -static int ipcp_udp_reg(const uint8_t * hash) +static int udp_ipcp_reg(const uint8_t * hash) { #ifdef HAVE_DDNS - char ipstr[INET_ADDRSTRLEN]; - char dnsstr[INET_ADDRSTRLEN]; - char cmd[1000]; + char ipstr[INET_ADDRSTRLEN]; + char dnsstr[INET_ADDRSTRLEN]; + char cmd[1000]; uint32_t dns_addr; uint32_t ip_addr; #endif - char hashstr[ipcp_dir_hash_strlen() + 1]; + char * hashstr; + + hashstr = malloc(ipcp_dir_hash_strlen() + 1); + if (hashstr == NULL) { + log_err("Failed to malloc hashstr."); + return -1; + } assert(hash); ipcp_hash_str(hashstr, hash); if (shim_data_reg_add_entry(udp_data.shim_data, hash)) { - log_err("Failed to add " HASH_FMT " to local registry.", - HASH_VAL(hash)); + log_err("Failed to add " HASH_FMT32 " to local registry.", + HASH_VAL32(hash)); + free(hashstr); return -1; } @@ -808,15 +852,17 @@ static int ipcp_udp_reg(const uint8_t * hash) dns_addr = udp_data.dns_addr; if (dns_addr != 0) { - ip_addr = udp_data.ip_addr; + ip_addr = udp_data.s_saddr.sin_addr.s_addr; - if (inet_ntop(AF_INET, &ip_addr, - ipstr, INET_ADDRSTRLEN) == NULL) { + if (inet4_ntop(&ip_addr, ipstr) == NULL) { + log_err("Failed to convert IP address to string."); + free(hashstr); return -1; } - if (inet_ntop(AF_INET, &dns_addr, - dnsstr, INET_ADDRSTRLEN) == NULL) { + if (inet4_ntop(&dns_addr, dnsstr) == NULL) { + log_err("Failed to convert DNS address to string."); + free(hashstr); return -1; } @@ -824,28 +870,36 @@ static int ipcp_udp_reg(const uint8_t * hash) dnsstr, hashstr, DNS_TTL, ipstr); if (ddns_send(cmd)) { + log_err("Failed to send DDNS message."); shim_data_reg_del_entry(udp_data.shim_data, hash); + free(hashstr); return -1; } } #endif - log_dbg("Registered " HASH_FMT ".", HASH_VAL(hash)); + free(hashstr); return 0; } -static int ipcp_udp_unreg(const uint8_t * hash) +static int udp_ipcp_unreg(const uint8_t * hash) { #ifdef HAVE_DDNS - char dnsstr[INET_ADDRSTRLEN]; + char dnsstr[INET_ADDRSTRLEN]; /* max DNS name length + max IP length + max command length */ - char cmd[100]; + char cmd[100]; uint32_t dns_addr; #endif - char hashstr[ipcp_dir_hash_strlen() + 1]; + char * hashstr; assert(hash); + hashstr = malloc(ipcp_dir_hash_strlen() + 1); + if (hashstr == NULL) { + log_err("Failed to malloc hashstr."); + return -1; + } + ipcp_hash_str(hashstr, hash); #ifdef HAVE_DDNS @@ -854,8 +908,9 @@ static int ipcp_udp_unreg(const uint8_t * hash) dns_addr = udp_data.dns_addr; if (dns_addr != 0) { - if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) - == NULL) { + if (inet4_ntop(&dns_addr, dnsstr) == NULL) { + log_err("Failed to convert DNS address to string."); + free(hashstr); return -1; } sprintf(cmd, "server %s\nupdate delete %s A\nsend\nquit\n", @@ -867,26 +922,33 @@ static int ipcp_udp_unreg(const uint8_t * hash) shim_data_reg_del_entry(udp_data.shim_data, hash); - log_dbg("Unregistered " HASH_FMT ".", HASH_VAL(hash)); + free(hashstr); return 0; } -static int ipcp_udp_query(const uint8_t * hash) +static int udp_ipcp_query(const uint8_t * hash) { - uint32_t ip_addr = 0; - struct hostent * h; + uint32_t ip_addr = 0; + char * hashstr; + struct hostent * h; #ifdef HAVE_DDNS - uint32_t dns_addr = 0; + uint32_t dns_addr = 0; #endif - char hashstr[ipcp_dir_hash_strlen() + 1]; - assert(hash); + hashstr = malloc(ipcp_dir_hash_strlen() + 1); + if (hashstr == NULL) { + log_err("Failed to malloc hashstr."); + return -ENOMEM; + } + ipcp_hash_str(hashstr, hash); - if (shim_data_dir_has(udp_data.shim_data, hash)) + if (shim_data_dir_has(udp_data.shim_data, hash)) { + free(hashstr); return 0; + } #ifdef HAVE_DDNS dns_addr = udp_data.dns_addr; @@ -894,14 +956,16 @@ static int ipcp_udp_query(const uint8_t * hash) if (dns_addr != 0) { ip_addr = ddns_resolve(hashstr, dns_addr); if (ip_addr == 0) { - log_dbg("Could not resolve %s.", hashstr); + log_err("Could not resolve %s.", hashstr); + free(hashstr); return -1; } } else { #endif h = gethostbyname(hashstr); if (h == NULL) { - log_dbg("Could not resolve %s.", hashstr); + log_err("Could not resolve %s.", hashstr); + free(hashstr); return -1; } @@ -912,260 +976,177 @@ static int ipcp_udp_query(const uint8_t * hash) if (shim_data_dir_add_entry(udp_data.shim_data, hash, ip_addr)) { log_err("Failed to add directory entry."); + free(hashstr); return -1; } + free(hashstr); + return 0; } -static int ipcp_udp_flow_alloc(int fd, - const uint8_t * dst, - qoscube_t cube) +static int udp_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { - struct sockaddr_in r_saddr; /* server address */ - struct sockaddr_in f_saddr; /* flow */ - socklen_t f_saddr_len = sizeof(f_saddr); - int skfd; + struct sockaddr_in r_saddr; /* Server address */ uint32_t ip_addr = 0; + char ipstr[INET_ADDRSTRLEN]; - log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); + (void) qs; assert(dst); - if (cube != QOS_CUBE_BE) { - log_dbg("Unsupported QoS requested."); + if (!shim_data_dir_has(udp_data.shim_data, dst)) { + log_err("Could not resolve destination."); return -1; } - skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - - /* this socket is for the flow */ - memset((char *) &f_saddr, 0, sizeof(f_saddr)); - f_saddr.sin_family = AF_INET; - f_saddr.sin_addr.s_addr = local_ip; - f_saddr.sin_port = 0; - - if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { - close(skfd); - return -1; - } + ip_addr = (uint32_t) shim_data_dir_get_addr(udp_data.shim_data, dst); - if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { - log_err("Could not get address from fd."); - close(skfd); + if (inet4_ntop(&ip_addr, ipstr) == NULL) { + log_err("Could not convert IP address."); return -1; } - if (!shim_data_dir_has(udp_data.shim_data, dst)) { - log_dbg("Could not resolve destination."); - close(skfd); - return -1; - } - ip_addr = (uint32_t) shim_data_dir_get_addr(udp_data.shim_data, dst); + log_dbg("Destination " HASH_FMT32 " resolved at IP %s.", + HASH_VAL32(dst), ipstr); - /* connect to server (store the remote IP address in the fd) */ memset((char *) &r_saddr, 0, sizeof(r_saddr)); r_saddr.sin_family = AF_INET; r_saddr.sin_addr.s_addr = ip_addr; - r_saddr.sin_port = LISTEN_PORT; + r_saddr.sin_port = udp_data.s_saddr.sin_port; - if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { - close(skfd); + if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data) < 0) { + log_err("Could not allocate port."); return -1; } pthread_rwlock_wrlock(&udp_data.flows_lock); - udp_data.fd_to_uf[fd].udp = f_saddr.sin_port; - udp_data.fd_to_uf[fd].skfd = skfd; - udp_data.uf_to_fd[skfd] = fd; - - fset_add(udp_data.np1_flows, fd); + udp_data.fd_to_uf[fd].d_eid = -1; + udp_data.fd_to_uf[fd].r_saddr = r_saddr; pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) { - pthread_rwlock_wrlock(&udp_data.flows_lock); - - udp_data.fd_to_uf[fd].udp = -1; - udp_data.fd_to_uf[fd].skfd = -1; - udp_data.uf_to_fd[skfd] = -1; - - pthread_rwlock_unlock(&udp_data.flows_lock); - close(skfd); - return -1; - } - - log_dbg("Flow pending on fd %d, UDP port %d.", - fd, ntohs(f_saddr.sin_port)); + fset_add(udp_data.np1_flows, fd); return 0; } -static int ipcp_udp_flow_alloc_resp(int fd, - int response) +static int udp_ipcp_flow_alloc_resp(int fd, + int resp, + const buffer_t * data) { - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - struct timespec abstime; - int skfd = -1; - struct sockaddr_in f_saddr; - struct sockaddr_in r_saddr; - socklen_t len = sizeof(r_saddr); - - if (response) - return 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + struct sockaddr_in saddr; + int d_eid; - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); return -1; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); - skfd = udp_data.fd_to_uf[fd].skfd; + saddr = udp_data.fd_to_uf[fd].r_saddr; + d_eid = udp_data.fd_to_uf[fd].d_eid; pthread_rwlock_unlock(&udp_data.flows_lock); - if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) { - log_dbg("Socket with fd %d has no address.", skfd); - return -1; - } - - if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) { - log_dbg("Socket with fd %d has no peer.", skfd); + if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data) < 0) { + fset_del(udp_data.np1_flows, fd); + log_err("Failed to respond to flow request."); return -1; } - pthread_rwlock_rdlock(&udp_data.flows_lock); - - set_fd(skfd); - fset_add(udp_data.np1_flows, fd); - pthread_rwlock_unlock(&udp_data.flows_lock); - - if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, f_saddr.sin_port, - r_saddr.sin_port, response) < 0) { - pthread_rwlock_rdlock(&udp_data.flows_lock); - clr_fd(skfd); - pthread_rwlock_unlock(&udp_data.flows_lock); - return -1; - } - - log_dbg("Accepted flow, fd %d on UDP port %d.", - fd, ntohs(f_saddr.sin_port)); - return 0; } -static int ipcp_udp_flow_dealloc(int fd) +static int udp_ipcp_flow_dealloc(int fd) { - int skfd = -1; - ipcp_flow_fini(fd); - pthread_rwlock_wrlock(&udp_data.flows_lock); - fset_del(udp_data.np1_flows, fd); - skfd = udp_data.fd_to_uf[fd].skfd; - - udp_data.uf_to_fd[skfd] = -1; - udp_data.fd_to_uf[fd].udp = -1; - udp_data.fd_to_uf[fd].skfd = -1; - - close(skfd); - - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); - clr_fd(skfd); + udp_data.fd_to_uf[fd].d_eid = -1; + memset(&udp_data.fd_to_uf[fd].r_saddr, 0, SADDR_SIZE); pthread_rwlock_unlock(&udp_data.flows_lock); - flow_dealloc(fd); - - log_dbg("Flow with fd %d deallocated.", fd); + ipcp_flow_dealloc(fd); return 0; } static struct ipcp_ops udp_ops = { - .ipcp_bootstrap = ipcp_udp_bootstrap, + .ipcp_bootstrap = udp_ipcp_bootstrap, .ipcp_enroll = NULL, .ipcp_connect = NULL, .ipcp_disconnect = NULL, - .ipcp_reg = ipcp_udp_reg, - .ipcp_unreg = ipcp_udp_unreg, - .ipcp_query = ipcp_udp_query, - .ipcp_flow_alloc = ipcp_udp_flow_alloc, - .ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp, - .ipcp_flow_dealloc = ipcp_udp_flow_dealloc + .ipcp_reg = udp_ipcp_reg, + .ipcp_unreg = udp_ipcp_unreg, + .ipcp_query = udp_ipcp_query, + .ipcp_flow_alloc = udp_ipcp_flow_alloc, + .ipcp_flow_join = NULL, + .ipcp_flow_alloc_resp = udp_ipcp_flow_alloc_resp, + .ipcp_flow_dealloc = udp_ipcp_flow_dealloc }; int main(int argc, char * argv[]) { - if (ipcp_init(argc, argv, &udp_ops) < 0) { - ipcp_create_r(getpid(), -1); - exit(EXIT_FAILURE); - } + int i; + if (udp_data_init() < 0) { log_err("Failed to init udp data."); - ipcp_create_r(getpid(), -1); - ipcp_fini(); - exit(EXIT_FAILURE); + goto fail_data_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - ipcp_create_r(getpid(), -1); - udp_data_fini(); - ipcp_fini(); - exit(EXIT_FAILURE); + if (ipcp_init(argc, argv, &udp_ops, THIS_TYPE) < 0) { + log_err("Failed to initialize IPCP."); + goto fail_init; } - if (ipcp_create_r(getpid(), 0)) { - log_err("Failed to notify IRMd we are initialized."); - ipcp_set_state(IPCP_NULL); - ipcp_shutdown(); - udp_data_fini(); - ipcp_fini(); - exit(EXIT_FAILURE); + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.sduloop); - pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.sdu_reader); - - pthread_join(udp_data.sduloop, NULL); - pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.sdu_reader, NULL); + for (i = 0; i < IPCP_UDP_WR_THR; ++i) + pthread_cancel(udp_data.packet_writer[i]); + for (i = 0; i < IPCP_UDP_RD_THR; ++i) + pthread_cancel(udp_data.packet_reader[i]); + pthread_cancel(udp_data.mgmt_handler); + + for (i = 0; i < IPCP_UDP_WR_THR; ++i) + pthread_join(udp_data.packet_writer[i], NULL); + for (i = 0; i < IPCP_UDP_RD_THR; ++i) + pthread_join(udp_data.packet_reader[i], NULL); + pthread_join(udp_data.mgmt_handler, NULL); + close(udp_data.s_fd); } - udp_data_fini(); + ipcp_stop(); ipcp_fini(); + udp_data_fini(); + exit(EXIT_SUCCESS); + + fail_start: + ipcp_fini(); + fail_init: + udp_data_fini(); + fail_data_init: + exit(EXIT_FAILURE); } |
