summaryrefslogtreecommitdiff
path: root/src/ipcpd/udp/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/udp/main.c')
-rw-r--r--src/ipcpd/udp/main.c787
1 files changed, 366 insertions, 421 deletions
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index f45a18cb..559be55a 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -30,6 +30,7 @@
#define OUROBOROS_PREFIX "ipcpd/udp"
+#include <ouroboros/bitmap.h>
#include <ouroboros/hash.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
@@ -58,34 +59,50 @@
#define FLOW_REPLY 2
#define THIS_TYPE IPCP_UDP
-#define LISTEN_PORT htons(0x0D1F)
-#define SHIM_UDP_BUF_SIZE 256
-#define SHIM_UDP_MSG_SIZE 256
-#define SHIM_UDP_MAX_PACKET_SIZE 8980
+#define IPCP_UDP_MAX_PACKET_SIZE 8980
+#define OUR_HEADER_LEN sizeof(uint32_t) /* adds eid */
+
+#define IPCP_UDP_BUF_SIZE 256
+#define IPCP_UDP_MSG_SIZE 256
#define DNS_TTL 86400
#define FD_UPDATE_TIMEOUT 100 /* microseconds */
-#define local_ip (udp_data.s_saddr.sin_addr.s_addr)
+#define SERV_PORT udp_data.s_saddr.sin_port;
+#define SERV_SADDR ((struct sockaddr *) &udp_data.s_saddr)
+#define CLNT_SADDR ((struct sockaddr *) &udp_data.c_saddr)
+#define SERV_SADDR_SIZE (sizeof(udp_data.s_saddr))
+#define LOCAL_IP (udp_data.s_saddr.sin_addr.s_addr)
-#define UDP_MAX_PORTS 0xFFFF
+#define MGMT_EID 0
+#define MGMT_FRAME_SIZE 512
+/* Keep order for alignment. */
struct mgmt_msg {
- uint16_t src_udp_port;
- uint16_t dst_udp_port;
+ uint32_t eid;
+ uint32_t s_eid;
+ uint32_t d_eid;
uint8_t code;
- uint8_t response;
- /* QoS parameters from spec, aligned */
+ int8_t response;
+ /* QoS parameters from spec */
uint8_t availability;
uint8_t in_order;
- uint32_t delay;
uint64_t bandwidth;
+ uint32_t delay;
uint32_t loss;
uint32_t ber;
uint32_t max_gap;
} __attribute__((packed));
+struct mgmt_frame {
+ struct list_head next;
+ struct sockaddr_in r_saddr;
+ uint8_t buf[MGMT_FRAME_SIZE];
+};
+
+/* UDP flow */
struct uf {
- int udp;
+ int d_eid;
+ /* IP details are stored through connect(). */
int skfd;
};
@@ -94,25 +111,25 @@ struct {
uint32_t ip_addr;
uint32_t dns_addr;
- /* listen server */
+ /* server socket */
struct sockaddr_in s_saddr;
int s_fd;
+ /* client port */
+ int clt_port;
fset_t * np1_flows;
fqueue_t * fq;
- fd_set flow_fd_s;
- /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */
- int uf_to_fd[FD_SETSIZE];
struct uf fd_to_uf[SYS_MAX_FLOWS];
pthread_rwlock_t flows_lock;
- pthread_t packet_loop;
- pthread_t handler;
- pthread_t packet_reader;
+ pthread_t packet_writer[IPCP_UDP_WR_THR];
+ pthread_t packet_reader[IPCP_UDP_RD_THR];
- bool fd_set_mod;
- pthread_cond_t fd_set_cond;
- pthread_mutex_t fd_set_lock;
+ /* Handle mgmt frames in a different thread */
+ pthread_t mgmt_handler;
+ pthread_mutex_t mgmt_lock;
+ pthread_cond_t mgmt_cond;
+ struct list_head mgmt_frames;
} udp_data;
static int udp_data_init(void)
@@ -120,24 +137,19 @@ static int udp_data_init(void)
int i;
if (pthread_rwlock_init(&udp_data.flows_lock, NULL))
- return -1;
-
- if (pthread_cond_init(&udp_data.fd_set_cond, NULL))
- goto fail_set_cond;
+ goto fail_rwlock_init;
- if (pthread_mutex_init(&udp_data.fd_set_lock, NULL))
- goto fail_set_lock;
+ if (pthread_cond_init(&udp_data.mgmt_cond, NULL))
+ goto fail_mgmt_cond;
- for (i = 0; i < FD_SETSIZE; ++i)
- udp_data.uf_to_fd[i] = -1;
+ if (pthread_mutex_init(&udp_data.mgmt_lock, NULL))
+ goto fail_mgmt_lock;
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
- udp_data.fd_to_uf[i].skfd = -1;
- udp_data.fd_to_uf[i].udp = -1;
+ udp_data.fd_to_uf[i].skfd = -1;
+ udp_data.fd_to_uf[i].d_eid = -1;
}
- FD_ZERO(&udp_data.flow_fd_s);
-
udp_data.np1_flows = fset_create();
if (udp_data.np1_flows == NULL)
goto fail_fset;
@@ -150,88 +162,43 @@ static int udp_data_init(void)
if (udp_data.shim_data == NULL)
goto fail_data;
+ list_head_init(&udp_data.mgmt_frames);
+
return 0;
fail_data:
fqueue_destroy(udp_data.fq);
fail_fqueue:
fset_destroy(udp_data.np1_flows);
fail_fset:
- pthread_mutex_destroy(&udp_data.fd_set_lock);
- fail_set_lock:
- pthread_cond_destroy(&udp_data.fd_set_cond);
- fail_set_cond:
+ pthread_mutex_destroy(&udp_data.mgmt_lock);
+ fail_mgmt_lock:
+ pthread_cond_destroy(&udp_data.mgmt_cond);
+ fail_mgmt_cond:
pthread_rwlock_destroy(&udp_data.flows_lock);
+ fail_rwlock_init:
return -1;
}
static void udp_data_fini(void)
{
- fset_destroy(udp_data.np1_flows);
- fqueue_destroy(udp_data.fq);
-
shim_data_destroy(udp_data.shim_data);
- pthread_rwlock_destroy(&udp_data.flows_lock);
- pthread_mutex_destroy(&udp_data.fd_set_lock);
- pthread_cond_destroy(&udp_data.fd_set_cond);
-}
-
-static void set_fd(int fd)
-{
- pthread_mutex_lock(&udp_data.fd_set_lock);
-
- udp_data.fd_set_mod = true;
- FD_SET(fd, &udp_data.flow_fd_s);
-
- while (udp_data.fd_set_mod)
- pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock);
-
- pthread_mutex_unlock(&udp_data.fd_set_lock);
-}
-
-static void clr_fd(int fd)
-{
- pthread_mutex_lock(&udp_data.fd_set_lock);
-
- udp_data.fd_set_mod = true;
- FD_CLR(fd, &udp_data.flow_fd_s);
-
- while (udp_data.fd_set_mod)
- pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock);
-
- pthread_mutex_unlock(&udp_data.fd_set_lock);
-}
-
-static int send_shim_udp_msg(uint8_t * buf,
- size_t len,
- uint32_t dst_ip_addr)
-{
- struct sockaddr_in r_saddr;
-
- memset((char *)&r_saddr, 0, sizeof(r_saddr));
- r_saddr.sin_family = AF_INET;
- r_saddr.sin_addr.s_addr = dst_ip_addr;
- r_saddr.sin_port = LISTEN_PORT;
-
- if (sendto(udp_data.s_fd, buf, len, 0,
- (struct sockaddr *) &r_saddr,
- sizeof(r_saddr)) == -1) {
- log_err("Failed to send message.");
- return -1;
- }
+ fqueue_destroy(udp_data.fq);
+ fset_destroy(udp_data.np1_flows);
- return 0;
+ pthread_rwlock_destroy(&udp_data.flows_lock);
+ pthread_cond_destroy(&udp_data.mgmt_cond);
+ pthread_mutex_destroy(&udp_data.mgmt_lock);
}
-static int ipcp_udp_port_alloc(uint32_t dst_ip_addr,
- uint16_t src_udp_port,
+static int ipcp_udp_port_alloc(int skfd,
+ uint32_t s_eid,
const uint8_t * dst,
qosspec_t qs)
{
uint8_t * buf;
struct mgmt_msg * msg;
size_t len;
- int ret;
len = sizeof(*msg) + ipcp_dir_hash_len();
@@ -240,8 +207,9 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr,
return -1;
msg = (struct mgmt_msg *) buf;
+ msg->eid = hton32(MGMT_EID);
msg->code = FLOW_REQ;
- msg->src_udp_port = src_udp_port;
+ msg->s_eid = hton32(s_eid);
msg->delay = hton32(qs.delay);
msg->bandwidth = hton64(qs.bandwidth);
msg->availability = qs.availability;
@@ -252,73 +220,63 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
- ret = send_shim_udp_msg(buf, len, dst_ip_addr);
+ if (write(skfd, msg, len) < 0) {
+ free(buf);
+ return -1;
+ }
free(buf);
- return ret;
+ return 0;
}
-static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr,
- uint16_t src_udp_port,
- uint16_t dst_udp_port,
- int response)
+static int ipcp_udp_port_alloc_resp(int skfd,
+ uint32_t s_eid,
+ uint32_t d_eid,
+ int8_t response)
{
- struct mgmt_msg * msg;
- int ret;
+ struct mgmt_msg * msg;
msg = malloc(sizeof(*msg));
if (msg == NULL)
return -1;
- msg->code = FLOW_REPLY;
- msg->src_udp_port = src_udp_port;
- msg->dst_udp_port = dst_udp_port;
- msg->response = response;
+ msg->eid = hton32(MGMT_EID);
+ msg->code = FLOW_REPLY;
+ msg->s_eid = hton32(s_eid);
+ msg->d_eid = hton32(d_eid);
+ msg->response = response;
- ret = send_shim_udp_msg((uint8_t *) msg, sizeof(*msg), dst_ip_addr);
+ if (write(skfd, msg, sizeof(*msg)) < 0) {
+ free(msg);
+ return -1;
+ }
free(msg);
- return ret;
+ return 0;
}
static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
+ int d_eid,
const uint8_t * dst,
qosspec_t qs)
{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- struct timespec abstime;
- struct sockaddr_in f_saddr;
- socklen_t f_saddr_len = sizeof(f_saddr);
- int skfd;
- int fd;
+ struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
+ struct timespec abstime;
+ int skfd;
+ int fd;
- log_dbg("Port request arrived from UDP port %d",
- ntohs(c_saddr->sin_port));
-
- if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (skfd < 0) {
log_err("Could not create UDP socket.");
return -1;
}
- memset((char *) &f_saddr, 0, sizeof(f_saddr));
- f_saddr.sin_family = AF_INET;
- f_saddr.sin_addr.s_addr = local_ip;
- f_saddr.sin_port = 0;
-
- if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {
- log_err("Could not bind to socket.");
- close(skfd);
- return -1;
- }
+ /* Remote listens on server port. Mod of c_saddr allowed. */
+ c_saddr->sin_port = udp_data.s_saddr.sin_port;
- if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {
- log_err("Could not get address from fd.");
- return -1;
- }
-
- /* connect stores the remote address in the file descriptor */
+ /* Connect stores the remote address in the file descriptor. */
if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) {
log_err("Could not connect to remote UDP client.");
close(skfd);
@@ -331,8 +289,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
+ pthread_cond_timedwait(&ipcpi.alloc_cond, &ipcpi.alloc_lock,
&abstime);
}
@@ -354,9 +311,8 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
pthread_rwlock_wrlock(&udp_data.flows_lock);
- udp_data.uf_to_fd[skfd] = fd;
- udp_data.fd_to_uf[fd].skfd = skfd;
- udp_data.fd_to_uf[fd].udp = f_saddr.sin_port;
+ udp_data.fd_to_uf[fd].skfd = skfd;
+ udp_data.fd_to_uf[fd].d_eid = d_eid;
pthread_rwlock_unlock(&udp_data.flows_lock);
@@ -365,224 +321,228 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Pending allocation request, fd %d, UDP port (%d, %d).",
- fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port));
+ log_dbg("Pending allocation request, fd %d, remote eid %d.",
+ fd, d_eid);
return 0;
}
-/* returns the n flow descriptor */
-static int udp_port_to_fd(int udp_port)
+static int ipcp_udp_port_alloc_reply(uint32_t s_eid,
+ uint32_t d_eid,
+ int8_t response)
{
- int i;
-
- for (i = 0; i < SYS_MAX_FLOWS; ++i)
- if (udp_data.fd_to_uf[i].udp == udp_port)
- return i;
-
- return -1;
-}
-
-static int ipcp_udp_port_alloc_reply(uint16_t src_udp_port,
- uint16_t dst_udp_port,
- int response)
-{
- int fd = -1;
- int ret = 0;
- int skfd = -1;
-
struct sockaddr_in t_saddr;
- socklen_t t_saddr_len = sizeof(t_saddr);
+ socklen_t t_saddr_len;
+ int ret = 0;
+ int skfd = -1;
- log_dbg("Received reply for flow on udp port %d.",
- ntohs(dst_udp_port));
+ t_saddr_len = sizeof(t_saddr);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
- fd = udp_port_to_fd(dst_udp_port);
- if (fd < 0) {
+ skfd = udp_data.fd_to_uf[s_eid].skfd;
+ if (skfd < 0) {
pthread_rwlock_unlock(&udp_data.flows_lock);
+ log_err("Got reply for unknown UDP eid: %u.", s_eid);
return -1;
}
- skfd = udp_data.fd_to_uf[fd].skfd;
+ udp_data.fd_to_uf[s_eid].d_eid = d_eid;
pthread_rwlock_unlock(&udp_data.flows_lock);
- /* get the original address with the LISTEN PORT */
if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) {
- log_dbg("Flow with fd %d has no peer.", fd);
+ log_dbg("Flow with fd %d has no peer.", s_eid);
+ close(skfd);
return -1;
}
- /* connect to the flow udp port */
- t_saddr.sin_port = src_udp_port;
-
if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) {
+ log_dbg("Could not connect flow to remote.");
close(skfd);
return -1;
}
- pthread_rwlock_rdlock(&udp_data.flows_lock);
-
- set_fd(skfd);
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
-
- if (ipcp_flow_alloc_reply(fd, response) < 0)
+ if (ipcp_flow_alloc_reply(s_eid, response) < 0) {
+ log_dbg("Failed to reply to flow allocation.");
return -1;
+ }
- log_dbg("Flow allocation completed, UDP ports: (%d, %d).",
- ntohs(dst_udp_port), ntohs(src_udp_port));
+ log_dbg("Flow allocation completed on eids (%d, %d).",
+ s_eid, d_eid);
return ret;
}
-static void * ipcp_udp_listener(void * o)
+static int ipcp_udp_mgmt_frame(const uint8_t * buf,
+ struct sockaddr_in c_saddr)
{
- uint8_t buf[SHIM_UDP_MSG_SIZE];
- ssize_t n = 0;
- struct sockaddr_in c_saddr;
- int sfd = udp_data.s_fd;
+ struct mgmt_msg * msg;
+ qosspec_t qs;
+
+ msg = (struct mgmt_msg *) buf;
+
+ switch (msg->code) {
+ case FLOW_REQ:
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
+ return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid),
+ (uint8_t *) (msg + 1), qs);
+ case FLOW_REPLY:
+ return ipcp_udp_port_alloc_reply(ntoh32(msg->s_eid),
+ ntoh32(msg->d_eid),
+ msg->response);
+ default:
+ log_err("Unknown message received %d.", msg->code);
+ return -1;
+ }
+}
+static void * ipcp_udp_mgmt_handler(void * o)
+{
(void) o;
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ (void *) &udp_data.mgmt_lock);
+
while (true) {
- struct mgmt_msg * msg = NULL;
- qosspec_t qs;
- memset(&buf, 0, SHIM_UDP_MSG_SIZE);
- n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0,
- (struct sockaddr *) &c_saddr,
- (socklen_t *) sizeof(c_saddr));
- if (n < 0)
- continue;
+ struct mgmt_frame * frame;
- /* flow alloc request from other host */
- if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr,
- sizeof(c_saddr.sin_addr.s_addr), AF_INET)
- == NULL)
- continue;
+ pthread_mutex_lock(&udp_data.mgmt_lock);
- msg = (struct mgmt_msg *) buf;
-
- switch (msg->code) {
- case FLOW_REQ:
- c_saddr.sin_port = msg->src_udp_port;
- qs.delay = ntoh32(msg->delay);
- qs.bandwidth = ntoh64(msg->bandwidth);
- qs.availability = msg->availability;
- qs.loss = ntoh32(msg->loss);
- qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
- qs.max_gap = ntoh32(msg->max_gap);
- ipcp_udp_port_req(&c_saddr,
- (uint8_t *) (msg + 1),
- qs);
- break;
- case FLOW_REPLY:
- ipcp_udp_port_alloc_reply(msg->src_udp_port,
- msg->dst_udp_port,
- msg->response);
- break;
- default:
- log_err("Unknown message received %d.", msg->code);
- continue;
- }
+ while (list_is_empty(&udp_data.mgmt_frames))
+ pthread_cond_wait(&udp_data.mgmt_cond,
+ &udp_data.mgmt_lock);
+
+ frame = list_first_entry((&udp_data.mgmt_frames),
+ struct mgmt_frame, next);
+ assert(frame != NULL);
+ list_del(&frame->next);
+
+ pthread_mutex_unlock(&udp_data.mgmt_lock);
- c_saddr.sin_port = LISTEN_PORT;
+ ipcp_udp_mgmt_frame(frame->buf, frame->r_saddr);
+
+ free(frame);
}
- return 0;
+ pthread_cleanup_pop(false);
+
+ return (void *) 0;
}
static void * ipcp_udp_packet_reader(void * o)
{
- ssize_t n;
- int skfd;
- int fd;
- /* FIXME: avoid this copy */
- char buf[SHIM_UDP_MAX_PACKET_SIZE];
- struct sockaddr_in r_saddr;
- struct timeval tv = {0, FD_UPDATE_TIMEOUT};
- fd_set read_fds;
- int flags;
+ uint8_t buf[IPCP_UDP_MAX_PACKET_SIZE];
+ uint8_t * data;
+ ssize_t n;
+ uint32_t eid;
(void) o;
- ipcp_lock_to_core();
+ data = buf + sizeof(uint32_t);
while (true) {
- pthread_rwlock_rdlock(&udp_data.flows_lock);
- pthread_mutex_lock(&udp_data.fd_set_lock);
+ struct mgmt_frame * frame;
+ struct sockaddr_in r_saddr;
+ socklen_t len;
- read_fds = udp_data.flow_fd_s;
- udp_data.fd_set_mod = false;
- pthread_cond_broadcast(&udp_data.fd_set_cond);
+ len = sizeof(r_saddr);
- pthread_mutex_unlock(&udp_data.fd_set_lock);
- pthread_rwlock_unlock(&udp_data.flows_lock);
+ n = recvfrom(udp_data.s_fd, buf, IPCP_UDP_MAX_PACKET_SIZE, 0,
+ (struct sockaddr *) &r_saddr, &len);
+ if (n < 0)
+ continue;
- if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
+ if (n == 0)
+ log_dbg("Got a 0 frame.");
+
+ if ((size_t) n < sizeof(eid)) {
+ log_dbg("Dropped bad frame.");
continue;
+ }
- for (skfd = 0; skfd < FD_SETSIZE; ++skfd) {
- if (!FD_ISSET(skfd, &read_fds))
- continue;
- flags = fcntl(skfd, F_GETFL, 0);
- fcntl(skfd, F_SETFL, flags | O_NONBLOCK);
- n = sizeof(r_saddr);
- if ((n = recvfrom(skfd,
- &buf,
- SHIM_UDP_MAX_PACKET_SIZE,
- 0,
- (struct sockaddr *) &r_saddr,
- (unsigned *) &n)) <= 0)
- continue;
+ eid = ntoh32(*((uint32_t *) buf));
- pthread_rwlock_rdlock(&udp_data.flows_lock);
+ /* pass onto mgmt queue */
+ if (eid == MGMT_EID) {
+ if (n > IPCP_UDP_MSG_SIZE) {
+ log_warn("Dropped oversize management frame.");
+ continue;
+ }
- fd = udp_data.uf_to_fd[skfd];
+ frame = malloc(sizeof(*frame));
+ if (frame == NULL)
+ continue;
- pthread_rwlock_unlock(&udp_data.flows_lock);
+ memcpy(frame->buf, buf, n);
+ memcpy(&frame->r_saddr, &r_saddr, sizeof(r_saddr));
- flow_write(fd, buf, n);
+ pthread_mutex_lock(&udp_data.mgmt_lock);
+ list_add(&frame->next, &udp_data.mgmt_frames);
+ pthread_cond_signal(&udp_data.mgmt_cond);
+ pthread_mutex_unlock(&udp_data.mgmt_lock);
+ continue;
}
+
+ flow_write(eid, data, n - sizeof(eid));
}
- return (void *) 0;
+ return 0;
}
-static void * ipcp_udp_packet_loop(void * o)
+static void * ipcp_udp_packet_writer(void * o)
{
- int fd;
- struct shm_du_buff * sdb;
-
(void) o;
ipcp_lock_to_core();
while (true) {
+ int fd;
+ int eid;
fevent(udp_data.np1_flows, udp_data.fq, NULL);
while ((fd = fqueue_next(udp_data.fq)) >= 0) {
+ struct shm_du_buff * sdb;
+ uint8_t * buf;
+ uint16_t len;
if (ipcp_flow_read(fd, &sdb)) {
- log_err("Bad read from fd %d.", fd);
+ log_dbg("Bad read from fd %d.", fd);
+ continue;
+ }
+
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ if (len > IPCP_UDP_MAX_PACKET_SIZE) {
+ log_dbg("Packet length exceeds MTU.");
+ ipcp_sdb_release(sdb);
+ continue;
+ }
+
+ buf = shm_du_buff_head_alloc(sdb, OUR_HEADER_LEN);
+ if (buf == NULL) {
+ log_dbg("Failed to allocate header.");
+ ipcp_sdb_release(sdb);
continue;
}
pthread_rwlock_rdlock(&udp_data.flows_lock);
+ eid = hton32(udp_data.fd_to_uf[fd].d_eid);
fd = udp_data.fd_to_uf[fd].skfd;
pthread_rwlock_unlock(&udp_data.flows_lock);
+ memcpy(buf, &eid, sizeof(eid));
+
pthread_cleanup_push((void (*)(void *))
- ipcp_sdb_release,
- (void *) sdb);
+ ipcp_sdb_release, (void *) sdb);
- if (send(fd, shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- 0) < 0)
- log_err("Failed to send PACKET.");
+ if (write(fd, buf, len + OUR_HEADER_LEN) < 0)
+ log_err("Failed to send packet.");
pthread_cleanup_pop(true);
}
@@ -593,28 +553,23 @@ static void * ipcp_udp_packet_loop(void * o)
static int ipcp_udp_bootstrap(const struct ipcp_config * conf)
{
- struct sockaddr_in s_saddr;
char ipstr[INET_ADDRSTRLEN];
char dnsstr[INET_ADDRSTRLEN];
- int enable = 1;
- int fd = -1;
+ char portstr[128]; /* port is max 64535 = 5 chars */
+ int i = 1;
assert(conf);
assert(conf->type == THIS_TYPE);
- if (inet_ntop(AF_INET,
- &conf->ip_addr,
- ipstr,
- INET_ADDRSTRLEN) == NULL) {
+ if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN)
+ == NULL) {
log_err("Failed to convert IP address");
return -1;
}
if (conf->dns_addr != 0) {
- if (inet_ntop(AF_INET,
- &conf->dns_addr,
- dnsstr,
- INET_ADDRSTRLEN) == NULL) {
+ if (inet_ntop(AF_INET, &conf->dns_addr, dnsstr, INET_ADDRSTRLEN)
+ == NULL) {
log_err("Failed to convert DNS address");
return -1;
}
@@ -626,76 +581,79 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf)
}
/* UDP listen server */
- if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
- log_err("Can't create socket.");
+ udp_data.s_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (udp_data.s_fd < 0) {
+ log_err("Can't create socket: %s", strerror(errno));
goto fail_socket;
}
- if (setsockopt(fd,
- SOL_SOCKET,
- SO_REUSEADDR,
- &enable,
- sizeof(int)) < 0)
+ if (setsockopt(udp_data.s_fd, SOL_SOCKET, SO_REUSEADDR,
+ &i, sizeof(i)) < 0)
log_warn("Failed to set SO_REUSEADDR.");
- memset((char *) &s_saddr, 0, sizeof(s_saddr));
+ memset((char *) &udp_data.s_saddr, 0, sizeof(udp_data.s_saddr));
udp_data.s_saddr.sin_family = AF_INET;
udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr;
- udp_data.s_saddr.sin_port = LISTEN_PORT;
+ udp_data.s_saddr.sin_port = htons(conf->srv_port);
- if (bind(fd,
- (struct sockaddr *) &udp_data.s_saddr,
- sizeof(udp_data.s_saddr)) < 0) {
+ if (bind(udp_data.s_fd, SERV_SADDR, SERV_SADDR_SIZE) < 0) {
log_err("Couldn't bind to %s.", ipstr);
goto fail_bind;
}
- udp_data.s_fd = fd;
udp_data.ip_addr = conf->ip_addr;
udp_data.dns_addr = conf->dns_addr;
-
- FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s);
+ udp_data.clt_port = htons(conf->clt_port);
ipcp_set_state(IPCP_OPERATIONAL);
- if (pthread_create(&udp_data.handler,
- NULL,
- ipcp_udp_listener,
- NULL)) {
+ if (pthread_create(&udp_data.mgmt_handler, NULL,
+ ipcp_udp_mgmt_handler, NULL)) {
ipcp_set_state(IPCP_INIT);
goto fail_bind;
}
- if (pthread_create(&udp_data.packet_reader,
- NULL,
- ipcp_udp_packet_reader,
- NULL)) {
- ipcp_set_state(IPCP_INIT);
- goto fail_packet_reader;
+ for (i = 0; i < IPCP_UDP_RD_THR; ++i) {
+ if (pthread_create(&udp_data.packet_reader[i], NULL,
+ ipcp_udp_packet_reader, NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_packet_reader;
+ }
}
- if (pthread_create(&udp_data.packet_loop,
- NULL,
- ipcp_udp_packet_loop,
- NULL)) {
- ipcp_set_state(IPCP_INIT);
- goto fail_packet_loop;
+ for (i = 0; i < IPCP_UDP_WR_THR; ++i) {
+ if (pthread_create(&udp_data.packet_writer[i], NULL,
+ ipcp_udp_packet_writer, NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_packet_writer;
+ }
}
+ sprintf(portstr, "%d", conf->clt_port);
+
log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid());
log_dbg("Bound to IP address %s.", ipstr);
+ log_dbg("Client port is %s.", conf->clt_port == 0 ? "random" : portstr);
+ log_dbg("Server port is %u.", conf->srv_port);
log_dbg("DNS server address is %s.", dnsstr);
return 0;
- fail_packet_loop:
- pthread_cancel(udp_data.packet_reader);
- pthread_join(udp_data.packet_reader, NULL);
+ fail_packet_writer:
+ while (i > 0) {
+ pthread_cancel(udp_data.packet_writer[--i]);
+ pthread_join(udp_data.packet_writer[i], NULL);
+ }
+ i = IPCP_UDP_RD_THR;
fail_packet_reader:
- pthread_cancel(udp_data.handler);
- pthread_join(udp_data.handler, NULL);
+ while (i > 0) {
+ pthread_cancel(udp_data.packet_reader[--i]);
+ pthread_join(udp_data.packet_reader[i], NULL);
+ }
+ pthread_cancel(udp_data.mgmt_handler);
+ pthread_join(udp_data.mgmt_handler, NULL);
fail_bind:
- close(fd);
+ close(udp_data.s_fd);
fail_socket:
return -1;
}
@@ -705,9 +663,9 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf)
/* NOTE: Disgusted with this crap */
static int ddns_send(char * cmd)
{
- pid_t pid = -1;
- int wstatus;
- int pipe_fd[2];
+ pid_t pid = -1;
+ int wstatus;
+ int pipe_fd[2];
char * argv[] = {NSUPDATE_EXEC, 0};
char * envp[] = {0};
@@ -743,22 +701,23 @@ static int ddns_send(char * cmd)
log_err("Failed to register with DNS server.");
close(pipe_fd[1]);
+
return 0;
}
static uint32_t ddns_resolve(char * name,
uint32_t dns_addr)
{
- pid_t pid = -1;
- int wstatus;
- int pipe_fd[2];
- char dnsstr[INET_ADDRSTRLEN];
- char buf[SHIM_UDP_BUF_SIZE];
- ssize_t count = 0;
- char * substr = NULL;
- char * substr2 = NULL;
- char * addr_str = "Address:";
- uint32_t ip_addr = 0;
+ pid_t pid = -1;
+ int wstatus;
+ int pipe_fd[2];
+ char dnsstr[INET_ADDRSTRLEN];
+ char buf[IPCP_UDP_BUF_SIZE];
+ ssize_t count = 0;
+ char * substr = NULL;
+ char * substr2 = NULL;
+ char * addr_str = "Address:";
+ uint32_t ip_addr = 0;
if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL)
return 0;
@@ -785,7 +744,7 @@ static uint32_t ddns_resolve(char * name,
close(pipe_fd[1]);
- count = read(pipe_fd[0], buf, SHIM_UDP_BUF_SIZE);
+ count = read(pipe_fd[0], buf, IPCP_UDP_BUF_SIZE);
if (count <= 0) {
log_err("Failed to communicate with nslookup.");
close(pipe_fd[0]);
@@ -796,7 +755,7 @@ static uint32_t ddns_resolve(char * name,
waitpid(pid, &wstatus, 0);
if (WIFEXITED(wstatus) && WEXITSTATUS(wstatus) == 0 &&
- count != SHIM_UDP_BUF_SIZE)
+ count != IPCP_UDP_BUF_SIZE)
log_dbg("Succesfully communicated with nslookup.");
else
log_err("Failed to resolve DNS address.");
@@ -825,13 +784,13 @@ static uint32_t ddns_resolve(char * name,
static int ipcp_udp_reg(const uint8_t * hash)
{
#ifdef HAVE_DDNS
- char ipstr[INET_ADDRSTRLEN];
- char dnsstr[INET_ADDRSTRLEN];
- char cmd[1000];
+ char ipstr[INET_ADDRSTRLEN];
+ char dnsstr[INET_ADDRSTRLEN];
+ char cmd[1000];
uint32_t dns_addr;
uint32_t ip_addr;
#endif
- char * hashstr;
+ char * hashstr;
hashstr = malloc(ipcp_dir_hash_strlen() + 1);
if (hashstr == NULL)
@@ -888,12 +847,12 @@ static int ipcp_udp_reg(const uint8_t * hash)
static int ipcp_udp_unreg(const uint8_t * hash)
{
#ifdef HAVE_DDNS
- char dnsstr[INET_ADDRSTRLEN];
+ char dnsstr[INET_ADDRSTRLEN];
/* max DNS name length + max IP length + max command length */
- char cmd[100];
+ char cmd[100];
uint32_t dns_addr;
#endif
- char * hashstr;
+ char * hashstr;
assert(hash);
@@ -932,13 +891,12 @@ static int ipcp_udp_unreg(const uint8_t * hash)
static int ipcp_udp_query(const uint8_t * hash)
{
- uint32_t ip_addr = 0;
- char * hashstr;
- struct hostent * h;
+ uint32_t ip_addr = 0;
+ char * hashstr;
+ struct hostent * h;
#ifdef HAVE_DDNS
- uint32_t dns_addr = 0;
+ uint32_t dns_addr = 0;
#endif
-
assert(hash);
hashstr = malloc(ipcp_dir_hash_strlen() + 1);
@@ -991,11 +949,14 @@ static int ipcp_udp_flow_alloc(int fd,
const uint8_t * dst,
qosspec_t qs)
{
- struct sockaddr_in r_saddr; /* server address */
- struct sockaddr_in f_saddr; /* flow */
- socklen_t f_saddr_len = sizeof(f_saddr);
+ struct sockaddr_in r_saddr; /* Server address */
+ struct sockaddr_in c_saddr; /* Client address */
+ socklen_t c_saddr_len;
int skfd;
uint32_t ip_addr = 0;
+ char ip_str[INET_ADDRSTRLEN];
+
+ c_saddr_len = sizeof(c_saddr);
log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst));
@@ -1004,21 +965,24 @@ static int ipcp_udp_flow_alloc(int fd,
assert(dst);
skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if (skfd < 0)
+ if (skfd < 0) {
+ log_err("Could not create socket.");
return -1;
+ }
- /* this socket is for the flow */
- memset((char *) &f_saddr, 0, sizeof(f_saddr));
- f_saddr.sin_family = AF_INET;
- f_saddr.sin_addr.s_addr = local_ip;
- f_saddr.sin_port = 0;
+ /* This socket is for the flow. */
+ memset((char *) &c_saddr, 0, sizeof(c_saddr));
+ c_saddr.sin_family = AF_INET;
+ c_saddr.sin_addr.s_addr = LOCAL_IP;
+ c_saddr.sin_port = udp_data.clt_port;
- if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {
+ if (bind(skfd, (struct sockaddr *) &c_saddr, sizeof(c_saddr)) < 0) {
+ log_dbg("Could not bind socket to client address.");
close(skfd);
return -1;
}
- if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {
+ if (getsockname(skfd, (struct sockaddr *) &c_saddr, &c_saddr_len) < 0) {
log_err("Could not get address from fd.");
close(skfd);
return -1;
@@ -1029,43 +993,41 @@ static int ipcp_udp_flow_alloc(int fd,
close(skfd);
return -1;
}
+
ip_addr = (uint32_t) shim_data_dir_get_addr(udp_data.shim_data, dst);
- /* connect to server (store the remote IP address in the fd) */
+ inet_ntop(AF_INET, &ip_addr, ip_str, INET_ADDRSTRLEN);
+ log_dbg("Destination UDP ipcp resolved at %s.", ip_str);
+
+ /* Connect to server and store the remote IP address in the skfd. */
memset((char *) &r_saddr, 0, sizeof(r_saddr));
r_saddr.sin_family = AF_INET;
r_saddr.sin_addr.s_addr = ip_addr;
- r_saddr.sin_port = LISTEN_PORT;
+ r_saddr.sin_port = udp_data.s_saddr.sin_port;
if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
+ log_dbg("Could not connect socket to remote.");
+ close(skfd);
+ return -1;
+ }
+
+ if (ipcp_udp_port_alloc(skfd, fd, dst, qs) < 0) {
+ log_err("Could not allocate port.");
close(skfd);
return -1;
}
pthread_rwlock_wrlock(&udp_data.flows_lock);
- udp_data.fd_to_uf[fd].udp = f_saddr.sin_port;
- udp_data.fd_to_uf[fd].skfd = skfd;
- udp_data.uf_to_fd[skfd] = fd;
+ udp_data.fd_to_uf[fd].d_eid = -1;
+ udp_data.fd_to_uf[fd].skfd = skfd;
fset_add(udp_data.np1_flows, fd);
pthread_rwlock_unlock(&udp_data.flows_lock);
- if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst, qs) < 0) {
- pthread_rwlock_wrlock(&udp_data.flows_lock);
-
- udp_data.fd_to_uf[fd].udp = -1;
- udp_data.fd_to_uf[fd].skfd = -1;
- udp_data.uf_to_fd[skfd] = -1;
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
- close(skfd);
- return -1;
- }
-
- log_dbg("Flow pending on fd %d, UDP port %d.",
- fd, ntohs(f_saddr.sin_port));
+ log_dbg("Flow pending on fd %d, UDP src port %d, dst port %d.",
+ fd, ntohs(c_saddr.sin_port), ntohs(r_saddr.sin_port));
return 0;
}
@@ -1073,12 +1035,10 @@ static int ipcp_udp_flow_alloc(int fd,
static int ipcp_udp_flow_alloc_resp(int fd,
int response)
{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- struct timespec abstime;
- int skfd = -1;
- struct sockaddr_in f_saddr;
- struct sockaddr_in r_saddr;
- socklen_t len = sizeof(r_saddr);
+ struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
+ struct timespec abstime;
+ int skfd;
+ int d_eid;
if (response)
return 0;
@@ -1106,38 +1066,23 @@ static int ipcp_udp_flow_alloc_resp(int fd,
pthread_rwlock_rdlock(&udp_data.flows_lock);
- skfd = udp_data.fd_to_uf[fd].skfd;
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
-
- if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) {
- log_dbg("Socket with fd %d has no address.", skfd);
- return -1;
- }
-
- if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) {
- log_dbg("Socket with fd %d has no peer.", skfd);
- return -1;
- }
-
- pthread_rwlock_rdlock(&udp_data.flows_lock);
-
- set_fd(skfd);
+ skfd = udp_data.fd_to_uf[fd].skfd;
+ d_eid = udp_data.fd_to_uf[fd].d_eid;
fset_add(udp_data.np1_flows, fd);
pthread_rwlock_unlock(&udp_data.flows_lock);
- if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, f_saddr.sin_port,
- r_saddr.sin_port, response) < 0) {
+ if (ipcp_udp_port_alloc_resp(skfd, d_eid, fd, response) < 0) {
pthread_rwlock_rdlock(&udp_data.flows_lock);
- clr_fd(skfd);
+ fset_del(udp_data.np1_flows, fd);
pthread_rwlock_unlock(&udp_data.flows_lock);
+ log_err("Failed to respond to flow request.");
return -1;
}
- log_dbg("Accepted flow, fd %d on UDP port %d.",
- fd, ntohs(f_saddr.sin_port));
+ log_dbg("Accepted flow, fd %d on eid %d.",
+ fd, d_eid);
return 0;
}
@@ -1154,18 +1099,12 @@ static int ipcp_udp_flow_dealloc(int fd)
skfd = udp_data.fd_to_uf[fd].skfd;
- udp_data.uf_to_fd[skfd] = -1;
- udp_data.fd_to_uf[fd].udp = -1;
- udp_data.fd_to_uf[fd].skfd = -1;
+ udp_data.fd_to_uf[fd].d_eid = -1;
+ udp_data.fd_to_uf[fd].skfd = -1;
close(skfd);
pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
-
- clr_fd(skfd);
-
- pthread_rwlock_unlock(&udp_data.flows_lock);
flow_dealloc(fd);
@@ -1191,6 +1130,8 @@ static struct ipcp_ops udp_ops = {
int main(int argc,
char * argv[])
{
+ int i;
+
if (ipcp_init(argc, argv, &udp_ops) < 0)
goto fail_init;
@@ -1212,13 +1153,17 @@ int main(int argc,
ipcp_shutdown();
if (ipcp_get_state() == IPCP_SHUTDOWN) {
- pthread_cancel(udp_data.packet_loop);
- pthread_cancel(udp_data.handler);
- pthread_cancel(udp_data.packet_reader);
-
- pthread_join(udp_data.packet_loop, NULL);
- pthread_join(udp_data.handler, NULL);
- pthread_join(udp_data.packet_reader, NULL);
+ for (i = 0; i < IPCP_UDP_RD_THR; ++i)
+ pthread_cancel(udp_data.packet_reader[i]);
+ for (i = 0; i < IPCP_UDP_WR_THR; ++i)
+ pthread_cancel(udp_data.packet_writer[i]);
+ pthread_cancel(udp_data.mgmt_handler);
+
+ for (i = 0; i < IPCP_UDP_RD_THR; ++i)
+ pthread_join(udp_data.packet_reader[i], NULL);
+ for (i = 0; i < IPCP_UDP_WR_THR; ++i)
+ pthread_join(udp_data.packet_writer[i], NULL);
+ pthread_join(udp_data.mgmt_handler, NULL);
}
udp_data_fini();