diff options
Diffstat (limited to 'src/ipcpd/eth/eth.c')
| -rw-r--r-- | src/ipcpd/eth/eth.c | 686 |
1 files changed, 399 insertions, 287 deletions
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 25ecdffb..c0e6d0ce 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC processes over Ethernet * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as @@ -29,12 +29,15 @@ #define _DARWIN_C_SOURCE #elif defined(__FreeBSD__) #define __BSD_VISIBLE 1 +#elif defined (__linux__) || defined (__CYGWIN__) +#define _DEFAULT_SOURCE #else #define _POSIX_C_SOURCE 200112L #endif #include "config.h" +#include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/errno.h> #include <ouroboros/list.h> @@ -44,15 +47,15 @@ #include <ouroboros/ipcp-dev.h> #include <ouroboros/fqueue.h> #include <ouroboros/logs.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include <ouroboros/fccntl.h> +#include <ouroboros/pthread.h> #include "ipcp.h" #include "shim-data.h" #include <signal.h> #include <stdlib.h> -#include <pthread.h> #include <fcntl.h> #include <unistd.h> #include <string.h> @@ -93,8 +96,23 @@ #include <net/bpf.h> #endif -#define MAC_SIZE 6 +#ifdef __linux__ +#ifndef ETH_MAX_MTU /* In if_ether.h as of Linux 4.10. */ +#define ETH_MAX_MTU 0xFFFFU +#endif /* ETH_MAX_MTU */ +#ifdef BUILD_ETH_DIX +#define ETH_MTU eth_data.mtu +#define ETH_MTU_MAX ETH_MAX_MTU +#else +#define ETH_MTU eth_data.mtu +#define ETH_MTU_MAX 1500 +#endif /* BUILD_ETH_DIX */ +#else /* __linux__ */ #define ETH_MTU 1500 +#define ETH_MTU_MAX ETH_MTU +#endif /* __linux__ */ + +#define MAC_SIZE 6 #define ETH_TYPE_LENGTH_SIZE sizeof(uint16_t) #define ETH_HEADER_SIZE (2 * MAC_SIZE + ETH_TYPE_LENGTH_SIZE) @@ -106,37 +124,53 @@ #define DIX_HEADER_SIZE (DIX_EID_SIZE + DIX_LENGTH_SIZE) #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + DIX_HEADER_SIZE) #define MAX_EIDS (1 << (8 * DIX_EID_SIZE)) -#define ETH_MAX_SDU_SIZE (ETH_MTU - DIX_HEADER_SIZE) -#define ETH_FRAME_SIZE (ETH_HEADER_TOT_SIZE + ETH_MAX_SDU_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - DIX_HEADER_SIZE) +#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #elif defined(BUILD_ETH_LLC) #define THIS_TYPE IPCP_ETH_LLC #define MGMT_SAP 0x01 #define LLC_HEADER_SIZE 3 #define ETH_HEADER_TOT_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE) #define MAX_SAPS 64 -#define ETH_MAX_SDU_SIZE (ETH_MTU - LLC_HEADER_SIZE) -#define ETH_FRAME_SIZE (ETH_HEADER_TOT_SIZE + ETH_MAX_SDU_SIZE) +#define ETH_MAX_PACKET_SIZE (ETH_MTU - LLC_HEADER_SIZE) +#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif -#define ALLOC_TIMEO 10 /* ms */ #define NAME_QUERY_TIMEO 2000 /* ms */ #define MGMT_TIMEO 100 /* ms */ +#define MGMT_FRAME_SIZE 2048 #define FLOW_REQ 0 #define FLOW_REPLY 1 #define NAME_QUERY_REQ 2 #define NAME_QUERY_REPLY 3 +struct ipcp ipcpi; + struct mgmt_msg { - uint8_t code; #if defined(BUILD_ETH_DIX) uint16_t seid; uint16_t deid; #elif defined(BUILD_ETH_LLC) uint8_t ssap; uint8_t dsap; + /* QoS here for alignment */ + uint8_t code; + uint8_t availability; +#endif + /* QoS parameters from spec, aligned */ + uint32_t loss; + uint64_t bandwidth; + uint32_t ber; + uint32_t max_gap; + uint32_t delay; + uint32_t timeout; + uint16_t cypher_s; + uint8_t in_order; +#if defined (BUILD_ETH_DIX) + uint8_t code; + uint8_t availability; #endif - uint8_t qoscube; int8_t response; } __attribute__((packed)); @@ -169,12 +203,16 @@ struct ef { struct mgmt_frame { struct list_head next; uint8_t r_addr[MAC_SIZE]; - uint8_t buf[ETH_FRAME_SIZE]; + uint8_t buf[MGMT_FRAME_SIZE]; + size_t len; }; struct { struct shim_data * shim_data; - +#ifdef __linux__ + int mtu; + int if_idx; +#endif #if defined(HAVE_NETMAP) struct nm_desc * nmd; uint8_t hw_addr[MAC_SIZE]; @@ -195,11 +233,10 @@ struct { #endif struct ef * fd_to_ef; fset_t * np1_flows; - fqueue_t * fq; pthread_rwlock_t flows_lock; - pthread_t sdu_writer; - pthread_t sdu_reader; + pthread_t packet_writer[IPCP_ETH_WR_THR]; + pthread_t packet_reader[IPCP_ETH_RD_THR]; #ifdef __linux__ pthread_t if_monitor; @@ -240,10 +277,6 @@ static int eth_data_init(void) if (eth_data.np1_flows == NULL) goto fail_np1_flows; - eth_data.fq = fqueue_create(); - if (eth_data.fq == NULL) - goto fail_fq; - for (i = 0; i < SYS_MAX_FLOWS; ++i) { #if defined(BUILD_ETH_DIX) eth_data.fd_to_ef[i].r_eid = -1; @@ -291,8 +324,6 @@ static int eth_data_init(void) fail_flows_lock: shim_data_destroy(eth_data.shim_data); fail_shim_data: - fqueue_destroy(eth_data.fq); - fail_fq: fset_destroy(eth_data.np1_flows); fail_np1_flows: #ifdef BUILD_ETH_LLC @@ -319,7 +350,6 @@ static void eth_data_fini(void) pthread_mutex_destroy(ð_data.mgmt_lock); pthread_rwlock_destroy(ð_data.flows_lock); shim_data_destroy(eth_data.shim_data); - fqueue_destroy(eth_data.fq); fset_destroy(eth_data.np1_flows); #ifdef BUILD_ETH_LLC bmp_destroy(eth_data.saps); @@ -355,10 +385,15 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, uint8_t cf = 0x03; #endif struct eth_frame * e_frame; +#ifdef HAVE_RAW_SOCKETS + fd_set fds; + + FD_ZERO(&fds); +#endif assert(frame); - if (len > ETH_MAX_SDU_SIZE) + if (len > (size_t) ETH_MAX_PACKET_SIZE) return -1; e_frame = (struct eth_frame *) frame; @@ -399,13 +434,20 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, } #elif defined(HAVE_RAW_SOCKETS) + FD_SET(eth_data.s_fd, &fds); + if (select(eth_data.s_fd + 1, NULL, &fds, NULL, NULL) < 0) { + log_dbg("Select() failed: %s.", strerror(errno)); + return -1; + } + assert(FD_ISSET(eth_data.s_fd, &fds)); + if (sendto(eth_data.s_fd, frame, frame_len, 0, (struct sockaddr *) ð_data.device, sizeof(eth_data.device)) <= 0) { - log_dbg("Failed to send message."); + log_dbg("Failed to send message: %s.", strerror(errno)); return -1; } #endif /* HAVE_NETMAP */ @@ -413,14 +455,15 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, return 0; } -static int eth_ipcp_alloc(const uint8_t * dst_addr, +static int eth_ipcp_alloc(const uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t eid, + uint16_t eid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, + uint8_t ssap, #endif - const uint8_t * hash, - qoscube_t cube) + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; @@ -429,7 +472,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + ETH_HEADER_TOT_SIZE); + buf = malloc(len + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -440,9 +483,20 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, #elif defined(BUILD_ETH_LLC) msg->ssap = ssap; #endif - msg->qoscube = cube; + + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); + msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, hash, ipcp_dir_hash_len()); + if (data->len > 0) + memcpy(buf + len + ETH_HEADER_TOT_SIZE, data->data, data->len); ret = eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -451,26 +505,27 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, len); + buf, len + data->len); free(buf); return ret; } -static int eth_ipcp_alloc_resp(uint8_t * dst_addr, +static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - uint8_t dsap, + uint8_t ssap, + uint8_t dsap, #endif - int response) + int response, + const buffer_t * data) { struct mgmt_msg * msg; uint8_t * buf; - buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE); + buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -486,6 +541,9 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #endif msg->response = response; + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); + if (eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) MGMT_EID, @@ -493,7 +551,7 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, sizeof(*msg))) { + buf, sizeof(*msg) + data->len)) { free(buf); return -1; } @@ -503,40 +561,20 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, return 0; } -static int eth_ipcp_req(uint8_t * r_addr, +static int eth_ipcp_req(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t r_eid, + uint16_t r_eid, #elif defined(BUILD_ETH_LLC) - uint8_t r_sap, + uint8_t r_sap, #endif - const uint8_t * dst, - qoscube_t cube) + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEO * MILLION}; - struct timespec abstime; - int fd; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + int fd; - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(getpid(), dst, ipcp_dir_hash_len(), cube); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data); if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); return -1; } @@ -551,32 +589,28 @@ static int eth_ipcp_req(uint8_t * r_addr, pthread_rwlock_unlock(ð_data.flows_lock); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - #if defined(BUILD_ETH_DIX) log_dbg("New flow request, fd %d, remote endpoint %d.", fd, r_eid); #elif defined(BUILD_ETH_LLC) log_dbg("New flow request, fd %d, remote SAP %d.", fd, r_sap); #endif - return 0; } -static int eth_ipcp_alloc_reply(uint8_t * r_addr, +static int eth_ipcp_alloc_reply(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - int dsap, + uint8_t ssap, + int dsap, #endif - int response) + int response, + const buffer_t * data) { - int ret = 0; - int fd = -1; + int ret = 0; + int fd = -1; + time_t mpl = IPCP_ETH_MPL; pthread_rwlock_wrlock(ð_data.flows_lock); @@ -611,11 +645,12 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, #elif defined(BUILD_ETH_LLC) log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); #endif - if ((ret = ipcp_flow_alloc_reply(fd, response)) < 0) + if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) { + log_err("Failed to reply to flow allocation."); return -1; + } return ret; - } static int eth_ipcp_name_query_req(const uint8_t * hash, @@ -671,14 +706,35 @@ static int eth_ipcp_name_query_reply(const uint8_t * hash, } static int eth_ipcp_mgmt_frame(const uint8_t * buf, + size_t len, uint8_t * r_addr) { struct mgmt_msg * msg; + size_t msg_len; + qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; switch (msg->code) { case FLOW_REQ: + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + + assert(len >= msg_len); + + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); + + data.data = (uint8_t *) buf + msg_len; + data.len = len - msg_len; + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -688,10 +744,16 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->ssap, #endif buf + sizeof(*msg), - msg->qoscube); + qs, + &data); } break; case FLOW_REPLY: + assert(len >= sizeof(*msg)); + + data.data = (uint8_t *) buf + sizeof(*msg); + data.len = len - sizeof(*msg); + eth_ipcp_alloc_reply(r_addr, #if defined(BUILD_ETH_DIX) ntohs(msg->seid), @@ -700,7 +762,8 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->ssap, msg->dsap, #endif - msg->response); + msg->response, + &data); break; case NAME_QUERY_REQ: eth_ipcp_name_query_req(buf + sizeof(*msg), r_addr); @@ -718,19 +781,15 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, static void * eth_ipcp_mgmt_handler(void * o) { - int ret; - struct timespec timeout = {(MGMT_TIMEO / 1000), - (MGMT_TIMEO % 1000) * MILLION}; - struct timespec abstime; - struct mgmt_frame * frame; - (void) o; - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - (void *) ð_data.mgmt_lock); + pthread_cleanup_push(__cleanup_mutex_unlock, ð_data.mgmt_lock); while (true) { - ret = 0; + int ret = 0; + struct timespec timeout = TIMESPEC_INIT_MS(MGMT_TIMEO); + struct timespec abstime; + struct mgmt_frame * frame = NULL; clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, &timeout, &abstime); @@ -742,23 +801,19 @@ static void * eth_ipcp_mgmt_handler(void * o) ret = -pthread_cond_timedwait(ð_data.mgmt_cond, ð_data.mgmt_lock, &abstime); + if (ret != -ETIMEDOUT) + frame = list_first_entry((ð_data.mgmt_frames), + struct mgmt_frame, next); + if (frame != NULL) + list_del(&frame->next); - if (ret == -ETIMEDOUT) { - pthread_mutex_unlock(ð_data.mgmt_lock); - continue; - } + pthread_mutex_unlock(ð_data.mgmt_lock); - frame = list_first_entry((ð_data.mgmt_frames), - struct mgmt_frame, next); - if (frame == NULL) { - pthread_mutex_unlock(ð_data.mgmt_lock); + if (frame == NULL) continue; - } - list_del(&frame->next); - pthread_mutex_unlock(ð_data.mgmt_lock); + eth_ipcp_mgmt_frame(frame->buf, frame->len, frame->r_addr); - eth_ipcp_mgmt_frame(frame->buf, frame->r_addr); free(frame); } @@ -767,7 +822,7 @@ static void * eth_ipcp_mgmt_handler(void * o) return (void *) 0; } -static void * eth_ipcp_sdu_reader(void * o) +static void * eth_ipcp_packet_reader(void * o) { uint8_t br_addr[MAC_SIZE]; #if defined(BUILD_ETH_DIX) @@ -804,7 +859,7 @@ static void * eth_ipcp_sdu_reader(void * o) buf = nm_nextpkt(eth_data.nmd, &hdr); if (buf == NULL) { - log_err("Bad read from netmap device."); + log_dbg("Bad read from netmap device."); continue; } #else @@ -823,12 +878,19 @@ static void * eth_ipcp_sdu_reader(void * o) if (select(eth_data.s_fd + 1, &fds, NULL, NULL, NULL) < 0) continue; assert(FD_ISSET(eth_data.s_fd, &fds)); - if (ipcp_sdb_reserve(&sdb, ETH_FRAME_SIZE)) + if (ipcp_sdb_reserve(&sdb, ETH_MTU)) continue; - buf = shm_du_buff_head(sdb); - frame_len = recv(eth_data.s_fd, buf, ETH_FRAME_SIZE, 0); + buf = shm_du_buff_head_alloc(sdb, ETH_HEADER_TOT_SIZE); + if (buf == NULL) { + log_dbg("Failed to allocate header."); + ipcp_sdb_release(sdb); + continue; + } + frame_len = recv(eth_data.s_fd, buf, + ETH_MTU + ETH_HEADER_TOT_SIZE, 0); #endif if (frame_len <= 0) { + log_dbg("Failed to receive frame."); ipcp_sdb_release(sdb); continue; } @@ -855,20 +917,14 @@ static void * eth_ipcp_sdu_reader(void * o) #endif length = ntohs(e_frame->length); #if defined(BUILD_ETH_DIX) - if (e_frame->ethertype != eth_data.ethertype) { -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; - } + if (e_frame->ethertype != eth_data.ethertype) + goto fail_frame; deid = ntohs(e_frame->eid); if (deid == MGMT_EID) { #elif defined (BUILD_ETH_LLC) - if (length > 0x05FF) {/* DIX */ - ipcp_sdb_release(sdb); - continue; - } + if (length > 0x05FF) /* DIX */ + goto fail_frame; length -= LLC_HEADER_SIZE; @@ -877,26 +933,22 @@ static void * eth_ipcp_sdu_reader(void * o) if (ssap == MGMT_SAP && dsap == MGMT_SAP) { #endif - pthread_mutex_lock(ð_data.mgmt_lock); + ipcp_sdb_release(sdb); /* No need for the N+1 buffer. */ frame = malloc(sizeof(*frame)); if (frame == NULL) { - pthread_mutex_unlock(ð_data.mgmt_lock); -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; + log_err("Failed to allocate frame."); + goto fail_frame; } memcpy(frame->buf, &e_frame->payload, length); memcpy(frame->r_addr, e_frame->src_hwaddr, MAC_SIZE); + frame->len = length; + + pthread_mutex_lock(ð_data.mgmt_lock); list_add(&frame->next, ð_data.mgmt_frames); pthread_cond_signal(ð_data.mgmt_cond); pthread_mutex_unlock(ð_data.mgmt_lock); - -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif } else { pthread_rwlock_rdlock(ð_data.flows_lock); @@ -907,10 +959,7 @@ static void * eth_ipcp_sdu_reader(void * o) #endif if (fd < 0) { pthread_rwlock_unlock(ð_data.flows_lock); -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; + goto fail_frame; } #ifdef BUILD_ETH_LLC @@ -918,10 +967,7 @@ static void * eth_ipcp_sdu_reader(void * o) || memcmp(eth_data.fd_to_ef[fd].r_addr, e_frame->src_hwaddr, MAC_SIZE)) { pthread_rwlock_unlock(ð_data.flows_lock); -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; + goto fail_frame; } #endif pthread_rwlock_unlock(ð_data.flows_lock); @@ -929,9 +975,20 @@ static void * eth_ipcp_sdu_reader(void * o) #ifndef HAVE_NETMAP shm_du_buff_head_release(sdb, ETH_HEADER_TOT_SIZE); shm_du_buff_truncate(sdb, length); - ipcp_flow_write(fd, sdb); #else - flow_write(fd, &e_frame->payload, length); + if (ipcp_sdb_reserve(&sdb, length)) + continue; + + buf = shm_du_buff_head(sdb); + memcpy(buf, &e_frame->payload, length); +#endif + if (np1_flow_write(fd, sdb) < 0) + ipcp_sdb_release(sdb); + + continue; + fail_frame: +#ifndef HAVE_NETMAP + ipcp_sdb_release(sdb); #endif } } @@ -939,7 +996,12 @@ static void * eth_ipcp_sdu_reader(void * o) return (void *) 0; } -static void * eth_ipcp_sdu_writer(void * o) +static void cleanup_writer(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * eth_ipcp_packet_writer(void * o) { int fd; struct shm_du_buff * sdb; @@ -952,27 +1014,39 @@ static void * eth_ipcp_sdu_writer(void * o) #endif uint8_t r_addr[MAC_SIZE]; + fqueue_t * fq; + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + (void) o; ipcp_lock_to_core(); + pthread_cleanup_push(cleanup_writer, fq); + while (true) { - fevent(eth_data.np1_flows, eth_data.fq, NULL); + fevent(eth_data.np1_flows, fq, NULL); + while ((fd = fqueue_next(fq)) >= 0) { + if (fqueue_type(fq) != FLOW_PKT) + continue; - pthread_rwlock_rdlock(ð_data.flows_lock); - while ((fd = fqueue_next(eth_data.fq)) >= 0) { - if (ipcp_flow_read(fd, &sdb)) { - log_err("Bad read from fd %d.", fd); + if (np1_flow_read(fd, &sdb)) { + log_dbg("Bad read from fd %d.", fd); continue; } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); if (shm_du_buff_head_alloc(sdb, ETH_HEADER_TOT_SIZE) == NULL) { - log_err("Failed to allocate header."); + log_dbg("Failed to allocate header."); ipcp_sdb_release(sdb); + continue; } + + pthread_rwlock_rdlock(ð_data.flows_lock); #if defined(BUILD_ETH_DIX) deid = eth_data.fd_to_ef[fd].r_eid; #elif defined(BUILD_ETH_LLC) @@ -983,19 +1057,23 @@ static void * eth_ipcp_sdu_writer(void * o) eth_data.fd_to_ef[fd].r_addr, MAC_SIZE); - eth_ipcp_send_frame(r_addr, + pthread_rwlock_unlock(ð_data.flows_lock); + + if (eth_ipcp_send_frame(r_addr, #if defined(BUILD_ETH_DIX) deid, #elif defined(BUILD_ETH_LLC) dsap, ssap, #endif shm_du_buff_head(sdb), - len); + len)) + log_dbg("Failed to send frame."); ipcp_sdb_release(sdb); } - pthread_rwlock_unlock(ð_data.flows_lock); } + pthread_cleanup_pop(true); + return (void *) 1; } @@ -1054,12 +1132,6 @@ static void change_flows_state(bool up) pthread_rwlock_unlock(ð_data.flows_lock); } -static void close_ptr(void * o) -{ - close(*((int *) o)); -} - - static void * eth_ipcp_if_monitor(void * o) { int fd; @@ -1080,7 +1152,7 @@ static void * eth_ipcp_if_monitor(void * o) return (void *) -1; } - pthread_cleanup_push(close_ptr, &fd); + pthread_cleanup_push(__cleanup_close_ptr, &fd); while (true) { status = recvmsg(fd, &msg, 0); @@ -1106,7 +1178,7 @@ static void * eth_ipcp_if_monitor(void * o) ifi = NLMSG_DATA(h); /* Not our interface */ - if (ifi->ifi_index != eth_data.device.sll_ifindex) + if (ifi->ifi_index != eth_data.if_idx) continue; if (ifi->ifi_flags & IFF_UP) { @@ -1163,23 +1235,35 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #elif defined(__linux__) int skfd; #endif +#ifndef SHM_RDRB_MULTI_BLOCK + size_t maxsz; +#endif +#if defined(HAVE_RAW_SOCKETS) + #if defined(IPCP_ETH_QDISC_BYPASS) + int qdisc_bypass = 1; + #endif /* ENABLE_QDISC_BYPASS */ + int flags; +#endif assert(conf); assert(conf->type == THIS_TYPE); - if (conf->dev == NULL) { - log_err("Device name is NULL."); + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; + strcpy(ipcpi.layer_name, conf->layer_info.name); + + if (strlen(conf->eth.dev) >= IFNAMSIZ) { + log_err("Invalid device name: %s.", conf->eth.dev); return -1; } memset(&ifr, 0, sizeof(ifr)); - memcpy(ifr.ifr_name, conf->dev, strlen(conf->dev)); + strcpy(ifr.ifr_name, conf->eth.dev); #ifdef BUILD_ETH_DIX - if (conf->ethertype < 0x0600 || conf->ethertype == 0xFFFF) { - log_err("Invalid Ethertype."); + if (conf->eth.ethertype < 0x0600 || conf->eth.ethertype == 0xFFFF) { + log_err("Invalid Ethertype: %d.", conf->eth.ethertype); return -1; } - eth_data.ethertype = htons(conf->ethertype); + eth_data.ethertype = htons(conf->eth.ethertype); #endif #if defined(__FreeBSD__) || defined(__APPLE__) @@ -1189,9 +1273,9 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) { - if (strcmp(ifa->ifa_name, conf->dev)) + if (strcmp(ifa->ifa_name, conf->eth.dev)) continue; - log_dbg("Interface %s found.", conf->dev); + log_dbg("Interface %s found.", conf->eth.dev); #if defined(HAVE_NETMAP) || defined(HAVE_BPF) memcpy(eth_data.hw_addr, @@ -1217,25 +1301,51 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) return -1; } + if (ioctl(skfd, SIOCGIFMTU, &ifr)) { + log_err("Failed to get MTU."); + close(skfd); + return -1; + } + + log_dbg("Device MTU is %d.", ifr.ifr_mtu); + + eth_data.mtu = MIN((int) ETH_MTU_MAX, ifr.ifr_mtu); + if (memcmp(conf->eth.dev, "lo", 2) == 0 && + eth_data.mtu > IPCP_ETH_LO_MTU) { + log_dbg("Using loopback interface. MTU restricted to %d.", + IPCP_ETH_LO_MTU); + eth_data.mtu = IPCP_ETH_LO_MTU; + } + +#ifndef SHM_RDRB_MULTI_BLOCK + maxsz = SHM_RDRB_BLOCK_SIZE - 5 * sizeof(size_t) - + (DU_BUFF_HEADSPACE + DU_BUFF_TAILSPACE); + if ((size_t) eth_data.mtu > maxsz ) { + log_dbg("Layer MTU truncated to shm block size."); + eth_data.mtu = maxsz; + } +#endif + log_dbg("Layer MTU is %d.", eth_data.mtu); + if (ioctl(skfd, SIOCGIFHWADDR, &ifr)) { - log_err("Failed to ioctl."); + log_err("Failed to get hwaddr."); close(skfd); return -1; } close(skfd); - idx = if_nametoindex(conf->dev); + idx = if_nametoindex(conf->eth.dev); if (idx == 0) { log_err("Failed to retrieve interface index."); - close(skfd); return -1; } + eth_data.if_idx = idx; #endif /* __FreeBSD__ */ #if defined(HAVE_NETMAP) strcpy(ifn, "netmap:"); - strcat(ifn, conf->dev); + strcat(ifn, conf->eth.dev); eth_data.nmd = nm_open(ifn, NULL, 0, NULL); if (eth_data.nmd == NULL) { @@ -1305,66 +1415,87 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) if (eth_data.s_fd < 0) { log_err("Failed to create socket."); - return -1; + goto fail_socket; } - if (bind(eth_data.s_fd, (struct sockaddr *) ð_data.device, - sizeof(eth_data.device))) { - log_err("Failed to bind socket to interface"); + flags = fcntl(eth_data.s_fd, F_GETFL, 0); + if (flags < 0) { + log_err("Failed to get flags."); goto fail_device; } -#endif /* HAVE_NETMAP */ - ipcp_set_state(IPCP_OPERATIONAL); + if (fcntl(eth_data.s_fd, F_SETFL, flags | O_NONBLOCK)) { + log_err("Failed to set socket non-blocking."); + goto fail_device; + } -#ifdef __linux__ - if (pthread_create(ð_data.if_monitor, - NULL, - eth_ipcp_if_monitor, - NULL)) { - ipcp_set_state(IPCP_INIT); + #if defined(IPCP_ETH_QDISC_BYPASS) + if (setsockopt(eth_data.s_fd, SOL_PACKET, PACKET_QDISC_BYPASS, + &qdisc_bypass, sizeof(qdisc_bypass))) { + log_info("Qdisc bypass not supported."); + } + #endif + + if (bind(eth_data.s_fd, (struct sockaddr *) ð_data.device, + sizeof(eth_data.device)) < 0) { + log_err("Failed to bind socket to interface."); + goto fail_device; + } +#endif /* HAVE_NETMAP */ +#if defined(__linux__) + if (pthread_create(ð_data.if_monitor, NULL, + eth_ipcp_if_monitor, NULL)) { + log_err("Failed to create monitor thread: %s.", + strerror(errno)); goto fail_device; } #endif - if (pthread_create(ð_data.mgmt_handler, - NULL, - eth_ipcp_mgmt_handler, - NULL)) { - ipcp_set_state(IPCP_INIT); + if (pthread_create(ð_data.mgmt_handler, NULL, + eth_ipcp_mgmt_handler, NULL)) { + log_err("Failed to create mgmt handler thread: %s.", + strerror(errno)); goto fail_mgmt_handler; } - if (pthread_create(ð_data.sdu_reader, - NULL, - eth_ipcp_sdu_reader, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_reader; + for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { + if (pthread_create(ð_data.packet_reader[idx], NULL, + eth_ipcp_packet_reader, NULL)) { + log_err("Failed to create packet reader thread: %s", + strerror(errno)); + goto fail_packet_reader; + } } - if (pthread_create(ð_data.sdu_writer, - NULL, - eth_ipcp_sdu_writer, - NULL)) { - ipcp_set_state(IPCP_INIT); - goto fail_sdu_writer; + for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { + if (pthread_create(ð_data.packet_writer[idx], NULL, + eth_ipcp_packet_writer, NULL)) { + log_err("Failed to create packet writer thread: %s", + strerror(errno)); + goto fail_packet_writer; + } } #if defined(BUILD_ETH_DIX) log_dbg("Bootstrapped IPCP over DIX Ethernet with pid %d " - "and Ethertype 0x%X.", getpid(), conf->ethertype); + "and Ethertype 0x%X.", getpid(), conf->eth.ethertype); #elif defined(BUILD_ETH_LLC) log_dbg("Bootstrapped IPCP over Ethernet with LLC with pid %d.", getpid()); #endif - return 0; - fail_sdu_writer: - pthread_cancel(eth_data.sdu_reader); - pthread_join(eth_data.sdu_reader, NULL); - fail_sdu_reader: + fail_packet_writer: + while (idx > 0) { + pthread_cancel(eth_data.packet_writer[--idx]); + pthread_join(eth_data.packet_writer[idx], NULL); + } + idx = IPCP_ETH_RD_THR; + fail_packet_reader: + while (idx > 0) { + pthread_cancel(eth_data.packet_reader[--idx]); + pthread_join(eth_data.packet_reader[idx], NULL); + } pthread_cancel(eth_data.mgmt_handler); pthread_join(eth_data.mgmt_handler, NULL); fail_mgmt_handler: @@ -1372,7 +1503,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) pthread_cancel(eth_data.if_monitor); pthread_join(eth_data.if_monitor, NULL); #endif -#if !defined(HAVE_NETMAP) +#if defined(__linux__) || !defined(HAVE_NETMAP) fail_device: #endif #if defined(HAVE_NETMAP) @@ -1381,6 +1512,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) close(eth_data.bpf); #elif defined(HAVE_RAW_SOCKETS) close(eth_data.s_fd); + fail_socket: #endif return -1; } @@ -1388,13 +1520,11 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) static int eth_ipcp_reg(const uint8_t * hash) { if (shim_data_reg_add_entry(eth_data.shim_data, hash)) { - log_err("Failed to add " HASH_FMT " to local registry.", - HASH_VAL(hash)); + log_err("Failed to add " HASH_FMT32 " to local registry.", + HASH_VAL32(hash)); return -1; } - log_dbg("Registered " HASH_FMT ".", HASH_VAL(hash)); - return 0; } @@ -1408,8 +1538,7 @@ static int eth_ipcp_unreg(const uint8_t * hash) static int eth_ipcp_query(const uint8_t * hash) { uint8_t r_addr[MAC_SIZE]; - struct timespec timeout = {(NAME_QUERY_TIMEO / 1000), - (NAME_QUERY_TIMEO % 1000) * MILLION}; + struct timespec timeout = TIMESPEC_INIT_MS(NAME_QUERY_TIMEO); struct dir_query * query; int ret; uint8_t * buf; @@ -1461,9 +1590,10 @@ static int eth_ipcp_query(const uint8_t * hash) return ret; } -static int eth_ipcp_flow_alloc(int fd, - const uint8_t * hash, - qoscube_t cube) +static int eth_ipcp_flow_alloc(int fd, + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1471,17 +1601,11 @@ static int eth_ipcp_flow_alloc(int fd, uint8_t r_addr[MAC_SIZE]; uint64_t addr = 0; - log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(hash)); - assert(hash); - if (cube != QOS_CUBE_BE) { - log_dbg("Unsupported QoS requested."); - return -1; - } - if (!shim_data_dir_has(eth_data.shim_data, hash)) { - log_err("Destination unreachable."); + log_err("Destination "HASH_FMT32 "unreachable.", + HASH_VAL32(hash)); return -1; } addr = shim_data_dir_get_addr(eth_data.shim_data, hash); @@ -1491,6 +1615,7 @@ static int eth_ipcp_flow_alloc(int fd, ssap = bmp_allocate(eth_data.saps); if (!bmp_is_id_valid(eth_data.saps, ssap)) { pthread_rwlock_unlock(ð_data.flows_lock); + log_err("Failed to allocate SSAP."); return -1; } @@ -1507,31 +1632,31 @@ static int eth_ipcp_flow_alloc(int fd, #elif defined(BUILD_ETH_LLC) ssap, #endif - hash, cube) < 0) { + hash, + qs, + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); eth_data.fd_to_ef[fd].sap = -1; eth_data.ef_to_fd[ssap] = -1; pthread_rwlock_unlock(ð_data.flows_lock); + log_err("Failed to allocate with peer."); #endif return -1; } fset_add(eth_data.np1_flows, fd); -#if defined(BUILD_ETH_DIX) - log_dbg("Pending flow with fd %d.", fd); -#elif defined(BUILD_ETH_LLC) - log_dbg("Pending flow with fd %d on SAP %d.", fd, ssap); +#if defined(BUILD_ETH_LLC) + log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif return 0; } -static int eth_ipcp_flow_alloc_resp(int fd, - int response) +static int eth_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEO * MILLION}; - struct timespec abstime; #if defined(BUILD_ETH_DIX) uint16_t r_eid; #elif defined(BUILD_ETH_LLC) @@ -1540,27 +1665,11 @@ static int eth_ipcp_flow_alloc_resp(int fd, #endif uint8_t r_addr[MAC_SIZE]; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); return -1; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - pthread_rwlock_wrlock(ð_data.flows_lock); #if defined(BUILD_ETH_DIX) r_eid = eth_data.fd_to_ef[fd].r_eid; @@ -1568,6 +1677,7 @@ static int eth_ipcp_flow_alloc_resp(int fd, ssap = bmp_allocate(eth_data.saps); if (!bmp_is_id_valid(eth_data.saps, ssap)) { pthread_rwlock_unlock(ð_data.flows_lock); + log_err("Failed to allocate SSAP."); return -1; } @@ -1585,20 +1695,20 @@ static int eth_ipcp_flow_alloc_resp(int fd, #elif defined(BUILD_ETH_LLC) ssap, r_sap, #endif - response) < 0) { + response, + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); pthread_rwlock_unlock(ð_data.flows_lock); #endif + log_err("Failed to respond to peer."); return -1; } fset_add(eth_data.np1_flows, fd); -#if defined(BUILD_ETH_DIX) - log_dbg("Accepted flow, fd %d.", fd); -#elif defined(BUILD_ETH_LLC) - log_dbg("Accepted flow, fd %d, SAP %d.", fd, (uint8_t)ssap); +#if defined(BUILD_ETH_LLC) + log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif return 0; } @@ -1610,10 +1720,10 @@ static int eth_ipcp_flow_dealloc(int fd) #endif ipcp_flow_fini(fd); - pthread_rwlock_wrlock(ð_data.flows_lock); - fset_del(eth_data.np1_flows, fd); + pthread_rwlock_wrlock(ð_data.flows_lock); + #if defined(BUILD_ETH_DIX) eth_data.fd_to_ef[fd].r_eid = -1; #elif defined BUILD_ETH_LLC @@ -1627,9 +1737,7 @@ static int eth_ipcp_flow_dealloc(int fd) pthread_rwlock_unlock(ð_data.flows_lock); - flow_dealloc(fd); - - log_dbg("Flow with fd %d deallocated.", fd); + ipcp_flow_dealloc(fd); return 0; } @@ -1643,6 +1751,7 @@ static struct ipcp_ops eth_ops = { .ipcp_unreg = eth_ipcp_unreg, .ipcp_query = eth_ipcp_query, .ipcp_flow_alloc = eth_ipcp_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = eth_ipcp_flow_alloc_resp, .ipcp_flow_dealloc = eth_ipcp_flow_dealloc }; @@ -1650,8 +1759,7 @@ static struct ipcp_ops eth_ops = { int main(int argc, char * argv[]) { - if (ipcp_init(argc, argv, ð_ops) < 0) - goto fail_init; + int i; if (eth_data_init() < 0) { #if defined(BUILD_ETH_DIX) @@ -1662,47 +1770,51 @@ int main(int argc, goto fail_data_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - goto fail_boot; + if (ipcp_init(argc, argv, ð_ops, THIS_TYPE) < 0) { + log_err("Failed to initialize IPCP."); + goto fail_init; } - if (ipcp_create_r(getpid(), 0)) { - log_err("Failed to notify IRMd we are initialized."); - ipcp_set_state(IPCP_NULL); - goto fail_create_r; + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - pthread_cancel(eth_data.sdu_writer); - pthread_cancel(eth_data.sdu_reader); + for (i = 0; i < IPCP_ETH_WR_THR; ++i) + pthread_cancel(eth_data.packet_writer[i]); + for (i = 0; i < IPCP_ETH_RD_THR; ++i) + pthread_cancel(eth_data.packet_reader[i]); + pthread_cancel(eth_data.mgmt_handler); #ifdef __linux__ pthread_cancel(eth_data.if_monitor); #endif - pthread_join(eth_data.sdu_writer, NULL); - pthread_join(eth_data.sdu_reader, NULL); + for (i = 0; i < IPCP_ETH_WR_THR; ++i) + pthread_join(eth_data.packet_writer[i], NULL); + for (i = 0; i < IPCP_ETH_RD_THR; ++i) + pthread_join(eth_data.packet_reader[i], NULL); + pthread_join(eth_data.mgmt_handler, NULL); #ifdef __linux__ pthread_join(eth_data.if_monitor, NULL); #endif } - eth_data_fini(); + ipcp_stop(); ipcp_fini(); + eth_data_fini(); + exit(EXIT_SUCCESS); - fail_create_r: - ipcp_shutdown(); - fail_boot: - eth_data_fini(); - fail_data_init: + fail_start: ipcp_fini(); fail_init: - ipcp_create_r(getpid(), -1); + eth_data_fini(); + fail_data_init: exit(EXIT_FAILURE); } |
