summaryrefslogtreecommitdiff
path: root/src/ipcpd/eth/eth.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/eth/eth.c')
-rw-r--r--src/ipcpd/eth/eth.c197
1 files changed, 136 insertions, 61 deletions
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 443f3fdb..1bbfac5b 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -29,6 +29,8 @@
#define _DARWIN_C_SOURCE
#elif defined(__FreeBSD__)
#define __BSD_VISIBLE 1
+#elif defined (__linux__) || defined (__CYGWIN__)
+#define _DEFAULT_SOURCE
#else
#define _POSIX_C_SOURCE 200112L
#endif
@@ -121,7 +123,7 @@
#define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE)
#define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE)
#define MAX_EIDS (1 << (8 * DIX_EID_SIZE))
-#define ETH_MAX_SDU_SIZE (ETH_MTU - DIX_HEADER_SIZE)
+#define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE)
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX)
#elif defined(BUILD_ETH_LLC)
#define THIS_TYPE IPCP_ETH_LLC
@@ -129,7 +131,7 @@
#define LLC_HEADER_SIZE 3
#define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE)
#define MAX_SAPS 64
-#define ETH_MAX_SDU_SIZE (ETH_MTU - LLC_HEADER_SIZE)
+#define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE)
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX)
#endif
@@ -144,15 +146,27 @@
#define NAME_QUERY_REPLY 3
struct mgmt_msg {
- uint8_t code;
#if defined(BUILD_ETH_DIX)
uint16_t seid;
uint16_t deid;
#elif defined(BUILD_ETH_LLC)
uint8_t ssap;
uint8_t dsap;
+ /* QoS here for alignment */
+ uint8_t code;
+ uint8_t availability;
+#endif
+ /* QoS parameters from spec, aligned */
+ uint32_t loss;
+ uint64_t bandwidth;
+ uint32_t ber;
+ uint32_t max_gap;
+ uint32_t delay;
+ uint8_t in_order;
+#if defined (BUILD_ETH_DIX)
+ uint8_t code;
+ uint8_t availability;
#endif
- uint8_t qoscube;
int8_t response;
} __attribute__((packed));
@@ -192,6 +206,7 @@ struct {
struct shim_data * shim_data;
#ifdef __linux__
int mtu;
+ int if_idx;
#endif
#if defined(HAVE_NETMAP)
struct nm_desc * nmd;
@@ -213,11 +228,10 @@ struct {
#endif
struct ef * fd_to_ef;
fset_t * np1_flows;
- fqueue_t * fq;
pthread_rwlock_t flows_lock;
- pthread_t sdu_writer;
- pthread_t sdu_reader;
+ pthread_t packet_writer[IPCP_ETH_WR_THR];
+ pthread_t packet_reader[IPCP_ETH_RD_THR];
#ifdef __linux__
pthread_t if_monitor;
@@ -258,10 +272,6 @@ static int eth_data_init(void)
if (eth_data.np1_flows == NULL)
goto fail_np1_flows;
- eth_data.fq = fqueue_create();
- if (eth_data.fq == NULL)
- goto fail_fq;
-
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
#if defined(BUILD_ETH_DIX)
eth_data.fd_to_ef[i].r_eid = -1;
@@ -309,8 +319,6 @@ static int eth_data_init(void)
fail_flows_lock:
shim_data_destroy(eth_data.shim_data);
fail_shim_data:
- fqueue_destroy(eth_data.fq);
- fail_fq:
fset_destroy(eth_data.np1_flows);
fail_np1_flows:
#ifdef BUILD_ETH_LLC
@@ -337,7 +345,6 @@ static void eth_data_fini(void)
pthread_mutex_destroy(&eth_data.mgmt_lock);
pthread_rwlock_destroy(&eth_data.flows_lock);
shim_data_destroy(eth_data.shim_data);
- fqueue_destroy(eth_data.fq);
fset_destroy(eth_data.np1_flows);
#ifdef BUILD_ETH_LLC
bmp_destroy(eth_data.saps);
@@ -376,7 +383,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr,
assert(frame);
- if (len > (size_t) ETH_MAX_SDU_SIZE)
+ if (len > (size_t) ETH_MAX_PACKET_SIZE)
return -1;
e_frame = (struct eth_frame *) frame;
@@ -438,7 +445,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
uint8_t ssap,
#endif
const uint8_t * hash,
- qoscube_t cube)
+ qosspec_t qs)
{
uint8_t * buf;
struct mgmt_msg * msg;
@@ -458,7 +465,14 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
#elif defined(BUILD_ETH_LLC)
msg->ssap = ssap;
#endif
- msg->qoscube = cube;
+
+ msg->delay = hton32(qs.delay);
+ msg->bandwidth = hton64(qs.bandwidth);
+ msg->availability = qs.availability;
+ msg->loss = hton32(qs.loss);
+ msg->ber = hton32(qs.ber);
+ msg->in_order = qs.in_order;
+ msg->max_gap = hton32(qs.max_gap);
memcpy(msg + 1, hash, ipcp_dir_hash_len());
@@ -528,7 +542,7 @@ static int eth_ipcp_req(uint8_t * r_addr,
uint8_t r_sap,
#endif
const uint8_t * dst,
- qoscube_t cube)
+ qosspec_t qs)
{
struct timespec ts = {0, ALLOC_TIMEO * MILLION};
struct timespec abstime;
@@ -552,7 +566,7 @@ static int eth_ipcp_req(uint8_t * r_addr,
}
/* reply to IRM, called under lock to prevent race */
- fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube);
+ fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), qs);
if (fd < 0) {
pthread_mutex_unlock(&ipcpi.alloc_lock);
log_err("Could not get new flow from IRMd.");
@@ -692,11 +706,20 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
uint8_t * r_addr)
{
struct mgmt_msg * msg;
+ qosspec_t qs;
msg = (struct mgmt_msg *) buf;
switch (msg->code) {
case FLOW_REQ:
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
+
if (shim_data_reg_has(eth_data.shim_data,
buf + sizeof(*msg))) {
eth_ipcp_req(r_addr,
@@ -706,7 +729,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
msg->ssap,
#endif
buf + sizeof(*msg),
- msg->qoscube);
+ qs);
}
break;
case FLOW_REPLY:
@@ -785,7 +808,7 @@ static void * eth_ipcp_mgmt_handler(void * o)
return (void *) 0;
}
-static void * eth_ipcp_sdu_reader(void * o)
+static void * eth_ipcp_packet_reader(void * o)
{
uint8_t br_addr[MAC_SIZE];
#if defined(BUILD_ETH_DIX)
@@ -890,7 +913,9 @@ static void * eth_ipcp_sdu_reader(void * o)
if (deid == MGMT_EID) {
#elif defined (BUILD_ETH_LLC)
if (length > 0x05FF) {/* DIX */
+#ifndef HAVE_NETMAP
ipcp_sdb_release(sdb);
+#endif
continue;
}
@@ -911,8 +936,8 @@ static void * eth_ipcp_sdu_reader(void * o)
memcpy(frame->buf, &e_frame->payload, length);
memcpy(frame->r_addr, e_frame->src_hwaddr, MAC_SIZE);
- pthread_mutex_unlock(&eth_data.mgmt_lock);
+ pthread_mutex_lock(&eth_data.mgmt_lock);
list_add(&frame->next, &eth_data.mgmt_frames);
pthread_cond_signal(&eth_data.mgmt_cond);
pthread_mutex_unlock(&eth_data.mgmt_lock);
@@ -962,7 +987,12 @@ static void * eth_ipcp_sdu_reader(void * o)
return (void *) 0;
}
-static void * eth_ipcp_sdu_writer(void * o)
+static void cleanup_writer(void * o)
+{
+ fqueue_destroy((fqueue_t *) o);
+}
+
+static void * eth_ipcp_packet_writer(void * o)
{
int fd;
struct shm_du_buff * sdb;
@@ -975,15 +1005,21 @@ static void * eth_ipcp_sdu_writer(void * o)
#endif
uint8_t r_addr[MAC_SIZE];
+ fqueue_t * fq;
+
+ fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) -1;
+
(void) o;
+ pthread_cleanup_push(cleanup_writer, fq);
+
ipcp_lock_to_core();
while (true) {
- fevent(eth_data.np1_flows, eth_data.fq, NULL);
-
- pthread_rwlock_rdlock(&eth_data.flows_lock);
- while ((fd = fqueue_next(eth_data.fq)) >= 0) {
+ fevent(eth_data.np1_flows, fq, NULL);
+ while ((fd = fqueue_next(fq)) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
log_dbg("Bad read from fd %d.", fd);
continue;
@@ -996,6 +1032,8 @@ static void * eth_ipcp_sdu_writer(void * o)
log_dbg("Failed to allocate header.");
ipcp_sdb_release(sdb);
}
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
#if defined(BUILD_ETH_DIX)
deid = eth_data.fd_to_ef[fd].r_eid;
#elif defined(BUILD_ETH_LLC)
@@ -1006,6 +1044,8 @@ static void * eth_ipcp_sdu_writer(void * o)
eth_data.fd_to_ef[fd].r_addr,
MAC_SIZE);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
eth_ipcp_send_frame(r_addr,
#if defined(BUILD_ETH_DIX)
deid,
@@ -1016,9 +1056,10 @@ static void * eth_ipcp_sdu_writer(void * o)
len);
ipcp_sdb_release(sdb);
}
- pthread_rwlock_unlock(&eth_data.flows_lock);
}
+ pthread_cleanup_pop(true);
+
return (void *) 1;
}
@@ -1129,7 +1170,7 @@ static void * eth_ipcp_if_monitor(void * o)
ifi = NLMSG_DATA(h);
/* Not our interface */
- if (ifi->ifi_index != eth_data.device.sll_ifindex)
+ if (ifi->ifi_index != eth_data.if_idx)
continue;
if (ifi->ifi_flags & IFF_UP) {
@@ -1189,6 +1230,10 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
#ifndef SHM_RDRB_MULTI_BLOCK
size_t maxsz;
#endif
+#if defined(HAVE_RAW_SOCKETS)
+ int qdisc_bypass = 1;
+ int flags;
+#endif
assert(conf);
assert(conf->type == THIS_TYPE);
@@ -1198,7 +1243,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
}
memset(&ifr, 0, sizeof(ifr));
- memcpy(ifr.ifr_name, conf->dev, strlen(conf->dev));
+ memcpy(ifr.ifr_name, conf->dev, IFNAMSIZ);
#ifdef BUILD_ETH_DIX
if (conf->ethertype < 0x0600 || conf->ethertype == 0xFFFF) {
@@ -1274,9 +1319,9 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
idx = if_nametoindex(conf->dev);
if (idx == 0) {
log_err("Failed to retrieve interface index.");
- close(skfd);
return -1;
}
+ eth_data.if_idx = idx;
#endif /* __FreeBSD__ */
#if defined(HAVE_NETMAP)
@@ -1354,16 +1399,32 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
return -1;
}
+ flags = fcntl(eth_data.s_fd, F_GETFL, 0);
+ if (flags < 0) {
+ log_err("Failed to get flags.");
+ goto fail_device;
+ }
+
+ if (fcntl(eth_data.s_fd, F_SETFL, flags | O_NONBLOCK)) {
+ log_err("Failed to set socket non-blocking.");
+ goto fail_device;
+ }
+
+ if (setsockopt(eth_data.s_fd, SOL_PACKET, PACKET_QDISC_BYPASS,
+ &qdisc_bypass, sizeof(qdisc_bypass))) {
+ log_info("Qdisc bypass not supported.");
+ }
+
if (bind(eth_data.s_fd, (struct sockaddr *) &eth_data.device,
sizeof(eth_data.device))) {
- log_err("Failed to bind socket to interface");
+ log_err("Failed to bind socket to interface.");
goto fail_device;
}
#endif /* HAVE_NETMAP */
ipcp_set_state(IPCP_OPERATIONAL);
-#ifdef __linux__
+#if defined(__linux__)
if (pthread_create(&eth_data.if_monitor,
NULL,
eth_ipcp_if_monitor,
@@ -1381,20 +1442,24 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
goto fail_mgmt_handler;
}
- if (pthread_create(&eth_data.sdu_reader,
- NULL,
- eth_ipcp_sdu_reader,
- NULL)) {
- ipcp_set_state(IPCP_INIT);
- goto fail_sdu_reader;
+ for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) {
+ if (pthread_create(&eth_data.packet_reader[idx],
+ NULL,
+ eth_ipcp_packet_reader,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_packet_reader;
+ }
}
- if (pthread_create(&eth_data.sdu_writer,
- NULL,
- eth_ipcp_sdu_writer,
- NULL)) {
- ipcp_set_state(IPCP_INIT);
- goto fail_sdu_writer;
+ for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) {
+ if (pthread_create(&eth_data.packet_writer[idx],
+ NULL,
+ eth_ipcp_packet_writer,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_packet_writer;
+ }
}
#if defined(BUILD_ETH_DIX)
@@ -1407,10 +1472,17 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
return 0;
- fail_sdu_writer:
- pthread_cancel(eth_data.sdu_reader);
- pthread_join(eth_data.sdu_reader, NULL);
- fail_sdu_reader:
+ fail_packet_writer:
+ while (idx > 0) {
+ pthread_cancel(eth_data.packet_writer[--idx]);
+ pthread_join(eth_data.packet_writer[idx], NULL);
+ }
+ idx = IPCP_ETH_RD_THR;
+ fail_packet_reader:
+ while (idx > 0) {
+ pthread_cancel(eth_data.packet_reader[--idx]);
+ pthread_join(eth_data.packet_reader[idx], NULL);
+ }
pthread_cancel(eth_data.mgmt_handler);
pthread_join(eth_data.mgmt_handler, NULL);
fail_mgmt_handler:
@@ -1418,7 +1490,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
pthread_cancel(eth_data.if_monitor);
pthread_join(eth_data.if_monitor, NULL);
#endif
-#if !defined(HAVE_NETMAP)
+#if defined(__linux__) || !defined(HAVE_NETMAP)
fail_device:
#endif
#if defined(HAVE_NETMAP)
@@ -1509,7 +1581,7 @@ static int eth_ipcp_query(const uint8_t * hash)
static int eth_ipcp_flow_alloc(int fd,
const uint8_t * hash,
- qoscube_t cube)
+ qosspec_t qs)
{
#ifdef BUILD_ETH_LLC
uint8_t ssap = 0;
@@ -1521,11 +1593,6 @@ static int eth_ipcp_flow_alloc(int fd,
assert(hash);
- if (cube > QOS_CUBE_DATA) {
- log_dbg("Unsupported QoS requested.");
- return -1;
- }
-
if (!shim_data_dir_has(eth_data.shim_data, hash)) {
log_err("Destination unreachable.");
return -1;
@@ -1553,7 +1620,7 @@ static int eth_ipcp_flow_alloc(int fd,
#elif defined(BUILD_ETH_LLC)
ssap,
#endif
- hash, cube) < 0) {
+ hash, qs) < 0) {
#ifdef BUILD_ETH_LLC
pthread_rwlock_wrlock(&eth_data.flows_lock);
bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap);
@@ -1696,6 +1763,8 @@ static struct ipcp_ops eth_ops = {
int main(int argc,
char * argv[])
{
+ int i;
+
if (ipcp_init(argc, argv, &eth_ops) < 0)
goto fail_init;
@@ -1722,14 +1791,20 @@ int main(int argc,
ipcp_shutdown();
if (ipcp_get_state() == IPCP_SHUTDOWN) {
- pthread_cancel(eth_data.sdu_writer);
- pthread_cancel(eth_data.sdu_reader);
+ for (i = 0; i < IPCP_ETH_WR_THR; ++i)
+ pthread_cancel(eth_data.packet_writer[i]);
+ for (i = 0; i < IPCP_ETH_RD_THR; ++i)
+ pthread_cancel(eth_data.packet_reader[i]);
+
pthread_cancel(eth_data.mgmt_handler);
#ifdef __linux__
pthread_cancel(eth_data.if_monitor);
#endif
- pthread_join(eth_data.sdu_writer, NULL);
- pthread_join(eth_data.sdu_reader, NULL);
+ for (i = 0; i < IPCP_ETH_WR_THR; ++i)
+ pthread_join(eth_data.packet_writer[i], NULL);
+ for (i = 0; i < IPCP_ETH_RD_THR; ++i)
+ pthread_join(eth_data.packet_reader[i], NULL);
+
pthread_join(eth_data.mgmt_handler, NULL);
#ifdef __linux__
pthread_join(eth_data.if_monitor, NULL);