diff options
Diffstat (limited to 'src/ipcpd/unicast')
56 files changed, 11310 insertions, 0 deletions
diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt new file mode 100644 index 00000000..ca742871 --- /dev/null +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -0,0 +1,75 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR + ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR + ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +set(IPCP_UNICAST_TARGET ipcpd-unicast CACHE INTERNAL "") +set(IPCP_UNICAST_MPL 60 CACHE STRING + "Default maximum packet lifetime for the unicast IPCP, in seconds") + +protobuf_generate_c(DHT_PROTO_SRCS DHT_PROTO_HDRS dir/dht.proto) + +math(EXPR PFT_EXPR "1 << 12") +set(PFT_SIZE ${PFT_EXPR} CACHE STRING + "Size of the PDU forwarding table") +if (HAVE_FUSE) + set(IPCP_FLOW_STATS TRUE CACHE BOOL + "Enable flow statistics tracking in IPCP") + if (IPCP_FLOW_STATS) + message(STATUS "IPCP flow statistics enabled") + else () + message(STATUS "IPCP flow statistics disabled") + endif () +endif () + +set(SOURCE_FILES + # Add source files here + addr-auth.c + ca.c + connmgr.c + dir.c + dt.c + fa.c + main.c + pff.c + routing.c + psched.c + # Add policies last + addr-auth/flat.c + ca/mb-ecn.c + ca/nop.c + dir/dht.c + pff/simple.c + pff/alternate.c + pff/multipath.c + pff/pft.c + routing/link-state.c + routing/graph.c + ) + +add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} + ${DHT_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) +target_link_libraries(ipcpd-unicast LINK_PUBLIC ouroboros-dev) + +include(AddCompileFlags) +if (CMAKE_BUILD_TYPE MATCHES "Debug*") + add_compile_flags(ipcpd-unicast -DCONFIG_OUROBOROS_DEBUG) +endif () + +install(TARGETS ipcpd-unicast RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR}) + +add_subdirectory(pff/tests) +add_subdirectory(routing/tests) + +if (NOT GNU) + add_subdirectory(dir/tests) +endif () diff --git a/src/ipcpd/unicast/addr-auth.c b/src/ipcpd/unicast/addr-auth.c new file mode 100644 index 00000000..908a4aa1 --- /dev/null +++ b/src/ipcpd/unicast/addr-auth.c @@ -0,0 +1,57 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Address authority + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define OUROBOROS_PREFIX "addr_auth" + +#include <ouroboros/logs.h> + +#include "addr-auth.h" +#include "addr-auth/pol.h" + +#include <stdlib.h> + +struct addr_auth_ops * ops; + +int addr_auth_init(enum pol_addr_auth type, + const void * info) +{ + switch (type) { + case ADDR_AUTH_FLAT_RANDOM: + ops = &flat_ops; + break; + default: + log_err("Unknown address authority type."); + return -1; + } + + return ops->init(info); +} + +uint64_t addr_auth_address(void) +{ + return ops->address(); +} + +int addr_auth_fini(void) +{ + return ops->fini(); +} diff --git a/src/ipcpd/unicast/addr-auth.h b/src/ipcpd/unicast/addr-auth.h new file mode 100644 index 00000000..e119dff3 --- /dev/null +++ b/src/ipcpd/unicast/addr-auth.h @@ -0,0 +1,37 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Address authority + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_H +#define OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_H + +#include <ouroboros/ipcp.h> + +#include <stdint.h> + +int addr_auth_init(enum pol_addr_auth type, + const void * info); + +int addr_auth_fini(void); + +uint64_t addr_auth_address(void); + +#endif /* OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_H */ diff --git a/src/ipcpd/unicast/addr-auth/flat.c b/src/ipcpd/unicast/addr-auth/flat.c new file mode 100644 index 00000000..c4562935 --- /dev/null +++ b/src/ipcpd/unicast/addr-auth/flat.c @@ -0,0 +1,79 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Policy for flat addresses in a distributed way + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "flat-addr-auth" + +#include <ouroboros/logs.h> +#include <ouroboros/random.h> + +#include "ipcp.h" +#include "flat.h" + +#define NAME_LEN 8 + +struct { + uint8_t addr_size; +} flat; + +#define INVALID_ADDRESS 0 + +struct addr_auth_ops flat_ops = { + .init = flat_init, + .fini = flat_fini, + .address = flat_address +}; + +int flat_init(const void * info) +{ + flat.addr_size = *((uint8_t *) info); + + if (flat.addr_size != 4) { + log_err("Flat address policy mandates 4 byte addresses."); + return -1; + } + + return 0; +} + +int flat_fini(void) +{ + return 0; +} + +uint64_t flat_address(void) +{ + uint32_t addr = INVALID_ADDRESS; + +#if defined (CONFIG_OUROBOROS_DEBUG) && defined (IPCP_DEBUG_LOCAL) + addr = getpid(); +#else + while (addr == INVALID_ADDRESS) + random_buffer(&addr,sizeof(addr)); +#endif + return addr; +} diff --git a/src/ipcpd/unicast/addr-auth/flat.h b/src/ipcpd/unicast/addr-auth/flat.h new file mode 100644 index 00000000..d4b672c7 --- /dev/null +++ b/src/ipcpd/unicast/addr-auth/flat.h @@ -0,0 +1,36 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Policy for flat addresses in a distributed way + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_FLAT_H +#define OUROBOROS_IPCPD_UNICAST_FLAT_H + +#include "ops.h" + +int flat_init(const void * info); + +int flat_fini(void); + +uint64_t flat_address(void); + +extern struct addr_auth_ops flat_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_FLAT_H */ diff --git a/src/ipcpd/unicast/addr-auth/ops.h b/src/ipcpd/unicast/addr-auth/ops.h new file mode 100644 index 00000000..06b24cec --- /dev/null +++ b/src/ipcpd/unicast/addr-auth/ops.h @@ -0,0 +1,34 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Address authority policy ops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_OPS_H +#define OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_OPS_H + +struct addr_auth_ops { + int (* init)(const void * info); + + int (* fini)(void); + + uint64_t (* address)(void); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_OPS_H */ diff --git a/src/ipcpd/unicast/addr-auth/pol.h b/src/ipcpd/unicast/addr-auth/pol.h new file mode 100644 index 00000000..844308c6 --- /dev/null +++ b/src/ipcpd/unicast/addr-auth/pol.h @@ -0,0 +1,23 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Address Authority policies + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "flat.h" diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c new file mode 100644 index 00000000..287eaf41 --- /dev/null +++ b/src/ipcpd/unicast/ca.c @@ -0,0 +1,109 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Congestion Avoidance + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define OUROBOROS_PREFIX "ca" + +#include <ouroboros/logs.h> + +#include "ca.h" +#include "ca/pol.h" + +struct { + struct ca_ops * ops; +} ca; + +int ca_init(enum pol_cong_avoid pol) +{ + switch(pol) { + case CA_NONE: + log_dbg("Disabling congestion control."); + ca.ops = &nop_ca_ops; + break; + case CA_MB_ECN: + log_dbg("Using multi-bit ECN."); + ca.ops = &mb_ecn_ca_ops; + break; + default: + return -1; + } + + return 0; +} + + +void ca_fini(void) +{ + ca.ops = NULL; +} + +void * ca_ctx_create(void) +{ + return ca.ops->ctx_create(); +} + +void ca_ctx_destroy(void * ctx) +{ + return ca.ops->ctx_destroy(ctx); +} + +ca_wnd_t ca_ctx_update_snd(void * ctx, + size_t len) +{ + return ca.ops->ctx_update_snd(ctx, len); +} + +bool ca_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece) +{ + return ca.ops->ctx_update_rcv(ctx, len, ecn, ece); +} + +void ca_ctx_update_ece(void * ctx, + uint16_t ece) +{ + return ca.ops->ctx_update_ece(ctx, ece); +} + +void ca_wnd_wait(ca_wnd_t wnd) +{ + return ca.ops->wnd_wait(wnd); +} + +int ca_calc_ecn(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len) +{ + return ca.ops->calc_ecn(fd, ecn, qc, len); +} + +ssize_t ca_print_stats(void * ctx, + char * buf, + size_t len) +{ + if (ca.ops->print_stats == NULL) + return 0; + + return ca.ops->print_stats(ctx, buf, len); +} diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h new file mode 100644 index 00000000..ea803e17 --- /dev/null +++ b/src/ipcpd/unicast/ca.h @@ -0,0 +1,68 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Congestion avoidance + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_H +#define OUROBOROS_IPCPD_UNICAST_CA_H + +#include <ouroboros/ipcp.h> +#include <ouroboros/qoscube.h> + +#include <stdbool.h> +#include <sys/types.h> + +typedef union { + time_t wait; +} ca_wnd_t; + +int ca_init(enum pol_cong_avoid ca); + +void ca_fini(void); + + +/* OPS */ +void * ca_ctx_create(void); + +void ca_ctx_destroy(void * ctx); + +ca_wnd_t ca_ctx_update_snd(void * ctx, + size_t len); + +bool ca_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + +void ca_ctx_update_ece(void * ctx, + uint16_t ece); + +void ca_wnd_wait(ca_wnd_t wnd); + +int ca_calc_ecn(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len); + +ssize_t ca_print_stats(void * ctx, + char * buf, + size_t len); + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_H */ diff --git a/src/ipcpd/unicast/ca/mb-ecn.c b/src/ipcpd/unicast/ca/mb-ecn.c new file mode 100644 index 00000000..d9a204b0 --- /dev/null +++ b/src/ipcpd/unicast/ca/mb-ecn.c @@ -0,0 +1,296 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Multi-bit ECN Congestion Avoidance + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/time.h> + +#include "mb-ecn.h" + +#include <inttypes.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> + +/* congestion avoidance constants */ +#define CA_SHFT 5 /* Average over 32 pkts */ +#define CA_WND (1 << CA_SHFT) /* 32 pkts receiver wnd */ +#define CA_UPD (1 << (CA_SHFT - 2)) /* Update snd every 8 pkt */ +#define CA_SLOT 24 /* Initial slot = 16 ms */ +#define CA_INC 1UL << 16 /* ~4MiB/s^2 additive inc */ +#define CA_IWL 1UL << 16 /* Initial limit ~4MiB/s */ +#define CA_MINPS 8 /* Mimimum pkts / slot */ +#define CA_MAXPS 64 /* Maximum pkts / slot */ +#define ECN_Q_SHFT 4 +#define ts_to_ns(ts) ((size_t) ts.tv_sec * BILLION + ts.tv_nsec) + +struct mb_ecn_ctx { + uint16_t rx_ece; /* Level of congestion (upstream) */ + size_t rx_ctr; /* Receiver side packet counter */ + + uint16_t tx_ece; /* Level of congestion (downstream) */ + size_t tx_ctr; /* Sender side packet counter */ + size_t tx_wbc; /* Window byte count */ + size_t tx_wpc; /* Window packet count */ + size_t tx_wbl; /* Window byte limit */ + bool tx_cav; /* Congestion avoidance */ + size_t tx_mul; /* Slot size multiplier */ + size_t tx_inc; /* Additive increase */ + size_t tx_slot; +}; + +struct ca_ops mb_ecn_ca_ops = { + .ctx_create = mb_ecn_ctx_create, + .ctx_destroy = mb_ecn_ctx_destroy, + .ctx_update_snd = mb_ecn_ctx_update_snd, + .ctx_update_rcv = mb_ecn_ctx_update_rcv, + .ctx_update_ece = mb_ecn_ctx_update_ece, + .wnd_wait = mb_ecn_wnd_wait, + .calc_ecn = mb_ecn_calc_ecn, + .print_stats = mb_ecn_print_stats +}; + +void * mb_ecn_ctx_create(void) +{ + struct timespec now; + struct mb_ecn_ctx * ctx; + + ctx = malloc(sizeof(*ctx)); + if (ctx == NULL) + return NULL; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + memset(ctx, 0, sizeof(*ctx)); + + ctx->tx_mul = CA_SLOT; + ctx->tx_wbl = CA_IWL; + ctx->tx_inc = CA_INC; + ctx->tx_slot = ts_to_ns(now) >> ctx->tx_mul; + + return (void *) ctx; +} + +void mb_ecn_ctx_destroy(void * ctx) +{ + free(ctx); +} + +#define _slot_after(new, old) ((int64_t) (old - new) < 0) + +ca_wnd_t mb_ecn_ctx_update_snd(void * _ctx, + size_t len) +{ + struct timespec now; + size_t slot; + ca_wnd_t wnd; + struct mb_ecn_ctx * ctx = _ctx; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + slot = ts_to_ns(now) >> ctx->tx_mul; + + ctx->tx_ctr++; + ctx->tx_wpc++; + ctx->tx_wbc += len; + + if (ctx->tx_ctr > CA_WND) + ctx->tx_ece = 0; + + if (_slot_after(slot, ctx->tx_slot)) { + bool carry = false; /* may carry over if window increases */ + + ctx->tx_slot = slot; + + if (!ctx->tx_cav) { /* Slow start */ + if (ctx->tx_wbc > ctx->tx_wbl) + ctx->tx_wbl <<= 1; + } else { + if (ctx->tx_ece) /* Mult. Decrease */ + ctx->tx_wbl -= (ctx->tx_wbl * ctx->tx_ece) + >> (CA_SHFT + 8); + else /* Add. Increase */ + ctx->tx_wbl = ctx->tx_wbc + ctx->tx_inc; + } + + /* Window scaling */ + if (ctx->tx_wpc < CA_MINPS) { + size_t fact = 0; /* factor to scale the window up */ + size_t pkts = ctx->tx_wpc; + while (pkts < CA_MINPS) { + pkts <<= 1; + fact++; + } + ctx->tx_mul += fact; + ctx->tx_slot >>= fact; + if ((ctx->tx_slot & ((1 << fact) - 1)) == 0) { + carry = true; + ctx->tx_slot += 1; + } + ctx->tx_wbl <<= fact; + ctx->tx_inc <<= fact; + } else if (ctx->tx_wpc > CA_MAXPS) { + size_t fact = 0; /* factor to scale the window down */ + size_t pkts = ctx->tx_wpc; + while (pkts > CA_MAXPS) { + pkts >>= 1; + fact++; + } + ctx->tx_mul -= fact; + ctx->tx_slot <<= fact; + ctx->tx_wbl >>= fact; + ctx->tx_inc >>= fact; + } else { + ctx->tx_slot = slot; + } + + if (!carry) { + ctx->tx_wbc = 0; + ctx->tx_wpc = 0; + } + } + + if (ctx->tx_wbc > ctx->tx_wbl) + wnd.wait = ((ctx->tx_slot + 1) << ctx->tx_mul) - ts_to_ns(now); + else + wnd.wait = 0; + + return wnd; +} + +void mb_ecn_wnd_wait(ca_wnd_t wnd) +{ + if (wnd.wait > 0) { + struct timespec s = TIMESPEC_INIT_S(0); + if (wnd.wait > BILLION) /* Don't care throttling < 1s */ + s.tv_sec = 1; + else + s.tv_nsec = wnd.wait; + + nanosleep(&s, NULL); + } +} + +bool mb_ecn_ctx_update_rcv(void * _ctx, + size_t len, + uint8_t ecn, + uint16_t * ece) +{ + struct mb_ecn_ctx* ctx = _ctx; + bool update; + + (void) len; + + if ((ctx->rx_ece | ecn) == 0) + return false; + + if (ecn == 0) { /* End of congestion */ + ctx->rx_ece >>= 2; + update = ctx->rx_ece == 0; + } else { + if (ctx->rx_ece == 0) { /* Start of congestion */ + ctx->rx_ece = ecn; + ctx->rx_ctr = 0; + update = true; + } else { /* Congestion update */ + ctx->rx_ece -= ctx->rx_ece >> CA_SHFT; + ctx->rx_ece += ecn; + update = (ctx->rx_ctr++ & (CA_UPD - 1)) == true; + } + } + + *ece = ctx->rx_ece; + + return update; +} + + +void mb_ecn_ctx_update_ece(void * _ctx, + uint16_t ece) +{ + struct mb_ecn_ctx* ctx = _ctx; + + ctx->tx_ece = ece; + ctx->tx_ctr = 0; + ctx->tx_cav = true; +} + +int mb_ecn_calc_ecn(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len) +{ + size_t q; + + (void) len; + (void) qc; + + q = ipcp_flow_queued(fd); + + *ecn |= (uint8_t) (q >> ECN_Q_SHFT); + + return 0; +} + +ssize_t mb_ecn_print_stats(void * _ctx, + char * buf, + size_t len) +{ + struct mb_ecn_ctx* ctx = _ctx; + char * regime; + + if (len < 1024) + return 0; + + if (!ctx->tx_cav) + regime = "Slow start"; + else if (ctx->tx_ece) + regime = "Multiplicative dec"; + else + regime = "Additive inc"; + + sprintf(buf, + "Congestion avoidance algorithm: %20s\n" + "Upstream congestion level: %20u\n" + "Upstream packet counter: %20zu\n" + "Downstream congestion level: %20u\n" + "Downstream packet counter: %20zu\n" + "Congestion window size (ns): %20" PRIu64 "\n" + "Packets in this window: %20zu\n" + "Bytes in this window: %20zu\n" + "Max bytes in this window: %20zu\n" + "Current congestion regime: %20s\n", + "Multi-bit ECN", + ctx->tx_ece, ctx->tx_ctr, + ctx->rx_ece, ctx->rx_ctr, (uint64_t) (1ULL << ctx->tx_mul), + ctx->tx_wpc, ctx->tx_wbc, ctx->tx_wbl, + regime); + + return strlen(buf); +} diff --git a/src/ipcpd/unicast/ca/mb-ecn.h b/src/ipcpd/unicast/ca/mb-ecn.h new file mode 100644 index 00000000..9a2c8b49 --- /dev/null +++ b/src/ipcpd/unicast/ca/mb-ecn.h @@ -0,0 +1,56 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Multi-bit ECN Congestion Avoidance + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H +#define OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H + +#include "ops.h" + +void * mb_ecn_ctx_create(void); + +void mb_ecn_ctx_destroy(void * ctx); + +ca_wnd_t mb_ecn_ctx_update_snd(void * ctx, + size_t len); + +bool mb_ecn_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + +void mb_ecn_ctx_update_ece(void * ctx, + uint16_t ece); + +void mb_ecn_wnd_wait(ca_wnd_t wnd); + +int mb_ecn_calc_ecn(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len); + +ssize_t mb_ecn_print_stats(void * ctx, + char * buf, + size_t len); + +extern struct ca_ops mb_ecn_ca_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H */ diff --git a/src/ipcpd/unicast/ca/nop.c b/src/ipcpd/unicast/ca/nop.c new file mode 100644 index 00000000..617fc15b --- /dev/null +++ b/src/ipcpd/unicast/ca/nop.c @@ -0,0 +1,98 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Dummy Congestion Avoidance + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "nop.h" + +#include <string.h> + +struct ca_ops nop_ca_ops = { + .ctx_create = nop_ctx_create, + .ctx_destroy = nop_ctx_destroy, + .ctx_update_snd = nop_ctx_update_snd, + .ctx_update_rcv = nop_ctx_update_rcv, + .ctx_update_ece = nop_ctx_update_ece, + .wnd_wait = nop_wnd_wait, + .calc_ecn = nop_calc_ecn, + .print_stats = NULL +}; + +void * nop_ctx_create(void) +{ + return (void *) 1; +} + +void nop_ctx_destroy(void * ctx) +{ + (void) ctx; +} + +ca_wnd_t nop_ctx_update_snd(void * ctx, + size_t len) +{ + ca_wnd_t wnd; + + (void) ctx; + (void) len; + + memset(&wnd, 0, sizeof(wnd)); + + return wnd; +} + +void nop_wnd_wait(ca_wnd_t wnd) +{ + (void) wnd; +} + +bool nop_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece) +{ + (void) ctx; + (void) len; + (void) ecn; + (void) ece; + + return false; +} + +void nop_ctx_update_ece(void * ctx, + uint16_t ece) +{ + (void) ctx; + (void) ece; +} + + +int nop_calc_ecn(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len) +{ + (void) fd; + (void) len; + (void) ecn; + (void) qc; + + return 0; +} diff --git a/src/ipcpd/unicast/ca/nop.h b/src/ipcpd/unicast/ca/nop.h new file mode 100644 index 00000000..248b198d --- /dev/null +++ b/src/ipcpd/unicast/ca/nop.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Dummy Congestion Avoidance + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_NOP_H +#define OUROBOROS_IPCPD_UNICAST_CA_NOP_H + +#include "ops.h" + +void * nop_ctx_create(void); + +void nop_ctx_destroy(void * ctx); + +ca_wnd_t nop_ctx_update_snd(void * ctx, + size_t len); + +bool nop_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + +void nop_ctx_update_ece(void * ctx, + uint16_t ece); + +void nop_wnd_wait(ca_wnd_t wnd); + +int nop_calc_ecn(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len); + +extern struct ca_ops nop_ca_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_NOP_H */ diff --git a/src/ipcpd/unicast/ca/ops.h b/src/ipcpd/unicast/ca/ops.h new file mode 100644 index 00000000..3a7b7248 --- /dev/null +++ b/src/ipcpd/unicast/ca/ops.h @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Congestion avoidance policy ops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_OPS_H +#define OUROBOROS_IPCPD_UNICAST_CA_OPS_H + +#include "ca.h" + +struct ca_ops { + void * (* ctx_create)(void); + + void (* ctx_destroy)(void * ctx); + + ca_wnd_t (* ctx_update_snd)(void * ctx, + size_t len); + + bool (* ctx_update_rcv)(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + + void (* ctx_update_ece)(void * ctx, + uint16_t ece); + + void (* wnd_wait)(ca_wnd_t wnd); + + int (* calc_ecn)(int fd, + uint8_t * ecn, + qoscube_t qc, + size_t len); + + /* Optional, can be NULL */ + ssize_t (* print_stats)(void * ctx, + char * buf, + size_t len); + +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_OPS_H */ diff --git a/src/ipcpd/unicast/ca/pol.h b/src/ipcpd/unicast/ca/pol.h new file mode 100644 index 00000000..db0a1a11 --- /dev/null +++ b/src/ipcpd/unicast/ca/pol.h @@ -0,0 +1,24 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Congestion avoidance policies + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "mb-ecn.h" +#include "nop.h" diff --git a/src/ipcpd/unicast/connmgr.c b/src/ipcpd/unicast/connmgr.c new file mode 100644 index 00000000..11c5d5b6 --- /dev/null +++ b/src/ipcpd/unicast/connmgr.c @@ -0,0 +1,39 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Handles connections between components + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "config.h" + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include <ouroboros/ipcp.h> + +#define BUILD_IPCP_UNICAST + +#ifdef IPCP_CONN_WAIT_DIR + #include "dir.h" +#endif + +#include "common/connmgr.c" diff --git a/src/ipcpd/unicast/dir.c b/src/ipcpd/unicast/dir.c new file mode 100644 index 00000000..e0cb09fc --- /dev/null +++ b/src/ipcpd/unicast/dir.c @@ -0,0 +1,93 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Directory Management + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "directory" + +#include <ouroboros/endian.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/utils.h> + +#include "dir.h" +#include "dir/pol.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> +#include <limits.h> + +struct { + struct dir_ops * ops; + void * dir; +} dirmgr; + +int dir_init(void) +{ + dirmgr.ops = &dht_dir_ops; + + dirmgr.dir = dirmgr.ops->create(); + if (dirmgr.dir == NULL) { + dirmgr.ops = NULL; + return -ENOMEM; + } + + return 0; +} + +void dir_fini(void) +{ + dirmgr.ops->destroy(dirmgr.dir); + dirmgr.ops = NULL; + dirmgr.dir = NULL; +} + +int dir_bootstrap(void) +{ + return dirmgr.ops->bootstrap(dirmgr.dir); +} + +int dir_reg(const uint8_t * hash) +{ + return dirmgr.ops->reg(dirmgr.dir, hash); +} + +int dir_unreg(const uint8_t * hash) +{ + return dirmgr.ops->unreg(dirmgr.dir, hash); +} + +uint64_t dir_query(const uint8_t * hash) +{ + return dirmgr.ops->query(dirmgr.dir, hash); +} + +int dir_wait_running(void) +{ + return dirmgr.ops->wait_running(dirmgr.dir); +} diff --git a/src/ipcpd/unicast/dir.h b/src/ipcpd/unicast/dir.h new file mode 100644 index 00000000..b261ea2c --- /dev/null +++ b/src/ipcpd/unicast/dir.h @@ -0,0 +1,42 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Directory + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DIR_H +#define OUROBOROS_IPCPD_UNICAST_DIR_H + +#include <inttypes.h> + +int dir_init(void); + +void dir_fini(void); + +int dir_bootstrap(void); + +int dir_reg(const uint8_t * hash); + +int dir_unreg(const uint8_t * hash); + +uint64_t dir_query(const uint8_t * hash); + +int dir_wait_running(void); + +#endif /* OUROBOROS_IPCPD_UNICAST_DIR_H */ diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c new file mode 100644 index 00000000..08a5a5a9 --- /dev/null +++ b/src/ipcpd/unicast/dir/dht.c @@ -0,0 +1,2883 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Distributed Hash Table based on Kademlia + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define DHT "dht" +#define OUROBOROS_PREFIX DHT + +#include <ouroboros/endian.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/notifier.h> +#include <ouroboros/random.h> +#include <ouroboros/time.h> +#include <ouroboros/tpm.h> +#include <ouroboros/utils.h> +#include <ouroboros/pthread.h> + +#include "common/connmgr.h" +#include "dht.h" +#include "dt.h" +#include "ipcp.h" +#include "ops.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> +#include <limits.h> + +#include "dht.pb-c.h" +typedef DhtMsg dht_msg_t; +typedef DhtContactMsg dht_contact_msg_t; + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ +#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1 /* Number of addresses to return on retrieve */ + +enum dht_state { + DHT_INIT = 0, + DHT_SHUTDOWN, + DHT_JOINING, + DHT_RUNNING, +}; + +enum kad_code { + KAD_JOIN = 0, + KAD_FIND_NODE, + KAD_FIND_VALUE, + /* Messages without a response below. */ + KAD_STORE, + KAD_RESPONSE +}; + +enum kad_req_state { + REQ_NULL = 0, + REQ_INIT, + REQ_PENDING, + REQ_RESPONSE, + REQ_DONE, + REQ_DESTROY +}; + +enum lookup_state { + LU_NULL = 0, + LU_INIT, + LU_PENDING, + LU_UPDATE, + LU_COMPLETE, + LU_DESTROY +}; + +struct kad_req { + struct list_head next; + + uint32_t cookie; + enum kad_code code; + uint8_t * key; + uint64_t addr; + + enum kad_req_state state; + pthread_cond_t cond; + pthread_mutex_t lock; + + time_t t_exp; +}; + +struct cookie_el { + struct list_head next; + + uint32_t cookie; +}; + +struct lookup { + struct list_head next; + + struct list_head cookies; + + uint8_t * key; + + struct list_head contacts; + size_t n_contacts; + + uint64_t * addrs; + size_t n_addrs; + + enum lookup_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct val { + struct list_head next; + + uint64_t addr; + + time_t t_exp; + time_t t_rep; +}; + +struct ref_entry { + struct list_head next; + + uint8_t * key; + + time_t t_rep; +}; + +struct dht_entry { + struct list_head next; + + uint8_t * key; + size_t n_vals; + struct list_head vals; +}; + +struct contact { + struct list_head next; + + uint8_t * id; + uint64_t addr; + + size_t fails; + time_t t_seen; +}; + +struct bucket { + struct list_head contacts; + size_t n_contacts; + + struct list_head alts; + size_t n_alts; + + time_t t_refr; + + size_t depth; + uint8_t mask; + + struct bucket * parent; + struct bucket * children[1L << KAD_BETA]; +}; + +struct cmd { + struct list_head next; + + struct shm_du_buff * sdb; +}; + +struct dir_ops dht_dir_ops = { + .create = dht_create, + .destroy = dht_destroy, + .bootstrap = dht_bootstrap, + .reg = dht_reg, + .unreg = dht_unreg, + .query = dht_query, + .wait_running = dht_wait_running +}; + +struct dht { + size_t alpha; + size_t b; + size_t k; + + time_t t_expire; + time_t t_refresh; + time_t t_replic; + time_t t_repub; + + uint8_t * id; + uint64_t addr; + + struct bucket * buckets; + + struct list_head entries; + + struct list_head refs; + + struct list_head lookups; + + struct list_head requests; + struct bmp * cookies; + + enum dht_state state; + struct list_head cmds; + pthread_cond_t cond; + pthread_mutex_t mtx; + + pthread_rwlock_t lock; + + uint64_t eid; + + struct tpm * tpm; + + pthread_t worker; +}; + +struct join_info { + struct dht * dht; + uint64_t addr; +}; + +struct packet_info { + struct dht * dht; + struct shm_du_buff * sdb; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, + size_t len) +{ + uint8_t * dup; + + dup = malloc(sizeof(*dup) * len); + if (dup == NULL) + return NULL; + + memcpy(dup, key, len); + + return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ + enum dht_state state; + + pthread_mutex_lock(&dht->mtx); + + state = dht->state; + + pthread_mutex_unlock(&dht->mtx); + + return state; +} + +static int dht_set_state(struct dht * dht, + enum dht_state state) +{ + pthread_mutex_lock(&dht->mtx); + + if (state == DHT_JOINING && dht->state != DHT_INIT) { + pthread_mutex_unlock(&dht->mtx); + return -1; + } + + dht->state = state; + + pthread_cond_broadcast(&dht->cond); + + pthread_mutex_unlock(&dht->mtx); + + return 0; +} + +int dht_wait_running(void * dir) +{ + struct dht * dht; + int ret = 0; + + dht = (struct dht *) dir; + + pthread_mutex_lock(&dht->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + + while (dht->state == DHT_JOINING) + pthread_cond_wait(&dht->cond, &dht->mtx); + + if (dht->state != DHT_RUNNING) + ret = -1; + + pthread_cleanup_pop(true); + + return ret; +} + +static uint8_t * create_id(size_t len) +{ + uint8_t * id; + + id = malloc(len); + if (id == NULL) + return NULL; + + if (random_buffer(id, len) < 0) { + free(id); + return NULL; + } + + return id; +} + +static void kad_req_create(struct dht * dht, + dht_msg_t * msg, + uint64_t addr) +{ + struct kad_req * req; + pthread_condattr_t cattr; + struct timespec t; + size_t b; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + req = malloc(sizeof(*req)); + if (req == NULL) + goto fail_malloc; + + list_head_init(&req->next); + + req->t_exp = t.tv_sec + KAD_T_RESP; + req->addr = addr; + req->state = REQ_INIT; + req->cookie = msg->cookie; + req->code = msg->code; + req->key = NULL; + + pthread_rwlock_rdlock(&dht->lock); + b = dht->b; + pthread_rwlock_unlock(&dht->lock); + + if (msg->has_key) { + req->key = dht_dup_key(msg->key.data, b); + if (req->key == NULL) + goto fail_dup_key; + } + + if (pthread_mutex_init(&req->lock, NULL)) + goto fail_mutex; + + + if (pthread_condattr_init(&cattr)) + goto fail_condattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&req->cond, &cattr)) + goto fail_cond_init; + + pthread_condattr_destroy(&cattr); + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&req->next, &dht->requests); + + pthread_rwlock_unlock(&dht->lock); + + return; + + fail_cond_init: + pthread_condattr_destroy(&cattr); + fail_condattr: + pthread_mutex_destroy(&req->lock); + fail_mutex: + free(req->key); + fail_dup_key: + free(req); + fail_malloc: + return; +} + +static void cancel_req_destroy(void * o) +{ + struct kad_req * req = (struct kad_req *) o; + + pthread_mutex_unlock(&req->lock); + + pthread_cond_destroy(&req->cond); + pthread_mutex_destroy(&req->lock); + + if (req->key != NULL) + free(req->key); + + free(req); +} + +static void kad_req_destroy(struct kad_req * req) +{ + assert(req); + + pthread_mutex_lock(&req->lock); + + switch (req->state) { + case REQ_DESTROY: + pthread_mutex_unlock(&req->lock); + return; + case REQ_PENDING: + req->state = REQ_DESTROY; + pthread_cond_broadcast(&req->cond); + break; + case REQ_INIT: + case REQ_DONE: + req->state = REQ_NULL; + break; + case REQ_RESPONSE: + case REQ_NULL: + default: + break; + } + + pthread_cleanup_push(cancel_req_destroy, req); + + while (req->state != REQ_NULL && req->state != REQ_DONE) + pthread_cond_wait(&req->cond, &req->lock); + + pthread_cleanup_pop(true); +} + +static int kad_req_wait(struct kad_req * req, + time_t t) +{ + struct timespec timeo = TIMESPEC_INIT_S(0); + struct timespec abs; + int ret = 0; + + assert(req); + + timeo.tv_sec = t; + + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&abs, &timeo, &abs); + + pthread_mutex_lock(&req->lock); + + req->state = REQ_PENDING; + + pthread_cleanup_push(__cleanup_mutex_unlock, &req->lock); + + while (req->state == REQ_PENDING && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + + switch(req->state) { + case REQ_DESTROY: + ret = -1; + req->state = REQ_NULL; + pthread_cond_signal(&req->cond); + break; + case REQ_PENDING: /* ETIMEDOUT */ + case REQ_RESPONSE: + req->state = REQ_DONE; + pthread_cond_broadcast(&req->cond); + break; + default: + break; + } + + pthread_cleanup_pop(true); + + return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ + pthread_mutex_lock(&req->lock); + + req->state = REQ_RESPONSE; + pthread_cond_signal(&req->cond); + + pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, + size_t len, + uint64_t addr) +{ + struct contact * c; + struct timespec t; + + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; + + list_head_init(&c->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id, len); + if (c->id == NULL) { + free(c); + return NULL; + } + + return c; +} + +static void contact_destroy(struct contact * c) +{ + if (c != NULL) + free(c->id); + + free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) +{ + uint8_t byte; + uint8_t mask; + + assert(b); + + if (b->children[0] == NULL) + return b; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + + return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht * dht, + const uint8_t * id) +{ + assert(dht->buckets); + + return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, + const uint8_t * dst) +{ + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, + struct contact * c, + const uint8_t * key) +{ + struct list_head * p; + + assert(l); + assert(c); + assert(key); + assert(c->id); + + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (dist(c->id, key) > dist(e->id, key)) + break; + } + + list_add_tail(&c->next, p); + + return 1; +} + +static size_t dht_contact_list(struct dht * dht, + struct list_head * l, + const uint8_t * key) +{ + struct list_head * p; + struct bucket * b; + size_t len = 0; + size_t i; + struct timespec t; + + assert(l); + assert(dht); + assert(key); + assert(list_is_empty(l)); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + b = dht_get_bucket(dht, key); + if (b == NULL) + return 0; + + b->t_refr = t.tv_sec + KAD_T_REFR; + + if (b->n_contacts == dht->k || b->parent == NULL) { + list_for_each(p, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (list_add_sorted(l, c, key) == 1) + if (++len == dht->k) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { + list_for_each(p, &d->children[i]->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (c == NULL) + continue; + if (list_add_sorted(l, c, key) == 1) + if (++len == dht->k) + break; + } + } + } + + assert(len == dht->k || b->parent == NULL); + + return len; +} + +static struct lookup * lookup_create(struct dht * dht, + const uint8_t * id) +{ + struct lookup * lu; + pthread_condattr_t cattr; + + assert(dht); + assert(id); + + lu = malloc(sizeof(*lu)); + if (lu == NULL) + goto fail_malloc; + + list_head_init(&lu->contacts); + list_head_init(&lu->cookies); + + lu->state = LU_INIT; + lu->addrs = NULL; + lu->n_addrs = 0; + lu->key = dht_dup_key(id, dht->b); + if (lu->key == NULL) + goto fail_id; + + if (pthread_mutex_init(&lu->lock, NULL)) + goto fail_mutex; + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&lu->cond, &cattr)) + goto fail_cond; + + pthread_condattr_destroy(&cattr); + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&lu->next, &dht->lookups); + + lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + + pthread_rwlock_unlock(&dht->lock); + + return lu; + + fail_cond: + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&lu->lock); + fail_mutex: + free(lu->key); + fail_id: + free(lu); + fail_malloc: + return NULL; +} + +static void cancel_lookup_destroy(void * o) +{ + struct lookup * lu; + struct list_head * p; + struct list_head * h; + + lu = (struct lookup *) o; + + if (lu->key != NULL) + free(lu->key); + if (lu->addrs != NULL) + free(lu->addrs); + + list_for_each_safe(p, h, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } + + list_for_each_safe(p, h, &lu->cookies) { + struct cookie_el * c = list_entry(p, struct cookie_el, next); + list_del(&c->next); + free(c); + } + + pthread_mutex_unlock(&lu->lock); + + pthread_mutex_destroy(&lu->lock); + + free(lu); +} + +static void lookup_destroy(struct lookup * lu) +{ + assert(lu); + + pthread_mutex_lock(&lu->lock); + + switch (lu->state) { + case LU_DESTROY: + pthread_mutex_unlock(&lu->lock); + return; + case LU_PENDING: + lu->state = LU_DESTROY; + pthread_cond_broadcast(&lu->cond); + break; + case LU_INIT: + case LU_UPDATE: + case LU_COMPLETE: + lu->state = LU_NULL; + break; + case LU_NULL: + default: + break; + } + + pthread_cleanup_push(cancel_lookup_destroy, lu); + + while (lu->state != LU_NULL) + pthread_cond_wait(&lu->cond, &lu->lock); + + pthread_cleanup_pop(true); +} + +static void lookup_update(struct dht * dht, + struct lookup * lu, + dht_msg_t * msg) +{ + struct list_head * p = NULL; + struct list_head * h; + struct contact * c = NULL; + size_t n; + size_t pos = 0; + bool mod = false; + + assert(lu); + assert(msg); + + if (dht_get_state(dht) != DHT_RUNNING) + return; + + pthread_mutex_lock(&lu->lock); + + list_for_each_safe(p, h, &lu->cookies) { + struct cookie_el * e = list_entry(p, struct cookie_el, next); + if (e->cookie == msg->cookie) { + list_del(&e->next); + free(e); + break; + } + } + + if (lu->state == LU_COMPLETE) { + pthread_mutex_unlock(&lu->lock); + return; + } + + if (msg->n_addrs > 0) { + if (lu->addrs == NULL) { + lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); + for (n = 0; n < msg->n_addrs; ++n) + lu->addrs[n] = msg->addrs[n]; + lu->n_addrs = msg->n_addrs; + } + + lu->state = LU_COMPLETE; + pthread_cond_broadcast(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, &lu->lock); + + while (lu->state == LU_INIT) { + pthread_rwlock_unlock(&dht->lock); + pthread_cond_wait(&lu->cond, &lu->lock); + pthread_rwlock_rdlock(&dht->lock); + } + + pthread_cleanup_pop(false); + + for (n = 0; n < msg->n_contacts; ++n) { + c = contact_create(msg->contacts[n]->id.data, + dht->b, msg->contacts[n]->addr); + if (c == NULL) + continue; + + pos = 0; + + list_for_each(p, &lu->contacts) { + struct contact * e; + e = list_entry(p, struct contact, next); + if (!memcmp(e->id, c->id, dht->b)) { + contact_destroy(c); + c = NULL; + break; + } + + if (dist(c->id, lu->key) > dist(e->id, lu->key)) + break; + + pos++; + } + + if (c == NULL) + continue; + + if (lu->n_contacts < dht->k) { + list_add_tail(&c->next, p); + ++lu->n_contacts; + mod = true; + } else if (pos == dht->k) { + contact_destroy(c); + } else { + struct contact * d; + list_add_tail(&c->next, p); + d = list_last_entry(&lu->contacts, + struct contact, next); + list_del(&d->next); + assert(lu->contacts.prv != &d->next); + contact_destroy(d); + mod = true; + } + } + + if (list_is_empty(&lu->cookies) && !mod) + lu->state = LU_COMPLETE; + else + lu->state = LU_UPDATE; + + pthread_cond_broadcast(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, + uint64_t * addrs) +{ + ssize_t n; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + for (n = 0; (size_t) n < lu->n_addrs; ++n) + addrs[n] = lu->addrs[n]; + + assert((size_t) n == lu->n_addrs); + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + ssize_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + addrs[n] = c->addr; + n++; + } + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static void lookup_new_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + size_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + /* Uses fails to check if the contact has been contacted. */ + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (c->fails == 0) { + c->fails = 1; + addrs[n] = c->addr; + n++; + } + + if (n == KAD_ALPHA) + break; + } + + assert(n <= KAD_ALPHA); + + addrs[n] = 0; + + pthread_mutex_unlock(&lu->lock); +} + +static void lookup_set_state(struct lookup * lu, + enum lookup_state state) +{ + pthread_mutex_lock(&lu->lock); + + lu->state = state; + pthread_cond_broadcast(&lu->cond); + + pthread_mutex_unlock(&lu->lock); +} + +static void cancel_lookup_wait(void * o) +{ + struct lookup * lu = (struct lookup *) o; + lu->state = LU_NULL; + pthread_mutex_unlock(&lu->lock); + lookup_destroy(lu); +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ + struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP); + struct timespec abs; + enum lookup_state state; + int ret = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&abs, &timeo, &abs); + + pthread_mutex_lock(&lu->lock); + + if (lu->state == LU_INIT || lu->state == LU_UPDATE) + lu->state = LU_PENDING; + + pthread_cleanup_push(cancel_lookup_wait, lu); + + while (lu->state == LU_PENDING && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); + + pthread_cleanup_pop(false); + + if (ret == -ETIMEDOUT) + lu->state = LU_COMPLETE; + + state = lu->state; + + pthread_mutex_unlock(&lu->lock); + + return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, + dht_msg_t * msg) +{ + struct list_head * p; + + assert(dht); + assert(msg); + + list_for_each(p, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + if (r->cookie == msg->cookie) + return r; + } + + return NULL; +} + +static struct lookup * dht_find_lookup(struct dht * dht, + uint32_t cookie) +{ + struct list_head * p; + struct list_head * p2; + struct list_head * h2; + + assert(dht); + assert(cookie > 0); + + list_for_each(p, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + pthread_mutex_lock(&l->lock); + list_for_each_safe(p2, h2, &l->cookies) { + struct cookie_el * e; + e = list_entry(p2, struct cookie_el, next); + if (e->cookie == cookie) { + list_del(&e->next); + free(e); + pthread_mutex_unlock(&l->lock); + return l; + } + } + pthread_mutex_unlock(&l->lock); + } + + return NULL; +} + +static struct val * val_create(uint64_t addr, + time_t exp) +{ + struct val * v; + struct timespec t; + + v = malloc(sizeof(*v)); + if (v == NULL) + return NULL; + + list_head_init(&v->next); + v->addr = addr; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + + return v; +} + +static void val_destroy(struct val * v) +{ + assert(v); + + free(v); +} + +static struct ref_entry * ref_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + struct timespec t; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + e->t_rep = t.tv_sec + dht->t_repub; + + return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ + free(e->key); + free(e); +} + +static struct dht_entry * dht_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + list_head_init(&e->next); + list_head_init(&e->vals); + + e->n_vals = 0; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + list_del(&v->next); + val_destroy(v); + } + + free(e->key); + + free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, + uint64_t addr, + time_t exp) +{ + struct list_head * p; + struct val * val; + struct timespec t; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + if (v->t_exp < t.tv_sec + exp) { + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + } + + return 0; + } + } + + val = val_create(addr, exp); + if (val == NULL) + return -ENOMEM; + + list_add(&val->next, &e->vals); + ++e->n_vals; + + return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + list_del(&v->next); + val_destroy(v); + --e->n_vals; + } + } + + if (e->n_vals == 0) { + list_del(&e->next); + dht_entry_destroy(e); + } +} + +static uint64_t dht_entry_get_addr(struct dht * dht, + struct dht_entry * e) +{ + struct list_head * p; + + assert(e); + assert(!list_is_empty(&e->vals)); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr != dht->addr) + return v->addr; + } + + return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * key, + enum kad_code code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht * dht, + struct bucket * b, + time_t t, + struct list_head * r) +{ + size_t i; + + if (*b->children != NULL) + for (i = 0; i < (1L << KAD_BETA); ++i) + bucket_refresh(dht, b->children[i], t, r); + + if (b->n_contacts == 0) + return; + + if (t > b->t_refr) { + struct contact * c; + struct contact * d; + c = list_first_entry(&b->contacts, struct contact, next); + d = contact_create(c->id, dht->b, c->addr); + if (d != NULL) + list_add(&d->next, r); + return; + } +} + + +static struct bucket * bucket_create(void) +{ + struct bucket * b; + struct timespec t; + size_t i; + + b = malloc(sizeof(*b)); + if (b == NULL) + return NULL; + + list_head_init(&b->contacts); + b->n_contacts = 0; + + list_head_init(&b->alts); + b->n_alts = 0; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + b->t_refr = t.tv_sec + KAD_T_REFR; + + for (i = 0; i < (1L << KAD_BETA); ++i) + b->children[i] = NULL; + + b->parent = NULL; + b->depth = 0; + + return b; +} + +static void bucket_destroy(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + size_t i; + + assert(b); + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i] != NULL) + bucket_destroy(b->children[i]); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + list_for_each_safe(p, h, &b->alts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + free(b); +} + +static bool bucket_has_id(struct bucket * b, + const uint8_t * id) +{ + uint8_t mask; + uint8_t byte; + + if (b->depth == 0) + return true; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + + return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + uint8_t mask = 0; + size_t i; + size_t c; + + assert(b); + assert(b->n_alts == 0); + assert(b->n_contacts); + assert(b->children[0] == NULL); + + c = b->n_contacts; + + for (i = 0; i < (1L << KAD_BETA); ++i) { + b->children[i] = bucket_create(); + if (b->children[i] == NULL) { + size_t j; + for (j = 0; j < i; ++j) + bucket_destroy(b->children[j]); + return -1; + } + + b->children[i]->depth = b->depth + 1; + b->children[i]->mask = mask; + b->children[i]->parent = b; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + if (bucket_has_id(b->children[i], c->id)) { + list_del(&c->next); + --b->n_contacts; + list_add(&c->next, &b->children[i]->contacts); + ++b->children[i]->n_contacts; + } + } + + mask++; + } + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i]->n_contacts == c) + split_bucket(b->children[i]); + + return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht * dht, + const uint8_t * id, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + struct contact * c; + + assert(dht); + + b = dht_get_bucket(dht, id); + if (b == NULL) + return -1; + + c = contact_create(id, dht->b, addr); + if (c == NULL) + return -1; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * d = list_entry(p, struct contact, next); + if (d->addr == addr) { + list_del(&d->next); + contact_destroy(d); + --b->n_contacts; + } + } + + if (b->n_contacts == dht->k) { + if (bucket_has_id(b, dht->id)) { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + if (split_bucket(b)) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + } else if (b->n_alts == dht->k) { + struct contact * d; + d = list_first_entry(&b->alts, struct contact, next); + list_del(&d->next); + contact_destroy(d); + list_add_tail(&c->next, &b->alts); + } else { + list_add_tail(&c->next, &b->alts); + ++b->n_alts; + } + } else { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + } + + return 0; +} + +static int send_msg(struct dht * dht, + dht_msg_t * msg, + uint64_t addr) +{ +#ifndef __DHT_TEST__ + struct shm_du_buff * sdb; + size_t len; +#endif + int retr = 0; + + if (msg->code == KAD_RESPONSE) + retr = KAD_RESP_RETR; + + pthread_rwlock_wrlock(&dht->lock); + + if (dht->id != NULL) { + msg->has_s_id = true; + msg->s_id.data = dht->id; + msg->s_id.len = dht->b; + } + + msg->s_addr = dht->addr; + + if (msg->code < KAD_STORE) { + msg->cookie = bmp_allocate(dht->cookies); + if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { + pthread_rwlock_unlock(&dht->lock); + goto fail_bmp_alloc; + } + } + + pthread_rwlock_unlock(&dht->lock); + +#ifndef __DHT_TEST__ + len = dht_msg__get_packed_size(msg); + if (len == 0) + goto fail_msg; + + while (true) { + if (ipcp_sdb_reserve(&sdb, len)) + goto fail_msg; + + dht_msg__pack(msg, shm_du_buff_head(sdb)); + + if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) + break; + + ipcp_sdb_release(sdb); + + sleep(1); + + if (--retr < 0) + goto fail_msg; + } + +#else + (void) addr; + (void) retr; +#endif /* __DHT_TEST__ */ + + if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) + kad_req_create(dht, msg, addr); + + return msg->cookie; +#ifndef __DHT_TEST__ + fail_msg: + pthread_rwlock_wrlock(&dht->lock); + bmp_release(dht->cookies, msg->cookie); + pthread_rwlock_unlock(&dht->lock); +#endif /* !__DHT_TEST__ */ + fail_bmp_alloc: + return -1; +} + +static struct dht_entry * dht_find_entry(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + list_for_each(p, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht->b)) + return e; + } + + return NULL; +} + +static int kad_add(struct dht * dht, + const dht_contact_msg_t * contacts, + ssize_t n, + time_t exp) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + while (n-- > 0) { + if (contacts[n].id.len != dht->b) + log_warn("Bad key length in contact data."); + + e = dht_find_entry(dht, contacts[n].id.data); + if (e != NULL) { + if (dht_entry_add_addr(e, contacts[n].addr, exp)) + goto fail; + } else { + e = dht_entry_create(dht, contacts[n].id.data); + if (e == NULL) + goto fail; + + if (dht_entry_add_addr(e, contacts[n].addr, exp)) { + dht_entry_destroy(e); + goto fail; + } + + list_add(&e->next, &dht->entries); + } + } + + pthread_rwlock_unlock(&dht->lock); + return 0; + + fail: + pthread_rwlock_unlock(&dht->lock); + return -ENOMEM; +} + +static int wait_resp(struct dht * dht, + dht_msg_t * msg, + time_t timeo) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + pthread_rwlock_unlock(&dht->lock); + + return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht * dht, + const uint8_t * key, + uint64_t addr, + uint64_t r_addr, + time_t ttl) +{ + dht_msg_t msg = DHT_MSG__INIT; + dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT; + dht_contact_msg_t * cmsgp[1]; + + cmsg.id.data = (uint8_t *) key; + cmsg.addr = addr; + + pthread_rwlock_rdlock(&dht->lock); + + cmsg.id.len = dht->b; + + pthread_rwlock_unlock(&dht->lock); + + cmsgp[0] = &cmsg; + + msg.code = KAD_STORE; + msg.has_t_expire = true; + msg.t_expire = ttl; + msg.n_contacts = 1; + msg.contacts = cmsgp; + + if (send_msg(dht, &msg, r_addr) < 0) + return -1; + + return 0; +} + +static ssize_t kad_find(struct dht * dht, + struct lookup * lu, + const uint64_t * addrs, + enum kad_code code) +{ + dht_msg_t msg = DHT_MSG__INIT; + ssize_t sent = 0; + + assert(dht); + assert(lu->key); + + msg.code = code; + + msg.has_key = true; + msg.key.data = (uint8_t *) lu->key; + msg.key.len = dht->b; + + while (*addrs != 0) { + struct cookie_el * c; + int ret; + + if (*addrs == dht->addr) { + ++addrs; + continue; + } + + ret = send_msg(dht, &msg, *addrs); + if (ret < 0) + break; + + c = malloc(sizeof(*c)); + if (c == NULL) + break; + + c->cookie = (uint32_t) ret; + + pthread_mutex_lock(&lu->lock); + + list_add_tail(&c->next, &lu->cookies); + + pthread_mutex_unlock(&lu->lock); + + ++sent; + ++addrs; + } + + return sent; +} + +static void lookup_detach(struct dht * dht, + struct lookup * lu) +{ + pthread_rwlock_wrlock(&dht->lock); + + list_del(&lu->next); + + pthread_rwlock_unlock(&dht->lock); +} + +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * id, + enum kad_code code) +{ + uint64_t addrs[KAD_ALPHA + 1]; + enum lookup_state state; + struct lookup * lu; + + lu = lookup_create(dht, id); + if (lu == NULL) + return NULL; + + lookup_new_addrs(lu, addrs); + + if (addrs[0] == 0) { + lookup_detach(dht, lu); + lookup_destroy(lu); + return NULL; + } + + if (kad_find(dht, lu, addrs, code) == 0) { + lookup_detach(dht, lu); + return lu; + } + + while ((state = lookup_wait(lu)) != LU_COMPLETE) { + switch (state) { + case LU_UPDATE: + lookup_new_addrs(lu, addrs); + if (addrs[0] == 0) + break; + + kad_find(dht, lu, addrs, code); + break; + case LU_DESTROY: + lookup_detach(dht, lu); + lookup_set_state(lu, LU_NULL); + return NULL; + default: + break; + } + } + + assert(state == LU_COMPLETE); + + lookup_detach(dht, lu); + + return lu; +} + +static void kad_publish(struct dht * dht, + const uint8_t * key, + uint64_t addr, + time_t exp) +{ + struct lookup * lu; + uint64_t * addrs; + ssize_t n; + size_t k; + time_t t_expire; + + + assert(dht); + assert(key); + + pthread_rwlock_rdlock(&dht->lock); + + k = dht->k; + t_expire = dht->t_expire; + + pthread_rwlock_unlock(&dht->lock); + + addrs = malloc(k * sizeof(*addrs)); + if (addrs == NULL) + return; + + lu = kad_lookup(dht, key, KAD_FIND_NODE); + if (lu == NULL) { + free(addrs); + return; + } + + n = lookup_contact_addrs(lu, addrs); + + while (n-- > 0) { + if (addrs[n] == dht->addr) { + dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT; + msg.id.data = (uint8_t *) key; + msg.id.len = dht->b; + msg.addr = addr; + kad_add(dht, &msg, 1, exp); + } else { + if (kad_store(dht, key, addr, addrs[n], t_expire)) + log_warn("Failed to send store message."); + } + } + + lookup_destroy(lu); + + free(addrs); +} + +static int kad_join(struct dht * dht, + uint64_t addr) +{ + dht_msg_t msg = DHT_MSG__INIT; + + msg.code = KAD_JOIN; + + msg.has_alpha = true; + msg.has_b = true; + msg.has_k = true; + msg.has_t_refresh = true; + msg.has_t_replicate = true; + msg.alpha = KAD_ALPHA; + msg.k = KAD_K; + msg.t_refresh = KAD_T_REFR; + msg.t_replicate = KAD_T_REPL; + + pthread_rwlock_rdlock(&dht->lock); + + msg.b = dht->b; + + pthread_rwlock_unlock(&dht->lock); + + if (send_msg(dht, &msg, addr) < 0) + return -1; + + if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) + return -1; + + dht->id = create_id(dht->b); + if (dht->id == NULL) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static void dht_dead_peer(struct dht * dht, + uint8_t * key, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + + b = dht_get_bucket(dht, key); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (b->n_contacts + b->n_alts <= dht->k) { + ++c->fails; + return; + } + + if (c->addr == addr) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + break; + } + } + + while (b->n_contacts < dht->k && b->n_alts > 0) { + struct contact * c; + c = list_first_entry(&b->alts, struct contact, next); + list_del(&c->next); + --b->n_alts; + list_add(&c->next, &b->contacts); + ++b->n_contacts; + } +} + +static int dht_del(struct dht * dht, + const uint8_t * key, + uint64_t addr) +{ + struct dht_entry * e; + + e = dht_find_entry(dht, key); + if (e == NULL) { + return -EPERM; + } + + dht_entry_del_addr(e, addr); + + return 0; +} + +static buffer_t dht_retrieve(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct list_head * p; + buffer_t buf; + uint64_t * pos; + size_t addrs = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) + goto fail; + + buf.len = MIN(DHT_RETR_ADDR, e->n_vals); + if (buf.len == 0) + goto fail; + + pos = malloc(sizeof(dht->addr) * buf.len); + if (pos == NULL) + goto fail; + + buf.data = (uint8_t *) pos; + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + *pos++ = v->addr; + if (++addrs >= buf.len) + break; + } + + pthread_rwlock_unlock(&dht->lock); + + return buf; + + fail: + pthread_rwlock_unlock(&dht->lock); + buf.len = 0; + buf.data = NULL; + return buf; +} + +static ssize_t dht_get_contacts(struct dht * dht, + const uint8_t * key, + dht_contact_msg_t *** msgs) +{ + struct list_head l; + struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + list_head_init(&l); + + pthread_rwlock_wrlock(&dht->lock); + + len = dht_contact_list(dht, &l, key); + if (len == 0) { + pthread_rwlock_unlock(&dht->lock); + *msgs = NULL; + return 0; + } + + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) { + pthread_rwlock_unlock(&dht->lock); + return 0; + } + + list_for_each_safe(p, h, &l) { + struct contact * c = list_entry(p, struct contact, next); + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) { + pthread_rwlock_unlock(&dht->lock); + while (i > 0) + free(*msgs[--i]); + free(*msgs); + *msgs = NULL; + return 0; + } + + dht_contact_msg__init((*msgs)[i]); + + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht->b; + (*msgs)[i++]->addr = c->addr; + list_del(&c->next); + free(c); + } + + pthread_rwlock_unlock(&dht->lock); + + return i; +} + +static time_t gcd(time_t a, + time_t b) +{ + if (a == 0) + return b; + + return gcd(b % a, a); +} + +static void * work(void * o) +{ + struct dht * dht; + struct timespec now; + struct list_head * p; + struct list_head * h; + struct list_head reflist; + time_t intv; + struct lookup * lu; + + dht = (struct dht *) o; + + pthread_rwlock_rdlock(&dht->lock); + + intv = gcd(dht->t_expire, dht->t_repub); + intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + + pthread_rwlock_unlock(&dht->lock); + + list_head_init(&reflist); + + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht->lock); + + /* Republish registered hashes. */ + list_for_each(p, &dht->refs) { + struct ref_entry * e; + uint8_t * key; + uint64_t addr; + time_t t_expire; + e = list_entry(p, struct ref_entry, next); + if (now.tv_sec > e->t_rep) { + key = dht_dup_key(e->key, dht->b); + if (key == NULL) + continue; + addr = dht->addr; + t_expire = dht->t_expire; + e->t_rep = now.tv_sec + dht->t_repub; + + pthread_rwlock_unlock(&dht->lock); + kad_publish(dht, key, addr, t_expire); + pthread_rwlock_wrlock(&dht->lock); + free(key); + } + } + + /* Remove stale entries and republish if necessary. */ + list_for_each_safe(p, h, &dht->entries) { + struct list_head * p1; + struct list_head * h1; + struct dht_entry * e; + uint8_t * key; + time_t t_expire; + e = list_entry (p, struct dht_entry, next); + list_for_each_safe(p1, h1, &e->vals) { + struct val * v; + uint64_t addr; + v = list_entry(p1, struct val, next); + if (now.tv_sec > v->t_exp) { + list_del(&v->next); + val_destroy(v); + continue; + } + + if (now.tv_sec > v->t_rep) { + key = dht_dup_key(e->key, dht->b); + addr = v->addr; + t_expire = dht->t_expire = now.tv_sec; + v->t_rep = now.tv_sec + dht->t_replic; + pthread_rwlock_unlock(&dht->lock); + kad_publish(dht, key, addr, t_expire); + pthread_rwlock_wrlock(&dht->lock); + free(key); + } + } + } + + /* Check the requests list for unresponsive nodes. */ + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r; + r = list_entry(p, struct kad_req, next); + if (now.tv_sec > r->t_exp) { + list_del(&r->next); + bmp_release(dht->cookies, r->cookie); + dht_dead_peer(dht, r->key, r->addr); + kad_req_destroy(r); + } + } + + /* Refresh unaccessed buckets. */ + bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + + pthread_rwlock_unlock(&dht->lock); + + list_for_each_safe(p, h, &reflist) { + struct contact * c; + c = list_entry(p, struct contact, next); + lu = kad_lookup(dht, c->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + list_del(&c->next); + contact_destroy(c); + } + + sleep(intv); + } + + return (void *) 0; +} + +static int kad_handle_join_resp(struct dht * dht, + struct kad_req * req, + dht_msg_t * msg) +{ + assert(dht); + assert(req); + assert(msg); + + /* We might send version numbers later to warn of updates if needed. */ + if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && + msg->has_t_refresh && msg->has_t_replicate)) { + log_warn("Join refused by remote."); + return -1; + } + + if (msg->b < sizeof(uint64_t)) { + log_err("Hash sizes less than 8 bytes unsupported."); + return -1; + } + + pthread_rwlock_wrlock(&dht->lock); + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + /* Likely corrupt packet. The member will refuse, we might here too. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) + log_warn("Different kademlia parameters detected."); + + if (msg->t_replicate != KAD_T_REPL) + log_warn("Different kademlia replication time detected."); + + if (msg->t_refresh != KAD_T_REFR) + log_warn("Different kademlia refresh time detected."); + + dht->k = msg->k; + dht->b = msg->b; + dht->t_expire = msg->t_expire; + dht->t_repub = MAX(1, dht->t_expire - 10); + + if (pthread_create(&dht->worker, NULL, work, dht)) { + bucket_destroy(dht->buckets); + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + kad_req_respond(req); + + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + + pthread_rwlock_unlock(&dht->lock); + + log_dbg("Enrollment of DHT completed."); + + return 0; +} + +static int kad_handle_find_resp(struct dht * dht, + struct kad_req * req, + dht_msg_t * msg) +{ + struct lookup * lu; + + assert(dht); + assert(req); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + lu = dht_find_lookup(dht, req->cookie); + if (lu == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + lookup_update(dht, lu, msg); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static void kad_handle_response(struct dht * dht, + dht_msg_t * msg) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_wrlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return; + } + + bmp_release(dht->cookies, req->cookie); + list_del(&req->next); + + pthread_rwlock_unlock(&dht->lock); + + switch(req->code) { + case KAD_JOIN: + if (kad_handle_join_resp(dht, req, msg)) + log_err("Enrollment of DHT failed."); + break; + case KAD_FIND_VALUE: + case KAD_FIND_NODE: + if (dht_get_state(dht) != DHT_RUNNING) + break; + kad_handle_find_resp(dht, req, msg); + break; + default: + break; + } + + kad_req_destroy(req); +} + +int dht_bootstrap(void * dir) +{ + struct dht * dht; + + dht = (struct dht *) dir; + + assert(dht); + + pthread_rwlock_wrlock(&dht->lock); + +#ifndef __DHT_TEST__ + dht->b = hash_len(ipcpi.dir_hash_algo); +#else + dht->b = DHT_TEST_KEY_LEN; +#endif + + dht->id = create_id(dht->b); + if (dht->id == NULL) + goto fail_id; + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) + goto fail_buckets; + + dht->buckets->depth = 0; + dht->buckets->mask = 0; + + dht->t_expire = 86400; /* 1 day */ + dht->t_repub = dht->t_expire - 10; + dht->k = KAD_K; + + if (pthread_create(&dht->worker, NULL, work, dht)) + goto fail_pthread_create; + + dht->state = DHT_RUNNING; + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; + + fail_pthread_create: + bucket_destroy(dht->buckets); + dht->buckets = NULL; + fail_buckets: + free(dht->id); + dht->id = NULL; + fail_id: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +static struct ref_entry * ref_entry_get(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + list_for_each(p, &dht->refs) { + struct ref_entry * r = list_entry(p, struct ref_entry, next); + if (!memcmp(key, r->key, dht-> b) ) + return r; + } + + return NULL; +} + +int dht_reg(void * dir, + const uint8_t * key) +{ + struct dht * dht; + struct ref_entry * e; + uint64_t addr; + time_t t_expire; + + dht = (struct dht *) dir; + + assert(dht); + assert(key); + assert(dht->addr != 0); + + if (dht_wait_running(dht)) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + if (ref_entry_get(dht, key) != NULL) { + log_dbg("Name already registered."); + pthread_rwlock_unlock(&dht->lock); + return 0; + } + + e = ref_entry_create(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -ENOMEM; + } + + list_add(&e->next, &dht->refs); + + t_expire = dht->t_expire; + addr = dht->addr; + + pthread_rwlock_unlock(&dht->lock); + + kad_publish(dht, key, addr, t_expire); + + return 0; +} + +int dht_unreg(void * dir, + const uint8_t * key) +{ + struct dht * dht; + struct list_head * p; + struct list_head * h; + + dht = (struct dht *) dir; + + assert(dht); + assert(key); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * r = list_entry(p, struct ref_entry, next); + if (!memcmp(key, r->key, dht-> b) ) { + list_del(&r->next); + ref_entry_destroy(r); + } + } + + dht_del(dht, key, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +uint64_t dht_query(void * dir, + const uint8_t * key) +{ + struct dht * dht; + struct dht_entry * e; + struct lookup * lu; + uint64_t addrs[KAD_K]; + size_t n; + + dht = (struct dht *) dir; + + assert(dht); + + addrs[0] = 0; + + if (dht_wait_running(dht)) + return 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e != NULL) + addrs[0] = dht_entry_get_addr(dht, e); + + pthread_rwlock_unlock(&dht->lock); + + if (addrs[0] != 0) + return addrs[0]; + + lu = kad_lookup(dht, key, KAD_FIND_VALUE); + if (lu == NULL) + return 0; + + n = lookup_get_addrs(lu, addrs); + if (n == 0) { + lookup_destroy(lu); + return 0; + } + + lookup_destroy(lu); + + /* Current behaviour is anycast and return the first peer address. */ + if (addrs[0] != dht->addr) + return addrs[0]; + + if (n > 1) + return addrs[1]; + + return 0; +} + +static void * dht_handle_packet(void * o) +{ + struct dht * dht = (struct dht *) o; + + assert(dht); + + while (true) { + dht_msg_t * msg; + dht_contact_msg_t ** cmsgs; + dht_msg_t resp_msg = DHT_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + size_t b; + size_t t_expire; + struct cmd * cmd; + + pthread_mutex_lock(&dht->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &dht->mtx); + + while (list_is_empty(&dht->cmds)) + pthread_cond_wait(&dht->cond, &dht->mtx); + + cmd = list_last_entry(&dht->cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_cleanup_pop(true); + + i = shm_du_buff_len(cmd->sdb); + + msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); +#ifndef __DHT_TEST__ + ipcp_sdb_release(cmd->sdb); +#endif + free(cmd); + + if (msg == NULL) { + log_err("Failed to unpack message."); + continue; + } + + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { + dht_msg__free_unpacked(msg, NULL); + log_dbg("Got a request message when not running."); + continue; + } + + pthread_rwlock_rdlock(&dht->lock); + + b = dht->b; + t_expire = dht->t_expire; + + pthread_rwlock_unlock(&dht->lock); + + if (msg->has_key && msg->key.len != b) { + dht_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + continue; + } + + if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { + dht_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", + msg->code); + continue; + } + + tpm_dec(dht->tpm); + + addr = msg->s_addr; + + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; + + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } + + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); + + break; + } + + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = b; + resp_msg.k = KAD_K; + resp_msg.t_expire = t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; + break; + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } + + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, + msg->t_expire); + break; + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); + break; + } + + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_get_state(dht) == DHT_JOINING && + dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); + goto finish; + } + + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); + } + + if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) + log_warn("Failed to send response."); + + finish: + dht_msg__free_unpacked(msg, NULL); + + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); + + if (resp_msg.n_contacts == 0) { + tpm_inc(dht->tpm); + continue; + } + + for (i = 0; i < resp_msg.n_contacts; ++i) + dht_contact_msg__free_unpacked(resp_msg.contacts[i], + NULL); + free(resp_msg.contacts); + + tpm_inc(dht->tpm); + } + + return (void *) 0; +} + +static void dht_post_packet(void * comp, + struct shm_du_buff * sdb) +{ + struct cmd * cmd; + struct dht * dht = (struct dht *) comp; + + if (dht_get_state(dht) == DHT_SHUTDOWN) { +#ifndef __DHT_TEST__ + ipcp_sdb_release(sdb); +#endif + return; + } + + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); + return; + } + + cmd->sdb = sdb; + + pthread_mutex_lock(&dht->mtx); + + list_add(&cmd->next, &dht->cmds); + + pthread_cond_signal(&dht->cond); + + pthread_mutex_unlock(&dht->mtx); +} + +void dht_destroy(void * dir) +{ + struct dht * dht; + struct list_head * p; + struct list_head * h; + + dht = (struct dht *) dir; + if (dht == NULL) + return; + +#ifndef __DHT_TEST__ + tpm_stop(dht->tpm); + + tpm_destroy(dht->tpm); +#endif + if (dht_get_state(dht) == DHT_RUNNING) { + dht_set_state(dht, DHT_SHUTDOWN); + pthread_cancel(dht->worker); + pthread_join(dht->worker, NULL); + } + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->cmds) { + struct cmd * c = list_entry(p, struct cmd, next); + list_del(&c->next); +#ifndef __DHT_TEST__ + ipcp_sdb_release(c->sdb); +#endif + free(c); + } + + list_for_each_safe(p, h, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + list_del(&r->next); + kad_req_destroy(r); + } + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e = list_entry(p, struct ref_entry, next); + list_del(&e->next); + ref_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + list_del(&l->next); + lookup_destroy(l); + } + + pthread_rwlock_unlock(&dht->lock); + + if (dht->buckets != NULL) + bucket_destroy(dht->buckets); + + bmp_destroy(dht->cookies); + + pthread_mutex_destroy(&dht->mtx); + + pthread_rwlock_destroy(&dht->lock); + + free(dht->id); + + free(dht); +} + +static void * join_thr(void * o) +{ + struct join_info * info = (struct join_info *) o; + struct lookup * lu; + size_t retr = 0; + + assert(info); + + while (kad_join(info->dht, info->addr)) { + if (dht_get_state(info->dht) == DHT_SHUTDOWN) { + log_dbg("DHT enrollment aborted."); + goto finish; + } + + if (retr++ == KAD_JOIN_RETR) { + dht_set_state(info->dht, DHT_INIT); + log_warn("DHT enrollment attempt failed."); + goto finish; + } + + sleep(KAD_JOIN_INTV); + } + + dht_set_state(info->dht, DHT_RUNNING); + + lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + + finish: + free(info); + + return (void *) 0; +} + +static void handle_event(void * self, + int event, + const void * o) +{ + struct dht * dht = (struct dht *) self; + + if (event == NOTIFY_DT_CONN_ADD) { + pthread_t thr; + struct join_info * inf; + struct conn * c = (struct conn *) o; + struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK); + + /* Give the pff some time to update for the new link. */ + nanosleep(&slack, NULL); + + switch(dht_get_state(dht)) { + case DHT_INIT: + inf = malloc(sizeof(*inf)); + if (inf == NULL) + break; + + inf->dht = dht; + inf->addr = c->conn_info.addr; + + if (dht_set_state(dht, DHT_JOINING) == 0 || + dht_wait_running(dht)) { + if (pthread_create(&thr, NULL, join_thr, inf)) { + dht_set_state(dht, DHT_INIT); + free(inf); + return; + } + pthread_detach(thr); + } else { + free(inf); + } + break; + case DHT_RUNNING: + /* + * FIXME: this lookup for effiency reasons + * causes a SEGV when stressed with rapid + * enrollments. + * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + * if (lu != NULL) + * lookup_destroy(lu); + */ + break; + default: + break; + } + } +} + +void * dht_create(void) +{ + struct dht * dht; + + dht = malloc(sizeof(*dht)); + if (dht == NULL) + goto fail_malloc; + + dht->buckets = NULL; + + list_head_init(&dht->entries); + list_head_init(&dht->requests); + list_head_init(&dht->refs); + list_head_init(&dht->lookups); + list_head_init(&dht->cmds); + + if (pthread_rwlock_init(&dht->lock, NULL)) + goto fail_rwlock; + + if (pthread_mutex_init(&dht->mtx, NULL)) + goto fail_mutex; + + if (pthread_cond_init(&dht->cond, NULL)) + goto fail_cond; + + dht->cookies = bmp_create(DHT_MAX_REQS, 1); + if (dht->cookies == NULL) + goto fail_bmp; + + dht->b = 0; + dht->id = NULL; +#ifndef __DHT_TEST__ + dht->addr = ipcpi.dt_addr; + dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); + if (dht->tpm == NULL) + goto fail_tpm_create; + + if (tpm_start(dht->tpm)) + goto fail_tpm_start; + + dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); + if ((int) dht->eid < 0) + goto fail_tpm_start; + + if (notifier_reg(handle_event, dht)) + goto fail_notifier_reg; +#else + (void) handle_event; + (void) dht_handle_packet; + (void) dht_post_packet; +#endif + dht->state = DHT_INIT; + + return (void *) dht; +#ifndef __DHT_TEST__ + fail_notifier_reg: + tpm_stop(dht->tpm); + fail_tpm_start: + tpm_destroy(dht->tpm); + fail_tpm_create: + bmp_destroy(dht->cookies); +#endif + fail_bmp: + pthread_cond_destroy(&dht->cond); + fail_cond: + pthread_mutex_destroy(&dht->mtx); + fail_mutex: + pthread_rwlock_destroy(&dht->lock); + fail_rwlock: + free(dht); + fail_malloc: + return NULL; +} diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h new file mode 100644 index 00000000..311c6b23 --- /dev/null +++ b/src/ipcpd/unicast/dir/dht.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Distributed Hash Table based on Kademlia + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DIR_DHT_H +#define OUROBOROS_IPCPD_UNICAST_DIR_DHT_H + +#include <ouroboros/ipcp-dev.h> + +#include "ops.h" + +#include <stdint.h> +#include <sys/types.h> + +void * dht_create(void); + +void dht_destroy(void * dir); + +int dht_bootstrap(void * dir); + +int dht_reg(void * dir, + const uint8_t * key); + +int dht_unreg(void * dir, + const uint8_t * key); + +uint64_t dht_query(void * dir, + const uint8_t * key); + +int dht_wait_running(void * dir); + +extern struct dir_ops dht_dir_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_DIR_DHT_H */ diff --git a/src/ipcpd/unicast/dir/dht.proto b/src/ipcpd/unicast/dir/dht.proto new file mode 100644 index 00000000..4c5b06db --- /dev/null +++ b/src/ipcpd/unicast/dir/dht.proto @@ -0,0 +1,45 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * DHT protocol, based on Kademlia + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message dht_contact_msg { + required bytes id = 1; + required uint64 addr = 2; +} + +message dht_msg { + required uint32 code = 1; + required uint32 cookie = 2; + required uint64 s_addr = 3; + optional bytes s_id = 4; + optional bytes key = 5; + repeated uint64 addrs = 6; + repeated dht_contact_msg contacts = 7; + // enrolment parameters + optional uint32 alpha = 8; + optional uint32 b = 9; + optional uint32 k = 10; + optional uint32 t_expire = 11; + optional uint32 t_refresh = 12; + optional uint32 t_replicate = 13; +} diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h new file mode 100644 index 00000000..6ff61ce6 --- /dev/null +++ b/src/ipcpd/unicast/dir/ops.h @@ -0,0 +1,46 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Directory policy ops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DIR_OPS_H +#define OUROBOROS_IPCPD_UNICAST_DIR_OPS_H + + +struct dir_ops { + void * (* create)(void); + + void (* destroy)(void * dir); + + int (* bootstrap)(void * dir); + + int (* reg)(void * dir, + const uint8_t * hash); + + int (* unreg)(void * dir, + const uint8_t * hash); + + uint64_t (* query)(void * dir, + const uint8_t * hash); + + int (* wait_running)(void * dir); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_DIR_OPS_H */ diff --git a/src/ipcpd/unicast/dir/pol.h b/src/ipcpd/unicast/dir/pol.h new file mode 100644 index 00000000..eae4b2e7 --- /dev/null +++ b/src/ipcpd/unicast/dir/pol.h @@ -0,0 +1,23 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Directory policies + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "dht.h" diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt new file mode 100644 index 00000000..f62ed993 --- /dev/null +++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt @@ -0,0 +1,40 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR + ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR + ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c + # Add new tests here + dht_test.c + ) + +protobuf_generate_c(DHT_PROTO_SRCS KAD_PROTO_HDRS ../dht.proto) +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} + ${DHT_PROTO_SRCS}) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +if(CMAKE_VERSION VERSION_LESS "3.29.0") + remove(tests_to_run test_suite.c) +else () + list(POP_FRONT tests_to_run) +endif() + +foreach (test ${tests_to_run}) + get_filename_component(test_name ${test} NAME_WE) + add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c new file mode 100644 index 00000000..bea2c3e7 --- /dev/null +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -0,0 +1,96 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Unit tests of the DHT + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define __DHT_TEST__ +#define DHT_TEST_KEY_LEN 32 + +#include "dht.c" + +#include <pthread.h> +#include <time.h> +#include <stdlib.h> +#include <stdio.h> + +#define CONTACTS 1000 + +int dht_test(int argc, + char ** argv) +{ + struct dht * dht; + uint8_t key[DHT_TEST_KEY_LEN]; + size_t i; + + (void) argc; + (void) argv; + + dht = dht_create(); + if (dht == NULL) { + printf("Failed to create dht.\n"); + return -1; + } + + dht_destroy(dht); + + dht = dht_create(); + if (dht == NULL) { + printf("Failed to re-create dht.\n"); + return -1; + } + + if (dht_bootstrap(dht)) { + printf("Failed to bootstrap dht.\n"); + dht_destroy(dht); + return -1; + } + + dht_destroy(dht); + + dht = dht_create(); + if (dht == NULL) { + printf("Failed to re-create dht.\n"); + return -1; + } + + if (dht_bootstrap(dht)) { + printf("Failed to bootstrap dht.\n"); + dht_destroy(dht); + return -1; + } + + for (i = 0; i < CONTACTS; ++i) { + uint64_t addr; + random_buffer(&addr, sizeof(addr)); + random_buffer(key, DHT_TEST_KEY_LEN); + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, key, addr)) { + pthread_rwlock_unlock(&dht->lock); + printf("Failed to update bucket.\n"); + dht_destroy(dht); + return -1; + } + pthread_rwlock_unlock(&dht->lock); + } + + dht_destroy(dht); + + return 0; +} diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c new file mode 100644 index 00000000..2bb5ed2f --- /dev/null +++ b/src/ipcpd/unicast/dt.c @@ -0,0 +1,871 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Data Transfer Component + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define DT "dt" +#define OUROBOROS_PREFIX DT + +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/dev.h> +#include <ouroboros/notifier.h> +#include <ouroboros/rib.h> +#ifdef IPCP_FLOW_STATS +#include <ouroboros/fccntl.h> +#endif + +#include "common/comp.h" +#include "common/connmgr.h" +#include "ca.h" +#include "ipcp.h" +#include "dt.h" +#include "pff.h" +#include "routing.h" +#include "psched.h" +#include "fa.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <inttypes.h> +#include <assert.h> + +#define QOS_BLOCK_LEN 672 +#define RIB_FILE_STRLEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX) +#define RIB_NAME_STRLEN 256 + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +struct comp_info { + void (* post_packet)(void * comp, struct shm_du_buff * sdb); + void * comp; + char * name; +}; + +/* Fixed field lengths */ +#define TTL_LEN 1 +#define QOS_LEN 1 +#define ECN_LEN 1 + +struct dt_pci { + uint64_t dst_addr; + qoscube_t qc; + uint8_t ttl; + uint8_t ecn; + uint64_t eid; +}; + +struct { + uint8_t addr_size; + uint8_t eid_size; + size_t head_size; + + /* Offsets */ + size_t qc_o; + size_t ttl_o; + size_t ecn_o; + size_t eid_o; + + /* Initial TTL value */ + uint8_t max_ttl; +} dt_pci_info; + +static void dt_pci_ser(uint8_t * head, + struct dt_pci * dt_pci) +{ + uint8_t ttl = dt_pci_info.max_ttl; + + assert(head); + assert(dt_pci); + + /* FIXME: Add check and operations for Big Endian machines. */ + memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size); + memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN); + memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN); + memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN); + memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size); + +} + +static void dt_pci_des(uint8_t * head, + struct dt_pci * dt_pci) +{ + assert(head); + assert(dt_pci); + + /* Decrease TTL */ + --*(head + dt_pci_info.ttl_o); + + /* FIXME: Add check and operations for Big Endian machines. */ + memcpy(&dt_pci->dst_addr, head, dt_pci_info.addr_size); + memcpy(&dt_pci->qc, head + dt_pci_info.qc_o, QOS_LEN); + memcpy(&dt_pci->ttl, head + dt_pci_info.ttl_o, TTL_LEN); + memcpy(&dt_pci->ecn, head + dt_pci_info.ecn_o, ECN_LEN); + memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size); +} + +static void dt_pci_shrink(struct shm_du_buff * sdb) +{ + assert(sdb); + + shm_du_buff_head_release(sdb, dt_pci_info.head_size); +} + +struct { + struct psched * psched; + + struct pff * pff[QOS_CUBE_MAX]; + struct routing_i * routing[QOS_CUBE_MAX]; +#ifdef IPCP_FLOW_STATS + struct { + time_t stamp; + uint64_t addr; + size_t snd_pkt[QOS_CUBE_MAX]; + size_t rcv_pkt[QOS_CUBE_MAX]; + size_t snd_bytes[QOS_CUBE_MAX]; + size_t rcv_bytes[QOS_CUBE_MAX]; + size_t lcl_r_pkt[QOS_CUBE_MAX]; + size_t lcl_r_bytes[QOS_CUBE_MAX]; + size_t lcl_w_pkt[QOS_CUBE_MAX]; + size_t lcl_w_bytes[QOS_CUBE_MAX]; + size_t r_drp_pkt[QOS_CUBE_MAX]; + size_t r_drp_bytes[QOS_CUBE_MAX]; + size_t w_drp_pkt[QOS_CUBE_MAX]; + size_t w_drp_bytes[QOS_CUBE_MAX]; + size_t f_nhp_pkt[QOS_CUBE_MAX]; + size_t f_nhp_bytes[QOS_CUBE_MAX]; + pthread_mutex_t lock; + } stat[PROG_MAX_FLOWS]; + + size_t n_flows; +#endif + struct bmp * res_fds; + struct comp_info comps[PROG_RES_FDS]; + pthread_rwlock_t lock; + + pthread_t listener; +} dt; + +static int dt_rib_read(const char * path, + char * buf, + size_t len) +{ +#ifdef IPCP_FLOW_STATS + int fd; + int i; + char str[QOS_BLOCK_LEN + 1]; + char addrstr[20]; + char * entry; + char tmstr[20]; + size_t rxqlen = 0; + size_t txqlen = 0; + struct tm * tm; + + /* NOTE: we may need stronger checks. */ + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + if (len < RIB_FILE_STRLEN) + return 0; + + buf[0] = '\0'; + + pthread_mutex_lock(&dt.stat[fd].lock); + + if (dt.stat[fd].stamp == 0) { + pthread_mutex_unlock(&dt.stat[fd].lock); + return 0; + } + + if (dt.stat[fd].addr == ipcpi.dt_addr) + sprintf(addrstr, "%s", dt.comps[fd].name); + else + sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr); + + tm = localtime(&dt.stat[fd].stamp); + strftime(tmstr, sizeof(tmstr), "%F %T", tm); + + if (fd >= PROG_RES_FDS) { + fccntl(fd, FLOWGRXQLEN, &rxqlen); + fccntl(fd, FLOWGTXQLEN, &txqlen); + } + + sprintf(buf, + "Flow established at: %20s\n" + "Endpoint address: %20s\n" + "Queued packets (rx): %20zu\n" + "Queued packets (tx): %20zu\n\n", + tmstr, addrstr, rxqlen, txqlen); + for (i = 0; i < QOS_CUBE_MAX; ++i) { + sprintf(str, + "Qos cube %3d:\n" + " sent (packets): %20zu\n" + " sent (bytes): %20zu\n" + " rcvd (packets): %20zu\n" + " rcvd (bytes): %20zu\n" + " local sent (packets): %20zu\n" + " local sent (bytes): %20zu\n" + " local rcvd (packets): %20zu\n" + " local rcvd (bytes): %20zu\n" + " dropped ttl (packets): %20zu\n" + " dropped ttl (bytes): %20zu\n" + " failed writes (packets): %20zu\n" + " failed writes (bytes): %20zu\n" + " failed nhop (packets): %20zu\n" + " failed nhop (bytes): %20zu\n", + i, + dt.stat[fd].snd_pkt[i], + dt.stat[fd].snd_bytes[i], + dt.stat[fd].rcv_pkt[i], + dt.stat[fd].rcv_bytes[i], + dt.stat[fd].lcl_w_pkt[i], + dt.stat[fd].lcl_w_bytes[i], + dt.stat[fd].lcl_r_pkt[i], + dt.stat[fd].lcl_r_bytes[i], + dt.stat[fd].r_drp_pkt[i], + dt.stat[fd].r_drp_bytes[i], + dt.stat[fd].w_drp_pkt[i], + dt.stat[fd].w_drp_bytes[i], + dt.stat[fd].f_nhp_pkt[i], + dt.stat[fd].f_nhp_bytes[i] + ); + strcat(buf, str); + } + + pthread_mutex_unlock(&dt.stat[fd].lock); + + return RIB_FILE_STRLEN; +#else + (void) path; + (void) buf; + (void) len; + return 0; +#endif +} + +static int dt_rib_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + + pthread_rwlock_rdlock(&dt.lock); + + if (dt.n_flows < 1) { + pthread_rwlock_unlock(&dt.lock); + return 0; + } + + *buf = malloc(sizeof(**buf) * dt.n_flows); + if (*buf == NULL) { + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; + } + + for (i = 0; i < PROG_MAX_FLOWS; ++i) { + pthread_mutex_lock(&dt.stat[i].lock); + + if (dt.stat[i].stamp == 0) { + pthread_mutex_unlock(&dt.stat[i].lock); + /* Optimization: skip unused res_fds. */ + if (i < PROG_RES_FDS) + i = PROG_RES_FDS; + continue; + } + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + pthread_mutex_unlock(&dt.stat[i].lock); + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; + } + + strcpy((*buf)[idx++], entry); + + pthread_mutex_unlock(&dt.stat[i].lock); + } + assert((size_t) idx == dt.n_flows); + + pthread_rwlock_unlock(&dt.lock); + + return idx; +#else + (void) buf; + return 0; +#endif +} + +static int dt_rib_getattr(const char * path, + struct rib_attr * attr) +{ +#ifdef IPCP_FLOW_STATS + int fd; + char * entry; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + pthread_mutex_lock(&dt.stat[fd].lock); + + if (dt.stat[fd].stamp != -1) { + attr->size = RIB_FILE_STRLEN; + attr->mtime = dt.stat[fd].stamp; + } else { + attr->size = 0; + attr->mtime = 0; + } + + pthread_mutex_unlock(&dt.stat[fd].lock); +#else + (void) path; + (void) attr; +#endif + return 0; +} + +static struct rib_ops r_ops = { + .read = dt_rib_read, + .readdir = dt_rib_readdir, + .getattr = dt_rib_getattr +}; + +#ifdef IPCP_FLOW_STATS +static void stat_used(int fd, + uint64_t addr) +{ + struct timespec now; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_mutex_lock(&dt.stat[fd].lock); + + memset(&dt.stat[fd], 0, sizeof(dt.stat[fd])); + + dt.stat[fd].stamp = (addr != INVALID_ADDR) ? now.tv_sec : 0; + dt.stat[fd].addr = addr; + + pthread_mutex_unlock(&dt.stat[fd].lock); + + pthread_rwlock_wrlock(&dt.lock); + + (addr != INVALID_ADDR) ? ++dt.n_flows : --dt.n_flows; + + pthread_rwlock_unlock(&dt.lock); +} +#endif + +static void handle_event(void * self, + int event, + const void * o) +{ + struct conn * c; + int fd; + + (void) self; + + c = (struct conn *) o; + + switch (event) { + case NOTIFY_DT_CONN_ADD: + fd = c->flow_info.fd; +#ifdef IPCP_FLOW_STATS + stat_used(fd, c->conn_info.addr); +#endif + psched_add(dt.psched, fd); + log_dbg("Added fd %d to packet scheduler.", fd); + break; + case NOTIFY_DT_CONN_DEL: + fd = c->flow_info.fd; +#ifdef IPCP_FLOW_STATS + stat_used(fd, INVALID_ADDR); +#endif + psched_del(dt.psched, fd); + log_dbg("Removed fd %d from packet scheduler.", fd); + break; + default: + break; + } +} + +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) +{ + struct dt_pci dt_pci; + int ret; + int ofd; + uint8_t * head; + size_t len; + + len = shm_du_buff_len(sdb); + +#ifndef IPCP_FLOW_STATS + (void) fd; +#else + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].rcv_pkt[qc]; + dt.stat[fd].rcv_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif + memset(&dt_pci, 0, sizeof(dt_pci)); + + head = shm_du_buff_head(sdb); + + dt_pci_des(head, &dt_pci); + if (dt_pci.dst_addr != ipcpi.dt_addr) { + if (dt_pci.ttl == 0) { + log_dbg("TTL was zero."); + ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].r_drp_pkt[qc]; + dt.stat[fd].r_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif + return; + } + + /* FIXME: Use qoscube from PCI instead of incoming flow. */ + ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); + if (ofd < 0) { + log_dbg("No next hop for %" PRIu64 ".", + dt_pci.dst_addr); + ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].f_nhp_pkt[qc]; + dt.stat[fd].f_nhp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif + return; + } + + (void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len); + + ret = ipcp_flow_write(ofd, sdb); + if (ret < 0) { + log_dbg("Failed to write packet to fd %d.", ofd); + if (ret == -EFLOWDOWN) + notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); + ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[ofd].lock); + + ++dt.stat[ofd].w_drp_pkt[qc]; + dt.stat[ofd].w_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif + return; + } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[ofd].lock); + + ++dt.stat[ofd].snd_pkt[qc]; + dt.stat[ofd].snd_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif + } else { + dt_pci_shrink(sdb); + if (dt_pci.eid >= PROG_RES_FDS) { + uint8_t ecn = *(head + dt_pci_info.ecn_o); + fa_np1_rcv(dt_pci.eid, ecn, sdb); + return; + } + + if (dt.comps[dt_pci.eid].post_packet == NULL) { + log_err("No registered component on eid %" PRIu64 ".", + dt_pci.eid); + ipcp_sdb_release(sdb); + return; + } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].lcl_r_pkt[qc]; + dt.stat[fd].lcl_r_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); + pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + + ++dt.stat[dt_pci.eid].snd_pkt[qc]; + dt.stat[dt_pci.eid].snd_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif + dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, + sdb); + } +} + +static void * dt_conn_handle(void * o) +{ + struct conn conn; + + (void) o; + + while (true) { + if (connmgr_wait(COMPID_DT, &conn)) { + log_err("Failed to get next DT connection."); + continue; + } + + /* NOTE: connection acceptance policy could be here. */ + + notifier_event(NOTIFY_DT_CONN_ADD, &conn); + } + + return 0; +} + +int dt_init(struct dt_config cfg) +{ + int i; + int j; + char dtstr[RIB_NAME_STRLEN + 1]; + int pp; + struct conn_info info; + + memset(&info, 0, sizeof(info)); + + strcpy(info.comp_name, DT_COMP); + strcpy(info.protocol, DT_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_FIXED; + info.addr = ipcpi.dt_addr; + + if (cfg.eid_size != 8) { /* only support 64 bits from now */ + log_warn("Invalid EID size. Only 64 bit is supported."); + cfg.eid_size = 8; + } + + dt_pci_info.addr_size = cfg.addr_size; + dt_pci_info.eid_size = cfg.eid_size; + dt_pci_info.max_ttl = cfg.max_ttl; + + dt_pci_info.qc_o = dt_pci_info.addr_size; + dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN; + dt_pci_info.ecn_o = dt_pci_info.ttl_o + TTL_LEN; + dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN; + dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size; + + if (connmgr_comp_init(COMPID_DT, &info)) { + log_err("Failed to register with connmgr."); + goto fail_connmgr_comp_init; + } + + pp = routing_init(cfg.routing_type); + if (pp < 0) { + log_err("Failed to init routing."); + goto fail_routing; + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + dt.pff[i] = pff_create(pp); + if (dt.pff[i] == NULL) { + log_err("Failed to create a PFF."); + for (j = 0; j < i; ++j) + pff_destroy(dt.pff[j]); + goto fail_pff; + } + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + dt.routing[i] = routing_i_create(dt.pff[i]); + if (dt.routing[i] == NULL) { + for (j = 0; j < i; ++j) + routing_i_destroy(dt.routing[j]); + goto fail_routing_i; + } + } + + if (pthread_rwlock_init(&dt.lock, NULL)) { + log_err("Failed to init rwlock."); + goto fail_rwlock_init; + } + + dt.res_fds = bmp_create(PROG_RES_FDS, 0); + if (dt.res_fds == NULL) + goto fail_res_fds; +#ifdef IPCP_FLOW_STATS + memset(dt.stat, 0, sizeof(dt.stat)); + + for (i = 0; i < PROG_MAX_FLOWS; ++i) + if (pthread_mutex_init(&dt.stat[i].lock, NULL)) { + log_err("Failed to init mutex for flow %d.", i); + for (j = 0; j < i; ++j) + pthread_mutex_destroy(&dt.stat[j].lock); + goto fail_stat_lock; + } + + dt.n_flows = 0; +#endif + sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr); + if (rib_reg(dtstr, &r_ops)) { + log_err("Failed to register RIB."); + goto fail_rib_reg; + } + + return 0; + + fail_rib_reg: +#ifdef IPCP_FLOW_STATS + for (i = 0; i < PROG_MAX_FLOWS; ++i) + pthread_mutex_destroy(&dt.stat[i].lock); + fail_stat_lock: +#endif + bmp_destroy(dt.res_fds); + fail_res_fds: + pthread_rwlock_destroy(&dt.lock); + fail_rwlock_init: + for (j = 0; j < QOS_CUBE_MAX; ++j) + routing_i_destroy(dt.routing[j]); + fail_routing_i: + for (i = 0; i < QOS_CUBE_MAX; ++i) + pff_destroy(dt.pff[i]); + fail_pff: + routing_fini(); + fail_routing: + connmgr_comp_fini(COMPID_DT); + fail_connmgr_comp_init: + return -1; +} + +void dt_fini(void) +{ + char dtstr[RIB_NAME_STRLEN + 1]; + int i; + + sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr); + rib_unreg(dtstr); +#ifdef IPCP_FLOW_STATS + for (i = 0; i < PROG_MAX_FLOWS; ++i) + pthread_mutex_destroy(&dt.stat[i].lock); +#endif + bmp_destroy(dt.res_fds); + + pthread_rwlock_destroy(&dt.lock); + + for (i = 0; i < QOS_CUBE_MAX; ++i) + routing_i_destroy(dt.routing[i]); + + for (i = 0; i < QOS_CUBE_MAX; ++i) + pff_destroy(dt.pff[i]); + + routing_fini(); + + connmgr_comp_fini(COMPID_DT); +} + +int dt_start(void) +{ + dt.psched = psched_create(packet_handler, ipcp_flow_read); + if (dt.psched == NULL) { + log_err("Failed to create N-1 packet scheduler."); + goto fail_psched; + } + + if (notifier_reg(handle_event, NULL)) { + log_err("Failed to register with notifier."); + goto fail_notifier_reg; + } + + if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { + log_err("Failed to create listener thread."); + psched_destroy(dt.psched); + return -1; + } + + return 0; + + fail_notifier_reg: + psched_destroy(dt.psched); + fail_psched: + return -1; + +} + +void dt_stop(void) +{ + pthread_cancel(dt.listener); + pthread_join(dt.listener, NULL); + + notifier_unreg(&handle_event); + + psched_destroy(dt.psched); +} + +int dt_reg_comp(void * comp, + void (* func)(void * func, struct shm_du_buff *), + char * name) +{ + int eid; + + assert(func); + + pthread_rwlock_wrlock(&dt.lock); + + eid = bmp_allocate(dt.res_fds); + if (!bmp_is_id_valid(dt.res_fds, eid)) { + log_err("Cannot allocate EID."); + pthread_rwlock_unlock(&dt.lock); + return -EBADF; + } + + assert(dt.comps[eid].post_packet == NULL); + assert(dt.comps[eid].comp == NULL); + assert(dt.comps[eid].name == NULL); + + dt.comps[eid].post_packet = func; + dt.comps[eid].comp = comp; + dt.comps[eid].name = name; + + pthread_rwlock_unlock(&dt.lock); +#ifdef IPCP_FLOW_STATS + stat_used(eid, ipcpi.dt_addr); +#endif + return eid; +} + +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + uint64_t eid, + struct shm_du_buff * sdb) +{ + struct dt_pci dt_pci; + int fd; + int ret; + uint8_t * head; + size_t len; + + assert(sdb); + assert(dst_addr != ipcpi.dt_addr); + + len = shm_du_buff_len(sdb); + +#ifdef IPCP_FLOW_STATS + if (eid < PROG_RES_FDS) { + pthread_mutex_lock(&dt.stat[eid].lock); + + ++dt.stat[eid].lcl_r_pkt[qc]; + dt.stat[eid].lcl_r_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[eid].lock); + } +#endif + fd = pff_nhop(dt.pff[qc], dst_addr); + if (fd < 0) { + log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); +#ifdef IPCP_FLOW_STATS + if (eid < PROG_RES_FDS) { + pthread_mutex_lock(&dt.stat[eid].lock); + + ++dt.stat[eid].lcl_r_pkt[qc]; + dt.stat[eid].lcl_r_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[eid].lock); + } +#endif + return -EPERM; + } + + head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size); + if (head == NULL) { + log_dbg("Failed to allocate DT header."); + goto fail_write; + } + + len = shm_du_buff_len(sdb); + + dt_pci.dst_addr = dst_addr; + dt_pci.qc = qc; + dt_pci.eid = eid; + dt_pci.ecn = 0; + + (void) ca_calc_ecn(fd, &dt_pci.ecn, qc, len); + + dt_pci_ser(head, &dt_pci); + + ret = ipcp_flow_write(fd, sdb); + if (ret < 0) { + log_dbg("Failed to write packet to fd %d.", fd); + if (ret == -EFLOWDOWN) + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + goto fail_write; + } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + if (dt_pci.eid < PROG_RES_FDS) { + ++dt.stat[fd].lcl_w_pkt[qc]; + dt.stat[fd].lcl_w_bytes[qc] += len; + } + ++dt.stat[fd].snd_pkt[qc]; + dt.stat[fd].snd_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif + return 0; + + fail_write: +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + if (eid < PROG_RES_FDS) { + ++dt.stat[fd].lcl_w_pkt[qc]; + dt.stat[fd].lcl_w_bytes[qc] += len; + } + ++dt.stat[fd].w_drp_pkt[qc]; + dt.stat[fd].w_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif + return -1; +} diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h new file mode 100644 index 00000000..7198a013 --- /dev/null +++ b/src/ipcpd/unicast/dt.h @@ -0,0 +1,51 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Data Transfer component + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DT_H +#define OUROBOROS_IPCPD_UNICAST_DT_H + +#include <ouroboros/ipcp.h> +#include <ouroboros/qoscube.h> +#include <ouroboros/shm_rdrbuff.h> + +#define DT_COMP "Data Transfer" +#define DT_PROTO "dtp" +#define INVALID_ADDR 0 + +int dt_init(struct dt_config cfg); + +void dt_fini(void); + +int dt_start(void); + +void dt_stop(void); + +int dt_reg_comp(void * comp, + void (* func)(void * comp, struct shm_du_buff * sdb), + char * name); + +int dt_write_packet(uint64_t dst_addr, + qoscube_t qc, + uint64_t eid, + struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCPD_UNICAST_DT_H */ diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c new file mode 100644 index 00000000..3631fd7b --- /dev/null +++ b/src/ipcpd/unicast/fa.c @@ -0,0 +1,969 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Flow allocator of the IPC Process + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define FA "flow-allocator" +#define OUROBOROS_PREFIX FA + +#include <ouroboros/endian.h> +#include <ouroboros/logs.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/rib.h> +#include <ouroboros/random.h> +#include <ouroboros/pthread.h> + +#include "dir.h" +#include "fa.h" +#include "psched.h" +#include "ipcp.h" +#include "dt.h" +#include "ca.h" + +#include <inttypes.h> +#include <stdlib.h> +#include <string.h> + +#if defined (IPCP_FLOW_STATS) && !defined(CLOCK_REALTIME_COARSE) +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +#define TIMEOUT 10 * MILLION /* nanoseconds */ + +#define FLOW_REQ 0 +#define FLOW_REPLY 1 +#define FLOW_UPDATE 2 +#define MSGBUFSZ 2048 + +#define STAT_FILE_LEN 0 + +struct fa_msg { + uint64_t s_addr; + uint64_t r_eid; + uint64_t s_eid; + uint8_t code; + int8_t response; + uint16_t ece; + /* QoS parameters from spec, aligned */ + uint32_t delay; + uint64_t bandwidth; + uint32_t loss; + uint32_t ber; + uint32_t max_gap; + uint32_t timeout; + uint16_t cypher_s; + uint8_t availability; + uint8_t in_order; +} __attribute__((packed)); + +struct cmd { + struct list_head next; + struct shm_du_buff * sdb; +}; + +struct fa_flow { +#ifdef IPCP_FLOW_STATS + time_t stamp; /* Flow creation */ + size_t p_snd; /* Packets sent */ + size_t p_snd_f; /* Packets sent fail */ + size_t b_snd; /* Bytes sent */ + size_t b_snd_f; /* Bytes sent fail */ + size_t p_rcv; /* Packets received */ + size_t p_rcv_f; /* Packets received fail */ + size_t b_rcv; /* Bytes received */ + size_t b_rcv_f; /* Bytes received fail */ + size_t u_snd; /* Flow updates sent */ + size_t u_rcv; /* Flow updates received */ +#endif + uint64_t s_eid; /* Local endpoint id */ + uint64_t r_eid; /* Remote endpoint id */ + uint64_t r_addr; /* Remote address */ + void * ctx; /* Congestion avoidance context */ +}; + +struct { + pthread_rwlock_t flows_lock; + struct fa_flow flows[PROG_MAX_FLOWS]; +#ifdef IPCP_FLOW_STATS + size_t n_flows; +#endif + uint32_t eid; + + struct list_head cmds; + pthread_cond_t cond; + pthread_mutex_t mtx; + pthread_t worker; + + struct psched * psched; +} fa; + +static int fa_rib_read(const char * path, + char * buf, + size_t len) +{ +#ifdef IPCP_FLOW_STATS + struct fa_flow * flow; + int fd; + char r_addrstr[21]; + char s_eidstr[21]; + char r_eidstr[21]; + char tmstr[20]; + char castr[1024]; + char * entry; + struct tm * tm; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + if (fd < 0 || fd >= PROG_MAX_FLOWS) + return -1; + + if (len < 1536) + return 0; + + flow = &fa.flows[fd]; + + buf[0] = '\0'; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp ==0) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + sprintf(r_addrstr, "%" PRIu64, flow->r_addr); + sprintf(s_eidstr, "%" PRIu64, flow->s_eid); + sprintf(r_eidstr, "%" PRIu64, flow->r_eid); + + tm = localtime(&flow->stamp); + strftime(tmstr, sizeof(tmstr), "%F %T", tm); + + ca_print_stats(flow->ctx, castr, 1024); + + sprintf(buf, + "Flow established at: %20s\n" + "Remote address: %20s\n" + "Local endpoint ID: %20s\n" + "Remote endpoint ID: %20s\n" + "Sent (packets): %20zu\n" + "Sent (bytes): %20zu\n" + "Send failed (packets): %20zu\n" + "Send failed (bytes): %20zu\n" + "Received (packets): %20zu\n" + "Received (bytes): %20zu\n" + "Receive failed (packets): %20zu\n" + "Receive failed (bytes): %20zu\n" + "Sent flow updates (packets): %20zu\n" + "Received flow updates (packets): %20zu\n" + "%s", + tmstr, r_addrstr, + s_eidstr, r_eidstr, + flow->p_snd, flow->b_snd, + flow->p_snd_f, flow->b_snd_f, + flow->p_rcv, flow->b_rcv, + flow->b_rcv_f, flow->b_rcv_f, + flow->u_snd, flow->u_rcv, + castr); + + pthread_rwlock_unlock(&fa.flows_lock); + + return strlen(buf); +#else + (void) path; + (void) buf; + (void) len; + return 0; +#endif +} + +static int fa_rib_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (fa.n_flows < 1) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + *buf = malloc(sizeof(**buf) * fa.n_flows); + if (*buf == NULL) { + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; + } + + for (i = 0; i < PROG_MAX_FLOWS; ++i) { + struct fa_flow * flow; + + flow = &fa.flows[i]; + if (flow->stamp == 0) + continue; + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; + } + + strcpy((*buf)[idx++], entry); + } + + assert((size_t) idx == fa.n_flows); + + pthread_rwlock_unlock(&fa.flows_lock); + + return idx; +#else + (void) buf; + return 0; +#endif +} + +static int fa_rib_getattr(const char * path, + struct rib_attr * attr) +{ +#ifdef IPCP_FLOW_STATS + int fd; + char * entry; + struct fa_flow * flow; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + fd = atoi(entry); + + flow = &fa.flows[fd]; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp != 0) { + attr->size = 1536; + attr->mtime = flow->stamp; + } else { + attr->size = 0; + attr->mtime = 0; + } + + pthread_rwlock_unlock(&fa.flows_lock); +#else + (void) path; + (void) attr; +#endif + return 0; +} + +static struct rib_ops r_ops = { + .read = fa_rib_read, + .readdir = fa_rib_readdir, + .getattr = fa_rib_getattr +}; + +static int eid_to_fd(uint64_t eid) +{ + struct fa_flow * flow; + int fd; + + fd = eid & 0xFFFFFFFF; + + if (fd < 0 || fd >= PROG_MAX_FLOWS) + return -1; + + flow = &fa.flows[fd]; + + if (flow->s_eid == eid) + return fd; + + return -1; +} + +static uint64_t gen_eid(int fd) +{ + uint32_t rnd; + + if (random_buffer(&rnd, sizeof(rnd)) < 0) + return fa.eid; /* INVALID */ + + fd &= 0xFFFFFFFF; + + return ((uint64_t) rnd << 32) + fd; +} + +static void packet_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) +{ + struct fa_flow * flow; + uint64_t r_addr; + uint64_t r_eid; + ca_wnd_t wnd; + size_t len; + + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + len = shm_du_buff_len(sdb); + +#ifdef IPCP_FLOW_STATS + ++flow->p_snd; + flow->b_snd += len; +#endif + wnd = ca_ctx_update_snd(flow->ctx, len); + + r_addr = flow->r_addr; + r_eid = flow->r_eid; + + pthread_rwlock_unlock(&fa.flows_lock); + + ca_wnd_wait(wnd); + + if (dt_write_packet(r_addr, qc, r_eid, sdb)) { + ipcp_sdb_release(sdb); + log_dbg("Failed to forward packet."); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_snd_f; + flow->b_snd_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif + return; + } +} + +static int fa_flow_init(struct fa_flow * flow) +{ +#ifdef IPCP_FLOW_STATS + struct timespec now; +#endif + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->s_eid = -1; + flow->r_addr = INVALID_ADDR; + + flow->ctx = ca_ctx_create(); + if (flow->ctx == NULL) + return -1; + +#ifdef IPCP_FLOW_STATS + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + flow->stamp = now.tv_sec; + + ++fa.n_flows; +#endif + return 0; +} + +static void fa_flow_fini(struct fa_flow * flow) +{ + ca_ctx_destroy(flow->ctx); + + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->s_eid = -1; + flow->r_addr = INVALID_ADDR; + +#ifdef IPCP_FLOW_STATS + --fa.n_flows; +#endif +} + +static void fa_post_packet(void * comp, + struct shm_du_buff * sdb) +{ + struct cmd * cmd; + + assert(comp == &fa); + + (void) comp; + + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); + ipcp_sdb_release(sdb); + return; + } + + cmd->sdb = sdb; + + pthread_mutex_lock(&fa.mtx); + + list_add(&cmd->next, &fa.cmds); + + pthread_cond_signal(&fa.cond); + + pthread_mutex_unlock(&fa.mtx); +} + +static size_t fa_wait_for_fa_msg(struct fa_msg * msg) +{ + struct cmd * cmd; + size_t len; + + pthread_mutex_lock(&fa.mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); + + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); + + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_cleanup_pop(true); + + len = shm_du_buff_len(cmd->sdb); + if (len > MSGBUFSZ || len < sizeof(*msg)) { + log_warn("Invalid flow allocation message (len: %zd).", len); + free(cmd); + return 0; /* No valid message */ + } + + memcpy(msg, shm_du_buff_head(cmd->sdb), len); + + ipcp_sdb_release(cmd->sdb); + + free(cmd); + + return len; +} + +static int fa_handle_flow_req(struct fa_msg * msg, + size_t len) +{ + size_t msg_len; + int fd; + qosspec_t qs; + struct fa_flow * flow; + uint8_t * dst; + buffer_t data; /* Piggbacked data on flow alloc request. */ + + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + if (len < msg_len) { + log_err("Invalid flow allocation request"); + return -EPERM; + } + + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; + + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); + + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); + if (fd < 0) + return fd; + + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa_flow_init(flow); + + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); + + pthread_rwlock_unlock(&fa.flows_lock); + + return fd; +} + +static int fa_handle_flow_reply(struct fa_msg * msg, + size_t len) +{ + int fd; + struct fa_flow * flow; + buffer_t data; /* Piggbacked data on flow alloc request. */ + time_t mpl = IPCP_UNICAST_MPL; + + assert(len >= sizeof(*msg)); + + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); + + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow reply for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -ENOTALLOC; + } + + flow = &fa.flows[fd]; + + flow->r_eid = ntoh64(msg->s_eid); + + if (msg->response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, fd); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { + log_err("Failed to reply for flow allocation on fd %d.", fd); + return -EIRMD; + } + + return 0; +} + +static int fa_handle_flow_update(struct fa_msg * msg, + size_t len) +{ + struct fa_flow * flow; + int fd; + + (void) len; + assert(len >= sizeof(*msg)); + + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow update for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -EPERM; + } + + flow = &fa.flows[fd]; +#ifdef IPCP_FLOW_STATS + flow->u_rcv++; +#endif + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); + + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} + +static void * fa_handle_packet(void * o) +{ + (void) o; + + while (true) { + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + size_t len; + + msg = (struct fa_msg *) buf; + + len = fa_wait_for_fa_msg(msg); + if (len == 0) + continue; + + switch (msg->code) { + case FLOW_REQ: + if (fa_handle_flow_req(msg, len) < 0) + log_err("Error handling flow alloc request."); + break; + case FLOW_REPLY: + if (fa_handle_flow_reply(msg, len) < 0) + log_err("Error handling flow reply."); + break; + case FLOW_UPDATE: + if (fa_handle_flow_update(msg, len) < 0) + log_err("Error handling flow update."); + break; + default: + log_warn("Recieved unknown flow allocation message."); + break; + } + } + + return (void *) 0; +} + +int fa_init(void) +{ + pthread_condattr_t cattr; + + if (pthread_rwlock_init(&fa.flows_lock, NULL)) + goto fail_rwlock; + + if (pthread_mutex_init(&fa.mtx, NULL)) + goto fail_mtx; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&fa.cond, &cattr)) + goto fail_cond; + + pthread_condattr_destroy(&cattr); + + list_head_init(&fa.cmds); + + if (rib_reg(FA, &r_ops)) + goto fail_rib_reg; + + fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); + if ((int) fa.eid < 0) + goto fail_rib_reg; + + return 0; + + fail_rib_reg: + pthread_cond_destroy(&fa.cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&fa.mtx); + fail_mtx: + pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: + + return -1; +} + +void fa_fini(void) +{ + rib_unreg(FA); + + pthread_cond_destroy(&fa.cond);; + pthread_mutex_destroy(&fa.mtx); + pthread_rwlock_destroy(&fa.flows_lock); +} + +int fa_start(void) +{ + struct sched_param par; + int pol; + int max; + + fa.psched = psched_create(packet_handler, np1_flow_read); + if (fa.psched == NULL) { + log_err("Failed to start packet scheduler."); + goto fail_psched; + } + + if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) { + log_err("Failed to create worker thread."); + goto fail_thread; + } + + if (pthread_getschedparam(fa.worker, &pol, &par)) { + log_err("Failed to get worker thread scheduling parameters."); + goto fail_sched; + } + + max = sched_get_priority_max(pol); + if (max < 0) { + log_err("Failed to get max priority for scheduler."); + goto fail_sched; + } + + par.sched_priority = max; + + if (pthread_setschedparam(fa.worker, pol, &par)) { + log_err("Failed to set scheduler priority to maximum."); + goto fail_sched; + } + + return 0; + + fail_sched: + pthread_cancel(fa.worker); + pthread_join(fa.worker, NULL); + fail_thread: + psched_destroy(fa.psched); + fail_psched: + return -1; +} + +void fa_stop(void) +{ + pthread_cancel(fa.worker); + pthread_join(fa.worker, NULL); + + psched_destroy(fa.psched); +} + +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) +{ + struct fa_msg * msg; + struct shm_du_buff * sdb; + struct fa_flow * flow; + uint64_t addr; + qoscube_t qc = QOS_CUBE_BE; + size_t len; + uint64_t eid; + + addr = dir_query(dst); + if (addr == 0) + return -1; + + len = sizeof(*msg) + ipcp_dir_hash_len(); + + if (ipcp_sdb_reserve(&sdb, len + data->len)) + return -1; + + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + + eid = gen_eid(fd); + + msg->code = FLOW_REQ; + msg->s_eid = hton64(eid); + msg->s_addr = hton64(ipcpi.dt_addr); + msg->delay = hton32(qs.delay); + msg->bandwidth = hton64(qs.bandwidth); + msg->availability = qs.availability; + msg->loss = hton32(qs.loss); + msg->ber = hton32(qs.ber); + msg->in_order = qs.in_order; + msg->max_gap = hton32(qs.max_gap); + msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); + + memcpy(msg + 1, dst, ipcp_dir_hash_len()); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); + + if (dt_write_packet(addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation request packet."); + ipcp_sdb_release(sdb); + return -1; + } + + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa_flow_init(flow); + flow->r_addr = addr; + flow->s_eid = eid; + + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} + +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) +{ + struct fa_msg * msg; + struct shm_du_buff * sdb; + struct fa_flow * flow; + qoscube_t qc = QOS_CUBE_BE; + + flow = &fa.flows[fd]; + + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); + goto fail_alloc_resp; + } + + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve sdb (%zu bytes).", + sizeof(*msg) + data->len); + goto fail_reserve; + } + + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + + msg->code = FLOW_REPLY; + msg->response = response; + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); + + pthread_rwlock_rdlock(&fa.flows_lock); + + msg->r_eid = hton64(flow->r_eid); + msg->s_eid = hton64(flow->s_eid); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation response packet."); + goto fail_packet; + } + + if (response < 0) { + pthread_rwlock_rdlock(&fa.flows_lock); + fa_flow_fini(flow); + pthread_rwlock_unlock(&fa.flows_lock); + } else { + psched_add(fa.psched, fd); + } + + return 0; + + fail_packet: + ipcp_sdb_release(sdb); + fail_reserve: + pthread_rwlock_wrlock(&fa.flows_lock); + fa_flow_fini(flow); + pthread_rwlock_unlock(&fa.flows_lock); + fail_alloc_resp: + return -1; +} + +int fa_dealloc(int fd) +{ + if (ipcp_flow_fini(fd) < 0) + return 0; + + psched_del(fa.psched, fd); + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa_flow_fini(&fa.flows[fd]); + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcp_flow_dealloc(fd); + + return 0; +} + +static int fa_update_remote(int fd, + uint16_t ece) +{ + struct fa_msg * msg; + struct shm_du_buff * sdb; + qoscube_t qc = QOS_CUBE_BE; + struct fa_flow * flow; + uint64_t r_addr; + + if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg)); + return -1; + } + + msg = (struct fa_msg *) shm_du_buff_head(sdb); + + memset(msg, 0, sizeof(*msg)); + + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + msg->code = FLOW_UPDATE; + msg->r_eid = hton64(flow->r_eid); + msg->ece = hton16(ece); + + r_addr = flow->r_addr; +#ifdef IPCP_FLOW_STATS + flow->u_snd++; +#endif + pthread_rwlock_unlock(&fa.flows_lock); + + + if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow update packet."); + ipcp_sdb_release(sdb); + return -1; + } + + return 0; +} + +void fa_np1_rcv(uint64_t eid, + uint8_t ecn, + struct shm_du_buff * sdb) +{ + struct fa_flow * flow; + bool update; + uint16_t ece; + int fd; + size_t len; + + len = shm_du_buff_len(sdb); + + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = eid_to_fd(eid); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); + ipcp_sdb_release(sdb); + return; + } + + flow = &fa.flows[fd]; + +#ifdef IPCP_FLOW_STATS + ++flow->p_rcv; + flow->b_rcv += len; +#endif + update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (ipcp_flow_write(fd, sdb) < 0) { + log_dbg("Failed to write to flow %d.", fd); + ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_rcv_f; + flow->b_rcv_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif + } + + if (update) + fa_update_remote(eid, ece); +} diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h new file mode 100644 index 00000000..1e716966 --- /dev/null +++ b/src/ipcpd/unicast/fa.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Flow allocator of the IPC Process + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_FA_H +#define OUROBOROS_IPCPD_UNICAST_FA_H + +#include <ouroboros/qos.h> +#include <ouroboros/utils.h> + +int fa_init(void); + +void fa_fini(void); + +int fa_start(void); + +void fa_stop(void); + +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); + +int fa_alloc_resp(int fd, + int response, + const buffer_t * data); + +int fa_dealloc(int fd); + +void fa_np1_rcv(uint64_t eid, + uint8_t ecn, + struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCPD_UNICAST_FA_H */ diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c new file mode 100644 index 00000000..e6cb2994 --- /dev/null +++ b/src/ipcpd/unicast/main.c @@ -0,0 +1,351 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Unicast IPC Process + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "unicast-ipcp" +#define THIS_TYPE IPCP_UNICAST + +#include <ouroboros/errno.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/logs.h> +#include <ouroboros/notifier.h> +#include <ouroboros/random.h> +#include <ouroboros/rib.h> +#include <ouroboros/time.h> + +#include "common/connmgr.h" +#include "common/enroll.h" +#include "addr-auth.h" +#include "ca.h" +#include "dir.h" +#include "dt.h" +#include "fa.h" +#include "ipcp.h" + +#include <stdbool.h> +#include <signal.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +struct ipcp ipcpi; + +static int initialize_components(const struct ipcp_config * conf) +{ + strcpy(ipcpi.layer_name, conf->layer_info.name); + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; + + assert(ipcp_dir_hash_len() != 0); + + if (addr_auth_init(conf->unicast.addr_auth_type, + &conf->unicast.dt.addr_size)) { + log_err("Failed to init address authority."); + goto fail_addr_auth; + } + + ipcpi.dt_addr = addr_auth_address(); + if (ipcpi.dt_addr == 0) { + log_err("Failed to get a valid address."); + goto fail_addr_auth; + } + + log_info("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); + + if (ca_init(conf->unicast.cong_avoid)) { + log_err("Failed to initialize congestion avoidance."); + goto fail_ca; + } + + if (dt_init(conf->unicast.dt)) { + log_err("Failed to initialize data transfer component."); + goto fail_dt; + } + + if (fa_init()) { + log_err("Failed to initialize flow allocator component."); + goto fail_fa; + } + + if (dir_init()) { + log_err("Failed to initialize directory."); + goto fail_dir; + } + + ipcp_set_state(IPCP_INIT); + + return 0; + + fail_dir: + fa_fini(); + fail_fa: + dt_fini(); + fail_dt: + ca_fini(); + fail_ca: + addr_auth_fini(); + fail_addr_auth: + return -1; +} + +static void finalize_components(void) +{ + dir_fini(); + + fa_fini(); + + dt_fini(); + + ca_fini(); + + addr_auth_fini(); +} + +static int start_components(void) +{ + if (dt_start() < 0) { + log_err("Failed to start data transfer."); + goto fail_dt_start; + } + + if (fa_start() < 0) { + log_err("Failed to start flow allocator."); + goto fail_fa_start; + } + + if (enroll_start() < 0) { + log_err("Failed to start enrollment."); + goto fail_enroll_start; + } + + if (connmgr_start() < 0) { + log_err("Failed to start AP connection manager."); + goto fail_connmgr_start; + } + + return 0; + + fail_connmgr_start: + enroll_stop(); + fail_enroll_start: + fa_stop(); + fail_fa_start: + dt_stop(); + fail_dt_start: + ipcp_set_state(IPCP_INIT); + return -1; +} + +static void stop_components(void) +{ + connmgr_stop(); + + enroll_stop(); + + fa_stop(); + + dt_stop(); + + ipcp_set_state(IPCP_INIT); +} + +static int bootstrap_components(void) +{ + if (dir_bootstrap()) { + log_err("Failed to bootstrap directory."); + return -1; + } + + return 0; +} + +static int unicast_ipcp_enroll(const char * dst, + struct layer_info * info) +{ + struct conn conn; + uint8_t id[ENROLL_ID_LEN]; + + if (random_buffer(id, ENROLL_ID_LEN) < 0) { + log_err("Failed to generate enrollment ID."); + goto fail_id; + } + + log_info_id(id, "Requesting enrollment."); + + if (connmgr_alloc(COMPID_ENROLL, dst, NULL, &conn) < 0) { + log_err_id(id, "Failed to get connection."); + goto fail_id; + } + + /* Get boot state from peer. */ + if (enroll_boot(&conn, id) < 0) { + log_err_id(id, "Failed to get boot information."); + goto fail_enroll_boot; + } + + if (initialize_components(enroll_get_conf()) < 0) { + log_err_id(id, "Failed to initialize components."); + goto fail_enroll_boot; + } + + if (start_components() < 0) { + log_err_id(id, "Failed to start components."); + goto fail_start_comp; + } + + if (enroll_ack(&conn, id, 0) < 0) + log_err_id(id, "Failed to confirm enrollment."); + + if (connmgr_dealloc(COMPID_ENROLL, &conn) < 0) + log_warn_id(id, "Failed to dealloc enrollment flow."); + + log_info_id(id, "Enrolled with %s.", dst); + + info->dir_hash_algo = (enum pol_dir_hash) ipcpi.dir_hash_algo; + strcpy(info->name, ipcpi.layer_name); + + return 0; + + fail_start_comp: + finalize_components(); + fail_enroll_boot: + connmgr_dealloc(COMPID_ENROLL, &conn); + fail_id: + return -1; +} + +static int unicast_ipcp_bootstrap(const struct ipcp_config * conf) +{ + assert(conf); + assert(conf->type == THIS_TYPE); + + enroll_bootstrap(conf); + + if (initialize_components(conf) < 0) { + log_err("Failed to init IPCP components."); + goto fail_init; + } + + if (start_components() < 0) { + log_err("Failed to init IPCP components."); + goto fail_start; + } + + if (bootstrap_components() < 0) { + log_err("Failed to bootstrap IPCP components."); + goto fail_bootstrap; + } + + return 0; + + fail_bootstrap: + stop_components(); + fail_start: + finalize_components(); + fail_init: + return -1; +} + +static int unicast_ipcp_query(const uint8_t * dst) +{ + return dir_query(dst) ? 0 : -1; +} + +static struct ipcp_ops unicast_ops = { + .ipcp_bootstrap = unicast_ipcp_bootstrap, + .ipcp_enroll = unicast_ipcp_enroll, + .ipcp_connect = connmgr_ipcp_connect, + .ipcp_disconnect = connmgr_ipcp_disconnect, + .ipcp_reg = dir_reg, + .ipcp_unreg = dir_unreg, + .ipcp_query = unicast_ipcp_query, + .ipcp_flow_alloc = fa_alloc, + .ipcp_flow_join = NULL, + .ipcp_flow_alloc_resp = fa_alloc_resp, + .ipcp_flow_dealloc = fa_dealloc +}; + +int main(int argc, + char * argv[]) +{ + if (ipcp_init(argc, argv, &unicast_ops, THIS_TYPE) < 0) { + log_err("Failed to init IPCP."); + goto fail_init; + } + + if (notifier_init() < 0) { + log_err("Failed to initialize notifier component."); + goto fail_notifier_init; + } + + if (connmgr_init() < 0) { + log_err("Failed to initialize connection manager."); + goto fail_connmgr_init; + } + + if (enroll_init() < 0) { + log_err("Failed to initialize enrollment component."); + goto fail_enroll_init; + } + + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; + } + + ipcp_sigwait(); + + if (ipcp_get_state() == IPCP_SHUTDOWN) { + stop_components(); + finalize_components(); + } + + ipcp_stop(); + + enroll_fini(); + + connmgr_fini(); + + notifier_fini(); + + ipcp_fini(); + + exit(EXIT_SUCCESS); + + fail_start: + enroll_fini(); + fail_enroll_init: + connmgr_fini(); + fail_connmgr_init: + notifier_fini(); + fail_notifier_init: + ipcp_fini(); + fail_init: + exit(EXIT_FAILURE); +} diff --git a/src/ipcpd/unicast/pff.c b/src/ipcpd/unicast/pff.c new file mode 100644 index 00000000..9b2aa2b4 --- /dev/null +++ b/src/ipcpd/unicast/pff.c @@ -0,0 +1,131 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * PDU Forwarding Function + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define OUROBOROS_PREFIX "pff" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> + +#include "pff.h" +#include "pff/pol.h" + +struct pff { + struct pff_ops * ops; + struct pff_i * pff_i; +}; + +struct pff * pff_create(enum pol_pff pol) +{ + struct pff * pff; + + pff = malloc(sizeof(*pff)); + if (pff == NULL) + return NULL; + + switch (pol) { + case PFF_ALTERNATE: + log_dbg("Using alternate PFF policy."); + pff->ops = &alternate_pff_ops; + break; + case PFF_SIMPLE: + log_dbg("Using simple PFF policy."); + pff->ops = &simple_pff_ops; + break; + case PFF_MULTIPATH: + log_dbg("Using multipath PFF policy."); + pff->ops = &multipath_pff_ops; + break; + default: + goto err; + } + + pff->pff_i = pff->ops->create(); + if (pff->pff_i == NULL) { + log_err("Failed to create PFF instance."); + goto err; + } + + return pff; + err: + free(pff); + return NULL; +} + +void pff_destroy(struct pff * pff) +{ + pff->ops->destroy(pff->pff_i); + + free(pff); +} + +void pff_lock(struct pff * pff) +{ + return pff->ops->lock(pff->pff_i); +} + +void pff_unlock(struct pff * pff) +{ + return pff->ops->unlock(pff->pff_i); +} + +int pff_add(struct pff * pff, + uint64_t addr, + int * fd, + size_t len) +{ + return pff->ops->add(pff->pff_i, addr, fd, len); +} + +int pff_update(struct pff * pff, + uint64_t addr, + int * fd, + size_t len) +{ + return pff->ops->update(pff->pff_i, addr, fd, len); +} + +int pff_del(struct pff * pff, + uint64_t addr) +{ + return pff->ops->del(pff->pff_i, addr); +} + +void pff_flush(struct pff * pff) +{ + return pff->ops->flush(pff->pff_i); +} + +int pff_nhop(struct pff * pff, + uint64_t addr) +{ + return pff->ops->nhop(pff->pff_i, addr); +} + +int pff_flow_state_change(struct pff * pff, + int fd, + bool up) +{ + if (pff->ops->flow_state_change != NULL) + return pff->ops->flow_state_change(pff->pff_i, fd, up); + + return 0; +} diff --git a/src/ipcpd/unicast/pff.h b/src/ipcpd/unicast/pff.h new file mode 100644 index 00000000..f44e5531 --- /dev/null +++ b/src/ipcpd/unicast/pff.h @@ -0,0 +1,69 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * PDU Forwarding Function + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_PFF_H +#define OUROBOROS_IPCPD_UNICAST_PFF_H + +#include <ouroboros/ipcp.h> + +#include <stdint.h> +#include <stdlib.h> +#include <stdbool.h> + +enum pol_pff { + PFF_SIMPLE = 0, + PFF_ALTERNATE, + PFF_MULTIPATH +}; + +struct pff * pff_create(enum pol_pff pol); + +void pff_destroy(struct pff * pff); + +void pff_lock(struct pff * pff); + +void pff_unlock(struct pff * pff); + +int pff_add(struct pff * pff, + uint64_t addr, + int * fd, + size_t len); + +int pff_update(struct pff * pff, + uint64_t addr, + int * fd, + size_t len); + +int pff_del(struct pff * pff, + uint64_t addr); + +void pff_flush(struct pff * pff); + +/* Returns fd towards next hop */ +int pff_nhop(struct pff * pff, + uint64_t addr); + +int pff_flow_state_change(struct pff * pff, + int fd, + bool up); + +#endif /* OUROBOROS_IPCPD_UNICAST_PFF_H */ diff --git a/src/ipcpd/unicast/pff/alternate.c b/src/ipcpd/unicast/pff/alternate.c new file mode 100644 index 00000000..85e85914 --- /dev/null +++ b/src/ipcpd/unicast/pff/alternate.c @@ -0,0 +1,400 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Policy for PFF with alternate next hops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/list.h> + +#include "pft.h" +#include "alternate.h" + +#include <string.h> +#include <assert.h> +#include <pthread.h> + +struct nhop { + struct list_head next; + int fd; +}; + +struct addr { + struct list_head next; + uint64_t addr; +}; + +struct pff_i { + struct pft * pft; + + struct list_head addrs; + + struct list_head nhops_down; + + pthread_rwlock_t lock; +}; + +struct pff_ops alternate_pff_ops = { + .create = alternate_pff_create, + .destroy = alternate_pff_destroy, + .lock = alternate_pff_lock, + .unlock = alternate_pff_unlock, + .add = alternate_pff_add, + .update = alternate_pff_update, + .del = alternate_pff_del, + .flush = alternate_pff_flush, + .nhop = alternate_pff_nhop, + .flow_state_change = alternate_flow_state_change +}; + +static int add_addr(struct pff_i * pff_i, + uint64_t addr) +{ + struct addr * a; + + a = malloc(sizeof(*a)); + if (a == NULL) + return -1; + + a->addr = addr; + + list_add(&a->next, &(pff_i->addrs)); + + return 0; +} + +static void del_addr(struct pff_i * pff_i, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &(pff_i->addrs)) { + struct addr * e = list_entry(p, struct addr, next); + if (e->addr == addr) { + list_del(&e->next); + free(e); + return; + } + } +} + +static void del_addrs(struct pff_i * pff_i) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &(pff_i->addrs)) { + struct addr * e = list_entry(p, struct addr, next); + list_del(&e->next); + free(e); + } +} + +static void del_nhops_down(struct pff_i * pff_i) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &(pff_i->nhops_down)) { + struct nhop * e = list_entry(p, struct nhop, next); + list_del(&e->next); + free(e); + } +} + +static int del_nhop_down(struct pff_i * pff_i, + int fd) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &(pff_i->nhops_down)) { + struct nhop * e = list_entry(p, struct nhop, next); + if (e->fd == fd) { + list_del(&e->next); + free(e); + return 0; + } + } + + return -1; +} + +static int add_nhop_down(struct pff_i * pff_i, + int fd) +{ + struct nhop * nhop; + + nhop = malloc(sizeof(*nhop)); + if (nhop == NULL) + return -1; + + nhop->fd = fd; + + list_add(&nhop->next, &(pff_i->nhops_down)); + + return 0; +} + +static bool nhops_down_has(struct pff_i * pff_i, + int fd) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &pff_i->nhops_down) { + struct nhop * e = list_entry(pos, struct nhop, next); + if (e->fd == fd) + return true; + } + + return false; +} + +static int add_to_pft(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len) +{ + int * fds; + + assert(pff_i); + assert(len > 0); + + fds = malloc(sizeof(*fds) * (len + 1)); + if (fds == NULL) + goto fail_malloc; + + memcpy(fds, fd, len * sizeof(*fds)); + /* Put primary hop again at the end */ + fds[len] = fds[0]; + + if (pft_insert(pff_i->pft, addr, fds, len)) + goto fail_insert; + + return 0; + + fail_insert: + free(fds); + fail_malloc: + return -1; +} + +struct pff_i * alternate_pff_create(void) +{ + struct pff_i * tmp; + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + goto fail_malloc; + + if (pthread_rwlock_init(&tmp->lock, NULL)) + goto fail_lock; + + tmp->pft = pft_create(PFT_SIZE, false); + if (tmp->pft == NULL) + goto fail_pft; + + list_head_init(&tmp->nhops_down); + list_head_init(&tmp->addrs); + + return tmp; + + fail_pft: + pthread_rwlock_destroy(&tmp->lock); + fail_lock: + free(tmp); + fail_malloc: + return NULL; +} + +void alternate_pff_destroy(struct pff_i * pff_i) +{ + assert(pff_i); + + pft_destroy(pff_i->pft); + del_nhops_down(pff_i); + del_addrs(pff_i); + pthread_rwlock_destroy(&pff_i->lock); + free(pff_i); +} + +void alternate_pff_lock(struct pff_i * pff_i) +{ + pthread_rwlock_wrlock(&pff_i->lock); +} + +void alternate_pff_unlock(struct pff_i * pff_i) +{ + pthread_rwlock_unlock(&pff_i->lock); +} + +int alternate_pff_add(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len) +{ + assert(pff_i); + assert(len > 0); + + if (add_to_pft(pff_i, addr, fd, len)) + return -1; + + if (add_addr(pff_i, addr)) { + pft_delete(pff_i->pft, addr); + return -1; + } + + return 0; +} + +int alternate_pff_update(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len) +{ + assert(pff_i); + assert(len > 0); + + if (pft_delete(pff_i->pft, addr)) + return -1; + + if (add_to_pft(pff_i, addr, fd, len)) + return -1; + + return 0; +} + +int alternate_pff_del(struct pff_i * pff_i, + uint64_t addr) +{ + assert(pff_i); + + del_addr(pff_i, addr); + + if (pft_delete(pff_i->pft, addr)) + return -1; + + return 0; +} + +void alternate_pff_flush(struct pff_i * pff_i) +{ + assert(pff_i); + + pft_flush(pff_i->pft); + + del_nhops_down(pff_i); + + del_addrs(pff_i); +} + +int alternate_pff_nhop(struct pff_i * pff_i, + uint64_t addr) +{ + int fd; + size_t len; + int * fds; + + assert(pff_i); + + pthread_rwlock_rdlock(&pff_i->lock); + + if (pft_lookup(pff_i->pft, addr, &fds, &len)) { + pthread_rwlock_unlock(&pff_i->lock); + return -1; + } + + fd = *fds; + + pthread_rwlock_unlock(&pff_i->lock); + + return fd; +} + +int alternate_flow_state_change(struct pff_i * pff_i, + int fd, + bool up) +{ + struct list_head * p; + size_t len; + int * fds; + size_t i; + int tmp; + + assert(pff_i); + + pthread_rwlock_wrlock(&pff_i->lock); + + if (up) { + if (del_nhop_down(pff_i, fd)) { + pthread_rwlock_unlock(&pff_i->lock); + return -1; + } + } else { + if (add_nhop_down(pff_i, fd)) { + pthread_rwlock_unlock(&pff_i->lock); + return -1; + } + } + + list_for_each(p, &pff_i->addrs) { + struct addr * e = list_entry(p, struct addr, next); + if (pft_lookup(pff_i->pft, e->addr, &fds, &len)) { + pthread_rwlock_unlock(&pff_i->lock); + return -1; + } + + if (up) { + /* It is using an alternate */ + if (fds[len] == fd && fds[0] != fd) { + for (i = 0 ; i < len; i++) { + /* Found the primary */ + if (fds[i] == fd) { + tmp = fds[0]; + fds[0] = fds[i]; + fds[i] = tmp; + break; + } + } + } + } else { + /* Need to switch to a (different) alternate */ + if (fds[0] == fd) { + for (i = 1; i < len; i++) { + /* Usable alternate */ + if (!nhops_down_has(pff_i, fds[i])) { + tmp = fds[0]; + fds[0] = fds[i]; + fds[i] = tmp; + break; + } + } + } + } + } + + pthread_rwlock_unlock(&pff_i->lock); + + return 0; +} diff --git a/src/ipcpd/unicast/pff/alternate.h b/src/ipcpd/unicast/pff/alternate.h new file mode 100644 index 00000000..96207e74 --- /dev/null +++ b/src/ipcpd/unicast/pff/alternate.h @@ -0,0 +1,61 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Policy for PFF with alternate next hops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ALTERNATE_PFF_H +#define OUROBOROS_IPCPD_UNICAST_ALTERNATE_PFF_H + +#include "ops.h" + +struct pff_i * alternate_pff_create(void); + +void alternate_pff_destroy(struct pff_i * pff_i); + +void alternate_pff_lock(struct pff_i * pff_i); + +void alternate_pff_unlock(struct pff_i * pff_i); + +int alternate_pff_add(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len); + +int alternate_pff_update(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len); + +int alternate_pff_del(struct pff_i * pff_i, + uint64_t addr); + +void alternate_pff_flush(struct pff_i * pff_i); + +/* Returns fd towards next hop */ +int alternate_pff_nhop(struct pff_i * pff_i, + uint64_t addr); + +int alternate_flow_state_change(struct pff_i * pff_i, + int fd, + bool up); + +extern struct pff_ops alternate_pff_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_ALTERNATE_PFF_H */ diff --git a/src/ipcpd/unicast/pff/multipath.c b/src/ipcpd/unicast/pff/multipath.c new file mode 100644 index 00000000..cbab0f5f --- /dev/null +++ b/src/ipcpd/unicast/pff/multipath.c @@ -0,0 +1,200 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Policy for PFF supporting multipath routing + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * Nick Aerts <nick.aerts@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#include <ouroboros/errno.h> + +#include "pft.h" +#include "multipath.h" + +#include <string.h> +#include <assert.h> +#include <pthread.h> + +struct pff_i { + struct pft * pft; + pthread_rwlock_t lock; +}; + +struct pff_ops multipath_pff_ops = { + .create = multipath_pff_create, + .destroy = multipath_pff_destroy, + .lock = multipath_pff_lock, + .unlock = multipath_pff_unlock, + .add = multipath_pff_add, + .update = multipath_pff_update, + .del = multipath_pff_del, + .flush = multipath_pff_flush, + .nhop = multipath_pff_nhop, + .flow_state_change = NULL +}; + +struct pff_i * multipath_pff_create(void) +{ + struct pff_i * tmp; + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + goto fail_malloc; + + if (pthread_rwlock_init(&tmp->lock, NULL)) + goto fail_rwlock; + + tmp->pft = pft_create(PFT_SIZE, false); + if (tmp->pft == NULL) + goto fail_pft; + + return tmp; + + fail_pft: + pthread_rwlock_destroy(&tmp->lock); + fail_rwlock: + free(tmp); + fail_malloc: + return NULL; +} + +void multipath_pff_destroy(struct pff_i * pff_i) +{ + assert(pff_i); + + pft_destroy(pff_i->pft); + pthread_rwlock_destroy(&pff_i->lock); + + free(pff_i); +} + +void multipath_pff_lock(struct pff_i * pff_i) +{ + pthread_rwlock_wrlock(&pff_i->lock); +} + +void multipath_pff_unlock(struct pff_i * pff_i) +{ + pthread_rwlock_unlock(&pff_i->lock); +} + +int multipath_pff_add(struct pff_i * pff_i, + uint64_t addr, + int * fds, + size_t len) +{ + int * tmp; + + assert(pff_i); + assert(fds); + assert(len > 0); + + tmp = malloc(len * sizeof(*tmp)); + if (tmp == NULL) + return -ENOMEM; + + memcpy(tmp,fds, len * sizeof(*tmp)); + + if (pft_insert(pff_i->pft, addr, tmp, len)) { + free(tmp); + return -1; + } + + return 0; +} + +int multipath_pff_update(struct pff_i * pff_i, + uint64_t addr, + int * fds, + size_t len) +{ + int * tmp; + + assert(pff_i); + assert(fds); + assert(len > 0); + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + return -ENOMEM; + + memcpy(tmp,fds, len * sizeof(*tmp)); + + if (pft_delete(pff_i->pft, addr)) { + free(tmp); + return -1; + } + + if (pft_insert(pff_i->pft, addr, tmp, 1)) { + free(tmp); + return -1; + } + + return 0; +} + +int multipath_pff_del(struct pff_i * pff_i, + uint64_t addr) +{ + assert(pff_i); + + if (pft_delete(pff_i->pft, addr)) + return -1; + + return 0; +} + +void multipath_pff_flush(struct pff_i * pff_i) +{ + assert(pff_i); + + pft_flush(pff_i->pft); +} + +int multipath_pff_nhop(struct pff_i * pff_i, + uint64_t addr) +{ + int fd; + int * fds; + size_t len; + + assert(pff_i); + + pthread_rwlock_wrlock(&pff_i->lock); + + if (pft_lookup(pff_i->pft, addr, &fds, &len)) { + pthread_rwlock_unlock(&pff_i->lock); + return -1; + } + + fd = *fds; + + assert(len > 0); + + /* Rotate fds left. */ + memmove(fds, fds + 1, (len - 1) * sizeof(*fds)); + fds[len - 1] = fd; + + pthread_rwlock_unlock(&pff_i->lock); + + return fd; +} diff --git a/src/ipcpd/unicast/pff/multipath.h b/src/ipcpd/unicast/pff/multipath.h new file mode 100644 index 00000000..0eb03476 --- /dev/null +++ b/src/ipcpd/unicast/pff/multipath.h @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Policy for PFF supporting multipath routing + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * Nick Aerts <nick.aerts@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_MULTIPATH_PFF_H +#define OUROBOROS_IPCPD_UNICAST_MULTIPATH_PFF_H + +#include "ops.h" + +struct pff_i * multipath_pff_create(void); + +void multipath_pff_destroy(struct pff_i * pff_i); + +void multipath_pff_lock(struct pff_i * pff_i); + +void multipath_pff_unlock(struct pff_i * pff_i); + +int multipath_pff_add(struct pff_i * pff_i, + uint64_t addr, + int * fds, + size_t len); + +int multipath_pff_update(struct pff_i * pff_i, + uint64_t addr, + int * fds, + size_t len); + +int multipath_pff_del(struct pff_i * pff_i, + uint64_t addr); + +void multipath_pff_flush(struct pff_i * pff_i); + +/* Returns fd towards next hop */ +int multipath_pff_nhop(struct pff_i * pff_i, + uint64_t addr); + +extern struct pff_ops multipath_pff_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_MULTIPATH_PFF_H */ diff --git a/src/ipcpd/unicast/pff/ops.h b/src/ipcpd/unicast/pff/ops.h new file mode 100644 index 00000000..16a31273 --- /dev/null +++ b/src/ipcpd/unicast/pff/ops.h @@ -0,0 +1,63 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Pff policy ops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_PFF_OPS_H +#define OUROBOROS_IPCPD_UNICAST_PFF_OPS_H + +#include <stdbool.h> + +struct pff_i; + +struct pff_ops { + struct pff_i * (* create)(void); + + void (* destroy)(struct pff_i * pff_i); + + void (* lock)(struct pff_i * pff_i); + + void (* unlock)(struct pff_i * pff_i); + + int (* add)(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len); + + int (* update)(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len); + + int (* del)(struct pff_i * pff_i, + uint64_t addr); + + void (* flush)(struct pff_i * pff_i); + + int (* nhop)(struct pff_i * pff_i, + uint64_t addr); + + /* Optional operation. */ + int (* flow_state_change)(struct pff_i * pff_i, + int fd, + bool up); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_PFF_OPS_H */ diff --git a/src/ipcpd/unicast/pff/pft.c b/src/ipcpd/unicast/pff/pft.c new file mode 100644 index 00000000..8c436113 --- /dev/null +++ b/src/ipcpd/unicast/pff/pft.c @@ -0,0 +1,215 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Packet forwarding table (PFT) with chaining on collisions + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#endif + +#include <ouroboros/list.h> +#include <ouroboros/errno.h> +#include <ouroboros/hash.h> + +#include "pft.h" + +#include <assert.h> +#include <string.h> + +/* store <len> output fds for dst addr */ +struct pft_entry { + struct list_head next; + uint64_t dst; + int * fds; + size_t len; +}; + +struct pft { + struct list_head * buckets; + bool hash_key; + uint64_t buckets_size; +}; + +struct pft * pft_create(uint64_t buckets, + bool hash_key) +{ + struct pft * tmp; + unsigned int i; + + if (buckets == 0) + return NULL; + + buckets--; + buckets |= buckets >> 1; + buckets |= buckets >> 2; + buckets |= buckets >> 4; + buckets |= buckets >> 8; + buckets |= buckets >> 16; + buckets |= buckets >> 32; + buckets++; + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + return NULL; + + tmp->hash_key = hash_key; + tmp->buckets_size = buckets; + + tmp->buckets = malloc(buckets * sizeof(*tmp->buckets)); + if (tmp->buckets == NULL) { + free(tmp); + return NULL; + } + + for (i = 0; i < buckets; i++) + list_head_init(&(tmp->buckets[i])); + + return tmp; +} + +void pft_destroy(struct pft * pft) +{ + assert(pft); + assert(pft->buckets); + + pft_flush(pft); + free(pft->buckets); + free(pft); +} + +void pft_flush(struct pft * pft) +{ + unsigned int i; + struct list_head * p; + struct list_head * h; + struct pft_entry * entry; + + assert(pft); + + for (i = 0; i < pft->buckets_size; i++) { + list_for_each_safe(p, h, &(pft->buckets[i])) { + entry = list_entry(p, struct pft_entry, next); + list_del(&entry->next); + free(entry->fds); + free(entry); + } + } +} + +static uint64_t hash(uint64_t key) +{ + uint64_t res[2]; + + mem_hash(HASH_MD5, res, (uint8_t *) &key, sizeof(key)); + + return res[0]; +} + +static uint64_t calc_key(struct pft * pft, + uint64_t dst) +{ + if (pft->hash_key) + dst = hash(dst); + + return (dst & (pft->buckets_size - 1)); +} + +int pft_insert(struct pft * pft, + uint64_t dst, + int * fds, + size_t len) +{ + struct pft_entry * entry; + uint64_t lookup_key; + struct list_head * p; + + assert(pft); + assert(len > 0); + + lookup_key = calc_key(pft, dst); + + list_for_each(p, &(pft->buckets[lookup_key])) { + entry = list_entry(p, struct pft_entry, next); + if (entry->dst == dst) + return -EPERM; + } + + entry = malloc(sizeof(*entry)); + if (entry == NULL) + return -ENOMEM; + + entry->dst = dst; + entry->fds = fds; + entry->len = len; + + list_add(&entry->next, &(pft->buckets[lookup_key])); + + return 0; +} + +int pft_lookup(struct pft * pft, + uint64_t dst, + int ** fds, + size_t * len) +{ + struct pft_entry * entry; + struct list_head * p; + uint64_t lookup_key; + + assert(pft); + + lookup_key = calc_key(pft, dst); + + list_for_each(p, &(pft->buckets[lookup_key])) { + entry = list_entry(p, struct pft_entry, next); + if (entry->dst == dst) { + *fds = entry->fds; + *len = entry->len; + return 0; + } + } + + return -1; +} + +int pft_delete(struct pft * pft, + uint64_t dst) +{ + struct pft_entry * entry; + uint64_t lookup_key; + struct list_head * p; + struct list_head * h; + + assert(pft); + + lookup_key = calc_key(pft, dst); + + list_for_each_safe(p, h, &(pft->buckets[lookup_key])) { + entry = list_entry(p, struct pft_entry, next); + if (entry->dst == dst) { + list_del(&entry->next); + free(entry->fds); + free(entry); + return 0; + } + } + + return -1; +} diff --git a/src/ipcpd/unicast/pff/pft.h b/src/ipcpd/unicast/pff/pft.h new file mode 100644 index 00000000..711dabcb --- /dev/null +++ b/src/ipcpd/unicast/pff/pft.h @@ -0,0 +1,55 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Packet forwarding table (PFT) with chaining on collisions + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_PFT_H +#define OUROBOROS_PFT_H + +#include <stdint.h> +#include <stdbool.h> +#include <stdlib.h> + +struct pft; + +/* Buckets is rounded up to the nearest power of 2 */ +struct pft * pft_create(uint64_t buckets, + bool hash_key); + +void pft_destroy(struct pft * table); + +void pft_flush(struct pft * table); + +/* Passes ownership of the block of memory */ +int pft_insert(struct pft * pft, + uint64_t dst, + int * fds, + size_t len); + +/* The block of memory returned is no copy */ +int pft_lookup(struct pft * pft, + uint64_t dst, + int ** fds, + size_t * len); + +int pft_delete(struct pft * pft, + uint64_t dst); + +#endif /* OUROBOROS_PFT_H */ diff --git a/src/ipcpd/unicast/pff/pol.h b/src/ipcpd/unicast/pff/pol.h new file mode 100644 index 00000000..245b03c4 --- /dev/null +++ b/src/ipcpd/unicast/pff/pol.h @@ -0,0 +1,25 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * PDU Forwarding Function policies + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "alternate.h" +#include "multipath.h" +#include "simple.h" diff --git a/src/ipcpd/unicast/pff/simple.c b/src/ipcpd/unicast/pff/simple.c new file mode 100644 index 00000000..5f95e3ce --- /dev/null +++ b/src/ipcpd/unicast/pff/simple.c @@ -0,0 +1,190 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Simple PDU Forwarding Function + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#include <ouroboros/errno.h> + +#include "pft.h" +#include "simple.h" + +#include <assert.h> +#include <pthread.h> + +struct pff_i { + struct pft * pft; + pthread_rwlock_t lock; +}; + +struct pff_ops simple_pff_ops = { + .create = simple_pff_create, + .destroy = simple_pff_destroy, + .lock = simple_pff_lock, + .unlock = simple_pff_unlock, + .add = simple_pff_add, + .update = simple_pff_update, + .del = simple_pff_del, + .flush = simple_pff_flush, + .nhop = simple_pff_nhop, + .flow_state_change = NULL +}; + +struct pff_i * simple_pff_create(void) +{ + struct pff_i * tmp; + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + return NULL; + + if (pthread_rwlock_init(&tmp->lock, NULL)) { + free(tmp); + return NULL; + } + + tmp->pft = pft_create(PFT_SIZE, false); + if (tmp->pft == NULL) { + pthread_rwlock_destroy(&tmp->lock); + free(tmp); + return NULL; + } + + return tmp; +} + +void simple_pff_destroy(struct pff_i * pff_i) +{ + assert(pff_i); + + pft_destroy(pff_i->pft); + + pthread_rwlock_destroy(&pff_i->lock); + free(pff_i); +} + +void simple_pff_lock(struct pff_i * pff_i) +{ + pthread_rwlock_wrlock(&pff_i->lock); +} + +void simple_pff_unlock(struct pff_i * pff_i) +{ + pthread_rwlock_unlock(&pff_i->lock); +} + +int simple_pff_add(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len) +{ + int * fds; + + assert(pff_i); + assert(fd); + assert(len > 0); + + (void) len; + + fds = malloc(sizeof(*fds)); + if (fds == NULL) + return -ENOMEM; + + *fds = *fd; + + if (pft_insert(pff_i->pft, addr, fds, 1)) { + free(fds); + return -1; + } + + return 0; +} + +int simple_pff_update(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len) +{ + int * fds; + + assert(pff_i); + assert(fd); + assert(len > 0); + + (void) len; + + fds = malloc(sizeof(*fds)); + if (fds == NULL) + return -ENOMEM; + + *fds = *fd; + + if (pft_delete(pff_i->pft, addr)) { + free(fds); + return -1; + } + + if (pft_insert(pff_i->pft, addr, fds, 1)) { + free(fds); + return -1; + } + + return 0; +} + +int simple_pff_del(struct pff_i * pff_i, + uint64_t addr) +{ + assert(pff_i); + + if (pft_delete(pff_i->pft, addr)) + return -1; + + return 0; +} + +void simple_pff_flush(struct pff_i * pff_i) +{ + assert(pff_i); + + pft_flush(pff_i->pft); +} + +int simple_pff_nhop(struct pff_i * pff_i, + uint64_t addr) +{ + int * fds; + size_t len; + int fd = -1; + + assert(pff_i); + + pthread_rwlock_rdlock(&pff_i->lock); + + if (pft_lookup(pff_i->pft, addr, &fds, &len) == 0) + fd = *fds; + + pthread_rwlock_unlock(&pff_i->lock); + + return fd; +} diff --git a/src/ipcpd/unicast/pff/simple.h b/src/ipcpd/unicast/pff/simple.h new file mode 100644 index 00000000..0966a186 --- /dev/null +++ b/src/ipcpd/unicast/pff/simple.h @@ -0,0 +1,57 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Simple policy for PFF + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_SIMPLE_PFF_H +#define OUROBOROS_IPCPD_UNICAST_SIMPLE_PFF_H + +#include "ops.h" + +struct pff_i * simple_pff_create(void); + +void simple_pff_destroy(struct pff_i * pff_i); + +void simple_pff_lock(struct pff_i * pff_i); + +void simple_pff_unlock(struct pff_i * pff_i); + +int simple_pff_add(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len); + +int simple_pff_update(struct pff_i * pff_i, + uint64_t addr, + int * fd, + size_t len); + +int simple_pff_del(struct pff_i * pff_i, + uint64_t addr); + +void simple_pff_flush(struct pff_i * pff_i); + +/* Returns fd towards next hop */ +int simple_pff_nhop(struct pff_i * pff_i, + uint64_t addr); + +extern struct pff_ops simple_pff_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_SIMPLE_PFF_H */ diff --git a/src/ipcpd/unicast/pff/tests/CMakeLists.txt b/src/ipcpd/unicast/pff/tests/CMakeLists.txt new file mode 100644 index 00000000..65705714 --- /dev/null +++ b/src/ipcpd/unicast/pff/tests/CMakeLists.txt @@ -0,0 +1,38 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR + ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR + ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c + # Add new tests here + pft_test.c + ) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +if(CMAKE_VERSION VERSION_LESS "3.29.0") + remove(tests_to_run test_suite.c) +else () + list(POP_FRONT tests_to_run) +endif() + +foreach (test ${tests_to_run}) + get_filename_component(test_name ${test} NAME_WE) + add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/unicast/pff/tests/pft_test.c b/src/ipcpd/unicast/pff/tests/pft_test.c new file mode 100644 index 00000000..18287fb8 --- /dev/null +++ b/src/ipcpd/unicast/pff/tests/pft_test.c @@ -0,0 +1,126 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the hash table + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "pft.c" + +#include <stdio.h> + +#define TBL_SIZE 256 +#define INT_TEST 4 + +int pft_test(int argc, + char ** argv) +{ + struct pft * pft; + int i; + int * j; + size_t len; + + (void) argc; + (void) argv; + + pft = pft_create(TBL_SIZE, true); + if (pft == NULL) { + printf("Failed to create.\n"); + return -1; + } + + pft_destroy(pft); + + pft = pft_create(TBL_SIZE, false); + if (pft == NULL) { + printf("Failed to create.\n"); + return -1; + } + + for (i = 0; i < TBL_SIZE + INT_TEST + 2; i++) { + j = malloc(sizeof(*j)); + if (j == NULL) { + printf("Failed to malloc.\n"); + pft_destroy(pft); + return -1; + } + *j = i; + + if (pft_insert(pft, i, j, 1)) { + printf("Failed to insert.\n"); + pft_destroy(pft); + free(j); + return -1; + } + } + + if (pft_lookup(pft, INT_TEST, &j, &len)) { + printf("Failed to lookup.\n"); + pft_destroy(pft); + return -1; + } + + if (*j != INT_TEST) { + printf("Lookup returned wrong value (%d != %d).\n", + INT_TEST, *j); + pft_destroy(pft); + return -1; + } + + if (pft_lookup(pft, TBL_SIZE + INT_TEST, &j, &len)) { + printf("Failed to lookup.\n"); + pft_destroy(pft); + return -1; + } + + if (*j != TBL_SIZE + INT_TEST) { + printf("Lookup returned wrong value (%d != %d).\n", + INT_TEST, *j); + pft_destroy(pft); + return -1; + } + + if (pft_delete(pft, INT_TEST)) { + printf("Failed to delete.\n"); + pft_destroy(pft); + return -1; + } + + if (pft_lookup(pft, INT_TEST, &j, &len) == 0) { + printf("Failed to delete properly.\n"); + pft_destroy(pft); + return -1; + } + + if (pft_lookup(pft, TBL_SIZE + INT_TEST, &j, &len)) { + printf("Failed to lookup after deletion.\n"); + pft_destroy(pft); + return -1; + } + + if (*j != TBL_SIZE + INT_TEST) { + printf("Lookup returned wrong value (%d != %d).\n", + INT_TEST, *j); + pft_destroy(pft); + return -1; + } + + pft_destroy(pft); + + return 0; +} diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c new file mode 100644 index 00000000..7e12148b --- /dev/null +++ b/src/ipcpd/unicast/psched.c @@ -0,0 +1,242 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Packet scheduler component + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/notifier.h> + +#include "common/connmgr.h" +#include "ipcp.h" +#include "psched.h" + +#include <assert.h> +#include <sched.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +static int qos_prio [] = { + QOS_PRIO_BE, + QOS_PRIO_VIDEO, + QOS_PRIO_VOICE, +}; + +struct psched { + fset_t * set[QOS_CUBE_MAX]; + next_packet_fn_t callback; + read_fn_t read; + pthread_t readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { + struct psched * sch; + qoscube_t qc; +}; + +static void cleanup_reader(void * o) +{ + fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ + struct psched * sched; + struct shm_du_buff * sdb; + int fd; + fqueue_t * fq; + qoscube_t qc; + + sched = ((struct sched_info *) o)->sch; + qc = ((struct sched_info *) o)->qc; + + ipcp_lock_to_core(); + + free(o); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_reader, fq); + + while (true) { + int ret = fevent(sched->set[qc], fq, NULL); + if (ret < 0) + continue; + + while ((fd = fqueue_next(fq)) >= 0) { + switch (fqueue_type(fq)) { + case FLOW_DEALLOC: + notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); + break; + case FLOW_DOWN: + notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); + break; + case FLOW_UP: + notifier_event(NOTIFY_DT_FLOW_UP, &fd); + break; + case FLOW_PKT: + if (sched->read(fd, &sdb) < 0) + continue; + + sched->callback(fd, qc, sdb); + break; + default: + break; + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +struct psched * psched_create(next_packet_fn_t callback, + read_fn_t read) +{ + struct psched * psched; + struct sched_info * infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; + int i; + int j; + + assert(callback); + + psched = malloc(sizeof(*psched)); + if (psched == NULL) + goto fail_malloc; + + psched->callback = callback; + psched->read = read; + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + psched->set[i] = fset_create(); + if (psched->set[i] == NULL) { + for (j = 0; j < i; ++j) + fset_destroy(psched->set[j]); + goto fail_flow_set; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + infos[i] = malloc(sizeof(*infos[i])); + if (infos[i] == NULL) { + for (j = 0; j < i; ++j) + free(infos[j]); + goto fail_infos; + } + infos[i]->sch = psched; + infos[i]->qc = i % QOS_CUBE_MAX; + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + if (pthread_create(&psched->readers[i], NULL, + packet_reader, infos[i])) { + for (j = 0; j < i; ++j) + pthread_cancel(psched->readers[j]); + for (j = 0; j < i; ++j) + pthread_join(psched->readers[j], NULL); + for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + free(infos[j]); + goto fail_infos; + } + } + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + struct sched_param par; + int pol = SCHED_RR; + int min; + int max; + + min = sched_get_priority_min(pol); + max = sched_get_priority_max(pol); + + min = (max - min) / 2; + + par.sched_priority = min + + (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + + if (pthread_setschedparam(psched->readers[i], pol, &par)) + goto fail_sched; + } + + return psched; + + fail_sched: + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_cancel(psched->readers[j]); + for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) + pthread_join(psched->readers[j], NULL); + fail_infos: + for (j = 0; j < QOS_CUBE_MAX; ++j) + fset_destroy(psched->set[j]); + fail_flow_set: + free(psched); + fail_malloc: + return NULL; +} + +void psched_destroy(struct psched * psched) +{ + int i; + + assert(psched); + + for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { + pthread_cancel(psched->readers[i]); + pthread_join(psched->readers[i], NULL); + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fset_destroy(psched->set[i]); + + free(psched); +} + +void psched_add(struct psched * psched, + int fd) +{ + qoscube_t qc; + + assert(psched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_add(psched->set[qc], fd); +} + +void psched_del(struct psched * psched, + int fd) +{ + qoscube_t qc; + + assert(psched); + + ipcp_flow_get_qoscube(fd, &qc); + fset_del(psched->set[qc], fd); +} diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h new file mode 100644 index 00000000..831f8084 --- /dev/null +++ b/src/ipcpd/unicast/psched.h @@ -0,0 +1,47 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Packet scheduler component + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_PSCHED_H +#define OUROBOROS_IPCPD_UNICAST_PSCHED_H + +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> + +typedef void (* next_packet_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); + +typedef int (* read_fn_t)(int fd, + struct shm_du_buff ** sdb); + +struct psched * psched_create(next_packet_fn_t callback, + read_fn_t read); + +void psched_destroy(struct psched * psched); + +void psched_add(struct psched * psched, + int fd); + +void psched_del(struct psched * psched, + int fd); + +#endif /* OUROBOROS_IPCPD_UNICAST_PSCHED_H */ diff --git a/src/ipcpd/unicast/routing.c b/src/ipcpd/unicast/routing.c new file mode 100644 index 00000000..f5417c24 --- /dev/null +++ b/src/ipcpd/unicast/routing.c @@ -0,0 +1,73 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Routing component of the IPCP + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include <ouroboros/errno.h> + +#include "pff.h" +#include "routing.h" +#include "routing/pol.h" + +struct routing_ops * r_ops; + +int routing_init(enum pol_routing pr) +{ + enum pol_pff pff_type; + + switch (pr) { + case ROUTING_LINK_STATE: + pff_type = PFF_SIMPLE; + r_ops = &link_state_ops; + break; + case ROUTING_LINK_STATE_LFA: + pff_type = PFF_ALTERNATE; + r_ops = &link_state_ops; + break; + case ROUTING_LINK_STATE_ECMP: + pff_type=PFF_MULTIPATH; + r_ops = &link_state_ops; + break; + default: + return -ENOTSUP; + } + + if (r_ops->init(pr)) + return -1; + + return pff_type; +} + +struct routing_i * routing_i_create(struct pff * pff) +{ + return r_ops->routing_i_create(pff); +} + +void routing_i_destroy(struct routing_i * instance) +{ + return r_ops->routing_i_destroy(instance); +} + +void routing_fini(void) +{ + r_ops->fini(); +} diff --git a/src/ipcpd/unicast/routing.h b/src/ipcpd/unicast/routing.h new file mode 100644 index 00000000..d5d833ae --- /dev/null +++ b/src/ipcpd/unicast/routing.h @@ -0,0 +1,41 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Routing component of the IPCP + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ROUTING_H +#define OUROBOROS_IPCPD_UNICAST_ROUTING_H + +#include <ouroboros/ipcp.h> +#include <ouroboros/qos.h> + +#include "pff.h" + +#include <stdint.h> + +int routing_init(enum pol_routing pr); + +void routing_fini(void); + +struct routing_i * routing_i_create(struct pff * pff); + +void routing_i_destroy(struct routing_i * instance); + +#endif /* OUROBOROS_IPCPD_UNICAST_ROUTING_H */ diff --git a/src/ipcpd/unicast/routing/graph.c b/src/ipcpd/unicast/routing/graph.c new file mode 100644 index 00000000..32f3e6fb --- /dev/null +++ b/src/ipcpd/unicast/routing/graph.c @@ -0,0 +1,849 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Undirected graph structure + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * Nick Aerts <nick.aerts@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "graph" + +#include <ouroboros/logs.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> + +#include "graph.h" +#include "ipcp.h" + +#include <assert.h> +#include <pthread.h> +#include <stdlib.h> +#include <limits.h> +#include <string.h> + +struct vertex { + struct list_head next; + uint64_t addr; + struct list_head edges; + int index; +}; + +struct edge { + struct list_head next; + struct vertex * nb; + qosspec_t qs; + int announced; +}; + +struct graph { + size_t nr_vertices; + struct list_head vertices; + pthread_mutex_t lock; +}; + +static struct edge * find_edge_by_addr(struct vertex * vertex, + uint64_t dst_addr) +{ + struct list_head * p; + + assert(vertex); + + list_for_each(p, &vertex->edges) { + struct edge * e = list_entry(p, struct edge, next); + if (e->nb->addr == dst_addr) + return e; + } + + return NULL; +} + +static struct vertex * find_vertex_by_addr(struct graph * graph, + uint64_t addr) +{ + struct list_head * p; + + assert(graph); + + list_for_each(p, &graph->vertices) { + struct vertex * e = list_entry(p, struct vertex, next); + if (e->addr == addr) + return e; + } + + return NULL; +} + +static struct edge * add_edge(struct vertex * vertex, + struct vertex * nb) +{ + struct edge * edge; + + assert(vertex); + assert(nb); + + edge = malloc(sizeof(*edge)); + if (edge == NULL) + return NULL; + + edge->nb = nb; + edge->announced = 0; + + list_add(&edge->next, &vertex->edges); + + return edge; +} + +static void del_edge(struct edge * edge) +{ + assert(edge); + + list_del(&edge->next); + free(edge); +} + +static struct vertex * add_vertex(struct graph * graph, + uint64_t addr) +{ + struct vertex * vertex; + struct list_head * p; + int i = 0; + + assert(graph); + + vertex = malloc(sizeof(*vertex)); + if (vertex == NULL) + return NULL; + + list_head_init(&vertex->edges); + vertex->addr = addr; + + /* Keep them ordered on address. */ + list_for_each(p, &graph->vertices) { + struct vertex * v = list_entry(p, struct vertex, next); + if (v->addr > addr) + break; + i++; + } + + vertex->index = i; + + list_add_tail(&vertex->next, p); + + /* Increase the index of the vertices to the right. */ + list_for_each(p, &graph->vertices) { + struct vertex * v = list_entry(p, struct vertex, next); + if (v->addr > addr) + v->index++; + } + + graph->nr_vertices++; + + return vertex; +} + +static void del_vertex(struct graph * graph, + struct vertex * vertex) +{ + struct list_head * p; + struct list_head * h; + + assert(graph); + assert(vertex); + + list_del(&vertex->next); + + /* Decrease the index of the vertices to the right. */ + list_for_each(p, &graph->vertices) { + struct vertex * v = list_entry(p, struct vertex, next); + if (v->addr > vertex->addr) + v->index--; + } + + list_for_each_safe(p, h, &vertex->edges) { + struct edge * e = list_entry(p, struct edge, next); + del_edge(e); + } + + free(vertex); + + graph->nr_vertices--; +} + +struct graph * graph_create(void) +{ + struct graph * graph; + + graph = malloc(sizeof(*graph)); + if (graph == NULL) + return NULL; + + if (pthread_mutex_init(&graph->lock, NULL)) { + free(graph); + return NULL; + } + + graph->nr_vertices = 0; + list_head_init(&graph->vertices); + + return graph; +} + +void graph_destroy(struct graph * graph) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + assert(graph); + + pthread_mutex_lock(&graph->lock); + + list_for_each_safe(p, n, &graph->vertices) { + struct vertex * e = list_entry(p, struct vertex, next); + del_vertex(graph, e); + } + + pthread_mutex_unlock(&graph->lock); + + pthread_mutex_destroy(&graph->lock); + + free(graph); +} + +int graph_update_edge(struct graph * graph, + uint64_t s_addr, + uint64_t d_addr, + qosspec_t qs) +{ + struct vertex * v; + struct edge * e; + struct vertex * nb; + struct edge * nb_e; + + assert(graph); + + pthread_mutex_lock(&graph->lock); + + v = find_vertex_by_addr(graph, s_addr); + if (v == NULL) { + v = add_vertex(graph, s_addr); + if (v == NULL) { + pthread_mutex_unlock(&graph->lock); + log_err("Failed to add vertex."); + return -ENOMEM; + } + } + + nb = find_vertex_by_addr(graph, d_addr); + if (nb == NULL) { + nb = add_vertex(graph, d_addr); + if (nb == NULL) { + if (list_is_empty(&v->edges)) + del_vertex(graph, v); + pthread_mutex_unlock(&graph->lock); + log_err("Failed to add vertex."); + return -ENOMEM; + } + } + + e = find_edge_by_addr(v, d_addr); + if (e == NULL) { + e = add_edge(v, nb); + if (e == NULL) { + if (list_is_empty(&v->edges)) + del_vertex(graph, v); + if (list_is_empty(&nb->edges)) + del_vertex(graph, nb); + pthread_mutex_unlock(&graph->lock); + log_err("Failed to add edge."); + return -ENOMEM; + } + } + + e->announced++; + e->qs = qs; + + nb_e = find_edge_by_addr(nb, s_addr); + if (nb_e == NULL) { + nb_e = add_edge(nb, v); + if (nb_e == NULL) { + if (--e->announced == 0) + del_edge(e); + if (list_is_empty(&v->edges)) + del_vertex(graph, v); + if (list_is_empty(&nb->edges)) + del_vertex(graph, nb); + pthread_mutex_unlock(&graph->lock); + log_err("Failed to add edge."); + return -ENOMEM; + } + } + + nb_e->announced++; + nb_e->qs = qs; + + pthread_mutex_unlock(&graph->lock); + + return 0; +} + +int graph_del_edge(struct graph * graph, + uint64_t s_addr, + uint64_t d_addr) +{ + struct vertex * v; + struct edge * e; + struct vertex * nb; + struct edge * nb_e; + + assert(graph); + + pthread_mutex_lock(&graph->lock); + + v = find_vertex_by_addr(graph, s_addr); + if (v == NULL) { + pthread_mutex_unlock(&graph->lock); + log_err("No such source vertex."); + return -1; + } + + nb = find_vertex_by_addr(graph, d_addr); + if (nb == NULL) { + pthread_mutex_unlock(&graph->lock); + log_err("No such destination vertex."); + return -1; + } + + e = find_edge_by_addr(v, d_addr); + if (e == NULL) { + pthread_mutex_unlock(&graph->lock); + log_err("No such source edge."); + return -1; + } + + nb_e = find_edge_by_addr(nb, s_addr); + if (nb_e == NULL) { + pthread_mutex_unlock(&graph->lock); + log_err("No such destination edge."); + return -1; + } + + if (--e->announced == 0) + del_edge(e); + if (--nb_e->announced == 0) + del_edge(nb_e); + + /* Removing vertex if it was the last edge */ + if (list_is_empty(&v->edges)) + del_vertex(graph, v); + if (list_is_empty(&nb->edges)) + del_vertex(graph, nb); + + pthread_mutex_unlock(&graph->lock); + + return 0; +} + +static int get_min_vertex(struct graph * graph, + int * dist, + bool * used, + struct vertex ** v) +{ + int min = INT_MAX; + int index = -1; + int i = 0; + struct list_head * p; + + assert(v); + assert(graph); + assert(dist); + assert(used); + + *v = NULL; + + list_for_each(p, &graph->vertices) { + if (!used[i] && dist[i] < min) { + min = dist[i]; + index = i; + *v = list_entry(p, struct vertex, next); + } + + i++; + } + + if (index != -1) + used[index] = true; + + return index; +} + +static int dijkstra(struct graph * graph, + uint64_t src, + struct vertex *** nhops, + int ** dist) +{ + bool * used; + struct list_head * p = NULL; + int i = 0; + struct vertex * v = NULL; + struct edge * e = NULL; + int alt; + + assert(graph); + assert(nhops); + assert(dist); + + *nhops = malloc(sizeof(**nhops) * graph->nr_vertices); + if (*nhops == NULL) + goto fail_pnhops; + + *dist = malloc(sizeof(**dist) * graph->nr_vertices); + if (*dist == NULL) + goto fail_pdist; + + used = malloc(sizeof(*used) * graph->nr_vertices); + if (used == NULL) + goto fail_used; + + /* Init the data structures */ + memset(used, 0, sizeof(*used) * graph->nr_vertices); + memset(*nhops, 0, sizeof(**nhops) * graph->nr_vertices); + memset(*dist, 0, sizeof(**dist) * graph->nr_vertices); + + list_for_each(p, &graph->vertices) { + v = list_entry(p, struct vertex, next); + (*dist)[i++] = (v->addr == src) ? 0 : INT_MAX; + } + + /* Perform actual Dijkstra */ + i = get_min_vertex(graph, *dist, used, &v); + while (v != NULL) { + list_for_each(p, &v->edges) { + e = list_entry(p, struct edge, next); + + /* Only include it if both sides announced it. */ + if (e->announced != 2) + continue; + + /* + * NOTE: Current weight is just hop count. + * Method could be extended to use a different + * weight for a different QoS cube. + */ + alt = (*dist)[i] + 1; + if (alt < (*dist)[e->nb->index]) { + (*dist)[e->nb->index] = alt; + if (v->addr == src) + (*nhops)[e->nb->index] = e->nb; + else + (*nhops)[e->nb->index] = (*nhops)[i]; + } + } + i = get_min_vertex(graph, *dist, used, &v); + } + + free(used); + + return 0; + + fail_used: + free(*dist); + fail_pdist: + free(*nhops); + fail_pnhops: + return -1; + +} + +static void free_routing_table(struct list_head * table) +{ + struct list_head * h; + struct list_head * p; + struct list_head * q; + struct list_head * i; + + assert(table); + + list_for_each_safe(p, h, table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + list_for_each_safe(q, i, &t->nhops) { + struct nhop * n = + list_entry(q, struct nhop, next); + list_del(&n->next); + free(n); + } + list_del(&t->next); + free(t); + } +} + +void graph_free_routing_table(struct graph * graph, + struct list_head * table) +{ + assert(table); + + pthread_mutex_lock(&graph->lock); + + free_routing_table(table); + + pthread_mutex_unlock(&graph->lock); +} + +static int graph_routing_table_simple(struct graph * graph, + uint64_t s_addr, + struct list_head * table, + int ** dist) +{ + struct vertex ** nhops; + struct list_head * p; + int i = 0; + struct vertex * v; + struct routing_table * t; + struct nhop * n; + + assert(graph); + assert(table); + assert(dist); + + /* We need at least 2 vertices for a table */ + if (graph->nr_vertices < 2) + goto fail_vertices; + + if (dijkstra(graph, s_addr, &nhops, dist)) + goto fail_vertices; + + list_head_init(table); + + /* Now construct the routing table from the nhops. */ + list_for_each(p, &graph->vertices) { + v = list_entry(p, struct vertex, next); + + /* This is the src */ + if (nhops[i] == NULL) { + i++; + continue; + } + + t = malloc(sizeof(*t)); + if (t == NULL) + goto fail_t; + + list_head_init(&t->nhops); + + n = malloc(sizeof(*n)); + if (n == NULL) + goto fail_n; + + t->dst = v->addr; + n->nhop = nhops[i]->addr; + + list_add(&n->next, &t->nhops); + list_add(&t->next, table); + + i++; + } + + free(nhops); + + return 0; + + fail_n: + free(t); + fail_t: + free_routing_table(table); + free(nhops); + free(*dist); + fail_vertices: + *dist = NULL; + return -1; +} + +static int add_lfa_to_table(struct list_head * table, + uint64_t addr, + uint64_t lfa) +{ + struct list_head * p; + struct nhop * n; + + assert(table); + + n = malloc(sizeof(*n)); + if (n == NULL) + return -1; + + n->nhop = lfa; + + list_for_each(p, table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + if (t->dst == addr) { + list_add_tail(&n->next, &t->nhops); + return 0; + } + } + + free(n); + + return -1; +} + +static int graph_routing_table_lfa(struct graph * graph, + uint64_t s_addr, + struct list_head * table, + int ** dist) +{ + int * n_dist[PROG_MAX_FLOWS]; + uint64_t addrs[PROG_MAX_FLOWS]; + int n_index[PROG_MAX_FLOWS]; + struct list_head * p; + struct list_head * q; + struct vertex * v; + struct edge * e; + struct vertex ** nhops; + int i = 0; + int j; + int k; + + if (graph_routing_table_simple(graph, s_addr, table, dist)) + goto fail_table; + + for (j = 0; j < PROG_MAX_FLOWS; j++) { + n_dist[j] = NULL; + n_index[j] = -1; + addrs[j] = -1; + } + + list_for_each(p, &graph->vertices) { + v = list_entry(p, struct vertex, next); + + if (v->addr != s_addr) + continue; + + /* + * Get the distances for every neighbor + * of the source. + */ + list_for_each(q, &v->edges) { + e = list_entry(q, struct edge, next); + + addrs[i] = e->nb->addr; + n_index[i] = e->nb->index; + if (dijkstra(graph, e->nb->addr, + &nhops, &(n_dist[i++]))) + goto fail_dijkstra; + + free(nhops); + } + + break; + } + + /* Loop though all nodes to see if we have a LFA for them. */ + list_for_each(p, &graph->vertices) { + v = list_entry(p, struct vertex, next); + + if (v->addr == s_addr) + continue; + + /* + * Check for every neighbor if + * dist(neighbor, destination) < + * dist(neighbor, source) + dist(source, destination). + */ + for (j = 0; j < i; j++) { + /* Exclude ourselves. */ + if (addrs[j] == v->addr) + continue; + + if (n_dist[j][v->index] < + (*dist)[n_index[j]] + (*dist)[v->index]) + if (add_lfa_to_table(table, v->addr, + addrs[j])) + goto fail_add_lfa; + } + } + + for (j = 0; j < i; j++) + free(n_dist[j]); + + return 0; + + fail_add_lfa: + for (k = j; k < i; k++) + free(n_dist[k]); + fail_dijkstra: + free_routing_table(table); + fail_table: + return -1; +} + +static int graph_routing_table_ecmp(struct graph * graph, + uint64_t s_addr, + struct list_head * table, + int ** dist) +{ + struct vertex ** nhops; + struct list_head * p; + struct list_head * h; + size_t i; + struct vertex * v; + struct vertex * src_v; + struct edge * e; + struct routing_table * t; + struct nhop * n; + struct list_head * forwarding; + + assert(graph); + assert(dist); + + if (graph-> nr_vertices < 2) + goto fail_vertices; + + forwarding = malloc(sizeof(*forwarding) * graph->nr_vertices); + if (forwarding == NULL) + goto fail_vertices; + + for (i = 0; i < graph->nr_vertices; ++i) + list_head_init(&forwarding[i]); + + if (dijkstra(graph, s_addr, &nhops, dist)) + goto fail_dijkstra; + + free(nhops); + + src_v = find_vertex_by_addr(graph, s_addr); + if (src_v == NULL) + goto fail_src_v; + + list_for_each(p, &src_v->edges) { + int * tmp_dist; + + e = list_entry(p, struct edge, next); + if (dijkstra(graph, e->nb->addr, &nhops, &tmp_dist)) + goto fail_src_v; + + free(nhops); + + list_for_each(h, &graph->vertices) { + v = list_entry(h, struct vertex, next); + if (tmp_dist[v->index] + 1 == (*dist)[v->index]) { + n = malloc(sizeof(*n)); + if (n == NULL) { + free(tmp_dist); + goto fail_src_v; + } + n->nhop = e->nb->addr; + list_add_tail(&n->next, &forwarding[v->index]); + } + } + + free(tmp_dist); + } + + list_head_init(table); + i = 0; + list_for_each(p, &graph->vertices) { + v = list_entry(p, struct vertex, next); + if (v->addr == s_addr) { + ++i; + continue; + } + + t = malloc(sizeof(*t)); + if (t == NULL) + goto fail_t; + + t->dst = v->addr; + + list_head_init(&t->nhops); + if (&forwarding[i] != forwarding[i].nxt) { + t->nhops.nxt = forwarding[i].nxt; + t->nhops.prv = forwarding[i].prv; + forwarding[i].prv->nxt = &t->nhops; + forwarding[i].nxt->prv = &t->nhops; + } + + list_add(&t->next, table); + ++i; + } + + free(*dist); + *dist = NULL; + free(forwarding); + + return 0; + + fail_t: + free_routing_table(table); + fail_src_v: + free(*dist); + fail_dijkstra: + free(forwarding); + fail_vertices: + *dist = NULL; + return -1; +} + +int graph_routing_table(struct graph * graph, + enum routing_algo algo, + uint64_t s_addr, + struct list_head * table) +{ + int * s_dist; + + assert(graph); + assert(table); + + pthread_mutex_lock(&graph->lock); + + switch (algo) { + case ROUTING_SIMPLE: + /* LFA uses the s_dist this returns. */ + if (graph_routing_table_simple(graph, s_addr, table, &s_dist)) + goto fail_table; + break; + case ROUTING_LFA: + if (graph_routing_table_lfa(graph, s_addr, table, &s_dist)) + goto fail_table; + break; + + case ROUTING_ECMP: + if (graph_routing_table_ecmp(graph, s_addr, table, &s_dist)) + goto fail_table; + break; + default: + log_err("Unsupported algorithm."); + goto fail_table; + } + + pthread_mutex_unlock(&graph->lock); + + free(s_dist); + + return 0; + + fail_table: + pthread_mutex_unlock(&graph->lock); + return -1; +} diff --git a/src/ipcpd/unicast/routing/graph.h b/src/ipcpd/unicast/routing/graph.h new file mode 100644 index 00000000..8190cc6c --- /dev/null +++ b/src/ipcpd/unicast/routing/graph.h @@ -0,0 +1,69 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Undirected graph structure + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_GRAPH_H +#define OUROBOROS_IPCPD_UNICAST_GRAPH_H + +#include <ouroboros/list.h> +#include <ouroboros/qos.h> + +#include <inttypes.h> + +enum routing_algo { + ROUTING_SIMPLE = 0, + ROUTING_LFA, + ROUTING_ECMP +}; + +struct nhop { + struct list_head next; + uint64_t nhop; +}; + +struct routing_table { + struct list_head next; + uint64_t dst; + struct list_head nhops; +}; + +struct graph * graph_create(void); + +void graph_destroy(struct graph * graph); + +int graph_update_edge(struct graph * graph, + uint64_t s_addr, + uint64_t d_addr, + qosspec_t qs); + +int graph_del_edge(struct graph * graph, + uint64_t s_addr, + uint64_t d_addr); + +int graph_routing_table(struct graph * graph, + enum routing_algo algo, + uint64_t s_addr, + struct list_head * table); + +void graph_free_routing_table(struct graph * graph, + struct list_head * table); + +#endif /* OUROBOROS_IPCPD_UNICAST_GRAPH_H */ diff --git a/src/ipcpd/unicast/routing/link-state.c b/src/ipcpd/unicast/routing/link-state.c new file mode 100644 index 00000000..57c0c7cb --- /dev/null +++ b/src/ipcpd/unicast/routing/link-state.c @@ -0,0 +1,1055 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Link state routing policy + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "link-state-routing" + +#include <ouroboros/endian.h> +#include <ouroboros/dev.h> +#include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/list.h> +#include <ouroboros/logs.h> +#include <ouroboros/notifier.h> +#include <ouroboros/pthread.h> +#include <ouroboros/rib.h> +#include <ouroboros/utils.h> + +#include "common/comp.h" +#include "common/connmgr.h" +#include "graph.h" +#include "ipcp.h" +#include "link-state.h" +#include "pff.h" + +#include <assert.h> +#include <stdlib.h> +#include <inttypes.h> +#include <string.h> + +#define RECALC_TIME 4 +#define LS_UPDATE_TIME 15 +#define LS_TIMEO 60 +#define LS_ENTRY_SIZE 104 +#define LSDB "lsdb" + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +struct lsa { + uint64_t d_addr; + uint64_t s_addr; + uint64_t seqno; +} __attribute__((packed)); + +struct routing_i { + struct list_head next; + + struct pff * pff; + pthread_t calculator; + + bool modified; + pthread_mutex_t lock; +}; + +/* TODO: link weight support. */ +struct adjacency { + struct list_head next; + + uint64_t dst; + uint64_t src; + + uint64_t seqno; + + time_t stamp; +}; + +enum nb_type { + NB_DT = 0, + NB_MGMT +}; + +struct nb { + struct list_head next; + + uint64_t addr; + int fd; + enum nb_type type; +}; + +struct { + struct list_head nbs; + size_t nbs_len; + fset_t * mgmt_set; + + struct list_head db; + size_t db_len; + + pthread_rwlock_t db_lock; + + struct graph * graph; + + pthread_t lsupdate; + pthread_t lsreader; + pthread_t listener; + + struct list_head routing_instances; + pthread_mutex_t routing_i_lock; + + enum routing_algo routing_algo; +} ls; + +struct routing_ops link_state_ops = { + .init = link_state_init, + .fini = link_state_fini, + .routing_i_create = link_state_routing_i_create, + .routing_i_destroy = link_state_routing_i_destroy +}; + +static int str_adj(struct adjacency * adj, + char * buf, + size_t len) +{ + char tmbuf[64]; + char srcbuf[64]; + char dstbuf[64]; + char seqnobuf[64]; + struct tm * tm; + + assert(adj); + + if (len < LS_ENTRY_SIZE) + return -1; + + tm = localtime(&adj->stamp); + strftime(tmbuf, sizeof(tmbuf), "%F %T", tm); /* 19 chars */ + + sprintf(srcbuf, "%" PRIu64, adj->src); + sprintf(dstbuf, "%" PRIu64, adj->dst); + sprintf(seqnobuf, "%" PRIu64, adj->seqno); + + sprintf(buf, "src: %20s\ndst: %20s\nseqno: %18s\nupd: %20s\n", + srcbuf, dstbuf, seqnobuf, tmbuf); + + return LS_ENTRY_SIZE; +} + +static struct adjacency * get_adj(const char * path) +{ + struct list_head * p; + char entry[RIB_PATH_LEN + 1]; + + assert(path); + + list_for_each(p, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + sprintf(entry, "%" PRIu64 ".%" PRIu64, a->src, a->dst); + if (strcmp(entry, path) == 0) + return a; + } + + return NULL; +} + +static int lsdb_rib_getattr(const char * path, + struct rib_attr * attr) +{ + struct adjacency * adj; + struct timespec now; + char * entry; + + assert(path); + assert(attr); + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_rdlock(&ls.db_lock); + + adj = get_adj(entry); + if (adj != NULL) { + attr->mtime = adj->stamp; + attr->size = LS_ENTRY_SIZE; + } else { + attr->mtime = now.tv_sec; + attr->size = 0; + } + + pthread_rwlock_unlock(&ls.db_lock); + + return 0; +} + +static int lsdb_rib_read(const char * path, + char * buf, + size_t len) +{ + struct adjacency * a; + char * entry; + int size; + + assert(path); + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + pthread_rwlock_rdlock(&ls.db_lock); + + if (ls.db_len + ls.nbs_len == 0) + goto fail; + + a = get_adj(entry); + if (a == NULL) + goto fail; + + size = str_adj(a, buf, len); + if (size < 0) + goto fail; + + pthread_rwlock_unlock(&ls.db_lock); + return size; + + fail: + pthread_rwlock_unlock(&ls.db_lock); + return -1; +} + +static int lsdb_rib_readdir(char *** buf) +{ + struct list_head * p; + char entry[RIB_PATH_LEN + 1]; + ssize_t idx = 0; + + assert(buf); + + pthread_rwlock_rdlock(&ls.db_lock); + + if (ls.db_len + ls.nbs_len == 0) { + pthread_rwlock_unlock(&ls.db_lock); + return 0; + } + + *buf = malloc(sizeof(**buf) * (ls.db_len + ls.nbs_len)); + if (*buf == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + char * str = (nb->type == NB_DT ? "dt." : "mgmt."); + sprintf(entry, "%s%" PRIu64, str, nb->addr); + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + strcpy((*buf)[idx], entry); + + idx++; + } + + list_for_each(p, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + sprintf(entry, "%" PRIu64 ".%" PRIu64, a->src, a->dst); + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + ssize_t j; + for (j = 0; j < idx; ++j) + free(*buf[j]); + free(buf); + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + strcpy((*buf)[idx], entry); + + idx++; + } + + pthread_rwlock_unlock(&ls.db_lock); + + return idx; +} + +static struct rib_ops r_ops = { + .read = lsdb_rib_read, + .readdir = lsdb_rib_readdir, + .getattr = lsdb_rib_getattr +}; + +static int lsdb_add_nb(uint64_t addr, + int fd, + enum nb_type type) +{ + struct list_head * p; + struct nb * nb; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * el = list_entry(p, struct nb, next); + if (el->addr == addr && el->type == type) { + log_dbg("Already know %s neighbor %" PRIu64 ".", + type == NB_DT ? "dt" : "mgmt", addr); + if (el->fd != fd) { + log_warn("Existing neighbor assigned new fd."); + el->fd = fd; + } + pthread_rwlock_unlock(&ls.db_lock); + return -EPERM; + } + + if (addr > el->addr) + break; + } + + nb = malloc(sizeof(*nb)); + if (nb == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + nb->addr = addr; + nb->fd = fd; + nb->type = type; + + list_add_tail(&nb->next, p); + + ++ls.nbs_len; + + log_dbg("Type %s neighbor %" PRIu64 " added.", + nb->type == NB_DT ? "dt" : "mgmt", addr); + + pthread_rwlock_unlock(&ls.db_lock); + + return 0; +} + +static int lsdb_del_nb(uint64_t addr, + int fd) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->fd == fd) { + list_del(&nb->next); + --ls.nbs_len; + pthread_rwlock_unlock(&ls.db_lock); + log_dbg("Type %s neighbor %" PRIu64 " deleted.", + nb->type == NB_DT ? "dt" : "mgmt", addr); + free(nb); + return 0; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -EPERM; +} + +static int nbr_to_fd(uint64_t addr) +{ + struct list_head * p; + int fd; + + pthread_rwlock_rdlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->type == NB_DT) { + fd = nb->fd; + pthread_rwlock_unlock(&ls.db_lock); + return fd; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -1; +} + +static void calculate_pff(struct routing_i * instance) +{ + int fd; + struct list_head table; + struct list_head * p; + struct list_head * q; + int fds[PROG_MAX_FLOWS]; + + assert(instance); + + if (graph_routing_table(ls.graph, ls.routing_algo, + ipcpi.dt_addr, &table)) + return; + + pff_lock(instance->pff); + + pff_flush(instance->pff); + + /* Calculate forwarding table from routing table. */ + list_for_each(p, &table) { + int i = 0; + struct routing_table * t = + list_entry(p, struct routing_table, next); + + list_for_each(q, &t->nhops) { + struct nhop * n = list_entry(q, struct nhop, next); + + fd = nbr_to_fd(n->nhop); + if (fd == -1) + continue; + + fds[i++] = fd; + } + if (i > 0) + pff_add(instance->pff, t->dst, fds, i); + } + + pff_unlock(instance->pff); + + graph_free_routing_table(ls.graph, &table); +} + +static void set_pff_modified(bool calc) +{ + struct list_head * p; + + pthread_mutex_lock(&ls.routing_i_lock); + list_for_each(p, &ls.routing_instances) { + struct routing_i * inst = + list_entry(p, struct routing_i, next); + pthread_mutex_lock(&inst->lock); + inst->modified = true; + pthread_mutex_unlock(&inst->lock); + if (calc) + calculate_pff(inst); + } + pthread_mutex_unlock(&ls.routing_i_lock); +} + +static int lsdb_add_link(uint64_t src, + uint64_t dst, + uint64_t seqno, + qosspec_t * qs) +{ + struct list_head * p; + struct adjacency * adj; + struct timespec now; + int ret = -1; + + assert(qs); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each(p, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + if (a->dst == dst && a->src == src) { + if (a->seqno < seqno) { + a->stamp = now.tv_sec; + a->seqno = seqno; + ret = 0; + } + pthread_rwlock_unlock(&ls.db_lock); + return ret; + } + + if (a->dst > dst || (a->dst == dst && a->src > src)) + break; + } + + adj = malloc(sizeof(*adj)); + if (adj == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + adj->dst = dst; + adj->src = src; + adj->seqno = seqno; + adj->stamp = now.tv_sec; + + list_add_tail(&adj->next, p); + + ls.db_len++; + + if (graph_update_edge(ls.graph, src, dst, *qs)) + log_warn("Failed to add edge to graph."); + + pthread_rwlock_unlock(&ls.db_lock); + + set_pff_modified(true); + + return 0; +} + +static int lsdb_del_link(uint64_t src, + uint64_t dst) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + if (a->dst == dst && a->src == src) { + list_del(&a->next); + if (graph_del_edge(ls.graph, src, dst)) + log_warn("Failed to delete edge from graph."); + + ls.db_len--; + + pthread_rwlock_unlock(&ls.db_lock); + set_pff_modified(false); + free(a); + return 0; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -EPERM; +} + +static void * periodic_recalc_pff(void * o) +{ + bool modified; + struct routing_i * inst; + + assert(o); + + inst = (struct routing_i *) o; + + while (true) { + pthread_mutex_lock(&inst->lock); + modified = inst->modified; + inst->modified = false; + pthread_mutex_unlock(&inst->lock); + + if (modified) + calculate_pff(inst); + + sleep(RECALC_TIME); + } + + return (void *) 0; +} + +static void send_lsm(uint64_t src, + uint64_t dst, + uint64_t seqno) +{ + struct lsa lsm; + struct list_head * p; + + lsm.d_addr = hton64(dst); + lsm.s_addr = hton64(src); + lsm.seqno = hton64(seqno); + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->type == NB_MGMT) + flow_write(nb->fd, &lsm, sizeof(lsm)); + } +} + +/* replicate the lsdb to a mgmt neighbor */ +static void lsdb_replicate(int fd) +{ + struct list_head * p; + struct list_head * h; + struct list_head copy; + + list_head_init(©); + + /* Lock the lsdb, copy the lsms and send outside of lock. */ + pthread_rwlock_rdlock(&ls.db_lock); + + list_for_each(p, &ls.db) { + struct adjacency * adj; + struct adjacency * cpy; + adj = list_entry(p, struct adjacency, next); + cpy = malloc(sizeof(*cpy)); + if (cpy == NULL) { + log_warn("Failed to replicate full lsdb."); + break; + } + + cpy->dst = adj->dst; + cpy->src = adj->src; + cpy->seqno = adj->seqno; + + list_add_tail(&cpy->next, ©); + } + + pthread_rwlock_unlock(&ls.db_lock); + + list_for_each_safe(p, h, ©) { + struct lsa lsm; + struct adjacency * adj; + adj = list_entry(p, struct adjacency, next); + lsm.d_addr = hton64(adj->dst); + lsm.s_addr = hton64(adj->src); + lsm.seqno = hton64(adj->seqno); + list_del(&adj->next); + free(adj); + flow_write(fd, &lsm, sizeof(lsm)); + } +} + +static void * lsupdate(void * o) +{ + struct list_head * p; + struct list_head * h; + struct timespec now; + + (void) o; + + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&ls.db_lock); + + pthread_cleanup_push(__cleanup_rwlock_unlock, &ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * adj; + adj = list_entry(p, struct adjacency, next); + if (now.tv_sec - adj->stamp > LS_TIMEO) { + list_del(&adj->next); + log_dbg("%" PRIu64 " - %" PRIu64" timed out.", + adj->src, adj->dst); + if (graph_del_edge(ls.graph, adj->src, + adj->dst)) + log_err("Failed to del edge."); + free(adj); + continue; + } + + if (adj->src == ipcpi.dt_addr) { + adj->seqno++; + send_lsm(adj->src, adj->dst, adj->seqno); + adj->stamp = now.tv_sec; + } + } + + pthread_cleanup_pop(true); + + sleep(LS_UPDATE_TIME); + } + + return (void *) 0; +} + +static void * ls_conn_handle(void * o) +{ + struct conn conn; + + (void) o; + + while (true) { + if (connmgr_wait(COMPID_MGMT, &conn)) { + log_err("Failed to get next MGMT connection."); + continue; + } + + /* NOTE: connection acceptance policy could be here. */ + + notifier_event(NOTIFY_MGMT_CONN_ADD, &conn); + } + + return 0; +} + + +static void forward_lsm(uint8_t * buf, + size_t len, + int in_fd) +{ + struct list_head * p; + + pthread_rwlock_rdlock(&ls.db_lock); + + pthread_cleanup_push(__cleanup_rwlock_unlock, &ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->type == NB_MGMT && nb->fd != in_fd) + flow_write(nb->fd, buf, len); + } + + pthread_cleanup_pop(true); +} + +static void cleanup_fqueue(void * fq) +{ + fqueue_destroy((fqueue_t *) fq); +} + +static void * lsreader(void * o) +{ + fqueue_t * fq; + int ret; + uint8_t buf[sizeof(struct lsa)]; + int fd; + qosspec_t qs; + struct lsa * msg; + size_t len; + + (void) o; + + memset(&qs, 0, sizeof(qs)); + + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + pthread_cleanup_push(cleanup_fqueue, fq); + + while (true) { + ret = fevent(ls.mgmt_set, fq, NULL); + if (ret < 0) { + log_warn("Event error: %d.", ret); + continue; + } + + while ((fd = fqueue_next(fq)) >= 0) { + if (fqueue_type(fq) != FLOW_PKT) + continue; + + len = flow_read(fd, buf, sizeof(*msg)); + if (len <= 0 || len != sizeof(*msg)) + continue; + + msg = (struct lsa *) buf; + + if (lsdb_add_link(ntoh64(msg->s_addr), + ntoh64(msg->d_addr), + ntoh64(msg->seqno), + &qs)) + continue; + + forward_lsm(buf, len, fd); + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +static void flow_event(int fd, + bool up) +{ + + struct list_head * p; + + log_dbg("Notifying routing instances of flow event."); + + pthread_mutex_lock(&ls.routing_i_lock); + + list_for_each(p, &ls.routing_instances) { + struct routing_i * ri = list_entry(p, struct routing_i, next); + pff_flow_state_change(ri->pff, fd, up); + } + + pthread_mutex_unlock(&ls.routing_i_lock); +} + +static void handle_event(void * self, + int event, + const void * o) +{ + /* FIXME: Apply correct QoS on graph */ + struct conn * c; + qosspec_t qs; + int flags; + + (void) self; + + assert(o); + + c = (struct conn *) o; + + memset(&qs, 0, sizeof(qs)); + + switch (event) { + case NOTIFY_DT_CONN_ADD: + pthread_rwlock_rdlock(&ls.db_lock); + + pthread_cleanup_push(__cleanup_rwlock_unlock, &ls.db_lock); + + send_lsm(ipcpi.dt_addr, c->conn_info.addr, 0); + pthread_cleanup_pop(true); + + if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) + log_dbg("Failed to add neighbor to LSDB."); + + if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, 0, &qs)) + log_dbg("Failed to add new adjacency to LSDB."); + break; + case NOTIFY_DT_CONN_DEL: + flow_event(c->flow_info.fd, false); + + if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_dbg("Failed to delete neighbor from LSDB."); + + if (lsdb_del_link(ipcpi.dt_addr, c->conn_info.addr)) + log_dbg("Local link was not in LSDB."); + break; + case NOTIFY_DT_CONN_QOS: + log_dbg("QoS changes currently unsupported."); + break; + case NOTIFY_DT_CONN_UP: + flow_event(c->flow_info.fd, true); + break; + case NOTIFY_DT_CONN_DOWN: + flow_event(c->flow_info.fd, false); + break; + case NOTIFY_MGMT_CONN_ADD: + fccntl(c->flow_info.fd, FLOWGFLAGS, &flags); + fccntl(c->flow_info.fd, FLOWSFLAGS, flags | FLOWFRNOPART); + fset_add(ls.mgmt_set, c->flow_info.fd); + if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT)) + log_warn("Failed to add mgmt neighbor to LSDB."); + /* replicate the entire lsdb */ + lsdb_replicate(c->flow_info.fd); + break; + case NOTIFY_MGMT_CONN_DEL: + fset_del(ls.mgmt_set, c->flow_info.fd); + if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_warn("Failed to delete mgmt neighbor from LSDB."); + break; + default: + break; + } +} + +struct routing_i * link_state_routing_i_create(struct pff * pff) +{ + struct routing_i * tmp; + + assert(pff); + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + goto fail_tmp; + + tmp->pff = pff; + tmp->modified = false; + + if (pthread_mutex_init(&tmp->lock, NULL)) + goto fail_instance_lock_init; + + if (pthread_create(&tmp->calculator, NULL, + periodic_recalc_pff, tmp)) + goto fail_pthread_create_lsupdate; + + pthread_mutex_lock(&ls.routing_i_lock); + + list_add(&tmp->next, &ls.routing_instances); + + pthread_mutex_unlock(&ls.routing_i_lock); + + return tmp; + + fail_pthread_create_lsupdate: + pthread_mutex_destroy(&tmp->lock); + fail_instance_lock_init: + free(tmp); + fail_tmp: + return NULL; +} + +void link_state_routing_i_destroy(struct routing_i * instance) +{ + assert(instance); + + pthread_mutex_lock(&ls.routing_i_lock); + + list_del(&instance->next); + + pthread_mutex_unlock(&ls.routing_i_lock); + + pthread_cancel(instance->calculator); + + pthread_join(instance->calculator, NULL); + + pthread_mutex_destroy(&instance->lock); + + free(instance); +} + +int link_state_init(enum pol_routing pr) +{ + struct conn_info info; + + memset(&info, 0, sizeof(info)); + + strcpy(info.comp_name, LS_COMP); + strcpy(info.protocol, LS_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_GPB; + info.addr = ipcpi.dt_addr; + + switch (pr) { + case ROUTING_LINK_STATE: + log_dbg("Using link state routing policy."); + ls.routing_algo = ROUTING_SIMPLE; + break; + case ROUTING_LINK_STATE_LFA: + log_dbg("Using Loop-Free Alternates policy."); + ls.routing_algo = ROUTING_LFA; + break; + case ROUTING_LINK_STATE_ECMP: + log_dbg("Using Equal-Cost Multipath policy."); + ls.routing_algo = ROUTING_ECMP; + break; + default: + goto fail_graph; + } + + ls.graph = graph_create(); + if (ls.graph == NULL) + goto fail_graph; + + if (notifier_reg(handle_event, NULL)) + goto fail_notifier_reg; + + if (pthread_rwlock_init(&ls.db_lock, NULL)) + goto fail_db_lock_init; + + if (pthread_mutex_init(&ls.routing_i_lock, NULL)) + goto fail_routing_i_lock_init; + + if (connmgr_comp_init(COMPID_MGMT, &info)) + goto fail_connmgr_comp_init; + + ls.mgmt_set = fset_create(); + if (ls.mgmt_set == NULL) + goto fail_fset_create; + + list_head_init(&ls.db); + list_head_init(&ls.nbs); + list_head_init(&ls.routing_instances); + + if (pthread_create(&ls.lsupdate, NULL, lsupdate, NULL)) + goto fail_pthread_create_lsupdate; + + if (pthread_create(&ls.lsreader, NULL, lsreader, NULL)) + goto fail_pthread_create_lsreader; + + if (pthread_create(&ls.listener, NULL, ls_conn_handle, NULL)) + goto fail_pthread_create_listener; + + if (rib_reg(LSDB, &r_ops)) + goto fail_rib_reg; + + ls.db_len = 0; + ls.nbs_len = 0; + + return 0; + + fail_rib_reg: + pthread_cancel(ls.listener); + pthread_join(ls.listener, NULL); + fail_pthread_create_listener: + pthread_cancel(ls.lsreader); + pthread_join(ls.lsreader, NULL); + fail_pthread_create_lsreader: + pthread_cancel(ls.lsupdate); + pthread_join(ls.lsupdate, NULL); + fail_pthread_create_lsupdate: + fset_destroy(ls.mgmt_set); + fail_fset_create: + connmgr_comp_fini(COMPID_MGMT); + fail_connmgr_comp_init: + pthread_mutex_destroy(&ls.routing_i_lock); + fail_routing_i_lock_init: + pthread_rwlock_destroy(&ls.db_lock); + fail_db_lock_init: + notifier_unreg(handle_event); + fail_notifier_reg: + graph_destroy(ls.graph); + fail_graph: + return -1; +} + +void link_state_fini(void) +{ + struct list_head * p; + struct list_head * h; + + rib_unreg(LSDB); + + notifier_unreg(handle_event); + + pthread_cancel(ls.listener); + pthread_cancel(ls.lsreader); + pthread_cancel(ls.lsupdate); + + pthread_join(ls.listener, NULL); + pthread_join(ls.lsreader, NULL); + pthread_join(ls.lsupdate, NULL); + + fset_destroy(ls.mgmt_set); + + connmgr_comp_fini(COMPID_MGMT); + + graph_destroy(ls.graph); + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + list_del(&a->next); + free(a); + } + + pthread_rwlock_unlock(&ls.db_lock); + + pthread_rwlock_destroy(&ls.db_lock); + + pthread_mutex_destroy(&ls.routing_i_lock); +} diff --git a/src/ipcpd/unicast/routing/link-state.h b/src/ipcpd/unicast/routing/link-state.h new file mode 100644 index 00000000..d77d72df --- /dev/null +++ b/src/ipcpd/unicast/routing/link-state.h @@ -0,0 +1,41 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Link state routing policy + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_POL_LINK_STATE_H +#define OUROBOROS_IPCPD_UNICAST_POL_LINK_STATE_H + +#define LS_COMP "Management" +#define LS_PROTO "LSP" + +#include "ops.h" + +int link_state_init(enum pol_routing pr); + +void link_state_fini(void); + +struct routing_i * link_state_routing_i_create(struct pff * pff); + +void link_state_routing_i_destroy(struct routing_i * instance); + +extern struct routing_ops link_state_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_POL_LINK_STATE_H */ diff --git a/src/ipcpd/unicast/routing/ops.h b/src/ipcpd/unicast/routing/ops.h new file mode 100644 index 00000000..8a79b7ec --- /dev/null +++ b/src/ipcpd/unicast/routing/ops.h @@ -0,0 +1,38 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Routing policy ops + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ROUTING_OPS_H +#define OUROBOROS_IPCPD_UNICAST_ROUTING_OPS_H + +#include "pff.h" + +struct routing_ops { + int (* init)(enum pol_routing pr); + + void (* fini)(void); + + struct routing_i * (* routing_i_create)(struct pff * pff); + + void (* routing_i_destroy)(struct routing_i * instance); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_ROUTING_OPS_H */ diff --git a/src/ipcpd/unicast/routing/pol.h b/src/ipcpd/unicast/routing/pol.h new file mode 100644 index 00000000..b6a6f150 --- /dev/null +++ b/src/ipcpd/unicast/routing/pol.h @@ -0,0 +1,23 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Routing policies + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "link-state.h" diff --git a/src/ipcpd/unicast/routing/tests/CMakeLists.txt b/src/ipcpd/unicast/routing/tests/CMakeLists.txt new file mode 100644 index 00000000..9d24bf03 --- /dev/null +++ b/src/ipcpd/unicast/routing/tests/CMakeLists.txt @@ -0,0 +1,38 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR + ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR + ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c + # Add new tests here + graph_test.c + ) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +if(CMAKE_VERSION VERSION_LESS "3.29.0") + remove(tests_to_run test_suite.c) +else () + list(POP_FRONT tests_to_run) +endif() + +foreach (test ${tests_to_run}) + get_filename_component(test_name ${test} NAME_WE) + add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/unicast/routing/tests/graph_test.c b/src/ipcpd/unicast/routing/tests/graph_test.c new file mode 100644 index 00000000..d805640c --- /dev/null +++ b/src/ipcpd/unicast/routing/tests/graph_test.c @@ -0,0 +1,351 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the graph structure + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include <ouroboros/utils.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "graph.c" + +struct graph * graph; +struct list_head table; +qosspec_t qs; + +int graph_test_entries(int entries) +{ + struct list_head * p; + int i = 0; + + if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { + printf("Failed to get routing table.\n"); + return -1; + } + + list_for_each(p, &table) + i++; + + if (i != entries) { + printf("Wrong number of entries.\n"); + graph_free_routing_table(graph, &table); + return -1; + } + + graph_free_routing_table(graph, &table); + + return 0; +} + +int graph_test_double_link(void) +{ + struct list_head * p; + int i = 0; + + if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { + printf("Failed to get routing table.\n"); + return -1; + } + + list_for_each(p, &table) + i++; + + if (i != 2) { + printf("Wrong number of entries.\n"); + graph_free_routing_table(graph, &table); + return -1; + } + + list_for_each(p, &table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + struct nhop * n = + list_first_entry(&t->nhops, struct nhop, next); + + if ((t->dst != 2 && n->nhop != 2) || + (t->dst != 3 && n->nhop != 2)) { + printf("Wrong routing entry.\n"); + graph_free_routing_table(graph, &table); + return -1; + } + } + + graph_free_routing_table(graph, &table); + + return 0; +} + +int graph_test_single_link(void) +{ + struct list_head * p; + int i = 0; + + if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { + printf("Failed to get routing table.\n"); + return -1; + } + + list_for_each(p, &table) + i++; + + if (i != 1) { + printf("Wrong number of entries.\n"); + graph_free_routing_table(graph, &table); + return -1; + } + + list_for_each(p, &table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + struct nhop * n = + list_first_entry(&t->nhops, struct nhop, next); + + if (t->dst != 2 && n->nhop != 2) { + printf("Wrong routing entry.\n"); + graph_free_routing_table(graph, &table); + return -1; + } + } + + graph_free_routing_table(graph, &table); + + return 0; +} + +static int distance_check(int dst, + int nhop, + enum routing_algo algo) +{ + (void) algo; + + /* Same distances for all algorithms at the moment. */ + if (dst == 2 && nhop != 2) { + printf("Wrong entry: 2.\n"); + return -1; + } + + if (dst == 3 && nhop != 3) { + printf("Wrong entry: 3.\n"); + return -1; + } + + if (dst == 4 && nhop != 3) { + printf("Wrong entry: 4.\n"); + return -1; + } + + if (dst == 5 && nhop != 3) { + printf("Wrong entry: 5.\n"); + return -1; + } + + if (dst == 6 && nhop != 2) { + printf("Wrong entry: 6.\n"); + return -1; + } + + if (dst == 7 && nhop != 3) { + printf("Wrong entry: 7.\n"); + return -1; + } + + return 0; +} + +int graph_test(int argc, + char ** argv) +{ + int nhop; + int dst; + struct list_head * p; + + (void) argc; + (void) argv; + + memset(&qs, 0, sizeof(qs)); + + graph = graph_create(); + if (graph == NULL) { + printf("Failed to create graph.\n"); + return -1; + } + + graph_destroy(graph); + + graph = graph_create(); + if (graph == NULL) { + printf("Failed to create graph.\n"); + return -1; + } + + if (graph_update_edge(graph, 1, 2, qs)) { + printf("Failed to add edge.\n"); + graph_destroy(graph); + return -1; + } + + if (graph_update_edge(graph, 2, 1, qs)) { + printf("Failed to add edge.\n"); + graph_destroy(graph); + return -1; + } + + if (graph_test_single_link()) { + graph_destroy(graph); + return -1; + } + + if (graph_update_edge(graph, 2, 3, qs)) { + printf("Failed to add edge.\n"); + graph_destroy(graph); + return -1; + } + + if (graph_update_edge(graph, 3, 2, qs)) { + printf("Failed to add edge.\n"); + graph_destroy(graph); + return -1; + } + + if (graph_test_double_link()) { + graph_destroy(graph); + return -1; + } + + if (graph_del_edge(graph, 2, 3)) { + printf("Failed to delete edge.\n"); + goto fail_graph; + } + + if (graph_del_edge(graph, 3, 2)) { + printf("Failed to delete edge.\n"); + goto fail_graph; + } + + if (graph_test_single_link()) + goto fail_graph; + + graph_update_edge(graph, 2, 3, qs); + graph_update_edge(graph, 3, 2, qs); + graph_update_edge(graph, 1, 3, qs); + graph_update_edge(graph, 3, 1, qs); + + if (graph_test_entries(2)) + goto fail_graph; + + graph_update_edge(graph, 3, 4, qs); + graph_update_edge(graph, 4, 3, qs); + graph_update_edge(graph, 4, 5, qs); + graph_update_edge(graph, 5, 4, qs); + + if (graph_test_entries(4)) + goto fail_graph; + + graph_update_edge(graph, 2, 6, qs); + graph_update_edge(graph, 6, 2, qs); + graph_update_edge(graph, 6, 7, qs); + graph_update_edge(graph, 7, 6, qs); + graph_update_edge(graph, 3, 7, qs); + graph_update_edge(graph, 7, 3, qs); + + if (graph_test_entries(6)) + goto fail_graph; + + + if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { + printf("Failed to get routing table.\n"); + goto fail_graph; + } + + list_for_each(p, &table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + struct nhop * n = + list_first_entry(&t->nhops, struct nhop, next); + + dst = t->dst; + nhop = n->nhop; + + if (distance_check(dst, nhop, ROUTING_SIMPLE)) { + printf("Simple distance check failed.\n"); + goto fail_routing; + } + } + + graph_free_routing_table(graph, &table); + + if (graph_routing_table(graph, ROUTING_LFA, 1, &table)) { + printf("Failed to get routing table for LFA.\n"); + goto fail_graph; + } + + list_for_each(p, &table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + struct nhop * n = + list_first_entry(&t->nhops, struct nhop, next); + + dst = t->dst; + nhop = n->nhop; + + if (distance_check(dst, nhop, ROUTING_LFA)) { + printf("LFA distance check failed.\n"); + goto fail_routing; + } + } + + graph_free_routing_table(graph, &table); + + if (graph_routing_table(graph, ROUTING_ECMP, 1, &table)) { + printf("Failed to get routing table for ECMP.\n"); + goto fail_graph; + } + + list_for_each(p, &table) { + struct routing_table * t = + list_entry(p, struct routing_table, next); + struct nhop * n = + list_first_entry(&t->nhops, struct nhop, next); + + dst = t->dst; + nhop = n->nhop; + + if (distance_check(dst, nhop, ROUTING_LFA)) { + printf("LFA distance check failed.\n"); + goto fail_routing; + } + } + + graph_free_routing_table(graph, &table); + + graph_destroy(graph); + + return 0; + + fail_routing: + graph_free_routing_table(graph, &table); + fail_graph: + graph_destroy(graph); + return -1; +} |
