summaryrefslogtreecommitdiff
path: root/src/ipcpd/udp/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/udp/main.c')
-rw-r--r--src/ipcpd/udp/main.c170
1 files changed, 106 insertions, 64 deletions
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index 2e59e1a5..a1af1e85 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -20,7 +20,11 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
#define _POSIX_C_SOURCE 200112L
+#endif
#include "config.h"
@@ -50,27 +54,34 @@
#include <sys/wait.h>
#include <fcntl.h>
-#define FLOW_REQ 1
-#define FLOW_REPLY 2
+#define FLOW_REQ 1
+#define FLOW_REPLY 2
-#define THIS_TYPE IPCP_UDP
-#define LISTEN_PORT htons(0x0D1F)
-#define SHIM_UDP_BUF_SIZE 256
-#define SHIM_UDP_MSG_SIZE 256
-#define SHIM_UDP_MAX_SDU_SIZE 8980
-#define DNS_TTL 86400
-#define FD_UPDATE_TIMEOUT 100 /* microseconds */
+#define THIS_TYPE IPCP_UDP
+#define LISTEN_PORT htons(0x0D1F)
+#define SHIM_UDP_BUF_SIZE 256
+#define SHIM_UDP_MSG_SIZE 256
+#define SHIM_UDP_MAX_PACKET_SIZE 8980
+#define DNS_TTL 86400
+#define FD_UPDATE_TIMEOUT 100 /* microseconds */
-#define local_ip (udp_data.s_saddr.sin_addr.s_addr)
+#define local_ip (udp_data.s_saddr.sin_addr.s_addr)
-#define UDP_MAX_PORTS 0xFFFF
+#define UDP_MAX_PORTS 0xFFFF
struct mgmt_msg {
uint16_t src_udp_port;
uint16_t dst_udp_port;
uint8_t code;
- uint8_t qoscube;
uint8_t response;
+ /* QoS parameters from spec, aligned */
+ uint8_t availability;
+ uint8_t in_order;
+ uint32_t delay;
+ uint64_t bandwidth;
+ uint32_t loss;
+ uint32_t ber;
+ uint32_t max_gap;
} __attribute__((packed));
struct uf {
@@ -95,9 +106,9 @@ struct {
struct uf fd_to_uf[SYS_MAX_FLOWS];
pthread_rwlock_t flows_lock;
- pthread_t sduloop;
+ pthread_t packet_loop;
pthread_t handler;
- pthread_t sdu_reader;
+ pthread_t packet_reader;
bool fd_set_mod;
pthread_cond_t fd_set_cond;
@@ -108,6 +119,15 @@ static int udp_data_init(void)
{
int i;
+ if (pthread_rwlock_init(&udp_data.flows_lock, NULL))
+ return -1;
+
+ if (pthread_cond_init(&udp_data.fd_set_cond, NULL))
+ goto fail_set_cond;
+
+ if (pthread_mutex_init(&udp_data.fd_set_lock, NULL))
+ goto fail_set_lock;
+
for (i = 0; i < FD_SETSIZE; ++i)
udp_data.uf_to_fd[i] = -1;
@@ -120,26 +140,28 @@ static int udp_data_init(void)
udp_data.np1_flows = fset_create();
if (udp_data.np1_flows == NULL)
- return -ENOMEM;
+ goto fail_fset;
udp_data.fq = fqueue_create();
- if (udp_data.fq == NULL) {
- fset_destroy(udp_data.np1_flows);
- return -ENOMEM;
- }
+ if (udp_data.fq == NULL)
+ goto fail_fqueue;
udp_data.shim_data = shim_data_create();
- if (udp_data.shim_data == NULL) {
- fqueue_destroy(udp_data.fq);
- fset_destroy(udp_data.np1_flows);
- return -ENOMEM;
- }
-
- pthread_rwlock_init(&udp_data.flows_lock, NULL);
- pthread_cond_init(&udp_data.fd_set_cond, NULL);
- pthread_mutex_init(&udp_data.fd_set_lock, NULL);
+ if (udp_data.shim_data == NULL)
+ goto fail_data;
return 0;
+ fail_data:
+ fqueue_destroy(udp_data.fq);
+ fail_fqueue:
+ fset_destroy(udp_data.np1_flows);
+ fail_fset:
+ pthread_mutex_destroy(&udp_data.fd_set_lock);
+ fail_set_lock:
+ pthread_cond_destroy(&udp_data.fd_set_cond);
+ fail_set_cond:
+ pthread_rwlock_destroy(&udp_data.flows_lock);
+ return -1;
}
static void udp_data_fini(void)
@@ -204,7 +226,7 @@ static int send_shim_udp_msg(uint8_t * buf,
static int ipcp_udp_port_alloc(uint32_t dst_ip_addr,
uint16_t src_udp_port,
const uint8_t * dst,
- qoscube_t cube)
+ qosspec_t qs)
{
uint8_t * buf;
struct mgmt_msg * msg;
@@ -220,7 +242,13 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr,
msg = (struct mgmt_msg *) buf;
msg->code = FLOW_REQ;
msg->src_udp_port = src_udp_port;
- msg->qoscube = cube;
+ msg->delay = hton32(qs.delay);
+ msg->bandwidth = hton64(qs.bandwidth);
+ msg->availability = qs.availability;
+ msg->loss = hton32(qs.loss);
+ msg->ber = hton32(qs.ber);
+ msg->in_order = qs.in_order;
+ msg->max_gap = hton32(qs.max_gap);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
@@ -257,7 +285,7 @@ static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr,
static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
const uint8_t * dst,
- qoscube_t cube)
+ qosspec_t qs)
{
struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
struct timespec abstime;
@@ -311,11 +339,12 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
if (ipcp_get_state() != IPCP_OPERATIONAL) {
log_dbg("Won't allocate over non-operational IPCP.");
pthread_mutex_unlock(&ipcpi.alloc_lock);
+ close(skfd);
return -1;
}
/* reply to IRM */
- fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube);
+ fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs);
if (fd < 0) {
pthread_mutex_unlock(&ipcpi.alloc_lock);
log_err("Could not get new flow from IRMd.");
@@ -371,6 +400,11 @@ static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port,
pthread_rwlock_rdlock(&udp_data.flows_lock);
fd = udp_port_to_fd(dst_udp_port);
+ if (fd < 0) {
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ return -1;
+ }
+
skfd = udp_data.fd_to_uf[fd].skfd;
pthread_rwlock_unlock(&udp_data.flows_lock);
@@ -415,11 +449,11 @@ static void * ipcp_udp_listener(void * o)
while (true) {
struct mgmt_msg * msg = NULL;
-
+ qosspec_t qs;
memset(&buf, 0, SHIM_UDP_MSG_SIZE);
- n = sizeof(c_saddr);
n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0,
- (struct sockaddr *) &c_saddr, (unsigned *) &n);
+ (struct sockaddr *) &c_saddr,
+ (socklen_t *) sizeof(c_saddr));
if (n < 0)
continue;
@@ -434,9 +468,16 @@ static void * ipcp_udp_listener(void * o)
switch (msg->code) {
case FLOW_REQ:
c_saddr.sin_port = msg->src_udp_port;
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
ipcp_udp_port_req(&c_saddr,
(uint8_t *) (msg + 1),
- msg->qoscube);
+ qs);
break;
case FLOW_REPLY:
ipcp_udp_port_alloc_reply(msg->src_udp_port,
@@ -454,13 +495,13 @@ static void * ipcp_udp_listener(void * o)
return 0;
}
-static void * ipcp_udp_sdu_reader(void * o)
+static void * ipcp_udp_packet_reader(void * o)
{
ssize_t n;
int skfd;
int fd;
/* FIXME: avoid this copy */
- char buf[SHIM_UDP_MAX_SDU_SIZE];
+ char buf[SHIM_UDP_MAX_PACKET_SIZE];
struct sockaddr_in r_saddr;
struct timeval tv = {0, FD_UPDATE_TIMEOUT};
fd_set read_fds;
@@ -492,7 +533,7 @@ static void * ipcp_udp_sdu_reader(void * o)
n = sizeof(r_saddr);
if ((n = recvfrom(skfd,
&buf,
- SHIM_UDP_MAX_SDU_SIZE,
+ SHIM_UDP_MAX_PACKET_SIZE,
0,
(struct sockaddr *) &r_saddr,
(unsigned *) &n)) <= 0)
@@ -511,7 +552,7 @@ static void * ipcp_udp_sdu_reader(void * o)
return (void *) 0;
}
-static void * ipcp_udp_sdu_loop(void * o)
+static void * ipcp_udp_packet_loop(void * o)
{
int fd;
struct shm_du_buff * sdb;
@@ -534,13 +575,14 @@ static void * ipcp_udp_sdu_loop(void * o)
pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_cleanup_push((void (*)(void *)) ipcp_sdb_release,
+ pthread_cleanup_push((void (*)(void *))
+ ipcp_sdb_release,
(void *) sdb);
if (send(fd, shm_du_buff_head(sdb),
shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
0) < 0)
- log_err("Failed to send SDU.");
+ log_err("Failed to send PACKET.");
pthread_cleanup_pop(true);
}
@@ -624,20 +666,20 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf)
goto fail_bind;
}
- if (pthread_create(&udp_data.sdu_reader,
+ if (pthread_create(&udp_data.packet_reader,
NULL,
- ipcp_udp_sdu_reader,
+ ipcp_udp_packet_reader,
NULL)) {
ipcp_set_state(IPCP_INIT);
- goto fail_sdu_reader;
+ goto fail_packet_reader;
}
- if (pthread_create(&udp_data.sduloop,
+ if (pthread_create(&udp_data.packet_loop,
NULL,
- ipcp_udp_sdu_loop,
+ ipcp_udp_packet_loop,
NULL)) {
ipcp_set_state(IPCP_INIT);
- goto fail_sduloop;
+ goto fail_packet_loop;
}
log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid());
@@ -646,10 +688,10 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf)
return 0;
- fail_sduloop:
- pthread_cancel(udp_data.sdu_reader);
- pthread_join(udp_data.sdu_reader, NULL);
- fail_sdu_reader:
+ fail_packet_loop:
+ pthread_cancel(udp_data.packet_reader);
+ pthread_join(udp_data.packet_reader, NULL);
+ fail_packet_reader:
pthread_cancel(udp_data.handler);
pthread_join(udp_data.handler, NULL);
fail_bind:
@@ -753,7 +795,8 @@ static uint32_t ddns_resolve(char * name,
close(pipe_fd[0]);
waitpid(pid, &wstatus, 0);
- if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0)
+ if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0 &&
+ count != SHIM_UDP_BUF_SIZE)
log_dbg("Succesfully communicated with nslookup.");
else
log_err("Failed to resolve DNS address.");
@@ -946,7 +989,7 @@ static int ipcp_udp_query(const uint8_t * hash)
static int ipcp_udp_flow_alloc(int fd,
const uint8_t * dst,
- qoscube_t cube)
+ qosspec_t qs)
{
struct sockaddr_in r_saddr; /* server address */
struct sockaddr_in f_saddr; /* flow */
@@ -956,14 +999,13 @@ static int ipcp_udp_flow_alloc(int fd,
log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst));
- assert(dst);
+ (void) qs;
- if (cube > QOS_CUBE_DATA) {
- log_dbg("Unsupported QoS requested.");
- return -1;
- }
+ assert(dst);
skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (skfd < 0)
+ return -1;
/* this socket is for the flow */
memset((char *) &f_saddr, 0, sizeof(f_saddr));
@@ -1010,7 +1052,7 @@ static int ipcp_udp_flow_alloc(int fd,
pthread_rwlock_unlock(&udp_data.flows_lock);
- if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, cube) < 0) {
+ if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) {
pthread_rwlock_wrlock(&udp_data.flows_lock);
udp_data.fd_to_uf[fd].udp = -1;
@@ -1180,13 +1222,13 @@ int main(int argc,
ipcp_shutdown();
if (ipcp_get_state() == IPCP_SHUTDOWN) {
- pthread_cancel(udp_data.sduloop);
+ pthread_cancel(udp_data.packet_loop);
pthread_cancel(udp_data.handler);
- pthread_cancel(udp_data.sdu_reader);
+ pthread_cancel(udp_data.packet_reader);
- pthread_join(udp_data.sduloop, NULL);
+ pthread_join(udp_data.packet_loop, NULL);
pthread_join(udp_data.handler, NULL);
- pthread_join(udp_data.sdu_reader, NULL);
+ pthread_join(udp_data.packet_reader, NULL);
}
udp_data_fini();