summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/broadcast/dt.c2
-rw-r--r--src/ipcpd/broadcast/main.c2
-rw-r--r--src/ipcpd/config.h.in13
-rw-r--r--src/ipcpd/eth/eth.c508
-rw-r--r--src/ipcpd/ipcp.c3
-rw-r--r--src/ipcpd/ipcp.h1
-rw-r--r--src/ipcpd/local/main.c9
-rw-r--r--src/ipcpd/udp/udp.c74
-rw-r--r--src/ipcpd/unicast/dt.c32
-rw-r--r--src/ipcpd/unicast/fa.c27
-rw-r--r--src/ipcpd/unicast/routing/graph.c8
-rw-r--r--src/ipcpd/unicast/routing/link-state.c2
12 files changed, 570 insertions, 111 deletions
diff --git a/src/ipcpd/broadcast/dt.c b/src/ipcpd/broadcast/dt.c
index 30e89a4f..95483e33 100644
--- a/src/ipcpd/broadcast/dt.c
+++ b/src/ipcpd/broadcast/dt.c
@@ -28,7 +28,7 @@
#include "config.h"
-#define BROADCAST_MTU 1400 /* FIXME: avoid packet copy. */
+#define BROADCAST_MTU IPCP_BROADCAST_MTU /* FIXME: avoid packet copy. */
#define DT "dt"
#define OUROBOROS_PREFIX DT
diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c
index b3cbdc79..77e22531 100644
--- a/src/ipcpd/broadcast/main.c
+++ b/src/ipcpd/broadcast/main.c
@@ -242,7 +242,7 @@ static int broadcast_ipcp_join(int fd,
notifier_event(NOTIFY_DT_CONN_ADD, &conn);
- ipcp_flow_alloc_reply(fd, 0, mpl, &data);
+ ipcp_flow_alloc_reply(fd, 0, mpl, IPCP_BROADCAST_MTU, &data);
return 0;
}
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in
index 0b4252e5..7edec526 100644
--- a/src/ipcpd/config.h.in
+++ b/src/ipcpd/config.h.in
@@ -23,8 +23,8 @@
#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@
#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@
-#define PROG_RES_FDS @PROG_RES_FDS@
-#define PROG_MAX_FLOWS @PROG_MAX_FLOWS@
+#define PROC_RES_FDS @PROC_RES_FDS@
+#define PROC_MAX_FLOWS @PROC_MAX_FLOWS@
#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@
#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@
@@ -46,11 +46,13 @@
#define IPCP_SCHED_THR_MUL @IPCP_SCHED_THR_MUL@
#define PFT_SIZE @PFT_SIZE@
#define IPCP_UNICAST_MPL @IPCP_UNICAST_MPL@
+#define IPCP_UNICAST_MTU @IPCP_UNICAST_MTU@
#define CONNMGR_RCV_TIMEOUT @CONNMGR_RCV_TIMEOUT@
#cmakedefine DISABLE_CORE_LOCK
#cmakedefine BUILD_CONTAINER
#cmakedefine IPCP_FLOW_STATS
+#cmakedefine IPCP_ETH_FLOW_STATS
#cmakedefine IPCP_DEBUG_LOCAL
#ifdef CONFIG_OUROBOROS_DEBUG
#cmakedefine DEBUG_PROTO_DHT
@@ -65,6 +67,8 @@
#define IPCP_UDP_RD_THR @IPCP_UDP_RD_THR@
#define IPCP_UDP_WR_THR @IPCP_UDP_WR_THR@
#define IPCP_UDP_MPL @IPCP_UDP_MPL@
+#define IPCP_UDP4_MTU @IPCP_UDP4_MTU@
+#define IPCP_UDP6_MTU @IPCP_UDP6_MTU@
/* eth */
#cmakedefine HAVE_NETMAP
@@ -76,10 +80,13 @@
#define IPCP_ETH_LO_MTU @IPCP_ETH_LO_MTU@
#define IPCP_ETH_MGMT_FRAME_SIZE @IPCP_ETH_MGMT_FRAME_SIZE@
#define IPCP_ETH_MPL @IPCP_ETH_MPL@
+#define IPCP_ETH_SNDBUF @IPCP_ETH_SNDBUF@
+#define IPCP_ETH_RCVBUF @IPCP_ETH_RCVBUF@
/* local */
#define IPCP_LOCAL_MPL @IPCP_LOCAL_MPL@
+#define IPCP_LOCAL_MTU @IPCP_LOCAL_MTU@
/* broadcast */
-/* local */
#define IPCP_BROADCAST_MPL @IPCP_BROADCAST_MPL@
+#define IPCP_BROADCAST_MTU @IPCP_BROADCAST_MTU@
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 8293ac15..103ba881 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -37,6 +37,7 @@
#include "config.h"
+#include <ouroboros/atomics.h>
#include <ouroboros/endian.h>
#include <ouroboros/hash.h>
#include <ouroboros/errno.h>
@@ -51,6 +52,14 @@
#include <ouroboros/time.h>
#include <ouroboros/fccntl.h>
#include <ouroboros/pthread.h>
+#include <ouroboros/rib.h>
+
+#ifndef IPCP_ETH_FLOW_STATS
+#undef FETCH_ADD_RELAXED
+#define FETCH_ADD_RELAXED(p, v) ((void) 0)
+#undef FETCH_SUB_RELAXED
+#define FETCH_SUB_RELAXED(p, v) ((void) 0)
+#endif
#include "ipcp.h"
#include "np1.h"
@@ -141,9 +150,11 @@
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX)
#endif
-#define NAME_QUERY_TIMEO 2000 /* ms */
-#define MGMT_TIMEO 100 /* ms */
+#define NAME_QUERY_TIMEO 1900 /* ms total budget */
+#define NAME_QUERY_RETRIES 3 /* retransmits, 4 attempts total */
+#define MGMT_TIMEO 100 /* ms */
#define MGMT_FRAME_SIZE IPCP_ETH_MGMT_FRAME_SIZE
+#define ETH_RIB_PATH "eth"
#define FLOW_REQ 0
#define FLOW_REPLY 1
@@ -169,7 +180,7 @@ struct mgmt_msg {
uint32_t delay;
uint32_t timeout;
int32_t response;
- uint8_t in_order;
+ uint8_t service;
#if defined (BUILD_ETH_DIX)
uint8_t code;
uint8_t availability;
@@ -201,6 +212,17 @@ struct ef {
int8_t r_sap;
#endif
uint8_t r_addr[MAC_SIZE];
+#ifdef IPCP_ETH_FLOW_STATS
+ struct {
+ time_t stamp;
+ size_t p_rcv;
+ size_t b_rcv;
+ size_t p_dlv_f;
+ size_t p_snd;
+ size_t b_snd;
+ size_t p_snd_f;
+ } stat;
+#endif
};
struct mgmt_frame {
@@ -238,6 +260,22 @@ struct {
struct ef * fd_to_ef;
fset_t * np1_flows;
pthread_rwlock_t flows_lock;
+#ifdef IPCP_ETH_FLOW_STATS
+ struct {
+ size_t n_flows;
+ size_t n_rcv;
+ size_t n_snd;
+ size_t n_mgmt_rcv;
+ size_t n_mgmt_snd;
+ size_t n_bad_id;
+ size_t n_dlv_f;
+ size_t n_buf_f;
+ size_t n_rcv_f;
+ size_t n_snd_f;
+ size_t kern_rcv;
+ size_t kern_drp;
+ } stat;
+#endif
pthread_t packet_writer[IPCP_ETH_WR_THR];
pthread_t packet_reader[IPCP_ETH_RD_THR];
@@ -289,7 +327,14 @@ static int eth_data_init(void)
eth_data.fd_to_ef[i].r_sap = -1;
#endif
memset(&eth_data.fd_to_ef[i].r_addr, 0, MAC_SIZE);
+#ifdef IPCP_ETH_FLOW_STATS
+ memset(&eth_data.fd_to_ef[i].stat, 0,
+ sizeof(eth_data.fd_to_ef[i].stat));
+#endif
}
+#ifdef IPCP_ETH_FLOW_STATS
+ memset(&eth_data.stat, 0, sizeof(eth_data.stat));
+#endif
eth_data.shim_data = shim_data_create();
if (eth_data.shim_data == NULL)
@@ -362,6 +407,227 @@ static void eth_data_fini(void)
free(eth_data.fd_to_ef);
}
+#ifdef IPCP_ETH_FLOW_STATS
+static int eth_rib_read(const char * path,
+ char * buf,
+ size_t len)
+{
+ struct ef * flow;
+ int fd;
+ char tmstr[RIB_TM_STRLEN];
+ struct tm * tm;
+ time_t stamp;
+ char * entry;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
+
+ if (len < 2048)
+ return 0;
+
+ buf[0] = '\0';
+
+ if (strcmp(entry, "summary") == 0) {
+ int n;
+#if defined(HAVE_RAW_SOCKETS)
+ int rcvbuf = 0;
+ int sndbuf = 0;
+ int queued = 0;
+ socklen_t optlen = sizeof(rcvbuf);
+# if defined(__linux__)
+ struct tpacket_stats tp_stats;
+ socklen_t tp_len = sizeof(tp_stats);
+# endif
+
+ getsockopt(eth_data.s_fd, SOL_SOCKET,
+ SO_RCVBUF, &rcvbuf, &optlen);
+ optlen = sizeof(sndbuf);
+ getsockopt(eth_data.s_fd, SOL_SOCKET,
+ SO_SNDBUF, &sndbuf, &optlen);
+ ioctl(eth_data.s_fd, FIONREAD, &queued);
+# if defined(__linux__)
+ if (getsockopt(eth_data.s_fd, SOL_PACKET,
+ PACKET_STATISTICS,
+ &tp_stats, &tp_len) == 0) {
+ FETCH_ADD_RELAXED(&eth_data.stat.kern_rcv,
+ tp_stats.tp_packets);
+ FETCH_ADD_RELAXED(&eth_data.stat.kern_drp,
+ tp_stats.tp_drops);
+ }
+# endif
+#endif
+ n = sprintf(buf,
+ "Active flows: %20zu\n"
+ "Total frames received: %20zu\n"
+ "Total frames sent: %20zu\n"
+ "Management frames received: %20zu\n"
+ "Management frames sent: %20zu\n"
+ "Bad EID/SAP frames: %20zu\n"
+ "Delivery (N+1) failures: %20zu\n"
+ "Buffer alloc failures: %20zu\n"
+ "Frame read failures: %20zu\n"
+ "Frame send failures: %20zu\n",
+ LOAD_RELAXED(&eth_data.stat.n_flows),
+ LOAD_RELAXED(&eth_data.stat.n_rcv),
+ LOAD_RELAXED(&eth_data.stat.n_snd),
+ LOAD_RELAXED(&eth_data.stat.n_mgmt_rcv),
+ LOAD_RELAXED(&eth_data.stat.n_mgmt_snd),
+ LOAD_RELAXED(&eth_data.stat.n_bad_id),
+ LOAD_RELAXED(&eth_data.stat.n_dlv_f),
+ LOAD_RELAXED(&eth_data.stat.n_buf_f),
+ LOAD_RELAXED(&eth_data.stat.n_rcv_f),
+ LOAD_RELAXED(&eth_data.stat.n_snd_f));
+#if defined(HAVE_RAW_SOCKETS)
+ n += sprintf(buf + n,
+ "Socket rcvbuf (bytes): %20d\n"
+ "Socket sndbuf (bytes): %20d\n"
+ "Socket queued (bytes): %20d\n",
+ rcvbuf, sndbuf, queued);
+# if defined(__linux__)
+ n += sprintf(buf + n,
+ "Kernel frames received: %20zu\n"
+ "Kernel frames dropped: %20zu\n",
+ LOAD_RELAXED(&eth_data.stat.kern_rcv),
+ LOAD_RELAXED(&eth_data.stat.kern_drp));
+# endif
+#endif
+ return n;
+ }
+
+ fd = atoi(entry);
+
+ if (fd < 0 || fd >= SYS_MAX_FLOWS)
+ return -1;
+
+ flow = &eth_data.fd_to_ef[fd];
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
+
+ stamp = flow->stat.stamp;
+ if (stamp == 0) {
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+ return 0;
+ }
+
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
+ tm = gmtime(&stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ sprintf(buf,
+ "Flow established at: %20s\n"
+ "Sent (packets): %20zu\n"
+ "Sent (bytes): %20zu\n"
+ "Send failed (packets): %20zu\n"
+ "Received (packets): %20zu\n"
+ "Received (bytes): %20zu\n"
+ "Delivery (N+1) failures: %20zu\n",
+ tmstr,
+ LOAD_RELAXED(&flow->stat.p_snd),
+ LOAD_RELAXED(&flow->stat.b_snd),
+ LOAD_RELAXED(&flow->stat.p_snd_f),
+ LOAD_RELAXED(&flow->stat.p_rcv),
+ LOAD_RELAXED(&flow->stat.b_rcv),
+ LOAD_RELAXED(&flow->stat.p_dlv_f));
+
+ return strlen(buf);
+}
+
+static int eth_rib_readdir(char *** buf)
+{
+ char entry[RIB_PATH_LEN + 1];
+ size_t i;
+ int idx = 0;
+ int n_entries;
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
+
+ n_entries = (int) LOAD_RELAXED(&eth_data.stat.n_flows) + 1;
+
+ *buf = malloc(sizeof(**buf) * n_entries);
+ if (*buf == NULL)
+ goto fail_entries;
+
+ (*buf)[idx] = malloc(strlen("summary") + 1);
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
+
+ strcpy((*buf)[idx++], "summary");
+
+ for (i = 0; i < SYS_MAX_FLOWS && idx < n_entries; ++i) {
+ if (eth_data.fd_to_ef[i].stat.stamp == 0)
+ continue;
+
+ sprintf(entry, "%zu", i);
+
+ (*buf)[idx] = malloc(strlen(entry) + 1);
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
+
+ strcpy((*buf)[idx++], entry);
+ }
+
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
+ return idx;
+
+ fail_entry:
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(*buf);
+ fail_entries:
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+ return -ENOMEM;
+}
+
+static int eth_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ int fd;
+ char * entry;
+ struct ef * flow;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
+
+ if (strcmp(entry, "summary") == 0) {
+ attr->size = 2048;
+ attr->mtime = 0;
+ return 0;
+ }
+
+ fd = atoi(entry);
+
+ if (fd < 0 || fd >= SYS_MAX_FLOWS) {
+ attr->size = 0;
+ attr->mtime = 0;
+ return 0;
+ }
+
+ flow = &eth_data.fd_to_ef[fd];
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
+
+ if (flow->stat.stamp != 0) {
+ attr->size = 2048;
+ attr->mtime = flow->stat.stamp;
+ } else {
+ attr->size = 0;
+ attr->mtime = 0;
+ }
+
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
+ return 0;
+}
+
+static struct rib_ops eth_r_ops = {
+ .read = eth_rib_read,
+ .readdir = eth_rib_readdir,
+ .getattr = eth_rib_getattr
+};
+#endif /* IPCP_ETH_FLOW_STATS */
+
#ifdef BUILD_ETH_LLC
static uint8_t reverse_bits(uint8_t b)
{
@@ -451,10 +717,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr,
}
assert(FD_ISSET(eth_data.s_fd, &fds));
- if (sendto(eth_data.s_fd,
- frame,
- frame_len,
- 0,
+ if (sendto(eth_data.s_fd, frame, frame_len, 0,
(struct sockaddr *) &eth_data.device,
sizeof(eth_data.device)) <= 0) {
log_dbg("Failed to send message: %s.", strerror(errno));
@@ -462,6 +725,8 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr,
}
#endif /* HAVE_NETMAP */
+ FETCH_ADD_RELAXED(&eth_data.stat.n_snd, 1);
+
return 0;
}
@@ -501,7 +766,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
msg->availability = qs.availability;
msg->loss = hton32(qs.loss);
msg->ber = hton32(qs.ber);
- msg->in_order = qs.in_order;
+ msg->service = qs.service;
msg->max_gap = hton32(qs.max_gap);
msg->timeout = hton32(qs.timeout);
@@ -519,6 +784,9 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
buf, len + data->len);
free(buf);
+ if (ret == 0)
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
return ret;
}
@@ -569,6 +837,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
return -1;
}
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
free(buf);
return 0;
@@ -586,7 +856,8 @@ static int eth_ipcp_req(uint8_t * r_addr,
{
int fd;
- fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL,
+ ETH_MAX_PACKET_SIZE, data);
if (fd < 0) {
log_err("Could not get new flow from IRMd.");
return -1;
@@ -633,7 +904,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr,
fd = eth_data.ef_to_fd[dsap];
#endif
if (fd < 0) {
- pthread_rwlock_unlock(& eth_data.flows_lock);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
log_err("No flow found with that SAP.");
return -1; /* -EFLOWNOTFOUND */
}
@@ -658,7 +929,8 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr,
#elif defined(BUILD_ETH_LLC)
log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap);
#endif
- if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) {
+ if ((ret = ipcp_flow_alloc_reply(fd, response, mpl,
+ ETH_MAX_PACKET_SIZE, data)) < 0) {
log_err("Failed to reply to flow allocation.");
return -1;
}
@@ -700,6 +972,8 @@ static int eth_ipcp_name_query_req(const uint8_t * hash,
return -1;
}
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
free(buf);
}
@@ -746,7 +1020,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
qs.availability = msg->availability;
qs.loss = ntoh32(msg->loss);
qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
+ qs.service = msg->service;
qs.max_gap = ntoh32(msg->max_gap);
qs.timeout = ntoh32(msg->timeout);
@@ -861,6 +1135,10 @@ static void * eth_ipcp_packet_reader(void * o)
fd_set fds;
int frame_len;
#endif
+#if defined(HAVE_RAW_SOCKETS)
+ struct sockaddr_ll src;
+ socklen_t slen;
+#endif
size_t eth_len;
uint8_t hcs;
struct eth_frame * e_frame;
@@ -900,24 +1178,58 @@ static void * eth_ipcp_packet_reader(void * o)
if (select(eth_data.s_fd + 1, &fds, NULL, NULL, NULL) < 0)
continue;
assert(FD_ISSET(eth_data.s_fd, &fds));
- if (ipcp_spb_reserve(&spb, ETH_MTU))
+ if (ipcp_spb_reserve(&spb, ETH_MTU)) {
+ FETCH_ADD_RELAXED(&eth_data.stat.n_buf_f, 1);
continue;
- buf = ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE);
+ }
+ buf = ssm_pk_buff_push(spb, ETH_HEADER_TOT_SIZE);
if (buf == NULL) {
log_dbg("Failed to allocate header.");
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_buf_f, 1);
continue;
}
- frame_len = recv(eth_data.s_fd, buf,
- ETH_MTU + ETH_HEADER_TOT_SIZE, 0);
+ slen = sizeof(src);
+ /* MSG_DONTWAIT: RD_THR>1 race-loser bails with EAGAIN. */
+ frame_len = recvfrom(eth_data.s_fd, buf,
+ ETH_MTU + ETH_HEADER_TOT_SIZE,
+ MSG_DONTWAIT,
+ (struct sockaddr *) &src, &slen);
#endif
- if (frame_len <= 0) {
- log_dbg("Failed to receive frame.");
+ if (frame_len == 0) {
+ ipcp_spb_release(spb);
+ continue; /* Spurious */
+ }
+
+ if (frame_len < 0) {
ipcp_spb_release(spb);
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ continue;
+
+ log_dbg("Failed to rcv frame: %s.", strerror(errno));
+ FETCH_ADD_RELAXED(&eth_data.stat.n_rcv_f, 1);
continue;
}
#endif
+#if defined(HAVE_NETMAP)
+ eth_len = hdr.len;
+#elif defined(HAVE_BPF)
+ eth_len = ((struct bpf_hdr *) buf)->bh_caplen;
+#else
+ eth_len = (size_t) frame_len;
+#endif
+ /* Defense in depth: reject before parsing dereferences. */
+ if (eth_len < ETH_HEADER_TOT_SIZE)
+ goto fail_frame;
+
+#if defined(HAVE_RAW_SOCKETS)
+ /* Drop our own egress. */
+ if (src.sll_pkttype == PACKET_OUTGOING)
+ goto fail_frame;
+#endif
+
#if defined(HAVE_BPF) && !defined(HAVE_NETMAP)
e_frame = (struct eth_frame *)
(buf + ((struct bpf_hdr *) buf)->bh_hdrlen);
@@ -935,6 +1247,8 @@ static void * eth_ipcp_packet_reader(void * o)
e_frame->dst_hwaddr,
MAC_SIZE) &&
memcmp(br_addr, e_frame->dst_hwaddr, MAC_SIZE)) {
+ FETCH_ADD_RELAXED(&eth_data.stat.n_bad_id, 1);
+ goto fail_frame;
}
#endif
length = ntohs(e_frame->length);
@@ -959,13 +1273,6 @@ static void * eth_ipcp_packet_reader(void * o)
ssap = reverse_bits(e_frame->ssap);
#endif
-#if defined(HAVE_NETMAP)
- eth_len = hdr.len;
-#elif defined(HAVE_BPF)
- eth_len = ((struct bpf_hdr *) buf)->bh_caplen;
-#else
- eth_len = (size_t) frame_len;
-#endif
if (eth_len < ETH_HEADER_TOT_SIZE + (size_t) length)
goto fail_frame;
@@ -1008,6 +1315,8 @@ static void * eth_ipcp_packet_reader(void * o)
list_add(&frame->next, &eth_data.mgmt_frames);
pthread_cond_signal(&eth_data.mgmt_cond);
pthread_mutex_unlock(&eth_data.mgmt_lock);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_rcv, 1);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_rcv, 1);
} else {
pthread_rwlock_rdlock(&eth_data.flows_lock);
@@ -1018,6 +1327,7 @@ static void * eth_ipcp_packet_reader(void * o)
#endif
if (fd < 0) {
pthread_rwlock_unlock(&eth_data.flows_lock);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_bad_id, 1);
goto fail_frame;
}
@@ -1026,13 +1336,18 @@ static void * eth_ipcp_packet_reader(void * o)
|| memcmp(eth_data.fd_to_ef[fd].r_addr,
e_frame->src_hwaddr, MAC_SIZE)) {
pthread_rwlock_unlock(&eth_data.flows_lock);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_bad_id, 1);
goto fail_frame;
}
#endif
+ FETCH_ADD_RELAXED(&eth_data.fd_to_ef[fd].stat.p_rcv, 1);
+ FETCH_ADD_RELAXED(&eth_data.fd_to_ef[fd].stat.b_rcv,
+ length);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_rcv, 1);
pthread_rwlock_unlock(&eth_data.flows_lock);
#ifndef HAVE_NETMAP
- ssm_pk_buff_head_release(spb, ETH_HEADER_TOT_SIZE);
+ ssm_pk_buff_pop(spb, ETH_HEADER_TOT_SIZE);
ssm_pk_buff_truncate(spb, length);
#else
if (ipcp_spb_reserve(&spb, length))
@@ -1041,8 +1356,13 @@ static void * eth_ipcp_packet_reader(void * o)
buf = ssm_pk_buff_head(spb);
memcpy(buf, &e_frame->payload, length);
#endif
- if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0)
+ if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) {
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.p_dlv_f,
+ 1);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_dlv_f, 1);
+ }
continue;
fail_frame:
@@ -1098,10 +1418,11 @@ static void * eth_ipcp_packet_writer(void * o)
len = ssm_pk_buff_len(spb);
- if (ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE)
+ if (ssm_pk_buff_push(spb, ETH_HEADER_TOT_SIZE)
== NULL) {
log_dbg("Failed to allocate header.");
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_buf_f, 1);
continue;
}
@@ -1125,8 +1446,20 @@ static void * eth_ipcp_packet_writer(void * o)
dsap, ssap,
#endif
ssm_pk_buff_head(spb),
- len))
+ len)) {
log_dbg("Failed to send frame.");
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.p_snd_f,
+ 1);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_snd_f, 1);
+ } else {
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.p_snd,
+ 1);
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.b_snd,
+ len);
+ }
ipcp_spb_release(spb);
}
}
@@ -1474,12 +1807,14 @@ static int eth_init_bpf(struct ifreq * ifr)
return -1;
}
#elif defined(HAVE_RAW_SOCKETS)
+#define SOCKOPT()
static int eth_init_raw_socket(struct ifreq * ifr)
{
int idx;
- int flags;
+ int sndbuf;
+ int rcvbuf;
#if defined(IPCP_ETH_QDISC_BYPASS)
- int qdisc_bypass = 1;
+ int qdisc_bypass = 1;
#endif /* ENABLE_QDISC_BYPASS */
idx = if_nametoindex(ifr->ifr_name);
@@ -1487,6 +1822,7 @@ static int eth_init_raw_socket(struct ifreq * ifr)
log_err("Failed to retrieve interface index.");
return -1;
}
+
memset(&(eth_data.device), 0, sizeof(eth_data.device));
eth_data.device.sll_ifindex = idx;
eth_data.device.sll_family = AF_PACKET;
@@ -1503,17 +1839,6 @@ static int eth_init_raw_socket(struct ifreq * ifr)
goto fail_socket;
}
- flags = fcntl(eth_data.s_fd, F_GETFL, 0);
- if (flags < 0) {
- log_err("Failed to get flags.");
- goto fail_device;
- }
-
- if (fcntl(eth_data.s_fd, F_SETFL, flags | O_NONBLOCK)) {
- log_err("Failed to set socket non-blocking.");
- goto fail_device;
- }
-
#if defined(IPCP_ETH_QDISC_BYPASS)
if (setsockopt(eth_data.s_fd, SOL_PACKET, PACKET_QDISC_BYPASS,
&qdisc_bypass, sizeof(qdisc_bypass))) {
@@ -1521,6 +1846,18 @@ static int eth_init_raw_socket(struct ifreq * ifr)
}
#endif
+ sndbuf = IPCP_ETH_SNDBUF;
+ if (sndbuf > 0 && setsockopt(eth_data.s_fd, SOL_SOCKET, SO_SNDBUF,
+ &sndbuf, sizeof(sndbuf))) {
+ log_info("Failed to set SO_SNDBUF to %d.", sndbuf);
+ }
+
+ rcvbuf = IPCP_ETH_RCVBUF;
+ if (rcvbuf > 0 && setsockopt(eth_data.s_fd, SOL_SOCKET, SO_RCVBUF,
+ &rcvbuf, sizeof(rcvbuf))) {
+ log_info("Failed to set SO_RCVBUF to %d.", rcvbuf);
+ }
+
if (bind(eth_data.s_fd, (struct sockaddr *) &eth_data.device,
sizeof(eth_data.device)) < 0) {
log_err("Failed to bind socket to interface.");
@@ -1593,6 +1930,12 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf)
return -1;
}
#endif /* HAVE_NETMAP */
+#ifdef IPCP_ETH_FLOW_STATS
+ if (rib_reg(ETH_RIB_PATH, &eth_r_ops)) {
+ log_err("Failed to register RIB.");
+ goto fail_rib_reg;
+ }
+#endif
#if defined(__linux__)
if (pthread_create(&eth_data.if_monitor, NULL,
eth_ipcp_if_monitor, NULL)) {
@@ -1656,6 +1999,10 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf)
#if defined(__linux__)
fail_monitor:
#endif
+#ifdef IPCP_ETH_FLOW_STATS
+ rib_unreg(ETH_RIB_PATH);
+ fail_rib_reg:
+#endif
#if defined(HAVE_NETMAP)
nm_close(eth_data.nmd);
#elif defined(HAVE_BPF)
@@ -1687,12 +2034,14 @@ static int eth_ipcp_unreg(const uint8_t * hash)
static int eth_ipcp_query(const uint8_t * hash)
{
uint8_t r_addr[MAC_SIZE];
- struct timespec timeout = TIMESPEC_INIT_MS(NAME_QUERY_TIMEO);
+ struct timespec timeout;
struct dir_query * query;
int ret;
+ int attempt;
uint8_t * buf;
struct mgmt_msg * msg;
size_t len;
+ long per_ms;
if (shim_data_dir_has(eth_data.shim_data, hash))
return 0;
@@ -1712,32 +2061,46 @@ static int eth_ipcp_query(const uint8_t * hash)
memset(r_addr, 0xff, MAC_SIZE);
- query = shim_data_dir_query_create(eth_data.shim_data, hash);
- if (query == NULL) {
- free(buf);
- return -1;
- }
+ per_ms = NAME_QUERY_TIMEO / (NAME_QUERY_RETRIES + 1);
+
+ ret = -1;
+ for (attempt = 0; attempt <= NAME_QUERY_RETRIES; ++attempt) {
+ query = shim_data_dir_query_create(eth_data.shim_data, hash);
+ if (query == NULL) {
+ ret = -1;
+ break;
+ }
- if (eth_ipcp_send_frame(r_addr,
+ if (eth_ipcp_send_frame(r_addr,
#if defined(BUILD_ETH_DIX)
- MGMT_EID,
+ MGMT_EID,
#elif defined(BUILD_ETH_LLC)
- reverse_bits(MGMT_SAP),
- reverse_bits(MGMT_SAP),
+ reverse_bits(MGMT_SAP),
+ reverse_bits(MGMT_SAP),
#endif
- buf, len)) {
- log_err("Failed to send management frame.");
+ buf, len)) {
+ log_err("Failed to send management frame.");
+ shim_data_dir_query_destroy(eth_data.shim_data,
+ query);
+ ret = -1;
+ break;
+ }
+
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
+ timeout.tv_sec = per_ms / 1000;
+ timeout.tv_nsec = (per_ms % 1000) * 1000000L;
+
+ ret = shim_data_dir_query_wait(query, &timeout);
+
shim_data_dir_query_destroy(eth_data.shim_data, query);
- free(buf);
- return -1;
+
+ if (ret != -ETIMEDOUT)
+ break;
}
free(buf);
- ret = shim_data_dir_query_wait(query, &timeout);
-
- shim_data_dir_query_destroy(eth_data.shim_data, query);
-
return ret;
}
@@ -1798,6 +2161,14 @@ static int eth_ipcp_flow_alloc(int fd,
}
fset_add(eth_data.np1_flows, fd);
+#ifdef IPCP_ETH_FLOW_STATS
+ pthread_rwlock_wrlock(&eth_data.flows_lock);
+ memset(&eth_data.fd_to_ef[fd].stat, 0,
+ sizeof(eth_data.fd_to_ef[fd].stat));
+ eth_data.fd_to_ef[fd].stat.stamp = time(NULL);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_flows, 1);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+#endif
#if defined(BUILD_ETH_LLC)
log_dbg("Assigned SAP %d for fd %d.", ssap, fd);
#endif
@@ -1858,6 +2229,14 @@ static int eth_ipcp_flow_alloc_resp(int fd,
}
fset_add(eth_data.np1_flows, fd);
+#ifdef IPCP_ETH_FLOW_STATS
+ pthread_rwlock_wrlock(&eth_data.flows_lock);
+ memset(&eth_data.fd_to_ef[fd].stat, 0,
+ sizeof(eth_data.fd_to_ef[fd].stat));
+ eth_data.fd_to_ef[fd].stat.stamp = time(NULL);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_flows, 1);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+#endif
#if defined(BUILD_ETH_LLC)
log_dbg("Assigned SAP %d for fd %d.", ssap, fd);
#endif
@@ -1886,6 +2265,12 @@ static int eth_ipcp_flow_dealloc(int fd)
#endif
memset(&eth_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE);
+#ifdef IPCP_ETH_FLOW_STATS
+ memset(&eth_data.fd_to_ef[fd].stat, 0,
+ sizeof(eth_data.fd_to_ef[fd].stat));
+ FETCH_SUB_RELAXED(&eth_data.stat.n_flows, 1);
+#endif
+
pthread_rwlock_unlock(&eth_data.flows_lock);
ipcp_flow_dealloc(fd);
@@ -1952,6 +2337,9 @@ int main(int argc,
#ifdef __linux__
pthread_join(eth_data.if_monitor, NULL);
#endif
+#ifdef IPCP_ETH_FLOW_STATS
+ rib_unreg(ETH_RIB_PATH);
+#endif
}
ipcp_stop();
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 5ad2401f..1052a686 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -363,6 +363,7 @@ static void * acceptloop(void * o)
int ipcp_wait_flow_req_arr(const uint8_t * dst,
qosspec_t qs,
time_t mpl,
+ uint32_t mtu,
const buffer_t * data)
{
struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT);
@@ -392,7 +393,7 @@ int ipcp_wait_flow_req_arr(const uint8_t * dst,
assert(ipcpd.alloc_id == -1);
- fd = ipcp_flow_req_arr(&hash, qs, mpl, data);
+ fd = ipcp_flow_req_arr(&hash, qs, mpl, mtu, data);
if (fd < 0) {
pthread_mutex_unlock(&ipcpd.alloc_lock);
log_err("Failed to get fd for flow.");
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 26a780a3..0adcc694 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -98,6 +98,7 @@ enum ipcp_state ipcp_get_state(void);
int ipcp_wait_flow_req_arr(const uint8_t * dst,
qosspec_t qs,
time_t mpl,
+ uint32_t mtu,
const buffer_t * data);
int ipcp_wait_flow_resp(const int fd);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 2c867317..eb9836f2 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -203,7 +203,8 @@ static int local_ipcp_flow_alloc(int fd,
HASH_VAL32(dst), fd);
assert(dst);
- out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data);
+ out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL,
+ IPCP_LOCAL_MTU, data);
if (out_fd < 0) {
log_dbg("Flow allocation failed: %d", out_fd);
return -1;
@@ -255,14 +256,16 @@ static int local_ipcp_flow_alloc_resp(int fd,
}
if (response < 0) {
- ipcp_flow_alloc_reply(out_fd, response, mpl, data);
+ ipcp_flow_alloc_reply(out_fd, response, mpl,
+ IPCP_LOCAL_MTU, data);
log_info("Flow allocation rejected, fds (%d, %d).", out_fd, fd);
return 0;
}
fset_add(local_data.flows, fd);
- if (ipcp_flow_alloc_reply(out_fd, response, mpl, data) < 0) {
+ if (ipcp_flow_alloc_reply(out_fd, response, mpl,
+ IPCP_LOCAL_MTU, data) < 0) {
log_err("Failed to reply to allocation");
fset_del(local_data.flows, fd);
return -1;
diff --git a/src/ipcpd/udp/udp.c b/src/ipcpd/udp/udp.c
index 452bbc1a..93e88b9b 100644
--- a/src/ipcpd/udp/udp.c
+++ b/src/ipcpd/udp/udp.c
@@ -47,6 +47,10 @@
#include <stdlib.h>
#include <sys/wait.h>
#include <fcntl.h>
+#include <unistd.h>
+#if defined(__linux__)
+#include <netinet/ip.h>
+#endif
#define FLOW_REQ 1
#define FLOW_REPLY 2
@@ -87,7 +91,7 @@ struct mgmt_msg {
uint8_t code;
/* QoS parameters from spec */
uint8_t availability;
- uint8_t in_order;
+ uint8_t service;
} __attribute__((packed));
struct mgmt_frame {
@@ -130,6 +134,53 @@ static const char * __inet_ntop(const struct __ADDR * addr,
return inet_ntop(__AF, addr, buf, __ADDRSTRLEN);
}
+#if defined(BUILD_IPCP_UDP4)
+#define UDP_MTU_FALLBACK IPCP_UDP4_MTU
+#define UDP_IP_OVERHEAD 28U /* IPv4 + UDP */
+#else
+#define UDP_MTU_FALLBACK IPCP_UDP6_MTU
+#define UDP_IP_OVERHEAD 48U /* IPv6 + UDP */
+#endif
+
+static uint32_t udp_query_mtu(const struct __SOCKADDR * saddr)
+{
+#if defined(__linux__) && (defined(IP_MTU) || defined(IPV6_MTU))
+ int sock;
+ int mtu = 0;
+ socklen_t len = sizeof(mtu);
+
+ sock = socket(__AF, SOCK_DGRAM, IPPROTO_UDP);
+ if (sock < 0)
+ return UDP_MTU_FALLBACK;
+
+ if (connect(sock, (const struct sockaddr *) saddr,
+ sizeof(*saddr)) < 0)
+ goto fallback;
+
+#if defined(BUILD_IPCP_UDP4) && defined(IP_MTU)
+ if (getsockopt(sock, IPPROTO_IP, IP_MTU, &mtu, &len) < 0)
+ goto fallback;
+#elif defined(BUILD_IPCP_UDP6) && defined(IPV6_MTU)
+ if (getsockopt(sock, IPPROTO_IPV6, IPV6_MTU, &mtu, &len) < 0)
+ goto fallback;
+#else
+ goto fallback;
+#endif
+ close(sock);
+
+ if (mtu <= (int) UDP_IP_OVERHEAD)
+ return UDP_MTU_FALLBACK;
+
+ return (uint32_t) mtu - UDP_IP_OVERHEAD;
+
+ fallback:
+ close(sock);
+#else
+ (void) saddr;
+#endif
+ return UDP_MTU_FALLBACK;
+}
+
static int udp_data_init(void)
{
int i;
@@ -220,7 +271,7 @@ static int udp_ipcp_port_alloc(const struct __SOCKADDR * r_saddr,
msg->availability = qs.availability;
msg->loss = hton32(qs.loss);
msg->ber = hton32(qs.ber);
- msg->in_order = qs.in_order;
+ msg->service = qs.service;
msg->max_gap = hton32(qs.max_gap);
msg->timeout = hton32(qs.timeout);
@@ -285,7 +336,8 @@ static int udp_ipcp_port_req(struct __SOCKADDR * c_saddr,
{
int fd;
- fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL,
+ udp_query_mtu(c_saddr), data);
if (fd < 0) {
log_err("Could not get new flow from IRMd.");
return -1;
@@ -332,7 +384,8 @@ static int udp_ipcp_port_alloc_reply(const struct __SOCKADDR * saddr,
pthread_rwlock_unlock(&udp_data.flows_lock);
- if (ipcp_flow_alloc_reply(s_eid, response, mpl, data) < 0) {
+ if (ipcp_flow_alloc_reply(s_eid, response, mpl,
+ udp_query_mtu(saddr), data) < 0) {
log_err("Failed to reply to flow allocation.");
return -1;
}
@@ -352,13 +405,18 @@ static int udp_ipcp_mgmt_frame(struct __SOCKADDR c_saddr,
qosspec_t qs;
buffer_t data;
+ /* Defence against malformed/corrupted wire input. */
+ if (len < sizeof(*msg))
+ return -1;
+
msg = (struct mgmt_msg *) buf;
switch (msg->code) {
case FLOW_REQ:
msg_len = sizeof(*msg) + ipcp_dir_hash_len();
- assert(len >= msg_len);
+ if (len < msg_len)
+ return -1;
data.len = len - msg_len;
data.data = (uint8_t *) buf + msg_len;
@@ -369,7 +427,7 @@ static int udp_ipcp_mgmt_frame(struct __SOCKADDR c_saddr,
qs.availability = msg->availability;
qs.loss = ntoh32(msg->loss);
qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
+ qs.service = msg->service;
qs.max_gap = ntoh32(msg->max_gap);
qs.timeout = ntoh32(msg->timeout);
@@ -377,8 +435,6 @@ static int udp_ipcp_mgmt_frame(struct __SOCKADDR c_saddr,
(uint8_t *) (msg + 1), qs,
&data);
case FLOW_REPLY:
- assert(len >= sizeof(*msg));
-
data.len = len - sizeof(*msg);
data.data = (uint8_t *) buf + sizeof(*msg);
@@ -549,7 +605,7 @@ static void * udp_ipcp_packet_writer(void * o)
continue;
}
- buf = ssm_pk_buff_head_alloc(spb, OUR_HEADER_LEN);
+ buf = ssm_pk_buff_push(spb, OUR_HEADER_LEN);
if (buf == NULL) {
log_dbg("Failed to allocate header.");
ipcp_spb_release(spb);
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 252477f4..cc54efa1 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -139,7 +139,7 @@ static void dt_pci_shrink(struct ssm_pk_buff * spb)
{
assert(spb);
- ssm_pk_buff_head_release(spb, dt_pci_info.head_size);
+ ssm_pk_buff_pop(spb, dt_pci_info.head_size);
}
struct {
@@ -168,12 +168,12 @@ struct {
size_t f_nhp_pkt[QOS_CUBE_MAX];
size_t f_nhp_bytes[QOS_CUBE_MAX];
pthread_mutex_t lock;
- } stat[PROG_MAX_FLOWS];
+ } stat[PROC_MAX_FLOWS];
size_t n_flows;
#endif
struct bmp * res_fds;
- struct comp_info comps[PROG_RES_FDS];
+ struct comp_info comps[PROC_RES_FDS];
pthread_rwlock_t lock;
pthread_t listener;
@@ -220,7 +220,7 @@ static int dt_rib_read(const char * path,
tm = gmtime(&dt.stat[fd].stamp);
strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
- if (fd >= PROG_RES_FDS) {
+ if (fd >= PROC_RES_FDS) {
fccntl(fd, FLOWGRXQLEN, &rxqlen);
fccntl(fd, FLOWGTXQLEN, &txqlen);
}
@@ -296,7 +296,7 @@ static int dt_rib_readdir(char *** buf)
if (*buf == NULL)
goto fail_entries;
- for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ for (i = 0; i < PROC_MAX_FLOWS; ++i) {
pthread_mutex_lock(&dt.stat[i].lock);
if (dt.stat[i].stamp == 0) {
@@ -514,7 +514,7 @@ static void packet_handler(int fd,
#endif
} else {
dt_pci_shrink(spb);
- if (dt_pci.eid >= PROG_RES_FDS) {
+ if (dt_pci.eid >= PROC_RES_FDS) {
uint8_t ecn = *(head + dt_pci_info.ecn_o);
fa_np1_rcv(dt_pci.eid, ecn, spb);
return;
@@ -636,13 +636,13 @@ int dt_init(struct dt_config cfg)
goto fail_rwlock_init;
}
- dt.res_fds = bmp_create(PROG_RES_FDS, 0);
+ dt.res_fds = bmp_create(PROC_RES_FDS, 0);
if (dt.res_fds == NULL)
goto fail_res_fds;
#ifdef IPCP_FLOW_STATS
memset(dt.stat, 0, sizeof(dt.stat));
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
log_err("Failed to init mutex for flow %d.", i);
for (j = 0; j < i; ++j)
@@ -662,7 +662,7 @@ int dt_init(struct dt_config cfg)
fail_rib_reg:
#ifdef IPCP_FLOW_STATS
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
pthread_mutex_destroy(&dt.stat[i].lock);
fail_stat_lock:
#endif
@@ -691,7 +691,7 @@ void dt_fini(void)
sprintf(dtstr, "%s.%" PRIu64, DT, dt.addr);
rib_unreg(dtstr);
#ifdef IPCP_FLOW_STATS
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ for (i = 0; i < PROC_MAX_FLOWS; ++i)
pthread_mutex_destroy(&dt.stat[i].lock);
#endif
bmp_destroy(dt.res_fds);
@@ -791,7 +791,7 @@ int dt_reg_comp(void * comp,
void dt_unreg_comp(int eid)
{
- assert(eid >= 0 && eid < PROG_RES_FDS);
+ assert(eid >= 0 && eid < PROC_RES_FDS);
pthread_rwlock_wrlock(&dt.lock);
@@ -823,7 +823,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
len = ssm_pk_buff_len(spb);
- if (eid < PROG_RES_FDS) {
+ if (eid < PROC_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);
++dt.stat[eid].lcl_r_pkt[qc];
@@ -837,7 +837,7 @@ int dt_write_packet(uint64_t dst_addr,
log_dbg("Could not get nhop for " ADDR_FMT32 ".",
ADDR_VAL32(&dst_addr));
#ifdef IPCP_FLOW_STATS
- if (eid < PROG_RES_FDS) {
+ if (eid < PROC_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);
++dt.stat[eid].lcl_r_pkt[qc];
@@ -849,7 +849,7 @@ int dt_write_packet(uint64_t dst_addr,
return -EPERM;
}
- head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size);
+ head = ssm_pk_buff_push(spb, dt_pci_info.head_size);
if (head == NULL) {
log_dbg("Failed to allocate DT header.");
goto fail_write;
@@ -876,7 +876,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (dt_pci.eid < PROG_RES_FDS) {
+ if (dt_pci.eid < PROC_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}
@@ -891,7 +891,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (eid < PROG_RES_FDS) {
+ if (eid < PROC_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index c157d71c..c0447885 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -81,7 +81,7 @@ struct fa_msg {
uint16_t ece;
uint8_t code;
uint8_t availability;
- uint8_t in_order;
+ uint8_t service;
} __attribute__((packed));
struct cmd {
@@ -111,7 +111,7 @@ struct fa_flow {
struct {
pthread_rwlock_t flows_lock;
- struct fa_flow flows[PROG_MAX_FLOWS];
+ struct fa_flow flows[PROC_MAX_FLOWS];
#ifdef IPCP_FLOW_STATS
size_t n_flows;
#endif
@@ -145,7 +145,7 @@ static int fa_rib_read(const char * path,
fd = atoi(entry);
- if (fd < 0 || fd >= PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROC_MAX_FLOWS)
return -1;
if (len < 1536)
@@ -225,7 +225,7 @@ static int fa_rib_readdir(char *** buf)
if (*buf == NULL)
goto fail_entries;
- for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ for (i = 0; i < PROC_MAX_FLOWS; ++i) {
struct fa_flow * flow;
flow = &fa.flows[i];
@@ -306,7 +306,7 @@ static int eid_to_fd(uint64_t eid)
fd = eid & 0xFFFFFFFF;
- if (fd < 0 || fd >= PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROC_MAX_FLOWS)
return -1;
flow = &fa.flows[fd];
@@ -496,11 +496,12 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qs.availability = msg->availability;
qs.loss = ntoh32(msg->loss);
qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
+ qs.service = msg->service;
qs.max_gap = ntoh32(msg->max_gap);
qs.timeout = ntoh32(msg->timeout);
- fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL,
+ IPCP_UNICAST_MTU, &data);
if (fd < 0)
return fd;
@@ -528,7 +529,8 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
time_t mpl = IPCP_UNICAST_MPL;
int response;
- assert(len >= sizeof(*msg));
+ if (len < sizeof(*msg))
+ return -EINVAL;
data.data = (uint8_t *) msg + sizeof(*msg);
data.len = len - sizeof(*msg);
@@ -558,7 +560,8 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
pthread_rwlock_unlock(&fa.flows_lock);
- if (ipcp_flow_alloc_reply(fd, response, mpl, &data) < 0) {
+ if (ipcp_flow_alloc_reply(fd, response, mpl,
+ IPCP_UNICAST_MTU, &data) < 0) {
log_err("Failed to reply for flow allocation on fd %d.", fd);
return -EIRMD;
}
@@ -572,8 +575,8 @@ static int fa_handle_flow_update(struct fa_msg * msg,
struct fa_flow * flow;
int fd;
- (void) len;
- assert(len >= sizeof(*msg));
+ if (len < sizeof(*msg))
+ return -EINVAL;
pthread_rwlock_wrlock(&fa.flows_lock);
@@ -789,7 +792,7 @@ int fa_alloc(int fd,
msg->availability = qs.availability;
msg->loss = hton32(qs.loss);
msg->ber = hton32(qs.ber);
- msg->in_order = qs.in_order;
+ msg->service = qs.service;
msg->max_gap = hton32(qs.max_gap);
msg->timeout = hton32(qs.timeout);
diff --git a/src/ipcpd/unicast/routing/graph.c b/src/ipcpd/unicast/routing/graph.c
index 0226c762..c168eb7d 100644
--- a/src/ipcpd/unicast/routing/graph.c
+++ b/src/ipcpd/unicast/routing/graph.c
@@ -603,9 +603,9 @@ static int graph_routing_table_lfa(struct graph * graph,
struct list_head * table,
int ** dist)
{
- int * n_dist[PROG_MAX_FLOWS];
- uint64_t addrs[PROG_MAX_FLOWS];
- int n_index[PROG_MAX_FLOWS];
+ int * n_dist[PROC_MAX_FLOWS];
+ uint64_t addrs[PROC_MAX_FLOWS];
+ int n_index[PROC_MAX_FLOWS];
struct list_head * p;
struct list_head * q;
struct vertex * v;
@@ -618,7 +618,7 @@ static int graph_routing_table_lfa(struct graph * graph,
if (graph_routing_table_simple(graph, s_addr, table, dist))
goto fail_table;
- for (j = 0; j < PROG_MAX_FLOWS; j++) {
+ for (j = 0; j < PROC_MAX_FLOWS; j++) {
n_dist[j] = NULL;
n_index[j] = -1;
addrs[j] = -1;
diff --git a/src/ipcpd/unicast/routing/link-state.c b/src/ipcpd/unicast/routing/link-state.c
index 051dd98d..c4ea9e1c 100644
--- a/src/ipcpd/unicast/routing/link-state.c
+++ b/src/ipcpd/unicast/routing/link-state.c
@@ -415,7 +415,7 @@ static void calculate_pff(struct routing_i * instance)
struct list_head table;
struct list_head * p;
struct list_head * q;
- int fds[PROG_MAX_FLOWS];
+ int fds[PROC_MAX_FLOWS];
assert(instance);