diff options
Diffstat (limited to 'src/ipcpd/udp/main.c')
-rw-r--r-- | src/ipcpd/udp/main.c | 170 |
1 files changed, 106 insertions, 64 deletions
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 2e59e1a5..a1af1e85 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -20,7 +20,11 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else #define _POSIX_C_SOURCE 200112L +#endif #include "config.h" @@ -50,27 +54,34 @@ #include <sys/wait.h> #include <fcntl.h> -#define FLOW_REQ 1 -#define FLOW_REPLY 2 +#define FLOW_REQ 1 +#define FLOW_REPLY 2 -#define THIS_TYPE IPCP_UDP -#define LISTEN_PORT htons(0x0D1F) -#define SHIM_UDP_BUF_SIZE 256 -#define SHIM_UDP_MSG_SIZE 256 -#define SHIM_UDP_MAX_SDU_SIZE 8980 -#define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ +#define THIS_TYPE IPCP_UDP +#define LISTEN_PORT htons(0x0D1F) +#define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 +#define SHIM_UDP_MAX_PACKET_SIZE 8980 +#define DNS_TTL 86400 +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define local_ip (udp_data.s_saddr.sin_addr.s_addr) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define UDP_MAX_PORTS 0xFFFF +#define UDP_MAX_PORTS 0xFFFF struct mgmt_msg { uint16_t src_udp_port; uint16_t dst_udp_port; uint8_t code; - uint8_t qoscube; uint8_t response; + /* QoS parameters from spec, aligned */ + uint8_t availability; + uint8_t in_order; + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; } __attribute__((packed)); struct uf { @@ -95,9 +106,9 @@ struct { struct uf fd_to_uf[SYS_MAX_FLOWS]; pthread_rwlock_t flows_lock; - pthread_t sduloop; + pthread_t packet_loop; pthread_t handler; - pthread_t sdu_reader; + pthread_t packet_reader; bool fd_set_mod; pthread_cond_t fd_set_cond; @@ -108,6 +119,15 @@ static int udp_data_init(void) { int i; + if (pthread_rwlock_init(&udp_data.flows_lock, NULL)) + return -1; + + if (pthread_cond_init(&udp_data.fd_set_cond, NULL)) + goto fail_set_cond; + + if (pthread_mutex_init(&udp_data.fd_set_lock, NULL)) + goto fail_set_lock; + for (i = 0; i < FD_SETSIZE; ++i) udp_data.uf_to_fd[i] = -1; @@ -120,26 +140,28 @@ static int udp_data_init(void) udp_data.np1_flows = fset_create(); if (udp_data.np1_flows == NULL) - return -ENOMEM; + goto fail_fset; udp_data.fq = fqueue_create(); - if (udp_data.fq == NULL) { - fset_destroy(udp_data.np1_flows); - return -ENOMEM; - } + if (udp_data.fq == NULL) + goto fail_fqueue; udp_data.shim_data = shim_data_create(); - if (udp_data.shim_data == NULL) { - fqueue_destroy(udp_data.fq); - fset_destroy(udp_data.np1_flows); - return -ENOMEM; - } - - pthread_rwlock_init(&udp_data.flows_lock, NULL); - pthread_cond_init(&udp_data.fd_set_cond, NULL); - pthread_mutex_init(&udp_data.fd_set_lock, NULL); + if (udp_data.shim_data == NULL) + goto fail_data; return 0; + fail_data: + fqueue_destroy(udp_data.fq); + fail_fqueue: + fset_destroy(udp_data.np1_flows); + fail_fset: + pthread_mutex_destroy(&udp_data.fd_set_lock); + fail_set_lock: + pthread_cond_destroy(&udp_data.fd_set_cond); + fail_set_cond: + pthread_rwlock_destroy(&udp_data.flows_lock); + return -1; } static void udp_data_fini(void) @@ -204,7 +226,7 @@ static int send_shim_udp_msg(uint8_t * buf, static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, uint16_t src_udp_port, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { uint8_t * buf; struct mgmt_msg * msg; @@ -220,7 +242,13 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, msg = (struct mgmt_msg *) buf; msg->code = FLOW_REQ; msg->src_udp_port = src_udp_port; - msg->qoscube = cube; + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); memcpy(msg + 1, dst, ipcp_dir_hash_len()); @@ -257,7 +285,7 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr, static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; struct timespec abstime; @@ -311,11 +339,12 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, if (ipcp_get_state() != IPCP_OPERATIONAL) { log_dbg("Won't allocate over non-operational IPCP."); pthread_mutex_unlock(&ipcpi.alloc_lock); + close(skfd); return -1; } /* reply to IRM */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); @@ -371,6 +400,11 @@ static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port, pthread_rwlock_rdlock(&udp_data.flows_lock); fd = udp_port_to_fd(dst_udp_port); + if (fd < 0) { + pthread_rwlock_unlock(&udp_data.flows_lock); + return -1; + } + skfd = udp_data.fd_to_uf[fd].skfd; pthread_rwlock_unlock(&udp_data.flows_lock); @@ -415,11 +449,11 @@ static void * ipcp_udp_listener(void * o) while (true) { struct mgmt_msg * msg = NULL; - + qosspec_t qs; memset(&buf, 0, SHIM_UDP_MSG_SIZE); - n = sizeof(c_saddr); n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, - (struct sockaddr *) &c_saddr, (unsigned *) &n); + (struct sockaddr *) &c_saddr, + (socklen_t *) sizeof(c_saddr)); if (n < 0) continue; @@ -434,9 +468,16 @@ static void * ipcp_udp_listener(void * o) switch (msg->code) { case FLOW_REQ: c_saddr.sin_port = msg->src_udp_port; + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); ipcp_udp_port_req(&c_saddr, (uint8_t *) (msg + 1), - msg->qoscube); + qs); break; case FLOW_REPLY: ipcp_udp_port_alloc_reply(msg->src_udp_port, @@ -454,13 +495,13 @@ static void * ipcp_udp_listener(void * o) return 0; } -static void * ipcp_udp_sdu_reader(void * o) +static void * ipcp_udp_packet_reader(void * o) { ssize_t n; int skfd; int fd; /* FIXME: avoid this copy */ - char buf[SHIM_UDP_MAX_SDU_SIZE]; + char buf[SHIM_UDP_MAX_PACKET_SIZE]; struct sockaddr_in r_saddr; struct timeval tv = {0, FD_UPDATE_TIMEOUT}; fd_set read_fds; @@ -492,7 +533,7 @@ static void * ipcp_udp_sdu_reader(void * o) n = sizeof(r_saddr); if ((n = recvfrom(skfd, &buf, - SHIM_UDP_MAX_SDU_SIZE, + SHIM_UDP_MAX_PACKET_SIZE, 0, (struct sockaddr *) &r_saddr, (unsigned *) &n)) <= 0) @@ -511,7 +552,7 @@ static void * ipcp_udp_sdu_reader(void * o) return (void *) 0; } -static void * ipcp_udp_sdu_loop(void * o) +static void * ipcp_udp_packet_loop(void * o) { int fd; struct shm_du_buff * sdb; @@ -534,13 +575,14 @@ static void * ipcp_udp_sdu_loop(void * o) pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_cleanup_push((void (*)(void *)) ipcp_sdb_release, + pthread_cleanup_push((void (*)(void *)) + ipcp_sdb_release, (void *) sdb); if (send(fd, shm_du_buff_head(sdb), shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), 0) < 0) - log_err("Failed to send SDU."); + log_err("Failed to send PACKET."); pthread_cleanup_pop(true); } @@ -624,20 +666,20 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) goto fail_bind; } - if (pthread_create(&udp_data.sdu_reader, + if (pthread_create(&udp_data.packet_reader, NULL, - ipcp_udp_sdu_reader, + ipcp_udp_packet_reader, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + goto fail_packet_reader; } - if (pthread_create(&udp_data.sduloop, + if (pthread_create(&udp_data.packet_loop, NULL, - ipcp_udp_sdu_loop, + ipcp_udp_packet_loop, NULL)) { ipcp_set_state(IPCP_INIT); - goto fail_sduloop; + goto fail_packet_loop; } log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); @@ -646,10 +688,10 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) return 0; - fail_sduloop: - pthread_cancel(udp_data.sdu_reader); - pthread_join(udp_data.sdu_reader, NULL); - fail_sdu_reader: + fail_packet_loop: + pthread_cancel(udp_data.packet_reader); + pthread_join(udp_data.packet_reader, NULL); + fail_packet_reader: pthread_cancel(udp_data.handler); pthread_join(udp_data.handler, NULL); fail_bind: @@ -753,7 +795,8 @@ static uint32_t ddns_resolve(char * name, close(pipe_fd[0]); waitpid(pid, &wstatus, 0); - if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0) + if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0 && + count != SHIM_UDP_BUF_SIZE) log_dbg("Succesfully communicated with nslookup."); else log_err("Failed to resolve DNS address."); @@ -946,7 +989,7 @@ static int ipcp_udp_query(const uint8_t * hash) static int ipcp_udp_flow_alloc(int fd, const uint8_t * dst, - qoscube_t cube) + qosspec_t qs) { struct sockaddr_in r_saddr; /* server address */ struct sockaddr_in f_saddr; /* flow */ @@ -956,14 +999,13 @@ static int ipcp_udp_flow_alloc(int fd, log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); - assert(dst); + (void) qs; - if (cube > QOS_CUBE_DATA) { - log_dbg("Unsupported QoS requested."); - return -1; - } + assert(dst); skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (skfd < 0) + return -1; /* this socket is for the flow */ memset((char *) &f_saddr, 0, sizeof(f_saddr)); @@ -1010,7 +1052,7 @@ static int ipcp_udp_flow_alloc(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) { + if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) { pthread_rwlock_wrlock(&udp_data.flows_lock); udp_data.fd_to_uf[fd].udp = -1; @@ -1180,13 +1222,13 @@ int main(int argc, ipcp_shutdown(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(udp_data.sduloop); + pthread_cancel(udp_data.packet_loop); pthread_cancel(udp_data.handler); - pthread_cancel(udp_data.sdu_reader); + pthread_cancel(udp_data.packet_reader); - pthread_join(udp_data.sduloop, NULL); + pthread_join(udp_data.packet_loop, NULL); pthread_join(udp_data.handler, NULL); - pthread_join(udp_data.sdu_reader, NULL); + pthread_join(udp_data.packet_reader, NULL); } udp_data_fini(); |