diff options
Diffstat (limited to 'src/ipcpd')
77 files changed, 1555 insertions, 1666 deletions
diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 1ce1bc0d..54294f11 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -1,3 +1,7 @@ +set(CONNMGR_RCV_TIMEOUT 1000 CACHE STRING + "Timeout for the connection manager to wait for OCEP info (ms).") +set(IPCP_DEBUG_LOCAL FALSE CACHE BOOL + "Use PID as address for local debugging") set(IPCP_QOS_CUBE_BE_PRIO 50 CACHE STRING "Priority for best effort QoS cube (0-99)") set(IPCP_QOS_CUBE_VIDEO_PRIO 90 CACHE STRING @@ -44,6 +48,10 @@ set(IPCP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/shim-data.c ) +set (COMMON_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/common/enroll.c + ) + add_subdirectory(local) add_subdirectory(eth) add_subdirectory(udp) diff --git a/src/ipcpd/broadcast/CMakeLists.txt b/src/ipcpd/broadcast/CMakeLists.txt index af0d8fcf..d85f335e 100644 --- a/src/ipcpd/broadcast/CMakeLists.txt +++ b/src/ipcpd/broadcast/CMakeLists.txt @@ -20,11 +20,10 @@ set(SOURCE_FILES # Add source files here connmgr.c dt.c - enroll.c main.c ) -add_executable(ipcpd-broadcast ${SOURCE_FILES} ${IPCP_SOURCES} +add_executable(ipcpd-broadcast ${SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} ${LAYER_CONFIG_PROTO_SRCS}) target_link_libraries(ipcpd-broadcast LINK_PUBLIC ouroboros-dev) diff --git a/src/ipcpd/broadcast/connmgr.c b/src/ipcpd/broadcast/connmgr.c index 393ee4ee..f297175d 100644 --- a/src/ipcpd/broadcast/connmgr.c +++ b/src/ipcpd/broadcast/connmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Handles connections between components * diff --git a/src/ipcpd/broadcast/dt.c b/src/ipcpd/broadcast/dt.c index bb04c42e..938c9085 100644 --- a/src/ipcpd/broadcast/dt.c +++ b/src/ipcpd/broadcast/dt.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Forward loop for broadcast * @@ -78,15 +78,16 @@ static int dt_add_nb(int fd) list_for_each(p, &fwd.nbs) { struct nb * el = list_entry(p, struct nb, next); if (el->fd == fd) { - log_dbg("Already know neighbor."); pthread_rwlock_unlock(&fwd.nbs_lock); - return -EPERM; + log_warn("Already know neighbor on fd %d.", fd); + return 0; } } nb = malloc(sizeof(*nb)); if (nb == NULL) { pthread_rwlock_unlock(&fwd.nbs_lock); + log_err("Failed to malloc neighbor struct."); return -ENOMEM; } @@ -96,10 +97,10 @@ static int dt_add_nb(int fd) ++fwd.nbs_len; - log_dbg("Neighbor %d added.", fd); - pthread_rwlock_unlock(&fwd.nbs_lock); + log_dbg("Neighbor %d added.", fd); + return 0; } @@ -124,6 +125,8 @@ static int dt_del_nb(int fd) pthread_rwlock_unlock(&fwd.nbs_lock); + log_err("Neighbor not found on fd %d.", fd); + return -EPERM; } @@ -191,7 +194,7 @@ static void * dt_reader(void * o) while (true) { ret = fevent(fwd.set, fq, NULL); if (ret < 0) { - log_warn("Event error: %d.", ret); + log_warn("Event warning: %d.", ret); continue; } @@ -226,13 +229,13 @@ static void handle_event(void * self, switch (event) { case NOTIFY_DT_CONN_ADD: - if (dt_add_nb(c->flow_info.fd)) - log_dbg("Failed to add neighbor."); + if (dt_add_nb(c->flow_info.fd) < 0) + log_err("Failed to add neighbor."); fset_add(fwd.set, c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: - if (dt_del_nb(c->flow_info.fd)) - log_dbg("Failed to delete neighbor."); + if (dt_del_nb(c->flow_info.fd) < 0) + log_err("Failed to delete neighbor."); fset_del(fwd.set, c->flow_info.fd); break; default: diff --git a/src/ipcpd/broadcast/dt.h b/src/ipcpd/broadcast/dt.h index 1b3a2dd4..8d3b83f8 100644 --- a/src/ipcpd/broadcast/dt.h +++ b/src/ipcpd/broadcast/dt.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Forward loop for broadcast * diff --git a/src/ipcpd/broadcast/enroll.c b/src/ipcpd/broadcast/enroll.c deleted file mode 100644 index d71c97cb..00000000 --- a/src/ipcpd/broadcast/enroll.c +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2022 - * - * Enrollment Task - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ -#define BUILD_IPCP_BROADCAST - -#include "common/enroll.c" diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index fdd3ca99..f51fc629 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Broadcast IPC Process * @@ -31,14 +31,14 @@ #define OUROBOROS_PREFIX "broadcast-ipcp" #define THIS_TYPE IPCP_BROADCAST -#include <ouroboros/errno.h> -#include <ouroboros/hash.h> #include <ouroboros/dev.h> +#include <ouroboros/errno.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/logs.h> #include <ouroboros/notifier.h> +#include <ouroboros/random.h> #include <ouroboros/rib.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include "common/connmgr.h" #include "common/enroll.h" @@ -56,50 +56,34 @@ struct ipcp ipcpi; static int initialize_components(const struct ipcp_config * conf) { - ipcpi.layer_name = strdup(conf->layer_info.layer_name); - if (ipcpi.layer_name == NULL) { - log_err("Failed to set layer name."); - goto fail_layer_name; - } - - ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; + strcpy(ipcpi.layer_name, conf->layer_info.name); + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; assert(ipcp_dir_hash_len() != 0); - if (dt_init()) { + if (dt_init() < 0) { log_err("Failed to initialize forwarding component."); - goto fail_dt; + return -1; } ipcp_set_state(IPCP_INIT); return 0; - - fail_dt: - free(ipcpi.layer_name); - fail_layer_name: - return -1; } static void finalize_components(void) { dt_fini(); - - free(ipcpi.layer_name); } static int start_components(void) { - assert(ipcp_get_state() == IPCP_INIT); - - ipcp_set_state(IPCP_OPERATIONAL); - - if (enroll_start()) { + if (enroll_start() < 0) { log_err("Failed to start enrollment."); goto fail_enroll_start; } - if (connmgr_start()) { + if (connmgr_start() < 0) { log_err("Failed to start AP connection manager."); goto fail_connmgr_start; } @@ -115,52 +99,55 @@ static int start_components(void) static void stop_components(void) { - assert(ipcp_get_state() == IPCP_OPERATIONAL || - ipcp_get_state() == IPCP_SHUTDOWN); - connmgr_stop(); enroll_stop(); - - ipcp_set_state(IPCP_INIT); } static int broadcast_ipcp_enroll(const char * dst, struct layer_info * info) { struct conn conn; + uint8_t id[ENROLL_ID_LEN]; - if (connmgr_alloc(COMPID_ENROLL, dst, NULL, &conn)) { - log_err("Failed to get connection."); - goto fail_er_flow; + if (random_buffer(id, ENROLL_ID_LEN) < 0) { + log_err("Failed to generate enrollment ID."); + goto fail_id; + } + + log_info_id(id, "Requesting enrollment."); + + if (connmgr_alloc(COMPID_ENROLL, dst, NULL, &conn) < 0) { + log_err_id(id, "Failed to get connection."); + goto fail_id; } /* Get boot state from peer. */ - if (enroll_boot(&conn)) { - log_err("Failed to get boot information."); + if (enroll_boot(&conn, id) < 0) { + log_err_id(id, "Failed to get boot information."); goto fail_enroll_boot; } - if (initialize_components(enroll_get_conf())) { - log_err("Failed to initialize IPCP components."); + if (initialize_components(enroll_get_conf()) < 0) { + log_err_id(id, "Failed to initialize components."); goto fail_enroll_boot; } - if (start_components()) { - log_err("Failed to start components."); + if (start_components() < 0) { + log_err_id(id, "Failed to start components."); goto fail_start_comp; } - if (enroll_done(&conn, 0)) - log_warn("Failed to confirm enrollment with peer."); + if (enroll_ack(&conn, id, 0) < 0) + log_err_id(id, "Failed to confirm enrollment."); - if (connmgr_dealloc(COMPID_ENROLL, &conn)) - log_warn("Failed to deallocate enrollment flow."); + if (connmgr_dealloc(COMPID_ENROLL, &conn) < 0) + log_warn_id(id, "Failed to dealloc enrollment flow."); - log_info("Enrolled with %s.", dst); + log_info_id(id, "Enrolled with %s.", dst); - info->dir_hash_algo = ipcpi.dir_hash_algo; - strcpy(info->layer_name, ipcpi.layer_name); + info->dir_hash_algo = (enum pol_dir_hash) ipcpi.dir_hash_algo; + strcpy(info->name, ipcpi.layer_name); return 0; @@ -168,7 +155,7 @@ static int broadcast_ipcp_enroll(const char * dst, finalize_components(); fail_enroll_boot: connmgr_dealloc(COMPID_ENROLL, &conn); - fail_er_flow: + fail_id: return -1; } @@ -176,6 +163,8 @@ static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); + ((struct ipcp_config *) conf)->layer_info.dir_hash_algo = + DIR_HASH_SHA3_256; enroll_bootstrap(conf); @@ -189,8 +178,6 @@ static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf) goto fail_start; } - log_dbg("Bootstrapped in layer %s.", conf->layer_info.layer_name); - return 0; fail_start: @@ -225,6 +212,7 @@ static int broadcast_ipcp_join(int fd, { struct conn conn; time_t mpl = IPCP_BROADCAST_MPL; + buffer_t data = {NULL, 0}; (void) qs; @@ -232,12 +220,14 @@ static int broadcast_ipcp_join(int fd, conn.flow_info.fd = fd; - if (name_check(dst) != 0) + if (name_check(dst) != 0) { + log_err("Failed to check name."); return -1; + } notifier_event(NOTIFY_DT_CONN_ADD, &conn); - ipcp_flow_alloc_reply(fd, 0, mpl, NULL, 0); + ipcp_flow_alloc_reply(fd, 0, mpl, &data); return 0; } @@ -252,12 +242,11 @@ int broadcast_ipcp_dealloc(int fd) notifier_event(NOTIFY_DT_CONN_DEL, &conn); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); return 0; } - static struct ipcp_ops broadcast_ops = { .ipcp_bootstrap = broadcast_ipcp_bootstrap, .ipcp_enroll = broadcast_ipcp_enroll, @@ -276,7 +265,7 @@ int main(int argc, char * argv[]) { if (ipcp_init(argc, argv, &broadcast_ops, THIS_TYPE) < 0) { - log_err("Failed to init IPCP."); + log_err("Failed to initialize IPCP."); goto fail_init; } @@ -295,24 +284,20 @@ int main(int argc, goto fail_enroll_init; } - if (ipcp_boot() < 0) { + if (ipcp_start() < 0) { log_err("Failed to boot IPCP."); - goto fail_boot; - } - - if (ipcp_create_r(0)) { - log_err("Failed to notify IRMd we are initialized."); - ipcp_set_state(IPCP_NULL); - goto fail_create_r; + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { stop_components(); finalize_components(); } + ipcp_stop(); + enroll_fini(); connmgr_fini(); @@ -323,9 +308,7 @@ int main(int argc, exit(EXIT_SUCCESS); - fail_create_r: - ipcp_shutdown(); - fail_boot: + fail_start: enroll_fini(); fail_enroll_init: connmgr_fini(); @@ -334,6 +317,5 @@ int main(int argc, fail_notifier_init: ipcp_fini(); fail_init: - ipcp_create_r(-1); exit(EXIT_FAILURE); } diff --git a/src/ipcpd/common/comp.h b/src/ipcpd/common/comp.h index 0a6d14ba..f3790d9c 100644 --- a/src/ipcpd/common/comp.h +++ b/src/ipcpd/common/comp.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Components for the unicast/broadcast IPC process * @@ -23,7 +23,7 @@ #ifndef OUROBOROS_IPCPD_COMMON_COMP_H #define OUROBOROS_IPCPD_COMMON_COMP_H -#include <ouroboros/cacep.h> +#include <ouroboros/cep.h> #define DST_MAX_STRLEN 64 diff --git a/src/ipcpd/common/connmgr.c b/src/ipcpd/common/connmgr.c index b582c0d2..4b5fd420 100644 --- a/src/ipcpd/common/connmgr.c +++ b/src/ipcpd/common/connmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Handles connections between components * @@ -22,9 +22,10 @@ #define OUROBOROS_PREFIX "connection-manager" +#include <ouroboros/cep.h> #include <ouroboros/dev.h> -#include <ouroboros/cacep.h> #include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> #include <ouroboros/list.h> #include <ouroboros/logs.h> #include <ouroboros/notifier.h> @@ -33,9 +34,9 @@ #include "connmgr.h" #include "ipcp.h" -#include <string.h> -#include <stdlib.h> #include <assert.h> +#include <stdlib.h> +#include <string.h> enum connmgr_state { CONNMGR_NULL = 0, @@ -128,10 +129,12 @@ static int add_comp_conn(enum comp_id id, static void * flow_acceptor(void * o) { - int fd; - qosspec_t qs; - struct conn_info rcv_info; - struct conn_info fail_info; + int fd; + qosspec_t qs; + struct conn_info rcv_info; + struct conn_info fail_info; + struct timespec timeo = TIMESPEC_INIT_MS(CONNMGR_RCV_TIMEOUT); + int err; (void) o; @@ -143,38 +146,48 @@ static void * flow_acceptor(void * o) fd = flow_accept(&qs, NULL); if (fd < 0) { if (fd != -EIRMD) - log_warn("Flow accept failed: %d", fd); + log_err("Flow accept failed: %d", fd); continue; } - if (cacep_rcv(fd, &rcv_info)) { - log_dbg("Error establishing application connection."); + log_info("Handling incoming flow %d.",fd); + + fccntl(fd, FLOWSRCVTIMEO, &timeo); + + err = cep_rcv(fd, &rcv_info); + if (err < 0) { + log_err("Error receiving OCEP info: %d.", err); flow_dealloc(fd); continue; } + log_info("Request to connect to %s.", rcv_info.comp_name); + id = get_id_by_name(rcv_info.comp_name); if (id < 0) { - log_dbg("Connection request for unknown component %s.", + log_err("Connection request for unknown component %s.", rcv_info.comp_name); - cacep_snd(fd, &fail_info); + cep_snd(fd, &fail_info); flow_dealloc(fd); continue; } - assert(id < COMPID_MAX); - - if (cacep_snd(fd, &connmgr.comps[id].info)) { - log_dbg("Failed to respond to request."); + err = cep_snd(fd, &connmgr.comps[id].info); + if (err < 0) { + log_err("Failed responding to OCEP request: %d.", err); flow_dealloc(fd); continue; } - if (add_comp_conn(id, fd, qs, &rcv_info)) { - log_dbg("Failed to add new connection."); + err = add_comp_conn(id, fd, qs, &rcv_info); + if (err < 0) { + log_err("Failed to add new connection: %d.", err); flow_dealloc(fd); continue; } + + log_info("Finished handling incoming flow %d for %s.", + fd, rcv_info.comp_name); } return (void *) 0; @@ -215,8 +228,10 @@ int connmgr_init(void) { connmgr.state = CONNMGR_INIT; - if (notifier_reg(handle_event, NULL)) + if (notifier_reg(handle_event, NULL)) { + log_err("Failed to register notifier."); return -1; + } return 0; } @@ -236,8 +251,10 @@ void connmgr_fini(void) int connmgr_start(void) { - if (pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL)) + if (pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL)) { + log_err("Failed to create pthread: %s.", strerror(errno)); return -1; + } connmgr.state = CONNMGR_RUNNING; @@ -259,12 +276,14 @@ int connmgr_comp_init(enum comp_id id, comp = connmgr.comps + id; - if (pthread_mutex_init(&comp->lock, NULL)) - return -1; + if (pthread_mutex_init(&comp->lock, NULL)) { + log_err("Failed to initialize mutex: %s.", strerror(errno)); + goto fail_mutex; + } if (pthread_cond_init(&comp->cond, NULL)) { - pthread_mutex_destroy(&comp->lock); - return -1; + log_err("Failed to initialize condvar: %s.", strerror(errno)); + goto fail_cond; } list_head_init(&comp->conns); @@ -273,6 +292,11 @@ int connmgr_comp_init(enum comp_id id, memcpy(&connmgr.comps[id].info, info, sizeof(connmgr.comps[id].info)); return 0; + + fail_cond: + pthread_mutex_destroy(&comp->lock); + fail_mutex: + return -1; } void connmgr_comp_fini(enum comp_id id) @@ -316,26 +340,32 @@ int connmgr_ipcp_connect(const char * dst, { struct conn_el * ce; int id; + int ret; assert(dst); assert(component); ce = malloc(sizeof(*ce)); if (ce == NULL) { - log_dbg("Out of memory."); - return -1; + log_err("Out of memory."); + goto fail_malloc; } id = get_id_by_name(component); if (id < 0) { - log_dbg("No such component: %s", component); - free(ce); - return -1; + log_err("No such component: %s", component); + goto fail_id; } - if (connmgr_alloc(id, dst, &qs, &ce->conn)) { - free(ce); - return -1; + pthread_cleanup_push(free, ce); + + ret = connmgr_alloc(id, dst, &qs, &ce->conn); + + pthread_cleanup_pop(false); + + if (ret < 0) { + log_err("Failed to allocate flow."); + goto fail_id; } if (strlen(dst) > DST_MAX_STRLEN) { @@ -353,6 +383,11 @@ int connmgr_ipcp_connect(const char * dst, pthread_mutex_unlock(&connmgr.comps[id].lock); return 0; + + fail_id: + free(ce); + fail_malloc: + return -1; } int connmgr_ipcp_disconnect(const char * dst, @@ -366,8 +401,10 @@ int connmgr_ipcp_disconnect(const char * dst, assert(component); id = get_id_by_name(component); - if (id < 0) + if (id < 0) { + log_err("No such component: %s.", component); return -1; + } pthread_mutex_lock(&connmgr.comps[id].lock); @@ -393,54 +430,58 @@ int connmgr_alloc(enum comp_id id, qosspec_t * qs, struct conn * conn) { + struct comp * comp; + int fd; + struct timespec timeo = TIMESPEC_INIT_MS(CONNMGR_RCV_TIMEOUT); + assert(id >= 0 && id < COMPID_MAX); assert(dst); - conn->flow_info.fd = flow_alloc(dst, qs, NULL); - if (conn->flow_info.fd < 0) { - log_dbg("Failed to allocate flow to %s.", dst); - return -1; + comp = connmgr.comps + id; + + fd = flow_alloc(dst, qs, NULL); + if (fd < 0) { + log_err("Failed to allocate flow to %s.", dst); + goto fail_alloc; } + conn->flow_info.fd = fd; + if (qs != NULL) conn->flow_info.qs = *qs; else memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs)); - log_dbg("Sending cacep info for protocol %s to fd %d.", - connmgr.comps[id].info.protocol, conn->flow_info.fd); + log_dbg("Sending OCEP info for protocol %s to fd %d.", + comp->info.protocol, conn->flow_info.fd); - if (cacep_snd(conn->flow_info.fd, &connmgr.comps[id].info)) { - log_dbg("Failed to create application connection."); - flow_dealloc(conn->flow_info.fd); - return -1; + fccntl(fd, FLOWSRCVTIMEO, &timeo); + + if (cep_snd(fd, &comp->info)) { + log_err("Failed to send OCEP info."); + goto fail_cep; } - if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) { - log_dbg("Failed to connect to application."); - flow_dealloc(conn->flow_info.fd); - return -1; + if (cep_rcv(fd, &conn->conn_info)) { + log_err("Failed to receive OCEP info."); + goto fail_cep; } - if (strcmp(connmgr.comps[id].info.protocol, conn->conn_info.protocol)) { - log_dbg("Unknown protocol (requested %s, got %s).", - connmgr.comps[id].info.protocol, - conn->conn_info.protocol); - flow_dealloc(conn->flow_info.fd); - return -1; + if (strcmp(comp->info.protocol, conn->conn_info.protocol)) { + log_err("Unknown protocol (requested %s, got %s).", + comp->info.protocol, conn->conn_info.protocol); + goto fail_cep; } - if (connmgr.comps[id].info.pref_version != - conn->conn_info.pref_version) { - log_dbg("Unknown protocol version."); - flow_dealloc(conn->flow_info.fd); - return -1; + if (comp->info.pref_version != conn->conn_info.pref_version) { + log_err("Unknown protocol version %d.", + conn->conn_info.pref_version); + goto fail_cep; } - if (connmgr.comps[id].info.pref_syntax != conn->conn_info.pref_syntax) { - log_dbg("Unknown protocol syntax."); - flow_dealloc(conn->flow_info.fd); - return -1; + if (comp->info.pref_syntax != conn->conn_info.pref_syntax) { + log_err("Unknown protocol syntax."); + goto fail_cep; } switch (id) { @@ -458,6 +499,11 @@ int connmgr_alloc(enum comp_id id, } return 0; + + fail_cep: + flow_dealloc(conn->flow_info.fd); + fail_alloc: + return -1; } int connmgr_dealloc(enum comp_id id, @@ -503,6 +549,7 @@ int connmgr_wait(enum comp_id id, el = list_first_entry((&comp->pending), struct conn_el, next); if (el == NULL) { pthread_mutex_unlock(&comp->lock); + log_err("Failed to get connection element."); return -1; } diff --git a/src/ipcpd/common/connmgr.h b/src/ipcpd/common/connmgr.h index 2e8c5745..0710dbbf 100644 --- a/src/ipcpd/common/connmgr.h +++ b/src/ipcpd/common/connmgr.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Handles the different AP connections * @@ -23,7 +23,7 @@ #ifndef OUROBOROS_IPCPD_COMMON_CONNMGR_H #define OUROBOROS_IPCPD_COMMON_CONNMGR_H -#include <ouroboros/cacep.h> +#include <ouroboros/cep.h> #include <ouroboros/qos.h> #include "comp.h" diff --git a/src/ipcpd/common/enroll.c b/src/ipcpd/common/enroll.c index 86f1c5a6..5e35ce37 100644 --- a/src/ipcpd/common/enroll.c +++ b/src/ipcpd/common/enroll.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Enrollment Task * @@ -28,13 +28,11 @@ #define OUROBOROS_PREFIX "enrollment" -#include <ouroboros/endian.h> -#include <ouroboros/errno.h> -#include <ouroboros/time_utils.h> #include <ouroboros/dev.h> -#include <ouroboros/logs.h> #include <ouroboros/errno.h> -#include <ouroboros/sockets.h> +#include <ouroboros/logs.h> +#include <ouroboros/serdes-oep.h> +#include <ouroboros/time.h> #include "common/connmgr.h" #include "common/enroll.h" @@ -45,9 +43,6 @@ #include <string.h> #include <pthread.h> -#include "ipcp_config.pb-c.h" -typedef EnrollMsg enroll_msg_t; - #define ENROLL_COMP "Enrollment" #define ENROLL_PROTO "OEP" /* Ouroboros enrollment protocol */ #define ENROLL_WARN_TIME_OFFSET 20 @@ -65,261 +60,218 @@ struct { pthread_t listener; } enroll; -static int send_rcv_enroll_msg(int fd) +static void * enroll_handle(void * o) { - enroll_msg_t req = ENROLL_MSG__INIT; - enroll_msg_t * reply; - uint8_t buf[ENROLL_BUF_LEN]; - ssize_t len; - ssize_t delta_t; - struct timespec t0; - struct timespec rtt; - - req.code = ENROLL_CODE__ENROLL_REQ; - - len = enroll_msg__get_packed_size(&req); - if (len < 0) { - log_dbg("Failed pack request message."); - return -1; - } - - enroll_msg__pack(&req, buf); + struct enroll_req req; + struct enroll_resp resp; + struct enroll_ack ack; + struct conn conn; + uint8_t __buf[ENROLL_BUF_LEN]; + buffer_t buf; + ssize_t len; - clock_gettime(CLOCK_REALTIME, &t0); - - if (flow_write(fd, buf, len) < 0) { - log_dbg("Failed to send request message."); - return -1; - } - - len = flow_read(fd, buf, ENROLL_BUF_LEN); - if (len < 0) { - log_dbg("No enrollment reply received."); - return -1; - } - - log_dbg("Received enrollment info (%zd bytes).", len); - - reply = enroll_msg__unpack(NULL, len, buf); - if (reply == NULL) { - log_dbg("No enrollment response."); - return -1; - } + (void) o; - if (reply->code != ENROLL_CODE__ENROLL_BOOT) { - log_dbg("Failed to unpack enrollment response."); - enroll_msg__free_unpacked(reply, NULL); - return -1; - } + buf.data = __buf; + buf.len = sizeof(__buf); - if (!(reply->has_t_sec && reply->has_t_nsec)) { - log_dbg("No time in response message."); - enroll_msg__free_unpacked(reply, NULL); - return -1; - } + resp.response = 0; + resp.conf = enroll.conf; - clock_gettime(CLOCK_REALTIME, &rtt); + while (true) { + buffer_t msg; + int fd; - delta_t = ts_diff_ms(&t0, &rtt); + if (connmgr_wait(COMPID_ENROLL, &conn)) { + log_err("Failed to get next connection."); + continue; + } - rtt.tv_sec = reply->t_sec; - rtt.tv_nsec = reply->t_nsec; + fd = conn.flow_info.fd; - if (labs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) - log_warn("Clock offset above threshold."); - - strcpy(enroll.conf.layer_info.layer_name, - reply->conf->layer_info->layer_name); - enroll.conf.type = reply->conf->ipcp_type; -#ifdef BUILD_IPCP_UNICAST - enroll.conf.addr_size = reply->conf->addr_size; - enroll.conf.eid_size = reply->conf->eid_size; - enroll.conf.max_ttl = reply->conf->max_ttl; - enroll.conf.addr_auth_type = reply->conf->addr_auth_type; - enroll.conf.routing_type = reply->conf->routing_type; - enroll.conf.cong_avoid = reply->conf->cong_avoid; -#endif - enroll.conf.layer_info.dir_hash_algo - = reply->conf->layer_info->dir_hash_algo; - enroll_msg__free_unpacked(reply, NULL); + log_info("Incoming enrollment connection on flow %d.", fd); - return 0; -} + len = flow_read(fd, buf.data, buf.len); + if (len < 0) { + log_err("Failed to read from flow %d.", fd); + goto finish_flow; + } -static ssize_t enroll_pack(uint8_t ** buf) -{ - enroll_msg_t msg = ENROLL_MSG__INIT; - ipcp_config_msg_t config = IPCP_CONFIG_MSG__INIT; - layer_info_msg_t layer_info = LAYER_INFO_MSG__INIT; - struct timespec now; - ssize_t len; + msg.data = buf.data; + msg.len = (size_t) len; - clock_gettime(CLOCK_REALTIME, &now); - - msg.code = ENROLL_CODE__ENROLL_BOOT; - msg.has_t_sec = true; - msg.t_sec = now.tv_sec; - msg.has_t_nsec = true; - msg.t_nsec = now.tv_nsec; - msg.conf = &config; - - config.ipcp_type = enroll.conf.type; -#ifdef BUILD_IPCP_UNICAST - config.has_addr_size = true; - config.addr_size = enroll.conf.addr_size; - config.has_eid_size = true; - config.eid_size = enroll.conf.eid_size; - config.has_max_ttl = true; - config.max_ttl = enroll.conf.max_ttl; - config.has_addr_auth_type = true; - config.addr_auth_type = enroll.conf.addr_auth_type; - config.has_routing_type = true; - config.routing_type = enroll.conf.routing_type; - config.has_cong_avoid = true; - config.cong_avoid = enroll.conf.cong_avoid; -#endif - config.layer_info = &layer_info; + if (enroll_req_des(&req, msg) < 0) { + log_err("Failed to unpack request message."); + goto finish_flow; + } - layer_info.layer_name = (char *) enroll.conf.layer_info.layer_name; - layer_info.dir_hash_algo = enroll.conf.layer_info.dir_hash_algo; + log_info_id(req.id, "Handling incoming enrollment."); - len = enroll_msg__get_packed_size(&msg); + /* TODO: authentication, timezone handling (UTC). */ - *buf = malloc(len); - if (*buf == NULL) - return -ENOMEM; + ack.result = -100; - enroll_msg__pack(&msg, *buf); + clock_gettime(CLOCK_REALTIME, &resp.t); - return len; -} + memcpy(resp.id, req.id, ENROLL_ID_LEN); -static void * enroll_handle(void * o) -{ - struct conn conn; - uint8_t buf[ENROLL_BUF_LEN]; - uint8_t * reply; - ssize_t len; - enroll_msg_t * msg; + len = enroll_resp_ser(&resp, buf); + if (len < 0) { + log_err_id(req.id, "Failed to pack reply."); + goto finish_enroll; + } - (void) o; + log_dbg_id(req.id, "Sending enrollment info (%zd bytes).", len); - while (true) { - if (connmgr_wait(COMPID_ENROLL, &conn)) { - log_err("Failed to get next connection."); - continue; + if (flow_write(conn.flow_info.fd, buf.data, len) < 0) { + log_err_id(req.id, "Failed te send response."); + goto finish_enroll; } - len = flow_read(conn.flow_info.fd, buf, ENROLL_BUF_LEN); + len = flow_read(conn.flow_info.fd, buf.data, buf.len); if (len < 0) { - log_err("Failed to read from flow."); - connmgr_dealloc(COMPID_ENROLL, &conn); - continue; + log_err_id(req.id, "Failed to read from flow."); + goto finish_enroll; } - msg = enroll_msg__unpack(NULL, len, buf); - if (msg == NULL) { - log_err("Failed to unpack message."); - connmgr_dealloc(COMPID_ENROLL, &conn); - continue; - } + msg.data = buf.data; + msg.len = (size_t) len; - if (msg->code != ENROLL_CODE__ENROLL_REQ) { - log_err("Wrong message type."); - connmgr_dealloc(COMPID_ENROLL, &conn); - enroll_msg__free_unpacked(msg, NULL); - continue; + if (enroll_ack_des(&ack, msg) < 0) { + log_err_id(req.id, "Failed to unpack ack."); + goto finish_enroll; } - log_dbg("Enrolling a new neighbor."); - - enroll_msg__free_unpacked(msg, NULL); - - len = enroll_pack(&reply); - if (len < 0) { - log_err("Failed to pack enrollment message."); - connmgr_dealloc(COMPID_ENROLL, &conn); - continue; + if (memcmp(req.id, ack.id, ENROLL_ID_LEN) != 0) + log_warn_id(req.id, "Enrollment ID mismatch."); + + finish_enroll: + switch(ack.result) { + case 0: + log_info_id(req.id, "Enrollment completed."); + break; + case -100: + log_warn_id(req.id, "Enrollment failed."); + break; + default: + log_warn_id(req.id, "Enrollment failed at remote."); } + finish_flow: + connmgr_dealloc(COMPID_ENROLL, &conn); - log_dbg("Sending enrollment info (%zd bytes).", len); + log_info("Enrollment flow %d closed.", fd); + } - if (flow_write(conn.flow_info.fd, reply, len) < 0) { - log_err("Failed respond to enrollment request."); - connmgr_dealloc(COMPID_ENROLL, &conn); - free(reply); - continue; - } + return 0; +} - free(reply); +int enroll_boot(struct conn * conn, + const uint8_t * id) +{ + uint8_t __buf[ENROLL_BUF_LEN]; + buffer_t buf; + buffer_t msg; + ssize_t len; + ssize_t delta_t; + struct timespec t0; + struct timespec rtt; + int fd; + int ret; + struct enroll_req req; + struct enroll_resp resp; + + fd = conn->flow_info.fd; + + buf.data = __buf; + buf.len = sizeof(__buf); + + memcpy(req.id, id, ENROLL_ID_LEN); + + len = enroll_req_ser(&req, buf); + if (len < 0) { + log_err_id(id, "Failed to pack request message."); + return -1; + } - len = flow_read(conn.flow_info.fd, buf, ENROLL_BUF_LEN); - if (len < 0) { - log_err("Failed to read from flow."); - connmgr_dealloc(COMPID_ENROLL, &conn); - continue; - } + clock_gettime(CLOCK_REALTIME, &t0); - msg = enroll_msg__unpack(NULL, len, buf); - if (msg == NULL) { - log_err("Failed to unpack message."); - connmgr_dealloc(COMPID_ENROLL, &conn); - continue; - } + if (flow_write(fd, buf.data, len) < 0) { + log_err_id(id, "Failed to send request message."); + return -1; + } - if (msg->code != ENROLL_CODE__ENROLL_DONE || !msg->has_result) { - log_err("Wrong message type."); - enroll_msg__free_unpacked(msg, NULL); - connmgr_dealloc(COMPID_ENROLL, &conn); - continue; - } + len = flow_read(fd, buf.data, buf.len); + if (len < 0) { + log_err_id(id, "No reply received."); + return -1; + } - if (msg->result == 0) - log_dbg("Neighbor enrollment successful."); - else - log_dbg("Neigbor reported failed enrollment."); + log_dbg_id(id, "Received configuration info (%zd bytes).", len); - enroll_msg__free_unpacked(msg, NULL); + msg.data = buf.data; + msg.len = len; - connmgr_dealloc(COMPID_ENROLL, &conn); + ret = enroll_resp_des(&resp, msg); + if (ret < 0) { + log_err_id(id, "Failed to unpack response message."); + return -1; } - return 0; -} + if (memcmp(resp.id, id, ENROLL_ID_LEN) != 0) { + log_err_id(id, "Enrollment ID mismatch."); + return -1; + } -int enroll_boot(struct conn * conn) -{ - log_dbg("Getting boot information."); + if (resp.response < 0) { + log_warn_id(id, "Remote denied request: %d.", resp.response); + return -1; + } - if (send_rcv_enroll_msg(conn->flow_info.fd)) { - log_err("Failed to enroll."); + if (resp.conf.type != ipcpi.type) { + log_err_id(id, "Wrong type in enrollment response %d (%d).", + resp.conf.type, ipcpi.type); return -1; } + clock_gettime(CLOCK_REALTIME, &rtt); + + delta_t = ts_diff_ms(&t0, &rtt); + + rtt.tv_sec = resp.t.tv_sec; + rtt.tv_nsec = resp.t.tv_nsec; + + if (labs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) + log_warn_id(id, "Clock offset above threshold."); + + enroll.conf = resp.conf; + return 0; } -int enroll_done(struct conn * conn, - int result) +int enroll_ack(struct conn * conn, + const uint8_t * id, + const int result) { - enroll_msg_t msg = ENROLL_MSG__INIT; - uint8_t buf[ENROLL_BUF_LEN]; - ssize_t len; + struct enroll_ack ack; + uint8_t __buf[ENROLL_BUF_LEN]; + buffer_t buf; + ssize_t len; - msg.code = ENROLL_CODE__ENROLL_DONE; - msg.has_result = true; - msg.result = result; + buf.data = __buf; + buf.len = sizeof(__buf); - len = enroll_msg__get_packed_size(&msg); + ack.result = result; + + memcpy(ack.id, id, ENROLL_ID_LEN); + + len = enroll_ack_ser(&ack, buf); if (len < 0) { - log_dbg("Failed pack request message."); + log_err_id(id, "Failed to pack acknowledgement."); return -1; } - enroll_msg__pack(&msg, buf); - - if (flow_write(conn->flow_info.fd, buf, len) < 0) { - log_dbg("Failed to send acknowledgment."); + if (flow_write(conn->flow_info.fd, buf.data, len) < 0) { + log_err_id(id, "Failed to send acknowledgment."); return -1; } diff --git a/src/ipcpd/common/enroll.h b/src/ipcpd/common/enroll.h index 1231456f..f26c31a3 100644 --- a/src/ipcpd/common/enroll.h +++ b/src/ipcpd/common/enroll.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Enrollment Task * @@ -37,10 +37,12 @@ void enroll_stop(void); void enroll_bootstrap(const struct ipcp_config * conf); -int enroll_boot(struct conn * conn); +int enroll_boot(struct conn * conn, + const uint8_t * id); -int enroll_done(struct conn * conn, - int result); +int enroll_ack(struct conn * conn, + const uint8_t * id, + const int result); struct ipcp_config * enroll_get_conf(void); diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index 60356e88..fe4f5fd2 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process configuration * @@ -41,6 +41,8 @@ #define IPCP_LINUX_SLACK_NS @IPCP_LINUX_TIMERSLACK_NS@ +#cmakedefine IPCP_DEBUG_LOCAL + /* unicast IPCP */ #define QOS_PRIO_BE @IPCP_QOS_CUBE_BE_PRIO@ #define QOS_PRIO_VIDEO @IPCP_QOS_CUBE_VIDEO_PRIO@ @@ -49,6 +51,7 @@ #define PFT_SIZE @PFT_SIZE@ #define DHT_ENROLL_SLACK @DHT_ENROLL_SLACK@ #define IPCP_UNICAST_MPL @IPCP_UNICAST_MPL@ +#define CONNMGR_RCV_TIMEOUT @CONNMGR_RCV_TIMEOUT@ #cmakedefine IPCP_CONN_WAIT_DIR #cmakedefine DISABLE_CORE_LOCK diff --git a/src/ipcpd/eth/dix.c b/src/ipcpd/eth/dix.c index e1e77caf..37b9896d 100644 --- a/src/ipcpd/eth/dix.c +++ b/src/ipcpd/eth/dix.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC processes over Ethernet - DIX * diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index ade8485c..ea6e0f1c 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC processes over Ethernet * @@ -37,6 +37,7 @@ #include "config.h" +#include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/errno.h> #include <ouroboros/list.h> @@ -46,7 +47,7 @@ #include <ouroboros/ipcp-dev.h> #include <ouroboros/fqueue.h> #include <ouroboros/logs.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include <ouroboros/fccntl.h> #include <ouroboros/pthread.h> @@ -135,7 +136,6 @@ #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX) #endif -#define ALLOC_TIMEO 10 /* ms */ #define NAME_QUERY_TIMEO 2000 /* ms */ #define MGMT_TIMEO 100 /* ms */ #define MGMT_FRAME_SIZE 2048 @@ -455,16 +455,15 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, return 0; } -static int eth_ipcp_alloc(const uint8_t * dst_addr, +static int eth_ipcp_alloc(const uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t eid, + uint16_t eid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, + uint8_t ssap, #endif - const uint8_t * hash, - qosspec_t qs, - const void * data, - size_t dlen) + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; @@ -473,7 +472,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + ETH_HEADER_TOT_SIZE + dlen); + buf = malloc(len + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -496,8 +495,8 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, hash, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen); + if (data->len > 0) + memcpy(buf + len + ETH_HEADER_TOT_SIZE, data->data, data->len); ret = eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -506,28 +505,27 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, len + dlen); + buf, len + data->len); free(buf); return ret; } -static int eth_ipcp_alloc_resp(uint8_t * dst_addr, +static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - uint8_t dsap, + uint8_t ssap, + uint8_t dsap, #endif - int response, - const void * data, - size_t len) + int response, + const buffer_t * data) { struct mgmt_msg * msg; uint8_t * buf; - buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + len); + buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -543,8 +541,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #endif msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); if (eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -553,7 +551,7 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, sizeof(*msg) + len)) { + buf, sizeof(*msg) + data->len)) { free(buf); return -1; } @@ -563,43 +561,20 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, return 0; } -static int eth_ipcp_req(uint8_t * r_addr, +static int eth_ipcp_req(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t r_eid, + uint16_t r_eid, #elif defined(BUILD_ETH_LLC) - uint8_t r_sap, + uint8_t r_sap, #endif - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEO * MILLION}; - struct timespec abstime; - int fd; - time_t mpl = IPCP_ETH_MPL; + int fd; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - /* reply to IRM, called under lock to prevent race */ - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data); if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); return -1; } @@ -614,11 +589,6 @@ static int eth_ipcp_req(uint8_t * r_addr, pthread_rwlock_unlock(ð_data.flows_lock); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - #if defined(BUILD_ETH_DIX) log_dbg("New flow request, fd %d, remote endpoint %d.", fd, r_eid); #elif defined(BUILD_ETH_LLC) @@ -627,17 +597,16 @@ static int eth_ipcp_req(uint8_t * r_addr, return 0; } -static int eth_ipcp_alloc_reply(uint8_t * r_addr, +static int eth_ipcp_alloc_reply(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - int dsap, + uint8_t ssap, + int dsap, #endif - int response, - const void * data, - size_t len) + int response, + const buffer_t * data) { int ret = 0; int fd = -1; @@ -676,11 +645,12 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, #elif defined(BUILD_ETH_LLC) log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); #endif - if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data, len)) < 0) + if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) { + log_err("Failed to reply to flow allocation."); return -1; + } return ret; - } static int eth_ipcp_name_query_req(const uint8_t * hash, @@ -742,6 +712,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, struct mgmt_msg * msg; size_t msg_len; qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; @@ -761,6 +732,9 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); + data.data = (uint8_t *) buf + msg_len; + data.len = len - msg_len; + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -771,13 +745,15 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, #endif buf + sizeof(*msg), qs, - buf + msg_len, - len - msg_len); + &data); } break; case FLOW_REPLY: assert(len >= sizeof(*msg)); + data.data = (uint8_t *) buf + sizeof(*msg); + data.len = len - sizeof(*msg); + eth_ipcp_alloc_reply(r_addr, #if defined(BUILD_ETH_DIX) ntohs(msg->seid), @@ -787,8 +763,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->dsap, #endif msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + &data); break; case NAME_QUERY_REQ: eth_ipcp_name_query_req(buf + sizeof(*msg), r_addr); @@ -808,10 +783,11 @@ static void * eth_ipcp_mgmt_handler(void * o) { (void) o; + pthread_cleanup_push(__cleanup_mutex_unlock, ð_data.mgmt_lock); + while (true) { int ret = 0; - struct timespec timeout = {(MGMT_TIMEO / 1000), - (MGMT_TIMEO % 1000) * MILLION}; + struct timespec timeout = TIMESPEC_INIT_MS(MGMT_TIMEO); struct timespec abstime; struct mgmt_frame * frame = NULL; @@ -819,8 +795,6 @@ static void * eth_ipcp_mgmt_handler(void * o) ts_add(&abstime, &timeout, &abstime); pthread_mutex_lock(ð_data.mgmt_lock); - pthread_cleanup_push(__cleanup_mutex_unlock, - ð_data.mgmt_lock); while (list_is_empty(ð_data.mgmt_frames) && ret != -ETIMEDOUT) @@ -833,7 +807,7 @@ static void * eth_ipcp_mgmt_handler(void * o) if (frame != NULL) list_del(&frame->next); - pthread_cleanup_pop(true); + pthread_mutex_unlock(ð_data.mgmt_lock); if (frame == NULL) continue; @@ -843,6 +817,8 @@ static void * eth_ipcp_mgmt_handler(void * o) free(frame); } + pthread_cleanup_pop(false); + return (void *) 0; } @@ -883,7 +859,7 @@ static void * eth_ipcp_packet_reader(void * o) buf = nm_nextpkt(eth_data.nmd, &hdr); if (buf == NULL) { - log_err("Bad read from netmap device."); + log_dbg("Bad read from netmap device."); continue; } #else @@ -914,6 +890,7 @@ static void * eth_ipcp_packet_reader(void * o) ETH_MTU + ETH_HEADER_TOT_SIZE, 0); #endif if (frame_len <= 0) { + log_dbg("Failed to receive frame."); ipcp_sdb_release(sdb); continue; } @@ -940,22 +917,14 @@ static void * eth_ipcp_packet_reader(void * o) #endif length = ntohs(e_frame->length); #if defined(BUILD_ETH_DIX) - if (e_frame->ethertype != eth_data.ethertype) { -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; - } + if (e_frame->ethertype != eth_data.ethertype) + goto fail_frame; deid = ntohs(e_frame->eid); if (deid == MGMT_EID) { #elif defined (BUILD_ETH_LLC) - if (length > 0x05FF) { /* DIX */ -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; - } + if (length > 0x05FF) /* DIX */ + goto fail_frame; length -= LLC_HEADER_SIZE; @@ -964,12 +933,12 @@ static void * eth_ipcp_packet_reader(void * o) if (ssap == MGMT_SAP && dsap == MGMT_SAP) { #endif + ipcp_sdb_release(sdb); /* No need for the N+1 buffer. */ + frame = malloc(sizeof(*frame)); if (frame == NULL) { -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; + log_err("Failed to allocate frame."); + goto fail_frame; } memcpy(frame->buf, &e_frame->payload, length); @@ -980,10 +949,6 @@ static void * eth_ipcp_packet_reader(void * o) list_add(&frame->next, ð_data.mgmt_frames); pthread_cond_signal(ð_data.mgmt_cond); pthread_mutex_unlock(ð_data.mgmt_lock); - -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif } else { pthread_rwlock_rdlock(ð_data.flows_lock); @@ -994,10 +959,7 @@ static void * eth_ipcp_packet_reader(void * o) #endif if (fd < 0) { pthread_rwlock_unlock(ð_data.flows_lock); -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; + goto fail_frame; } #ifdef BUILD_ETH_LLC @@ -1005,10 +967,7 @@ static void * eth_ipcp_packet_reader(void * o) || memcmp(eth_data.fd_to_ef[fd].r_addr, e_frame->src_hwaddr, MAC_SIZE)) { pthread_rwlock_unlock(ð_data.flows_lock); -#ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); -#endif - continue; + goto fail_frame; } #endif pthread_rwlock_unlock(ð_data.flows_lock); @@ -1025,6 +984,12 @@ static void * eth_ipcp_packet_reader(void * o) #endif if (np1_flow_write(fd, sdb) < 0) ipcp_sdb_release(sdb); + + continue; + fail_frame: +#ifndef HAVE_NETMAP + ipcp_sdb_release(sdb); +#endif } } @@ -1057,10 +1022,10 @@ static void * eth_ipcp_packet_writer(void * o) (void) o; - pthread_cleanup_push(cleanup_writer, fq); - ipcp_lock_to_core(); + pthread_cleanup_push(cleanup_writer, fq); + while (true) { fevent(eth_data.np1_flows, fq, NULL); while ((fd = fqueue_next(fq)) >= 0) { @@ -1078,6 +1043,7 @@ static void * eth_ipcp_packet_writer(void * o) == NULL) { log_dbg("Failed to allocate header."); ipcp_sdb_release(sdb); + continue; } pthread_rwlock_rdlock(ð_data.flows_lock); @@ -1093,14 +1059,15 @@ static void * eth_ipcp_packet_writer(void * o) pthread_rwlock_unlock(ð_data.flows_lock); - eth_ipcp_send_frame(r_addr, + if (eth_ipcp_send_frame(r_addr, #if defined(BUILD_ETH_DIX) deid, #elif defined(BUILD_ETH_LLC) dsap, ssap, #endif shm_du_buff_head(sdb), - len); + len)) + log_dbg("Failed to send frame."); ipcp_sdb_release(sdb); } } @@ -1280,32 +1247,23 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) assert(conf); assert(conf->type == THIS_TYPE); - ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; - ipcpi.layer_name = strdup(conf->layer_info.layer_name); - if (ipcpi.layer_name == NULL) { - log_err("Failed to set layer name"); - return -ENOMEM; - } + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; + strcpy(ipcpi.layer_name, conf->layer_info.name); - if (conf->dev == NULL) { - log_err("Device name is NULL."); - return -1; - } - - if (strlen(conf->dev) >= IFNAMSIZ) { - log_err("Invalid device name: %s.", conf->dev); + if (strlen(conf->eth.dev) >= IFNAMSIZ) { + log_err("Invalid device name: %s.", conf->eth.dev); return -1; } memset(&ifr, 0, sizeof(ifr)); - strcpy(ifr.ifr_name, conf->dev); + strcpy(ifr.ifr_name, conf->eth.dev); #ifdef BUILD_ETH_DIX - if (conf->ethertype < 0x0600 || conf->ethertype == 0xFFFF) { - log_err("Invalid Ethertype."); + if (conf->eth.ethertype < 0x0600 || conf->eth.ethertype == 0xFFFF) { + log_err("Invalid Ethertype: %d.", conf->eth.ethertype); return -1; } - eth_data.ethertype = htons(conf->ethertype); + eth_data.ethertype = htons(conf->eth.ethertype); #endif #if defined(__FreeBSD__) || defined(__APPLE__) @@ -1315,9 +1273,9 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) } for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) { - if (strcmp(ifa->ifa_name, conf->dev)) + if (strcmp(ifa->ifa_name, conf->eth.dev)) continue; - log_dbg("Interface %s found.", conf->dev); + log_dbg("Interface %s found.", conf->eth.dev); #if defined(HAVE_NETMAP) || defined(HAVE_BPF) memcpy(eth_data.hw_addr, @@ -1352,7 +1310,8 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) log_dbg("Device MTU is %d.", ifr.ifr_mtu); eth_data.mtu = MIN((int) ETH_MTU_MAX, ifr.ifr_mtu); - if (memcmp(conf->dev, "lo", 2) == 0 && eth_data.mtu > IPCP_ETH_LO_MTU) { + if (memcmp(conf->eth.dev, "lo", 2) == 0 && + eth_data.mtu > IPCP_ETH_LO_MTU) { log_dbg("Using loopback interface. MTU restricted to %d.", IPCP_ETH_LO_MTU); eth_data.mtu = IPCP_ETH_LO_MTU; @@ -1376,7 +1335,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) close(skfd); - idx = if_nametoindex(conf->dev); + idx = if_nametoindex(conf->eth.dev); if (idx == 0) { log_err("Failed to retrieve interface index."); return -1; @@ -1386,7 +1345,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #if defined(HAVE_NETMAP) strcpy(ifn, "netmap:"); - strcat(ifn, conf->dev); + strcat(ifn, conf->eth.dev); eth_data.nmd = nm_open(ifn, NULL, 0, NULL); if (eth_data.nmd == NULL) { @@ -1456,7 +1415,7 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) if (eth_data.s_fd < 0) { log_err("Failed to create socket."); - return -1; + goto fail_socket; } flags = fcntl(eth_data.s_fd, F_GETFL, 0); @@ -1478,60 +1437,52 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #endif if (bind(eth_data.s_fd, (struct sockaddr *) ð_data.device, - sizeof(eth_data.device))) { + sizeof(eth_data.device)) < 0) { log_err("Failed to bind socket to interface."); goto fail_device; } - #endif /* HAVE_NETMAP */ - ipcp_set_state(IPCP_OPERATIONAL); - #if defined(__linux__) - if (pthread_create(ð_data.if_monitor, - NULL, - eth_ipcp_if_monitor, - NULL)) { - ipcp_set_state(IPCP_INIT); + if (pthread_create(ð_data.if_monitor, NULL, + eth_ipcp_if_monitor, NULL)) { + log_err("Failed to create monitor thread: %s.", + strerror(errno)); goto fail_device; } #endif - if (pthread_create(ð_data.mgmt_handler, - NULL, - eth_ipcp_mgmt_handler, - NULL)) { - ipcp_set_state(IPCP_INIT); + if (pthread_create(ð_data.mgmt_handler, NULL, + eth_ipcp_mgmt_handler, NULL)) { + log_err("Failed to create mgmt handler thread: %s.", + strerror(errno)); goto fail_mgmt_handler; } for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) { - if (pthread_create(ð_data.packet_reader[idx], - NULL, - eth_ipcp_packet_reader, - NULL)) { - ipcp_set_state(IPCP_INIT); + if (pthread_create(ð_data.packet_reader[idx], NULL, + eth_ipcp_packet_reader, NULL)) { + log_err("Failed to create packet reader thread: %s", + strerror(errno)); goto fail_packet_reader; } } for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) { - if (pthread_create(ð_data.packet_writer[idx], - NULL, - eth_ipcp_packet_writer, - NULL)) { - ipcp_set_state(IPCP_INIT); + if (pthread_create(ð_data.packet_writer[idx], NULL, + eth_ipcp_packet_writer, NULL)) { + log_err("Failed to create packet writer thread: %s", + strerror(errno)); goto fail_packet_writer; } } #if defined(BUILD_ETH_DIX) log_dbg("Bootstrapped IPCP over DIX Ethernet with pid %d " - "and Ethertype 0x%X.", getpid(), conf->ethertype); + "and Ethertype 0x%X.", getpid(), conf->eth.ethertype); #elif defined(BUILD_ETH_LLC) log_dbg("Bootstrapped IPCP over Ethernet with LLC with pid %d.", getpid()); #endif - return 0; fail_packet_writer: @@ -1562,19 +1513,18 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf) #elif defined(HAVE_RAW_SOCKETS) close(eth_data.s_fd); #endif + fail_socket: return -1; } static int eth_ipcp_reg(const uint8_t * hash) { if (shim_data_reg_add_entry(eth_data.shim_data, hash)) { - log_err("Failed to add " HASH_FMT " to local registry.", - HASH_VAL(hash)); + log_err("Failed to add " HASH_FMT32 " to local registry.", + HASH_VAL32(hash)); return -1; } - log_dbg("Registered " HASH_FMT ".", HASH_VAL(hash)); - return 0; } @@ -1588,8 +1538,7 @@ static int eth_ipcp_unreg(const uint8_t * hash) static int eth_ipcp_query(const uint8_t * hash) { uint8_t r_addr[MAC_SIZE]; - struct timespec timeout = {(NAME_QUERY_TIMEO / 1000), - (NAME_QUERY_TIMEO % 1000) * MILLION}; + struct timespec timeout = TIMESPEC_INIT_MS(NAME_QUERY_TIMEO); struct dir_query * query; int ret; uint8_t * buf; @@ -1641,11 +1590,10 @@ static int eth_ipcp_query(const uint8_t * hash) return ret; } -static int eth_ipcp_flow_alloc(int fd, - const uint8_t * hash, - qosspec_t qs, - const void * data, - size_t len) +static int eth_ipcp_flow_alloc(int fd, + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1653,12 +1601,11 @@ static int eth_ipcp_flow_alloc(int fd, uint8_t r_addr[MAC_SIZE]; uint64_t addr = 0; - log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(hash)); - assert(hash); if (!shim_data_dir_has(eth_data.shim_data, hash)) { - log_err("Destination unreachable."); + log_err("Destination "HASH_FMT32 "unreachable.", + HASH_VAL32(hash)); return -1; } addr = shim_data_dir_get_addr(eth_data.shim_data, hash); @@ -1668,6 +1615,7 @@ static int eth_ipcp_flow_alloc(int fd, ssap = bmp_allocate(eth_data.saps); if (!bmp_is_id_valid(eth_data.saps, ssap)) { pthread_rwlock_unlock(ð_data.flows_lock); + log_err("Failed to allocate SSAP."); return -1; } @@ -1686,34 +1634,29 @@ static int eth_ipcp_flow_alloc(int fd, #endif hash, qs, - data, - len) < 0) { + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); eth_data.fd_to_ef[fd].sap = -1; eth_data.ef_to_fd[ssap] = -1; pthread_rwlock_unlock(ð_data.flows_lock); + log_err("Failed to allocate with peer."); #endif return -1; } fset_add(eth_data.np1_flows, fd); -#if defined(BUILD_ETH_DIX) - log_dbg("Pending flow with fd %d.", fd); -#elif defined(BUILD_ETH_LLC) - log_dbg("Pending flow with fd %d on SAP %d.", fd, ssap); +#if defined(BUILD_ETH_LLC) + log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif return 0; } -static int eth_ipcp_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int eth_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEO * MILLION}; - struct timespec abstime; #if defined(BUILD_ETH_DIX) uint16_t r_eid; #elif defined(BUILD_ETH_LLC) @@ -1722,27 +1665,11 @@ static int eth_ipcp_flow_alloc_resp(int fd, #endif uint8_t r_addr[MAC_SIZE]; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); return -1; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - pthread_rwlock_wrlock(ð_data.flows_lock); #if defined(BUILD_ETH_DIX) r_eid = eth_data.fd_to_ef[fd].r_eid; @@ -1750,6 +1677,7 @@ static int eth_ipcp_flow_alloc_resp(int fd, ssap = bmp_allocate(eth_data.saps); if (!bmp_is_id_valid(eth_data.saps, ssap)) { pthread_rwlock_unlock(ð_data.flows_lock); + log_err("Failed to allocate SSAP."); return -1; } @@ -1768,21 +1696,19 @@ static int eth_ipcp_flow_alloc_resp(int fd, ssap, r_sap, #endif response, - data, - len) < 0) { + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); pthread_rwlock_unlock(ð_data.flows_lock); #endif + log_err("Failed to respond to peer."); return -1; } fset_add(eth_data.np1_flows, fd); -#if defined(BUILD_ETH_DIX) - log_dbg("Accepted flow, fd %d.", fd); -#elif defined(BUILD_ETH_LLC) - log_dbg("Accepted flow, fd %d, SAP %d.", fd, (uint8_t)ssap); +#if defined(BUILD_ETH_LLC) + log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif return 0; } @@ -1811,9 +1737,7 @@ static int eth_ipcp_flow_dealloc(int fd) pthread_rwlock_unlock(ð_data.flows_lock); - flow_dealloc(fd); - - log_dbg("Flow with fd %d deallocated.", fd); + ipcp_flow_dealloc(fd); return 0; } @@ -1837,9 +1761,6 @@ int main(int argc, { int i; - if (ipcp_init(argc, argv, ð_ops, THIS_TYPE) < 0) - goto fail_init; - if (eth_data_init() < 0) { #if defined(BUILD_ETH_DIX) log_err("Failed to init eth-llc data."); @@ -1849,18 +1770,17 @@ int main(int argc, goto fail_data_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - goto fail_boot; + if (ipcp_init(argc, argv, ð_ops, THIS_TYPE) < 0) { + log_err("Failed to initialize IPCP."); + goto fail_init; } - if (ipcp_create_r(0)) { - log_err("Failed to notify IRMd we are initialized."); - ipcp_set_state(IPCP_NULL); - goto fail_create_r; + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { for (i = 0; i < IPCP_ETH_WR_THR; ++i) @@ -1883,19 +1803,18 @@ int main(int argc, #endif } - eth_data_fini(); + ipcp_stop(); ipcp_fini(); + eth_data_fini(); + exit(EXIT_SUCCESS); - fail_create_r: - ipcp_shutdown(); - fail_boot: - eth_data_fini(); - fail_data_init: + fail_start: ipcp_fini(); fail_init: - ipcp_create_r(-1); + eth_data_fini(); + fail_data_init: exit(EXIT_FAILURE); } diff --git a/src/ipcpd/eth/llc.c b/src/ipcpd/eth/llc.c index 56482ce7..c900dcab 100644 --- a/src/ipcpd/eth/llc.c +++ b/src/ipcpd/eth/llc.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC processes over Ethernet - LLC * diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index eefa72af..966c4920 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process main loop * @@ -35,18 +35,21 @@ #define OUROBOROS_PREFIX "ipcpd/ipcp" #define IPCP_INFO "info" +#define ALLOC_TIMEOUT 50 /* ms */ +#include <ouroboros/bitmap.h> +#include <ouroboros/dev.h> +#include <ouroboros/errno.h> #include <ouroboros/hash.h> +#include <ouroboros/ipcp-dev.h> #include <ouroboros/logs.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/utils.h> -#include <ouroboros/sockets.h> -#include <ouroboros/errno.h> -#include <ouroboros/dev.h> -#include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> -#include <ouroboros/rib.h> +#include <ouroboros/protobuf.h> #include <ouroboros/pthread.h> +#include <ouroboros/rib.h> +#include <ouroboros/sockets.h> +#include <ouroboros/time.h> +#include <ouroboros/utils.h> #include "ipcp.h" @@ -173,15 +176,15 @@ static int ipcp_rib_readdir(char *** buf) while (info[i] != NULL) { (*buf)[i] = strdup(info[i]); - if (*buf == NULL) + if ((*buf)[i] == NULL) goto fail_dup; i++; } return i; fail_dup: - while (--i > 0) - free((*buf)[i]); + while (i > 0) + free((*buf)[--i]); fail: free(*buf); @@ -191,9 +194,13 @@ static int ipcp_rib_readdir(char *** buf) static int ipcp_rib_getattr(const char * path, struct rib_attr * attr) { - (void) path; + char buf[LAYER_NAME_SIZE + 2]; + struct timespec now; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); - attr->size = LAYER_NAME_SIZE; + attr->size = ipcp_rib_read(path, buf, LAYER_NAME_SIZE + 2); + attr->mtime = now.tv_sec; return 0; } @@ -204,10 +211,9 @@ static struct rib_ops r_ops = { .getattr = ipcp_rib_getattr }; -__attribute__((no_sanitize_address)) static void * acceptloop(void * o) { - int csockfd; + int csockfd; (void) o; @@ -255,28 +261,395 @@ static void * acceptloop(void * o) return (void *) 0; } +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) +{ + struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT); + struct timespec abstime; + int fd; + buffer_t hash; + + hash.data = (uint8_t *) dst; + hash.len = ipcp_dir_hash_len(); + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_err("Won't allocate over non-operational IPCP."); + return -EIPCPSTATE; + } + + assert(ipcpi.alloc_id == -1); + + fd = ipcp_flow_req_arr(&hash, qs, mpl, data); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_err("Failed to get fd for flow."); + return fd; + } + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + return fd; + +} + +int ipcp_wait_flow_resp(const int fd) +{ + struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT); + struct timespec abstime; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + + pthread_mutex_lock(&ipcpi.alloc_lock); + + while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + return -1; + } + + assert(ipcpi.alloc_id == fd); + + ipcpi.alloc_id = -1; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + return 0; +} + static void free_msg(void * o) { ipcp_msg__free_unpacked((ipcp_msg_t *) o, NULL); } + +static void do_bootstrap(ipcp_config_msg_t * conf_msg, + ipcp_msg_t * ret_msg) +{ + struct ipcp_config conf; + + log_info("Bootstrapping..."); + + if (ipcpi.ops->ipcp_bootstrap == NULL) { + log_err("Bootstrap unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_INIT) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + conf = ipcp_config_msg_to_s(conf_msg); + ret_msg->result = ipcpi.ops->ipcp_bootstrap(&conf); + if (ret_msg->result == 0) { + ret_msg->layer_info = layer_info_s_to_msg(&conf.layer_info); + ipcp_set_state(IPCP_OPERATIONAL); + } + finish: + log_info("Finished bootstrapping: %d.", ret_msg->result); +} + +static void do_enroll(const char * dst, + ipcp_msg_t * ret_msg) +{ + struct layer_info info; + + log_info("Enrolling with %s...", dst); + + if (ipcpi.ops->ipcp_enroll == NULL) { + log_err("Enroll unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_INIT) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_enroll(dst, &info); + if (ret_msg->result == 0) { + ret_msg->layer_info = layer_info_s_to_msg(&info); + ipcp_set_state(IPCP_OPERATIONAL); + } + finish: + log_info("Finished enrolling with %s: %d.", dst, ret_msg->result); +} + +static void do_connect(const char * dst, + const char * comp, + qosspec_t qs, + ipcp_msg_t * ret_msg) +{ + log_info("Connecting %s to %s...", comp, dst); + + if (ipcpi.ops->ipcp_connect == NULL) { + log_err("Connect unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_connect(dst, comp, qs); + finish: + log_info("Finished connecting: %d.", ret_msg->result); +} + +static void do_disconnect(const char * dst, + const char * comp, + ipcp_msg_t * ret_msg) +{ + log_info("Disconnecting %s from %s...", comp, dst); + + if (ipcpi.ops->ipcp_disconnect == NULL) { + log_err("Disconnect unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_disconnect(dst, comp); + + finish: + log_info("Finished disconnecting %s from %s: %d.", + comp, dst, ret_msg->result); +} + +static void do_reg(const uint8_t * hash, + ipcp_msg_t * ret_msg) +{ + + log_info("Registering " HASH_FMT32 "...", HASH_VAL32(hash)); + + if (ipcpi.ops->ipcp_reg == NULL) { + log_err("Registration unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_reg(hash); + finish: + log_info("Finished registering " HASH_FMT32 " : %d.", + HASH_VAL32(hash), ret_msg->result); +} + +static void do_unreg(const uint8_t * hash, + ipcp_msg_t * ret_msg) +{ + log_info("Unregistering " HASH_FMT32 "...", HASH_VAL32(hash)); + + if (ipcpi.ops->ipcp_unreg == NULL) { + log_err("Unregistration unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_unreg(hash); + finish: + log_info("Finished unregistering " HASH_FMT32 ": %d.", + HASH_VAL32(hash), ret_msg->result); +} + +static void do_query(const uint8_t * hash, + ipcp_msg_t * ret_msg) +{ + /* TODO: Log this operation when IRMd has internal caches. */ + + if (ipcpi.ops->ipcp_query == NULL) { + log_err("Directory query unsupported."); + ret_msg->result = -ENOTSUP; + return; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + return; + } + + ret_msg->result = ipcpi.ops->ipcp_query(hash); +} + +static void do_flow_alloc(pid_t pid, + int flow_id, + uint8_t * dst, + qosspec_t qs, + const buffer_t * data, + ipcp_msg_t * ret_msg) +{ + int fd; + + log_info("Allocating flow %d for %d to " HASH_FMT32 ".", + flow_id, pid, HASH_VAL32(dst)); + + if (ipcpi.ops->ipcp_flow_alloc == NULL) { + log_err("Flow allocation unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + fd = np1_flow_alloc(pid, flow_id); + if (fd < 0) { + log_err("Failed allocating n + 1 fd on flow_id %d: %d", + flow_id, fd); + ret_msg->result = -EFLOWDOWN; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data); + finish: + log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.", + flow_id, HASH_VAL32(dst), ret_msg->result); +} + + +static void do_flow_join(pid_t pid, + int flow_id, + const uint8_t * dst, + qosspec_t qs, + ipcp_msg_t * ret_msg) +{ + int fd; + + log_info("Joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); + + if (ipcpi.ops->ipcp_flow_join == NULL) { + log_err("Broadcast unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + fd = np1_flow_alloc(pid, flow_id); + if (fd < 0) { + log_err("Failed allocating n + 1 fd on flow_id %d.", flow_id); + ret_msg->result = -1; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_flow_join(fd, dst, qs); + finish: + log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); +} + +static void do_flow_alloc_resp(int resp, + int flow_id, + const buffer_t * data, + ipcp_msg_t * ret_msg) +{ + int fd = -1; + + log_info("Responding %d to alloc on flow_id %d.", resp, flow_id); + + if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { + log_err("Flow_alloc_resp unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + if (resp == 0) { + fd = np1_flow_resp(flow_id); + if (fd < 0) { + log_warn("Flow_id %d is not known.", flow_id); + ret_msg->result = -1; + goto finish; + } + } + + ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data); + finish: + log_info("Finished responding to allocation request: %d", + ret_msg->result); +} + +static void do_flow_dealloc(int flow_id, + int timeo_sec, + ipcp_msg_t * ret_msg) +{ + int fd; + + log_info("Deallocating flow %d.", flow_id); + + if (ipcpi.ops->ipcp_flow_dealloc == NULL) { + log_err("Flow deallocation unsupported."); + ret_msg->result = -ENOTSUP; + goto finish; + } + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg->result = -EIPCPSTATE; + goto finish; + } + + fd = np1_flow_dealloc(flow_id, timeo_sec); + if (fd < 0) { + log_warn("Could not deallocate flow_id %d.", flow_id); + ret_msg->result = -1; + goto finish; + } + + ret_msg->result = ipcpi.ops->ipcp_flow_dealloc(fd); + finish: + log_info("Finished deallocating flow %d: %d.", + flow_id, ret_msg->result); +} + static void * mainloop(void * o) { int sfd; buffer_t buffer; - struct ipcp_config conf; - struct layer_info info; - ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; (void) o; while (true) { - ipcp_msg_t ret_msg = IPCP_MSG__INIT; - layer_info_msg_t layer_info = LAYER_INFO_MSG__INIT; - int fd = -1; - struct cmd * cmd; - qosspec_t qs; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; + qosspec_t qs; + struct cmd * cmd; + buffer_t data; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -307,327 +680,68 @@ static void * mainloop(void * o) pthread_cleanup_push(__cleanup_close_ptr, &sfd); pthread_cleanup_push(free_msg, msg); + ret_msg.has_result = true; + switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_bootstrap == NULL) { - log_err("Bootstrap unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - conf_msg = msg->conf; - conf.type = conf_msg->ipcp_type; - strcpy(conf.layer_info.layer_name, - conf_msg->layer_info->layer_name); - - switch(conf_msg->ipcp_type) { - case IPCP_LOCAL: - break; - case IPCP_UNICAST: - conf.addr_size = conf_msg->addr_size; - conf.eid_size = conf_msg->eid_size; - conf.max_ttl = conf_msg->max_ttl; - conf.addr_auth_type = conf_msg->addr_auth_type; - conf.routing_type = conf_msg->routing_type; - conf.cong_avoid = conf_msg->cong_avoid; - break; - case IPCP_ETH_DIX: - conf.ethertype = conf_msg->ethertype; - /* FALLTHRU */ - case IPCP_ETH_LLC: - conf.dev = conf_msg->dev; - break; - case IPCP_UDP: - conf.ip_addr = conf_msg->ip_addr; - conf.dns_addr = conf_msg->dns_addr; - conf.port = conf_msg->port; - conf.layer_info.dir_hash_algo = HASH_MD5; - layer_info.dir_hash_algo = HASH_MD5; - break; - case IPCP_BROADCAST: - conf.layer_info.dir_hash_algo = HASH_SHA3_256; - layer_info.dir_hash_algo = HASH_SHA3_256; - break; - default: - log_err("Unknown IPCP type: %d.", - conf_msg->ipcp_type); - ret_msg.result = -EIPCP; - goto exit; /* break from outer switch/case */ - } - - /* UDP and broadcast use fixed hash algorithm. */ - if (conf_msg->ipcp_type != IPCP_UDP && - conf_msg->ipcp_type != IPCP_BROADCAST) { - switch(conf_msg->layer_info->dir_hash_algo) { - case DIR_HASH_SHA3_224: - conf.layer_info.dir_hash_algo = - HASH_SHA3_224; - break; - case DIR_HASH_SHA3_256: - conf.layer_info.dir_hash_algo = - HASH_SHA3_256; - break; - case DIR_HASH_SHA3_384: - conf.layer_info.dir_hash_algo = - HASH_SHA3_384; - break; - case DIR_HASH_SHA3_512: - conf.layer_info.dir_hash_algo = - HASH_SHA3_512; - break; - default: - assert(false); - } - - layer_info.dir_hash_algo = - conf.layer_info.dir_hash_algo; - } - - ret_msg.result = ipcpi.ops->ipcp_bootstrap(&conf); - if (ret_msg.result == 0) { - ret_msg.layer_info = &layer_info; - layer_info.layer_name = - conf.layer_info.layer_name; - } + do_bootstrap(msg->conf, &ret_msg); break; case IPCP_MSG_CODE__IPCP_ENROLL: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_enroll == NULL) { - log_err("Enroll unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_INIT) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dst, - &info); - if (ret_msg.result == 0) { - ret_msg.layer_info = &layer_info; - layer_info.dir_hash_algo = info.dir_hash_algo; - layer_info.layer_name = info.layer_name; - } + do_enroll(msg->dst, &ret_msg); break; case IPCP_MSG_CODE__IPCP_CONNECT: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_connect == NULL) { - log_err("Connect unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - qs = msg_to_spec(msg->qosspec); - ret_msg.result = ipcpi.ops->ipcp_connect(msg->dst, - msg->comp, - qs); + qs = qos_spec_msg_to_s(msg->qosspec); + do_connect(msg->dst, msg->comp, qs, &ret_msg); break; case IPCP_MSG_CODE__IPCP_DISCONNECT: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_disconnect == NULL) { - log_err("Disconnect unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - ret_msg.result = ipcpi.ops->ipcp_disconnect(msg->dst, - msg->comp); + do_disconnect(msg->dst, msg->comp, &ret_msg); break; case IPCP_MSG_CODE__IPCP_REG: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_reg == NULL) { - log_err("Registration unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - ret_msg.result = - ipcpi.ops->ipcp_reg(msg->hash.data); + do_reg(msg->hash.data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_UNREG: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_unreg == NULL) { - log_err("Unregistration unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - ret_msg.result = - ipcpi.ops->ipcp_unreg(msg->hash.data); + do_unreg(msg->hash.data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_QUERY: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_query == NULL) { - log_err("Directory query unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - ret_msg.result = - ipcpi.ops->ipcp_query(msg->hash.data); + do_query(msg->hash.data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_flow_alloc == NULL) { - log_err("Flow allocation unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); assert(msg->pk.len > 0 ? msg->pk.data != NULL : msg->pk.data == NULL); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - fd = np1_flow_alloc(msg->pid, - msg->flow_id); - if (fd < 0) { - log_err("Failed allocating fd on flow_id %d.", - msg->flow_id); - ret_msg.result = -1; - break; - } - - qs = msg_to_spec(msg->qosspec); - ret_msg.result = - ipcpi.ops->ipcp_flow_alloc(fd, - msg->hash.data, - qs, - msg->pk.data, - msg->pk.len); + data.len = msg->pk.len; + data.data = msg->pk.data; + qs = qos_spec_msg_to_s(msg->qosspec); + do_flow_alloc(msg->pid, msg->flow_id, + msg->hash.data, qs, + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_JOIN: - ret_msg.has_result = true; - - if (ipcpi.ops->ipcp_flow_join == NULL) { - log_err("Broadcast unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - assert(msg->hash.len == ipcp_dir_hash_len()); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - fd = np1_flow_alloc(msg->pid, - msg->flow_id); - if (fd < 0) { - log_err("Failed allocating fd on flow_id %d.", - msg->flow_id); - ret_msg.result = -1; - break; - } - - qs = msg_to_spec(msg->qosspec); - ret_msg.result = - ipcpi.ops->ipcp_flow_join(fd, - msg->hash.data, - qs); + qs = qos_spec_msg_to_s(msg->qosspec); + do_flow_join(msg->pid, msg->flow_id, + msg->hash.data, qs, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: - ret_msg.has_result = true; - if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { - log_err("Flow_alloc_resp unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - if (!msg->response) { - fd = np1_flow_resp(msg->flow_id); - if (fd < 0) { - log_warn("Port_id %d is not known.", - msg->flow_id); - ret_msg.result = -1; - break; - } - } - assert(msg->pk.len > 0 ? msg->pk.data != NULL - : msg->pk.data == NULL); - - ret_msg.result = - ipcpi.ops->ipcp_flow_alloc_resp(fd, - msg->response, - msg->pk.data, - msg->pk.len); + : msg->pk.data == NULL); + data.len = msg->pk.len; + data.data = msg->pk.data; + do_flow_alloc_resp(msg->response, msg->flow_id, + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: - ret_msg.has_result = true; - if (ipcpi.ops->ipcp_flow_dealloc == NULL) { - log_err("Flow deallocation unsupported."); - ret_msg.result = -ENOTSUP; - break; - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_err("IPCP in wrong state."); - ret_msg.result = -EIPCPSTATE; - break; - } - - fd = np1_flow_dealloc(msg->flow_id, msg->timeo_sec); - if (fd < 0) { - log_warn("Could not deallocate flow_id %d.", - msg->flow_id); - ret_msg.result = -1; - break; - } - - ret_msg.result = - ipcpi.ops->ipcp_flow_dealloc(fd); + do_flow_dealloc(msg->flow_id, msg->timeo_sec, &ret_msg); break; default: - ret_msg.has_result = true; - ret_msg.result = -1; - log_err("Don't know that message code"); + ret_msg.result = -1; + log_err("Unknown message code: %d.", msg->code); break; } - exit: + pthread_cleanup_pop(true); pthread_cleanup_pop(false); @@ -649,12 +763,16 @@ static void * mainloop(void * o) ipcp_msg__pack(&ret_msg, buffer.data); + if (ret_msg.layer_info != NULL) + layer_info_msg__free_unpacked(ret_msg.layer_info, NULL); + pthread_cleanup_push(__cleanup_close_ptr, &sfd); + pthread_cleanup_push(free, buffer.data) if (write(sfd, buffer.data, buffer.len) == -1) log_warn("Failed to send reply message"); - free(buffer.data); + pthread_cleanup_pop(true); pthread_cleanup_pop(true); tpm_inc(ipcpi.tpm); @@ -764,14 +882,32 @@ int ipcp_init(int argc, goto fail_rib_init; } + if (rib_reg(IPCP_INFO, &r_ops)) { + log_err("Failed to register rib."); + goto fail_rib_reg; + } + + ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, + mainloop, NULL); + if (ipcpi.tpm == NULL) { + log_err("Failed to create threadpool manager."); + goto fail_tpm_create; + } + list_head_init(&ipcpi.cmds); ipcpi.alloc_id = -1; pthread_condattr_destroy(&cattr); + ipcp_set_state(IPCP_INIT); + return 0; + fail_tpm_create: + rib_unreg(IPCP_INFO); + fail_rib_reg: + rib_fini(); fail_rib_init: pthread_cond_destroy(&ipcpi.cmd_cond); fail_cmd_cond: @@ -794,50 +930,55 @@ int ipcp_init(int argc, return ret; } -int ipcp_boot() +int ipcp_start(void) { - sigset_t sigset; + sigset_t sigset; + struct ipcp_info info; + sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGQUIT); sigaddset(&sigset, SIGHUP); sigaddset(&sigset, SIGPIPE); - ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, - mainloop, NULL); - if (ipcpi.tpm == NULL) - goto fail_tpm_create; - pthread_sigmask(SIG_BLOCK, &sigset, NULL); + info.pid = getpid(); + info.type = ipcpi.type; + strcpy(info.name, ipcpi.name); + info.state = IPCP_OPERATIONAL; + if (tpm_start(ipcpi.tpm)) goto fail_tpm_start; - ipcp_set_state(IPCP_INIT); - - if (rib_reg(IPCP_INFO, &r_ops)) - goto fail_rib_reg; - if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) { log_err("Failed to create acceptor thread."); - ipcp_set_state(IPCP_NULL); goto fail_acceptor; } - return 0; + info.state = IPCP_OPERATIONAL; + + if (ipcp_create_r(&info)) { + log_err("Failed to notify IRMd we are initialized."); + goto fail_create_r; + } + return 0; + fail_create_r: + pthread_cancel(ipcpi.acceptor); + pthread_join(ipcpi.acceptor, NULL); fail_acceptor: - rib_unreg(IPCP_INFO); - fail_rib_reg: tpm_stop(ipcpi.tpm); fail_tpm_start: tpm_destroy(ipcpi.tpm); - fail_tpm_create: + ipcp_set_state(IPCP_NULL); + info.state = IPCP_NULL; + ipcp_create_r(&info); return -1; } -void ipcp_shutdown() +void ipcp_sigwait(void) { siginfo_t info; @@ -891,19 +1032,25 @@ void ipcp_shutdown() continue; } } +} - pthread_cancel(ipcpi.acceptor); +void ipcp_stop(void) +{ + log_info("IPCP %d shutting down.", getpid()); + pthread_cancel(ipcpi.acceptor); pthread_join(ipcpi.acceptor, NULL); - tpm_stop(ipcpi.tpm); - tpm_destroy(ipcpi.tpm); - log_info("IPCP %d shutting down.", getpid()); + tpm_stop(ipcpi.tpm); } -void ipcp_fini() +void ipcp_fini(void) { + tpm_destroy(ipcpi.tpm); + + rib_unreg(IPCP_INFO); + rib_fini(); close(ipcpi.sockfd); @@ -934,7 +1081,7 @@ void ipcp_set_state(enum ipcp_state state) pthread_mutex_unlock(&ipcpi.state_mtx); } -enum ipcp_state ipcp_get_state() +enum ipcp_state ipcp_get_state(void) { enum ipcp_state state; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 79b7b7c0..aab490c7 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process structure * @@ -26,20 +26,14 @@ #include <ouroboros/hash.h> #include <ouroboros/ipcp.h> #include <ouroboros/list.h> +#include <ouroboros/protobuf.h> +#include <ouroboros/qos.h> #include <ouroboros/sockets.h> #include <ouroboros/tpm.h> #include <pthread.h> #include <time.h> -enum ipcp_state { - IPCP_NULL = 0, - IPCP_INIT, - /* Layer name must be set for states below. */ - IPCP_OPERATIONAL, - IPCP_SHUTDOWN -}; - struct ipcp_ops { int (* ipcp_bootstrap)(const struct ipcp_config * conf); @@ -59,20 +53,18 @@ struct ipcp_ops { int (* ipcp_query)(const uint8_t * hash); - int (* ipcp_flow_alloc)(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len); + int (* ipcp_flow_alloc)(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); int (* ipcp_flow_join)(int fd, const uint8_t * dst, qosspec_t qs); - int (* ipcp_flow_alloc_resp)(int fd, - int response, - const void * data, - size_t len); + int (* ipcp_flow_alloc_resp)(int fd, + int response, + const buffer_t * data); int (* ipcp_flow_dealloc)(int fd); }; @@ -85,7 +77,7 @@ extern struct ipcp { char * name; enum ipcp_type type; - char * layer_name; + char layer_name[LAYER_NAME_SIZE + 1]; uint64_t dt_addr; @@ -95,9 +87,8 @@ extern struct ipcp { int irmd_fd; enum ipcp_state state; - pthread_rwlock_t state_lock; - pthread_mutex_t state_mtx; pthread_cond_t state_cond; + pthread_mutex_t state_mtx; int sockfd; char * sock_path; @@ -120,9 +111,11 @@ int ipcp_init(int argc, struct ipcp_ops * ops, enum ipcp_type type); -int ipcp_boot(void); +int ipcp_start(void); -void ipcp_shutdown(void); +void ipcp_sigwait(void); + +void ipcp_stop(void); void ipcp_fini(void); @@ -133,6 +126,14 @@ enum ipcp_state ipcp_get_state(void); int ipcp_parse_arg(int argc, char * argv[]); +/* Helper functions to handle races during flow allocation */ +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data); + +int ipcp_wait_flow_resp(const int fd); + /* Helper functions for directory entries, could be moved */ uint8_t * ipcp_hash_dup(const uint8_t * hash); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 2b20ae15..160e07e0 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Local IPC process * @@ -48,8 +48,7 @@ #include <sys/wait.h> #include <assert.h> -#define THIS_TYPE IPCP_LOCAL -#define ALLOC_TIMEOUT 10 /* ms */ +#define THIS_TYPE IPCP_LOCAL struct ipcp ipcpi; @@ -84,6 +83,7 @@ static int local_data_init(void) if (pthread_rwlock_init(&local_data.lock, NULL) < 0) goto fail_rwlock_init; + return 0; fail_rwlock_init: @@ -97,13 +97,13 @@ static int local_data_init(void) } static void local_data_fini(void){ + pthread_rwlock_destroy(&local_data.lock); shim_data_destroy(local_data.shim_data); - fset_destroy(local_data.flows); fqueue_destroy(local_data.fq); - pthread_rwlock_destroy(&local_data.lock); + fset_destroy(local_data.flows); } -static void * ipcp_local_packet_loop(void * o) +static void * local_ipcp_packet_loop(void * o) { (void) o; @@ -139,54 +139,45 @@ static void * ipcp_local_packet_loop(void * o) return (void *) 0; } -static int ipcp_local_bootstrap(const struct ipcp_config * conf) +static int local_ipcp_bootstrap(const struct ipcp_config * conf) { assert(conf); assert(conf->type == THIS_TYPE); - ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; - ipcpi.layer_name = strdup(conf->layer_info.layer_name); - if (ipcpi.layer_name == NULL) { - log_err("Failed to set layer name"); - return -ENOMEM; - } - - ipcp_set_state(IPCP_OPERATIONAL); + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; + strcpy(ipcpi.layer_name,conf->layer_info.name); if (pthread_create(&local_data.packet_loop, NULL, - ipcp_local_packet_loop, NULL)) { + local_ipcp_packet_loop, NULL)) { + log_err("Failed to create pthread: %s", strerror(errno)); ipcp_set_state(IPCP_INIT); return -1; } - log_info("Bootstrapped local IPCP with pid %d.", getpid()); - return 0; } -static int ipcp_local_reg(const uint8_t * hash) +static int local_ipcp_reg(const uint8_t * hash) { if (shim_data_reg_add_entry(local_data.shim_data, hash)) { - log_dbg("Failed to add " HASH_FMT " to local registry.", - HASH_VAL(hash)); + log_err("Failed to add " HASH_FMT32 " to local registry.", + HASH_VAL32(hash)); return -1; } - log_info("Registered " HASH_FMT ".", HASH_VAL(hash)); - return 0; } -static int ipcp_local_unreg(const uint8_t * hash) +static int local_ipcp_unreg(const uint8_t * hash) { shim_data_reg_del_entry(local_data.shim_data, hash); - log_info("Unregistered " HASH_FMT ".", HASH_VAL(hash)); + log_info("Unregistered " HASH_FMT32 ".", HASH_VAL32(hash)); return 0; } -static int ipcp_local_query(const uint8_t * hash) +static int local_ipcp_query(const uint8_t * hash) { int ret; @@ -195,43 +186,19 @@ static int ipcp_local_query(const uint8_t * hash) return ret; } -static int ipcp_local_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int local_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; - struct timespec abstime; - int out_fd = -1; - time_t mpl = IPCP_LOCAL_MPL; + int out_fd = -1; - log_dbg("Allocating flow to " HASH_FMT " on fd %d.", HASH_VAL(dst), fd); + log_dbg("Allocating flow to " HASH_FMT32 " on fd %d.", + HASH_VAL32(dst), fd); assert(dst); - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - assert(ipcpi.alloc_id == -1); - - out_fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, - data, len); + out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data); if (out_fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); log_dbg("Flow allocation failed: %d", out_fd); return -1; } @@ -243,11 +210,6 @@ static int ipcp_local_flow_alloc(int fd, pthread_rwlock_unlock(&local_data.lock); - ipcpi.alloc_id = out_fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - fset_add(local_data.flows, fd); log_info("Pending local allocation request on fd %d.", fd); @@ -255,40 +217,21 @@ static int ipcp_local_flow_alloc(int fd, return 0; } -static int ipcp_local_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int local_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, ALLOC_TIMEOUT * MILLION}; - struct timespec abstime; - int out_fd = -1; - time_t mpl = IPCP_LOCAL_MPL; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); + struct timespec wait = TIMESPEC_INIT_MS(1); + time_t mpl = IPCP_LOCAL_MPL; + int out_fd; - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed waiting for IRMd response."); return -1; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - pthread_rwlock_wrlock(&local_data.lock); - - if (response) { + if (response < 0) { + pthread_rwlock_wrlock(&local_data.lock); if (local_data.in_out[fd] != -1) local_data.in_out[local_data.in_out[fd]] = fd; local_data.in_out[fd] = -1; @@ -296,25 +239,38 @@ static int ipcp_local_flow_alloc_resp(int fd, return 0; } + pthread_rwlock_rdlock(&local_data.lock); + out_fd = local_data.in_out[fd]; if (out_fd == -1) { pthread_rwlock_unlock(&local_data.lock); - return -1; + log_dbg("Potential race detected"); + nanosleep(&wait, NULL); + pthread_rwlock_rdlock(&local_data.lock); + out_fd = local_data.in_out[fd]; } pthread_rwlock_unlock(&local_data.lock); + if (out_fd == -1) { + log_err("Invalid out_fd."); + return -1; + } + fset_add(local_data.flows, fd); - if (ipcp_flow_alloc_reply(out_fd, response, mpl, data, len) < 0) + if (ipcp_flow_alloc_reply(out_fd, response, mpl, data) < 0) { + log_err("Failed to reply to allocation"); + fset_del(local_data.flows, fd); return -1; + } log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd); return 0; } -static int ipcp_local_flow_dealloc(int fd) +static int local_ipcp_flow_dealloc(int fd) { assert(!(fd < 0)); @@ -328,7 +284,7 @@ static int ipcp_local_flow_dealloc(int fd) pthread_rwlock_unlock(&local_data.lock); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); log_info("Flow with fd %d deallocated.", fd); @@ -336,60 +292,54 @@ static int ipcp_local_flow_dealloc(int fd) } static struct ipcp_ops local_ops = { - .ipcp_bootstrap = ipcp_local_bootstrap, + .ipcp_bootstrap = local_ipcp_bootstrap, .ipcp_enroll = NULL, .ipcp_connect = NULL, .ipcp_disconnect = NULL, - .ipcp_reg = ipcp_local_reg, - .ipcp_unreg = ipcp_local_unreg, - .ipcp_query = ipcp_local_query, - .ipcp_flow_alloc = ipcp_local_flow_alloc, + .ipcp_reg = local_ipcp_reg, + .ipcp_unreg = local_ipcp_unreg, + .ipcp_query = local_ipcp_query, + .ipcp_flow_alloc = local_ipcp_flow_alloc, .ipcp_flow_join = NULL, - .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, - .ipcp_flow_dealloc = ipcp_local_flow_dealloc + .ipcp_flow_alloc_resp = local_ipcp_flow_alloc_resp, + .ipcp_flow_dealloc = local_ipcp_flow_dealloc }; int main(int argc, char * argv[]) { - if (ipcp_init(argc, argv, &local_ops, THIS_TYPE) < 0) - goto fail_init; - if (local_data_init() < 0) { log_err("Failed to init local data."); goto fail_data_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - goto fail_boot; - } + if (ipcp_init(argc, argv, &local_ops, THIS_TYPE) < 0) + goto fail_init; - if (ipcp_create_r(0)) { - log_err("Failed to notify IRMd we are initialized."); - goto fail_create_r; + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { pthread_cancel(local_data.packet_loop); pthread_join(local_data.packet_loop, NULL); } - local_data_fini(); + ipcp_stop(); ipcp_fini(); - exit(EXIT_SUCCESS); - fail_create_r: - ipcp_set_state(IPCP_NULL); - ipcp_shutdown(); - fail_boot: local_data_fini(); - fail_data_init: + + exit(EXIT_SUCCESS); + + fail_start: ipcp_fini(); fail_init: - ipcp_create_r(-1); + local_data_fini(); + fail_data_init: exit(EXIT_FAILURE); } diff --git a/src/ipcpd/shim-data.c b/src/ipcpd/shim-data.c index a499b2d4..1fac63ac 100644 --- a/src/ipcpd/shim-data.c +++ b/src/ipcpd/shim-data.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process utilities * @@ -30,18 +30,18 @@ #define OUROBOROS_PREFIX "shim-data" -#include <ouroboros/endian.h> -#include <ouroboros/logs.h> -#include <ouroboros/list.h> -#include <ouroboros/time_utils.h> #include <ouroboros/errno.h> +#include <ouroboros/hash.h> +#include <ouroboros/list.h> +#include <ouroboros/logs.h> +#include <ouroboros/time.h> #include "shim-data.h" #include "ipcp.h" -#include <string.h> -#include <stdlib.h> #include <assert.h> +#include <stdlib.h> +#include <string.h> struct reg_entry { struct list_head list; @@ -139,7 +139,7 @@ static void dir_entry_destroy(struct dir_entry * entry) free(entry); } -struct shim_data * shim_data_create() +struct shim_data * shim_data_create(void) { struct shim_data * sd; @@ -297,8 +297,8 @@ int shim_data_reg_add_entry(struct shim_data * data, if (find_reg_entry_by_hash(data, hash)) { pthread_rwlock_unlock(&data->reg_lock); - log_dbg(HASH_FMT " was already in the directory.", - HASH_VAL(hash)); + log_dbg(HASH_FMT32 " was already in the directory.", + HASH_VAL32(hash)); return 0; } @@ -446,9 +446,9 @@ uint64_t shim_data_dir_get_addr(struct shim_data * data, pthread_rwlock_rdlock(&data->dir_lock); entry = find_dir_entry_any(data, hash); - if (entry == NULL) { pthread_rwlock_unlock(&data->dir_lock); + log_warn("No address for " HASH_FMT32 ".", HASH_VAL32(hash)); return 0; /* undefined behaviour, 0 may be a valid address */ } diff --git a/src/ipcpd/shim-data.h b/src/ipcpd/shim-data.h index f9848c0c..372b4ea7 100644 --- a/src/ipcpd/shim-data.h +++ b/src/ipcpd/shim-data.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Utitilies for building IPC processes * diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 601efa5c..2e8d84ce 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * IPC process over UDP * @@ -31,6 +31,7 @@ #define OUROBOROS_PREFIX "ipcpd/udp" #include <ouroboros/bitmap.h> +#include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> @@ -65,7 +66,6 @@ #define IPCP_UDP_BUF_SIZE 8980 #define IPCP_UDP_MSG_SIZE 8980 #define DNS_TTL 86400 -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ #define SADDR ((struct sockaddr *) &udp_data.s_saddr) #define SADDR_SIZE (sizeof(udp_data.s_saddr)) @@ -140,12 +140,18 @@ struct { static int udp_data_init(void) { - int i; + int i; + pthread_condattr_t cattr; if (pthread_rwlock_init(&udp_data.flows_lock, NULL)) goto fail_rwlock_init; - if (pthread_cond_init(&udp_data.mgmt_cond, NULL)) + if (pthread_condattr_init(&cattr)) + goto fail_condattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&udp_data.mgmt_cond, &cattr)) goto fail_mgmt_cond; if (pthread_mutex_init(&udp_data.mgmt_lock, NULL)) @@ -162,9 +168,12 @@ static int udp_data_init(void) if (udp_data.shim_data == NULL) goto fail_data; + pthread_condattr_destroy(&cattr); + list_head_init(&udp_data.mgmt_frames); return 0; + fail_data: fset_destroy(udp_data.np1_flows); fail_fset: @@ -172,6 +181,8 @@ static int udp_data_init(void) fail_mgmt_lock: pthread_cond_destroy(&udp_data.mgmt_cond); fail_mgmt_cond: + pthread_condattr_destroy(&cattr); + fail_condattr: pthread_rwlock_destroy(&udp_data.flows_lock); fail_rwlock_init: return -1; @@ -188,22 +199,21 @@ static void udp_data_fini(void) pthread_mutex_destroy(&udp_data.mgmt_lock); } -static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr, +static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr, uint32_t s_eid, const uint8_t * dst, qosspec_t qs, - const void * data, - size_t dlen) + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; size_t len; - assert(dlen > 0 ? data != NULL : data == NULL); + assert(data->len > 0 ? data->data != NULL : data->data == NULL); len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + dlen); + buf = malloc(len + data->len); if (buf == NULL) return -1; @@ -222,10 +232,10 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(buf + len, data, dlen); + if (data->len > 0) + memcpy(buf + len, data->data, data->len); - if (sendto(udp_data.s_fd, msg, len + dlen, + if (sendto(udp_data.s_fd, msg, len + data->len, SENDTO_FLAGS, (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0) { free(buf); @@ -237,16 +247,15 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr, return 0; } -static int ipcp_udp_port_alloc_resp(const struct sockaddr_in * r_saddr, +static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr, uint32_t s_eid, uint32_t d_eid, int8_t response, - const void * data, - size_t len) + const buffer_t * data) { struct mgmt_msg * msg; - msg = malloc(sizeof(*msg) + len); + msg = malloc(sizeof(*msg) + data->len); if (msg == NULL) return -1; @@ -256,10 +265,10 @@ static int ipcp_udp_port_alloc_resp(const struct sockaddr_in * r_saddr, msg->d_eid = hton32(d_eid); msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); - if (sendto(udp_data.s_fd, msg, sizeof(*msg) + len, + if (sendto(udp_data.s_fd, msg, sizeof(*msg) + data->len, SENDTO_FLAGS, (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0 ) { free(msg); @@ -271,38 +280,16 @@ static int ipcp_udp_port_alloc_resp(const struct sockaddr_in * r_saddr, return 0; } -static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, +static int udp_ipcp_port_req(struct sockaddr_in * c_saddr, int d_eid, const uint8_t * dst, qosspec_t qs, - const void * data, - size_t len) + const buffer_t * data) { - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - struct timespec abstime; - int fd; - time_t mpl = IPCP_UDP_MPL; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + int fd; - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - /* reply to IRM */ - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data); if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Could not get new flow from IRMd."); return -1; } @@ -314,23 +301,17 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, pthread_rwlock_unlock(&udp_data.flows_lock); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Pending allocation request, fd %d, remote eid %d.", fd, d_eid); return 0; } -static int ipcp_udp_port_alloc_reply(const struct sockaddr_in * saddr, +static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr, uint32_t s_eid, uint32_t d_eid, int8_t response, - const void * data, - size_t len) + const buffer_t * data) { time_t mpl = IPCP_UDP_MPL; @@ -338,8 +319,8 @@ static int ipcp_udp_port_alloc_reply(const struct sockaddr_in * saddr, if (memcmp(&udp_data.fd_to_uf[s_eid].r_saddr, saddr, sizeof(*saddr))) { pthread_rwlock_unlock(&udp_data.flows_lock); - log_warn("Flow allocation reply for %u from wrong source.", - s_eid); + log_err("Flow allocation reply for %u from wrong source.", + s_eid); return -1; } @@ -348,8 +329,8 @@ static int ipcp_udp_port_alloc_reply(const struct sockaddr_in * saddr, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_flow_alloc_reply(s_eid, response, mpl, data, len) < 0) { - log_dbg("Failed to reply to flow allocation."); + if (ipcp_flow_alloc_reply(s_eid, response, mpl, data) < 0) { + log_err("Failed to reply to flow allocation."); return -1; } @@ -359,13 +340,14 @@ static int ipcp_udp_port_alloc_reply(const struct sockaddr_in * saddr, return 0; } -static int ipcp_udp_mgmt_frame(const uint8_t * buf, +static int udp_ipcp_mgmt_frame(const uint8_t * buf, size_t len, struct sockaddr_in c_saddr) { struct mgmt_msg * msg; size_t msg_len; qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; @@ -375,6 +357,10 @@ static int ipcp_udp_mgmt_frame(const uint8_t * buf, assert(len >= msg_len); + data.len = len - msg_len; + data.data = (uint8_t *) buf + msg_len; + + qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); qs.availability = msg->availability; @@ -385,26 +371,27 @@ static int ipcp_udp_mgmt_frame(const uint8_t * buf, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); - return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid), + return udp_ipcp_port_req(&c_saddr, ntoh32(msg->s_eid), (uint8_t *) (msg + 1), qs, - buf + msg_len, - len - msg_len); + &data); case FLOW_REPLY: assert(len >= sizeof(*msg)); - return ipcp_udp_port_alloc_reply(&c_saddr, + data.len = len - sizeof(*msg); + data.data = (uint8_t *) buf + sizeof(*msg); + + return udp_ipcp_port_alloc_reply(&c_saddr, ntoh32(msg->s_eid), ntoh32(msg->d_eid), msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + &data); default: log_err("Unknown message received %d.", msg->code); return -1; } } -static void * ipcp_udp_mgmt_handler(void * o) +static void * udp_ipcp_mgmt_handler(void * o) { (void) o; @@ -426,7 +413,7 @@ static void * ipcp_udp_mgmt_handler(void * o) pthread_mutex_unlock(&udp_data.mgmt_lock); - ipcp_udp_mgmt_frame(frame->buf, frame->len, frame->r_saddr); + udp_ipcp_mgmt_frame(frame->buf, frame->len, frame->r_saddr); free(frame); } @@ -436,7 +423,7 @@ static void * ipcp_udp_mgmt_handler(void * o) return (void *) 0; } -static void * ipcp_udp_packet_reader(void * o) +static void * udp_ipcp_packet_reader(void * o) { uint8_t buf[IPCP_UDP_MAX_PACKET_SIZE]; uint8_t * data; @@ -446,6 +433,8 @@ static void * ipcp_udp_packet_reader(void * o) (void) o; + ipcp_lock_to_core(); + data = buf + sizeof(uint32_t); eid_p = (uint32_t *) buf; @@ -506,7 +495,7 @@ static void * ipcp_udp_packet_reader(void * o) ipcp_sdb_release(sdb); } - return 0; + return (void *) 0; } static void cleanup_fqueue(void * fq) @@ -519,7 +508,7 @@ static void cleanup_sdb(void * sdb) ipcp_sdb_release((struct shm_du_buff *) sdb); } -static void * ipcp_udp_packet_writer(void * o) +static void * udp_ipcp_packet_writer(void * o) { fqueue_t * fq; @@ -591,37 +580,36 @@ static void * ipcp_udp_packet_writer(void * o) return (void *) 1; } -static int ipcp_udp_bootstrap(const struct ipcp_config * conf) +static const char * inet4_ntop(const void * addr, + char * buf) +{ + return inet_ntop(AF_INET, addr, buf, INET_ADDRSTRLEN); +} + +static int udp_ipcp_bootstrap(const struct ipcp_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - char portstr[128]; /* port is max 64535 = 5 chars */ int i = 1; assert(conf); assert(conf->type == THIS_TYPE); - ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; - ipcpi.layer_name = strdup(conf->layer_info.layer_name); - if (ipcpi.layer_name == NULL) { - log_err("Failed to set layer name"); - return -ENOMEM; - } + ipcpi.dir_hash_algo = HASH_MD5; + strcpy(ipcpi.layer_name, conf->layer_info.name); - if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN) - == NULL) { - log_err("Failed to convert IP address"); + if (inet4_ntop(&conf->udp.ip_addr, ipstr) == NULL) { + log_err("Failed to convert IP address."); return -1; } - if (conf->dns_addr != 0) { - if (inet_ntop(AF_INET, &conf->dns_addr, dnsstr, INET_ADDRSTRLEN) - == NULL) { - log_err("Failed to convert DNS address"); + if (conf->udp.dns_addr != 0) { + if (inet4_ntop(&conf->udp.dns_addr, dnsstr) == NULL) { + log_err("Failed to convert DNS address."); return -1; } #ifndef HAVE_DDNS - log_warn("DNS disabled at compile time, address ignored"); + log_warn("DNS disabled at compile time, address ignored."); #endif } else { strcpy(dnsstr, "not set"); @@ -636,46 +624,46 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) memset((char *) &udp_data.s_saddr, 0, sizeof(udp_data.s_saddr)); udp_data.s_saddr.sin_family = AF_INET; - udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr; - udp_data.s_saddr.sin_port = htons(conf->port); + udp_data.s_saddr.sin_addr.s_addr = conf->udp.ip_addr; + udp_data.s_saddr.sin_port = htons(conf->udp.port); if (bind(udp_data.s_fd, SADDR, SADDR_SIZE) < 0) { - log_err("Couldn't bind to %s.", ipstr); + log_err("Couldn't bind to %s:%d. %s.", + ipstr, conf->udp.port, strerror(errno)); goto fail_bind; } - udp_data.dns_addr = conf->dns_addr; - - ipcp_set_state(IPCP_OPERATIONAL); + udp_data.dns_addr = conf->udp.dns_addr; if (pthread_create(&udp_data.mgmt_handler, NULL, - ipcp_udp_mgmt_handler, NULL)) { - ipcp_set_state(IPCP_INIT); + udp_ipcp_mgmt_handler, NULL)) { + log_err("Failed to create management thread."); goto fail_bind; } for (i = 0; i < IPCP_UDP_RD_THR; ++i) { if (pthread_create(&udp_data.packet_reader[i], NULL, - ipcp_udp_packet_reader, NULL)) { - ipcp_set_state(IPCP_INIT); + udp_ipcp_packet_reader, NULL)) { + log_err("Failed to create reader thread."); goto fail_packet_reader; } } for (i = 0; i < IPCP_UDP_WR_THR; ++i) { if (pthread_create(&udp_data.packet_writer[i], NULL, - ipcp_udp_packet_writer, NULL)) { - ipcp_set_state(IPCP_INIT); + udp_ipcp_packet_writer, NULL)) { + log_err("Failed to create writer thread."); goto fail_packet_writer; } } - sprintf(portstr, "%d", conf->port); - log_dbg("Bootstrapped IPCP over UDP with pid %d.", getpid()); log_dbg("Bound to IP address %s.", ipstr); - log_dbg("Using port %u.", conf->port); - log_dbg("DNS server address is %s.", dnsstr); + log_dbg("Using port %u.", conf->udp.port); + if (conf->udp.dns_addr != 0) + log_dbg("DNS server address is %s.", dnsstr); + else + log_dbg("DNS server not in use."); return 0; @@ -703,20 +691,22 @@ static int ipcp_udp_bootstrap(const struct ipcp_config * conf) /* NOTE: Disgusted with this crap */ static int ddns_send(char * cmd) { - pid_t pid = -1; + pid_t pid; int wstatus; int pipe_fd[2]; char * argv[] = {NSUPDATE_EXEC, 0}; char * envp[] = {0}; if (pipe(pipe_fd)) { - log_err("Failed to create pipe."); + log_err("Failed to create pipe: %s.", strerror(errno)); return -1; } pid = fork(); if (pid == -1) { - log_err("Failed to fork."); + log_err("Failed to fork: %s.", strerror(errno)); + close(pipe_fd[0]); + close(pipe_fd[1]); return -1; } @@ -724,12 +714,15 @@ static int ddns_send(char * cmd) close(pipe_fd[1]); dup2(pipe_fd[0], 0); execve(argv[0], &argv[0], envp); + log_err("Failed to execute: %s", strerror(errno)); + exit(1); } close(pipe_fd[0]); if (write(pipe_fd[1], cmd, strlen(cmd)) == -1) { - log_err("Failed to communicate with nsupdate."); + log_err("Failed to communicate with nsupdate: %s.", + strerror(errno)); close(pipe_fd[1]); return -1; } @@ -759,18 +752,20 @@ static uint32_t ddns_resolve(char * name, char * addr_str = "Address:"; uint32_t ip_addr = 0; - if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) + if (inet4_ntop(&dns_addr, dnsstr) == NULL) return 0; if (pipe(pipe_fd)) { - log_err("Failed to create pipe."); + log_err("Failed to create pipe: %s.", strerror(errno)); return 0; } pid = fork(); if (pid == -1) { - log_err("Failed to fork."); - return 0; + log_err("Failed to fork: %s.", strerror(errno)); + close(pipe_fd[0]); + close(pipe_fd[1]); + return -1; } if (pid == 0) { @@ -780,6 +775,8 @@ static uint32_t ddns_resolve(char * name, close(pipe_fd[0]); dup2(pipe_fd[1], 1); execve(argv[0], &argv[0], envp); + log_err("Failed to execute: %s", strerror(errno)); + exit(1); } close(pipe_fd[1]); @@ -821,7 +818,7 @@ static uint32_t ddns_resolve(char * name, } #endif -static int ipcp_udp_reg(const uint8_t * hash) +static int udp_ipcp_reg(const uint8_t * hash) { #ifdef HAVE_DDNS char ipstr[INET_ADDRSTRLEN]; @@ -833,16 +830,18 @@ static int ipcp_udp_reg(const uint8_t * hash) char * hashstr; hashstr = malloc(ipcp_dir_hash_strlen() + 1); - if (hashstr == NULL) + if (hashstr == NULL) { + log_err("Failed to malloc hashstr."); return -1; + } assert(hash); ipcp_hash_str(hashstr, hash); if (shim_data_reg_add_entry(udp_data.shim_data, hash)) { - log_err("Failed to add " HASH_FMT " to local registry.", - HASH_VAL(hash)); + log_err("Failed to add " HASH_FMT32 " to local registry.", + HASH_VAL32(hash)); free(hashstr); return -1; } @@ -855,14 +854,14 @@ static int ipcp_udp_reg(const uint8_t * hash) if (dns_addr != 0) { ip_addr = udp_data.s_saddr.sin_addr.s_addr; - if (inet_ntop(AF_INET, &ip_addr, - ipstr, INET_ADDRSTRLEN) == NULL) { + if (inet4_ntop(&ip_addr, ipstr) == NULL) { + log_err("Failed to convert IP address to string."); free(hashstr); return -1; } - if (inet_ntop(AF_INET, &dns_addr, - dnsstr, INET_ADDRSTRLEN) == NULL) { + if (inet4_ntop(&dns_addr, dnsstr) == NULL) { + log_err("Failed to convert DNS address to string."); free(hashstr); return -1; } @@ -871,20 +870,19 @@ static int ipcp_udp_reg(const uint8_t * hash) dnsstr, hashstr, DNS_TTL, ipstr); if (ddns_send(cmd)) { + log_err("Failed to send DDNS message."); shim_data_reg_del_entry(udp_data.shim_data, hash); free(hashstr); return -1; } } #endif - log_dbg("Registered " HASH_FMT ".", HASH_VAL(hash)); - free(hashstr); return 0; } -static int ipcp_udp_unreg(const uint8_t * hash) +static int udp_ipcp_unreg(const uint8_t * hash) { #ifdef HAVE_DDNS char dnsstr[INET_ADDRSTRLEN]; @@ -897,8 +895,10 @@ static int ipcp_udp_unreg(const uint8_t * hash) assert(hash); hashstr = malloc(ipcp_dir_hash_strlen() + 1); - if (hashstr == NULL) + if (hashstr == NULL) { + log_err("Failed to malloc hashstr."); return -1; + } ipcp_hash_str(hashstr, hash); @@ -908,8 +908,8 @@ static int ipcp_udp_unreg(const uint8_t * hash) dns_addr = udp_data.dns_addr; if (dns_addr != 0) { - if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) - == NULL) { + if (inet4_ntop(&dns_addr, dnsstr) == NULL) { + log_err("Failed to convert DNS address to string."); free(hashstr); return -1; } @@ -922,14 +922,12 @@ static int ipcp_udp_unreg(const uint8_t * hash) shim_data_reg_del_entry(udp_data.shim_data, hash); - log_dbg("Unregistered " HASH_FMT ".", HASH_VAL(hash)); - free(hashstr); return 0; } -static int ipcp_udp_query(const uint8_t * hash) +static int udp_ipcp_query(const uint8_t * hash) { uint32_t ip_addr = 0; char * hashstr; @@ -940,8 +938,10 @@ static int ipcp_udp_query(const uint8_t * hash) assert(hash); hashstr = malloc(ipcp_dir_hash_strlen() + 1); - if (hashstr == NULL) + if (hashstr == NULL) { + log_err("Failed to malloc hashstr."); return -ENOMEM; + } ipcp_hash_str(hashstr, hash); @@ -956,7 +956,7 @@ static int ipcp_udp_query(const uint8_t * hash) if (dns_addr != 0) { ip_addr = ddns_resolve(hashstr, dns_addr); if (ip_addr == 0) { - log_dbg("Could not resolve %s.", hashstr); + log_err("Could not resolve %s.", hashstr); free(hashstr); return -1; } @@ -964,7 +964,7 @@ static int ipcp_udp_query(const uint8_t * hash) #endif h = gethostbyname(hashstr); if (h == NULL) { - log_dbg("Could not resolve %s.", hashstr); + log_err("Could not resolve %s.", hashstr); free(hashstr); return -1; } @@ -985,38 +985,40 @@ static int ipcp_udp_query(const uint8_t * hash) return 0; } -static int ipcp_udp_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int udp_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct sockaddr_in r_saddr; /* Server address */ uint32_t ip_addr = 0; char ipstr[INET_ADDRSTRLEN]; - log_dbg("Allocating flow to " HASH_FMT ".", HASH_VAL(dst)); - (void) qs; assert(dst); if (!shim_data_dir_has(udp_data.shim_data, dst)) { - log_dbg("Could not resolve destination."); + log_err("Could not resolve destination."); return -1; } ip_addr = (uint32_t) shim_data_dir_get_addr(udp_data.shim_data, dst); - inet_ntop(AF_INET, &ip_addr, ipstr, INET_ADDRSTRLEN); - log_dbg("Destination UDP ipcp resolved at %s.", ipstr); + if (inet4_ntop(&ip_addr, ipstr) == NULL) { + log_err("Could not convert IP address."); + return -1; + } + + log_dbg("Destination " HASH_FMT32 " resolved at IP %s.", + HASH_VAL32(dst), ipstr); memset((char *) &r_saddr, 0, sizeof(r_saddr)); r_saddr.sin_family = AF_INET; r_saddr.sin_addr.s_addr = ip_addr; r_saddr.sin_port = udp_data.s_saddr.sin_port; - if (ipcp_udp_port_alloc(&r_saddr, fd, dst, qs, data, len) < 0) { + if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data) < 0) { log_err("Could not allocate port."); return -1; } @@ -1030,45 +1032,21 @@ static int ipcp_udp_flow_alloc(int fd, fset_add(udp_data.np1_flows, fd); - log_dbg("Flow to %s pending on fd %d.", ipstr, fd); - return 0; } -static int ipcp_udp_flow_alloc_resp(int fd, - int resp, - const void * data, - size_t len) +static int udp_ipcp_flow_alloc_resp(int fd, + int resp, + const buffer_t * data) { - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - struct timespec abstime; struct sockaddr_in saddr; int d_eid; - if (resp) - return 0; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); return -1; } - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); saddr = udp_data.fd_to_uf[fd].r_saddr; @@ -1076,7 +1054,7 @@ static int ipcp_udp_flow_alloc_resp(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_udp_port_alloc_resp(&saddr, d_eid, fd, resp, data, len) < 0) { + if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data) < 0) { fset_del(udp_data.np1_flows, fd); log_err("Failed to respond to flow request."); return -1; @@ -1084,13 +1062,10 @@ static int ipcp_udp_flow_alloc_resp(int fd, fset_add(udp_data.np1_flows, fd); - log_dbg("Accepted flow, fd %d on eid %d.", - fd, d_eid); - return 0; } -static int ipcp_udp_flow_dealloc(int fd) +static int udp_ipcp_flow_dealloc(int fd) { ipcp_flow_fini(fd); @@ -1103,25 +1078,23 @@ static int ipcp_udp_flow_dealloc(int fd) pthread_rwlock_unlock(&udp_data.flows_lock); - flow_dealloc(fd); - - log_dbg("Flow with fd %d deallocated.", fd); + ipcp_flow_dealloc(fd); return 0; } static struct ipcp_ops udp_ops = { - .ipcp_bootstrap = ipcp_udp_bootstrap, + .ipcp_bootstrap = udp_ipcp_bootstrap, .ipcp_enroll = NULL, .ipcp_connect = NULL, .ipcp_disconnect = NULL, - .ipcp_reg = ipcp_udp_reg, - .ipcp_unreg = ipcp_udp_unreg, - .ipcp_query = ipcp_udp_query, - .ipcp_flow_alloc = ipcp_udp_flow_alloc, + .ipcp_reg = udp_ipcp_reg, + .ipcp_unreg = udp_ipcp_unreg, + .ipcp_query = udp_ipcp_query, + .ipcp_flow_alloc = udp_ipcp_flow_alloc, .ipcp_flow_join = NULL, - .ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp, - .ipcp_flow_dealloc = ipcp_udp_flow_dealloc + .ipcp_flow_alloc_resp = udp_ipcp_flow_alloc_resp, + .ipcp_flow_dealloc = udp_ipcp_flow_dealloc }; int main(int argc, @@ -1129,53 +1102,51 @@ int main(int argc, { int i; - if (ipcp_init(argc, argv, &udp_ops, THIS_TYPE) < 0) - goto fail_init; if (udp_data_init() < 0) { log_err("Failed to init udp data."); goto fail_data_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - goto fail_boot; + if (ipcp_init(argc, argv, &udp_ops, THIS_TYPE) < 0) { + log_err("Failed to initialize IPCP."); + goto fail_init; } - if (ipcp_create_r(0)) { - log_err("Failed to notify IRMd we are initialized."); - goto fail_create_r; + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { - for (i = 0; i < IPCP_UDP_RD_THR; ++i) - pthread_cancel(udp_data.packet_reader[i]); for (i = 0; i < IPCP_UDP_WR_THR; ++i) pthread_cancel(udp_data.packet_writer[i]); + for (i = 0; i < IPCP_UDP_RD_THR; ++i) + pthread_cancel(udp_data.packet_reader[i]); pthread_cancel(udp_data.mgmt_handler); - for (i = 0; i < IPCP_UDP_RD_THR; ++i) - pthread_join(udp_data.packet_reader[i], NULL); for (i = 0; i < IPCP_UDP_WR_THR; ++i) pthread_join(udp_data.packet_writer[i], NULL); + for (i = 0; i < IPCP_UDP_RD_THR; ++i) + pthread_join(udp_data.packet_reader[i], NULL); pthread_join(udp_data.mgmt_handler, NULL); + close(udp_data.s_fd); } - udp_data_fini(); + ipcp_stop(); ipcp_fini(); - exit(EXIT_SUCCESS); - fail_create_r: - ipcp_set_state(IPCP_NULL); - ipcp_shutdown(); - fail_boot: udp_data_fini(); - fail_data_init: + + exit(EXIT_SUCCESS); + + fail_start: ipcp_fini(); fail_init: - ipcp_create_r(-1); + udp_data_fini(); + fail_data_init: exit(EXIT_FAILURE); } diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt index e1fe1074..ca742871 100644 --- a/src/ipcpd/unicast/CMakeLists.txt +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -16,7 +16,7 @@ set(IPCP_UNICAST_TARGET ipcpd-unicast CACHE INTERNAL "") set(IPCP_UNICAST_MPL 60 CACHE STRING "Default maximum packet lifetime for the unicast IPCP, in seconds") -protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS dir/kademlia.proto) +protobuf_generate_c(DHT_PROTO_SRCS DHT_PROTO_HDRS dir/dht.proto) math(EXPR PFT_EXPR "1 << 12") set(PFT_SIZE ${PFT_EXPR} CACHE STRING @@ -38,7 +38,6 @@ set(SOURCE_FILES connmgr.c dir.c dt.c - enroll.c fa.c main.c pff.c @@ -57,8 +56,8 @@ set(SOURCE_FILES routing/graph.c ) -add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} - ${KAD_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) +add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} ${COMMON_SOURCES} + ${DHT_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) target_link_libraries(ipcpd-unicast LINK_PUBLIC ouroboros-dev) include(AddCompileFlags) diff --git a/src/ipcpd/unicast/addr-auth.c b/src/ipcpd/unicast/addr-auth.c index 89fef6c2..908a4aa1 100644 --- a/src/ipcpd/unicast/addr-auth.c +++ b/src/ipcpd/unicast/addr-auth.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Address authority * diff --git a/src/ipcpd/unicast/addr-auth.h b/src/ipcpd/unicast/addr-auth.h index e85973fb..e119dff3 100644 --- a/src/ipcpd/unicast/addr-auth.h +++ b/src/ipcpd/unicast/addr-auth.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Address authority * diff --git a/src/ipcpd/unicast/addr-auth/flat.c b/src/ipcpd/unicast/addr-auth/flat.c index b184e7fe..c4562935 100644 --- a/src/ipcpd/unicast/addr-auth/flat.c +++ b/src/ipcpd/unicast/addr-auth/flat.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Policy for flat addresses in a distributed way * @@ -69,8 +69,11 @@ uint64_t flat_address(void) { uint32_t addr = INVALID_ADDRESS; +#if defined (CONFIG_OUROBOROS_DEBUG) && defined (IPCP_DEBUG_LOCAL) + addr = getpid(); +#else while (addr == INVALID_ADDRESS) random_buffer(&addr,sizeof(addr)); - +#endif return addr; } diff --git a/src/ipcpd/unicast/addr-auth/flat.h b/src/ipcpd/unicast/addr-auth/flat.h index 13601677..d4b672c7 100644 --- a/src/ipcpd/unicast/addr-auth/flat.h +++ b/src/ipcpd/unicast/addr-auth/flat.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Policy for flat addresses in a distributed way * diff --git a/src/ipcpd/unicast/addr-auth/ops.h b/src/ipcpd/unicast/addr-auth/ops.h index 6b2e8020..06b24cec 100644 --- a/src/ipcpd/unicast/addr-auth/ops.h +++ b/src/ipcpd/unicast/addr-auth/ops.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Address authority policy ops * diff --git a/src/ipcpd/unicast/addr-auth/pol.h b/src/ipcpd/unicast/addr-auth/pol.h index c1cbf07c..844308c6 100644 --- a/src/ipcpd/unicast/addr-auth/pol.h +++ b/src/ipcpd/unicast/addr-auth/pol.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Address Authority policies * diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c index 849878be..287eaf41 100644 --- a/src/ipcpd/unicast/ca.c +++ b/src/ipcpd/unicast/ca.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Congestion Avoidance * diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h index 13c46336..ea803e17 100644 --- a/src/ipcpd/unicast/ca.h +++ b/src/ipcpd/unicast/ca.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Congestion avoidance * diff --git a/src/ipcpd/unicast/ca/mb-ecn.c b/src/ipcpd/unicast/ca/mb-ecn.c index c2d5acb4..d9a204b0 100644 --- a/src/ipcpd/unicast/ca/mb-ecn.c +++ b/src/ipcpd/unicast/ca/mb-ecn.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Multi-bit ECN Congestion Avoidance * @@ -29,7 +29,7 @@ #include "config.h" #include <ouroboros/ipcp-dev.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include "mb-ecn.h" @@ -187,7 +187,7 @@ ca_wnd_t mb_ecn_ctx_update_snd(void * _ctx, void mb_ecn_wnd_wait(ca_wnd_t wnd) { if (wnd.wait > 0) { - struct timespec s = {0, 0}; + struct timespec s = TIMESPEC_INIT_S(0); if (wnd.wait > BILLION) /* Don't care throttling < 1s */ s.tv_sec = 1; else diff --git a/src/ipcpd/unicast/ca/mb-ecn.h b/src/ipcpd/unicast/ca/mb-ecn.h index bc1baf97..9a2c8b49 100644 --- a/src/ipcpd/unicast/ca/mb-ecn.h +++ b/src/ipcpd/unicast/ca/mb-ecn.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Multi-bit ECN Congestion Avoidance * diff --git a/src/ipcpd/unicast/ca/nop.c b/src/ipcpd/unicast/ca/nop.c index 3bb577e9..617fc15b 100644 --- a/src/ipcpd/unicast/ca/nop.c +++ b/src/ipcpd/unicast/ca/nop.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Dummy Congestion Avoidance * diff --git a/src/ipcpd/unicast/ca/nop.h b/src/ipcpd/unicast/ca/nop.h index 8e08c089..248b198d 100644 --- a/src/ipcpd/unicast/ca/nop.h +++ b/src/ipcpd/unicast/ca/nop.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Dummy Congestion Avoidance * diff --git a/src/ipcpd/unicast/ca/ops.h b/src/ipcpd/unicast/ca/ops.h index 628a5296..3a7b7248 100644 --- a/src/ipcpd/unicast/ca/ops.h +++ b/src/ipcpd/unicast/ca/ops.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Congestion avoidance policy ops * diff --git a/src/ipcpd/unicast/ca/pol.h b/src/ipcpd/unicast/ca/pol.h index 119c1a5c..db0a1a11 100644 --- a/src/ipcpd/unicast/ca/pol.h +++ b/src/ipcpd/unicast/ca/pol.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Congestion avoidance policies * diff --git a/src/ipcpd/unicast/connmgr.c b/src/ipcpd/unicast/connmgr.c index 33e76c01..11c5d5b6 100644 --- a/src/ipcpd/unicast/connmgr.c +++ b/src/ipcpd/unicast/connmgr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Handles connections between components * diff --git a/src/ipcpd/unicast/dir.c b/src/ipcpd/unicast/dir.c index d998f26b..e0cb09fc 100644 --- a/src/ipcpd/unicast/dir.c +++ b/src/ipcpd/unicast/dir.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Directory Management * diff --git a/src/ipcpd/unicast/dir.h b/src/ipcpd/unicast/dir.h index 9d11b256..b261ea2c 100644 --- a/src/ipcpd/unicast/dir.h +++ b/src/ipcpd/unicast/dir.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Directory * diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index a8c9ff94..08a5a5a9 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Distributed Hash Table based on Kademlia * @@ -31,6 +31,7 @@ #define DHT "dht" #define OUROBOROS_PREFIX DHT +#include <ouroboros/endian.h> #include <ouroboros/hash.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/bitmap.h> @@ -39,7 +40,7 @@ #include <ouroboros/list.h> #include <ouroboros/notifier.h> #include <ouroboros/random.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include <ouroboros/tpm.h> #include <ouroboros/utils.h> #include <ouroboros/pthread.h> @@ -56,9 +57,9 @@ #include <inttypes.h> #include <limits.h> -#include "kademlia.pb-c.h" -typedef KadMsg kad_msg_t; -typedef KadContactMsg kad_contact_msg_t; +#include "dht.pb-c.h" +typedef DhtMsg dht_msg_t; +typedef DhtContactMsg dht_contact_msg_t; #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME @@ -353,7 +354,7 @@ static uint8_t * create_id(size_t len) } static void kad_req_create(struct dht * dht, - kad_msg_t * msg, + dht_msg_t * msg, uint64_t addr) { struct kad_req * req; @@ -448,7 +449,7 @@ static void kad_req_destroy(struct kad_req * req) return; case REQ_PENDING: req->state = REQ_DESTROY; - pthread_cond_signal(&req->cond); + pthread_cond_broadcast(&req->cond); break; case REQ_INIT: case REQ_DONE: @@ -471,12 +472,14 @@ static void kad_req_destroy(struct kad_req * req) static int kad_req_wait(struct kad_req * req, time_t t) { - struct timespec timeo = {t, 0}; + struct timespec timeo = TIMESPEC_INIT_S(0); struct timespec abs; int ret = 0; assert(req); + timeo.tv_sec = t; + clock_gettime(PTHREAD_COND_CLOCK, &abs); ts_add(&abs, &timeo, &abs); @@ -792,7 +795,7 @@ static void lookup_destroy(struct lookup * lu) static void lookup_update(struct dht * dht, struct lookup * lu, - kad_msg_t * msg) + dht_msg_t * msg) { struct list_head * p = NULL; struct list_head * h; @@ -994,7 +997,7 @@ static void cancel_lookup_wait(void * o) static enum lookup_state lookup_wait(struct lookup * lu) { - struct timespec timeo = {KAD_T_RESP, 0}; + struct timespec timeo = TIMESPEC_INIT_S(KAD_T_RESP); struct timespec abs; enum lookup_state state; int ret = 0; @@ -1026,7 +1029,7 @@ static enum lookup_state lookup_wait(struct lookup * lu) } static struct kad_req * dht_find_request(struct dht * dht, - kad_msg_t * msg) + dht_msg_t * msg) { struct list_head * p; @@ -1463,7 +1466,7 @@ static int dht_update_bucket(struct dht * dht, } static int send_msg(struct dht * dht, - kad_msg_t * msg, + dht_msg_t * msg, uint64_t addr) { #ifndef __DHT_TEST__ @@ -1496,7 +1499,7 @@ static int send_msg(struct dht * dht, pthread_rwlock_unlock(&dht->lock); #ifndef __DHT_TEST__ - len = kad_msg__get_packed_size(msg); + len = dht_msg__get_packed_size(msg); if (len == 0) goto fail_msg; @@ -1504,7 +1507,7 @@ static int send_msg(struct dht * dht, if (ipcp_sdb_reserve(&sdb, len)) goto fail_msg; - kad_msg__pack(msg, shm_du_buff_head(sdb)); + dht_msg__pack(msg, shm_du_buff_head(sdb)); if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) break; @@ -1551,7 +1554,7 @@ static struct dht_entry * dht_find_entry(struct dht * dht, } static int kad_add(struct dht * dht, - const kad_contact_msg_t * contacts, + const dht_contact_msg_t * contacts, ssize_t n, time_t exp) { @@ -1590,7 +1593,7 @@ static int kad_add(struct dht * dht, } static int wait_resp(struct dht * dht, - kad_msg_t * msg, + dht_msg_t * msg, time_t timeo) { struct kad_req * req; @@ -1617,9 +1620,9 @@ static int kad_store(struct dht * dht, uint64_t r_addr, time_t ttl) { - kad_msg_t msg = KAD_MSG__INIT; - kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; - kad_contact_msg_t * cmsgp[1]; + dht_msg_t msg = DHT_MSG__INIT; + dht_contact_msg_t cmsg = DHT_CONTACT_MSG__INIT; + dht_contact_msg_t * cmsgp[1]; cmsg.id.data = (uint8_t *) key; cmsg.addr = addr; @@ -1649,7 +1652,7 @@ static ssize_t kad_find(struct dht * dht, const uint64_t * addrs, enum kad_code code) { - kad_msg_t msg = KAD_MSG__INIT; + dht_msg_t msg = DHT_MSG__INIT; ssize_t sent = 0; assert(dht); @@ -1789,7 +1792,7 @@ static void kad_publish(struct dht * dht, while (n-- > 0) { if (addrs[n] == dht->addr) { - kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; + dht_contact_msg_t msg = DHT_CONTACT_MSG__INIT; msg.id.data = (uint8_t *) key; msg.id.len = dht->b; msg.addr = addr; @@ -1808,7 +1811,7 @@ static void kad_publish(struct dht * dht, static int kad_join(struct dht * dht, uint64_t addr) { - kad_msg_t msg = KAD_MSG__INIT; + dht_msg_t msg = DHT_MSG__INIT; msg.code = KAD_JOIN; @@ -1943,7 +1946,7 @@ static buffer_t dht_retrieve(struct dht * dht, static ssize_t dht_get_contacts(struct dht * dht, const uint8_t * key, - kad_contact_msg_t *** msgs) + dht_contact_msg_t *** msgs) { struct list_head l; struct list_head * p; @@ -1980,7 +1983,7 @@ static ssize_t dht_get_contacts(struct dht * dht, return 0; } - kad_contact_msg__init((*msgs)[i]); + dht_contact_msg__init((*msgs)[i]); (*msgs)[i]->id.data = c->id; (*msgs)[i]->id.len = dht->b; @@ -2117,7 +2120,7 @@ static void * work(void * o) static int kad_handle_join_resp(struct dht * dht, struct kad_req * req, - kad_msg_t * msg) + dht_msg_t * msg) { assert(dht); assert(req); @@ -2177,7 +2180,7 @@ static int kad_handle_join_resp(struct dht * dht, static int kad_handle_find_resp(struct dht * dht, struct kad_req * req, - kad_msg_t * msg) + dht_msg_t * msg) { struct lookup * lu; @@ -2201,7 +2204,7 @@ static int kad_handle_find_resp(struct dht * dht, } static void kad_handle_response(struct dht * dht, - kad_msg_t * msg) + dht_msg_t * msg) { struct kad_req * req; @@ -2439,9 +2442,9 @@ static void * dht_handle_packet(void * o) assert(dht); while (true) { - kad_msg_t * msg; - kad_contact_msg_t ** cmsgs; - kad_msg_t resp_msg = KAD_MSG__INIT; + dht_msg_t * msg; + dht_contact_msg_t ** cmsgs; + dht_msg_t resp_msg = DHT_MSG__INIT; uint64_t addr; buffer_t buf; size_t i; @@ -2463,7 +2466,7 @@ static void * dht_handle_packet(void * o) i = shm_du_buff_len(cmd->sdb); - msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); + msg = dht_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); #ifndef __DHT_TEST__ ipcp_sdb_release(cmd->sdb); #endif @@ -2475,7 +2478,7 @@ static void * dht_handle_packet(void * o) } if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); log_dbg("Got a request message when not running."); continue; } @@ -2488,13 +2491,13 @@ static void * dht_handle_packet(void * o) pthread_rwlock_unlock(&dht->lock); if (msg->has_key && msg->key.len != b) { - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); log_warn("Bad key in message."); continue; } if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); log_warn("Bad source ID in message of type %d.", msg->code); continue; @@ -2595,7 +2598,7 @@ static void * dht_handle_packet(void * o) log_warn("Failed to send response."); finish: - kad_msg__free_unpacked(msg, NULL); + dht_msg__free_unpacked(msg, NULL); if (resp_msg.n_addrs > 0) free(resp_msg.addrs); @@ -2606,7 +2609,7 @@ static void * dht_handle_packet(void * o) } for (i = 0; i < resp_msg.n_contacts; ++i) - kad_contact_msg__free_unpacked(resp_msg.contacts[i], + dht_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); free(resp_msg.contacts); @@ -2763,7 +2766,7 @@ static void handle_event(void * self, pthread_t thr; struct join_info * inf; struct conn * c = (struct conn *) o; - struct timespec slack = {0, DHT_ENROLL_SLACK * MILLION}; + struct timespec slack = TIMESPEC_INIT_MS(DHT_ENROLL_SLACK); /* Give the pff some time to update for the new link. */ nanosleep(&slack, NULL); diff --git a/src/ipcpd/unicast/dir/dht.h b/src/ipcpd/unicast/dir/dht.h index 34c30595..311c6b23 100644 --- a/src/ipcpd/unicast/dir/dht.h +++ b/src/ipcpd/unicast/dir/dht.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Distributed Hash Table based on Kademlia * diff --git a/src/ipcpd/unicast/dir/kademlia.proto b/src/ipcpd/unicast/dir/dht.proto index 25b5c303..4c5b06db 100644 --- a/src/ipcpd/unicast/dir/kademlia.proto +++ b/src/ipcpd/unicast/dir/dht.proto @@ -1,7 +1,7 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * - * KAD protocol + * DHT protocol, based on Kademlia * * Dimitri Staessens <dimitri@ouroboros.rocks> * Sander Vrijders <sander@ouroboros.rocks> @@ -22,19 +22,19 @@ syntax = "proto2"; -message kad_contact_msg { +message dht_contact_msg { required bytes id = 1; required uint64 addr = 2; -}; +} -message kad_msg { +message dht_msg { required uint32 code = 1; required uint32 cookie = 2; required uint64 s_addr = 3; optional bytes s_id = 4; optional bytes key = 5; repeated uint64 addrs = 6; - repeated kad_contact_msg contacts = 7; + repeated dht_contact_msg contacts = 7; // enrolment parameters optional uint32 alpha = 8; optional uint32 b = 9; @@ -42,4 +42,4 @@ message kad_msg { optional uint32 t_expire = 11; optional uint32 t_refresh = 12; optional uint32 t_replicate = 13; -};
\ No newline at end of file +} diff --git a/src/ipcpd/unicast/dir/ops.h b/src/ipcpd/unicast/dir/ops.h index 4c9a6885..6ff61ce6 100644 --- a/src/ipcpd/unicast/dir/ops.h +++ b/src/ipcpd/unicast/dir/ops.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Directory policy ops * diff --git a/src/ipcpd/unicast/dir/pol.h b/src/ipcpd/unicast/dir/pol.h index d31dcbe8..eae4b2e7 100644 --- a/src/ipcpd/unicast/dir/pol.h +++ b/src/ipcpd/unicast/dir/pol.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Directory policies * diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt index 482711d5..c850e41d 100644 --- a/src/ipcpd/unicast/dir/tests/CMakeLists.txt +++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt @@ -20,10 +20,9 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c dht_test.c ) -protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS ../kademlia.proto) - +protobuf_generate_c(DHT_PROTO_SRCS KAD_PROTO_HDRS ../dht.proto) add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} - ${KAD_PROTO_SRCS}) + ${DHT_PROTO_SRCS}) target_link_libraries(${PARENT_DIR}_test ouroboros-common) add_dependencies(check ${PARENT_DIR}_test) diff --git a/src/ipcpd/unicast/dir/tests/dht_test.c b/src/ipcpd/unicast/dir/tests/dht_test.c index c62d6624..bea2c3e7 100644 --- a/src/ipcpd/unicast/dir/tests/dht_test.c +++ b/src/ipcpd/unicast/dir/tests/dht_test.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Unit tests of the DHT * diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 815cc359..2bb5ed2f 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Data Transfer Component * @@ -399,6 +399,7 @@ static void handle_event(void * self, const void * o) { struct conn * c; + int fd; (void) self; @@ -406,19 +407,20 @@ static void handle_event(void * self, switch (event) { case NOTIFY_DT_CONN_ADD: + fd = c->flow_info.fd; #ifdef IPCP_FLOW_STATS - stat_used(c->flow_info.fd, c->conn_info.addr); + stat_used(fd, c->conn_info.addr); #endif - psched_add(dt.psched, c->flow_info.fd); - log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); + psched_add(dt.psched, fd); + log_dbg("Added fd %d to packet scheduler.", fd); break; case NOTIFY_DT_CONN_DEL: + fd = c->flow_info.fd; #ifdef IPCP_FLOW_STATS - stat_used(c->flow_info.fd, INVALID_ADDR); + stat_used(fd, INVALID_ADDR); #endif - psched_del(dt.psched, c->flow_info.fd); - log_dbg("Removed fd %d from " - "packet scheduler.", c->flow_info.fd); + psched_del(dt.psched, fd); + log_dbg("Removed fd %d from packet scheduler.", fd); break; default: break; @@ -563,10 +565,7 @@ static void * dt_conn_handle(void * o) return 0; } -int dt_init(enum pol_routing pr, - uint8_t addr_size, - uint8_t eid_size, - uint8_t max_ttl) +int dt_init(struct dt_config cfg) { int i; int j; @@ -582,14 +581,14 @@ int dt_init(enum pol_routing pr, info.pref_syntax = PROTO_FIXED; info.addr = ipcpi.dt_addr; - if (eid_size != 8) { /* only support 64 bits from now */ + if (cfg.eid_size != 8) { /* only support 64 bits from now */ log_warn("Invalid EID size. Only 64 bit is supported."); - eid_size = 8; + cfg.eid_size = 8; } - dt_pci_info.addr_size = addr_size; - dt_pci_info.eid_size = eid_size; - dt_pci_info.max_ttl = max_ttl; + dt_pci_info.addr_size = cfg.addr_size; + dt_pci_info.eid_size = cfg.eid_size; + dt_pci_info.max_ttl = cfg.max_ttl; dt_pci_info.qc_o = dt_pci_info.addr_size; dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN; @@ -597,17 +596,12 @@ int dt_init(enum pol_routing pr, dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN; dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size; - if (notifier_reg(handle_event, NULL)) { - log_err("Failed to register with notifier."); - goto fail_notifier_reg; - } - if (connmgr_comp_init(COMPID_DT, &info)) { log_err("Failed to register with connmgr."); goto fail_connmgr_comp_init; } - pp = routing_init(pr); + pp = routing_init(cfg.routing_type); if (pp < 0) { log_err("Failed to init routing."); goto fail_routing; @@ -645,6 +639,7 @@ int dt_init(enum pol_routing pr, for (i = 0; i < PROG_MAX_FLOWS; ++i) if (pthread_mutex_init(&dt.stat[i].lock, NULL)) { + log_err("Failed to init mutex for flow %d.", i); for (j = 0; j < i; ++j) pthread_mutex_destroy(&dt.stat[j].lock); goto fail_stat_lock; @@ -653,8 +648,10 @@ int dt_init(enum pol_routing pr, dt.n_flows = 0; #endif sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr); - if (rib_reg(dtstr, &r_ops)) + if (rib_reg(dtstr, &r_ops)) { + log_err("Failed to register RIB."); goto fail_rib_reg; + } return 0; @@ -678,8 +675,6 @@ int dt_init(enum pol_routing pr, fail_routing: connmgr_comp_fini(COMPID_DT); fail_connmgr_comp_init: - notifier_unreg(&handle_event); - fail_notifier_reg: return -1; } @@ -707,8 +702,6 @@ void dt_fini(void) routing_fini(); connmgr_comp_fini(COMPID_DT); - - notifier_unreg(&handle_event); } int dt_start(void) @@ -716,7 +709,12 @@ int dt_start(void) dt.psched = psched_create(packet_handler, ipcp_flow_read); if (dt.psched == NULL) { log_err("Failed to create N-1 packet scheduler."); - return -1; + goto fail_psched; + } + + if (notifier_reg(handle_event, NULL)) { + log_err("Failed to register with notifier."); + goto fail_notifier_reg; } if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { @@ -726,12 +724,21 @@ int dt_start(void) } return 0; + + fail_notifier_reg: + psched_destroy(dt.psched); + fail_psched: + return -1; + } void dt_stop(void) { pthread_cancel(dt.listener); pthread_join(dt.listener, NULL); + + notifier_unreg(&handle_event); + psched_destroy(dt.psched); } @@ -747,7 +754,7 @@ int dt_reg_comp(void * comp, eid = bmp_allocate(dt.res_fds); if (!bmp_is_id_valid(dt.res_fds, eid)) { - log_warn("Reserved EIDs depleted."); + log_err("Cannot allocate EID."); pthread_rwlock_unlock(&dt.lock); return -EBADF; } diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 58f8244e..7198a013 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Data Transfer component * @@ -31,11 +31,7 @@ #define DT_PROTO "dtp" #define INVALID_ADDR 0 -int dt_init(enum pol_routing pr, - uint8_t addr_size, - uint8_t eid_size, - uint8_t max_ttl -); +int dt_init(struct dt_config cfg); void dt_fini(void); diff --git a/src/ipcpd/unicast/enroll.c b/src/ipcpd/unicast/enroll.c deleted file mode 100644 index 500a3895..00000000 --- a/src/ipcpd/unicast/enroll.c +++ /dev/null @@ -1,3 +0,0 @@ -#define BUILD_IPCP_UNICAST - -#include "common/enroll.c" diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 7d5e6549..3631fd7b 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow allocator of the IPC Process * @@ -31,6 +31,7 @@ #define FA "flow-allocator" #define OUROBOROS_PREFIX FA +#include <ouroboros/endian.h> #include <ouroboros/logs.h> #include <ouroboros/fqueue.h> #include <ouroboros/errno.h> @@ -55,7 +56,7 @@ #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define TIMEOUT 10000 /* nanoseconds */ +#define TIMEOUT 10 * MILLION /* nanoseconds */ #define FLOW_REQ 0 #define FLOW_REPLY 1 @@ -358,7 +359,7 @@ static void packet_handler(int fd, if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); - log_warn("Failed to forward packet."); + log_dbg("Failed to forward packet."); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); ++flow->p_snd_f; @@ -455,7 +456,7 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg) len = shm_du_buff_len(cmd->sdb); if (len > MSGBUFSZ || len < sizeof(*msg)) { - log_warn("Invalid flow allocation message (len: %zd)\n", len); + log_warn("Invalid flow allocation message (len: %zd).", len); free(cmd); return 0; /* No valid message */ } @@ -469,81 +470,6 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg) return len; } -static int fa_wait_irmd_alloc(uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) -{ - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; - int fd; - time_t mpl = IPCP_UNICAST_MPL; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Won't allocate over non-operational IPCP."); - return -EIPCPSTATE; - } - - assert(ipcpi.alloc_id == -1); - - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Failed to get fd for flow."); - return -ENOTALLOC; - } - - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - return fd; -} - -static int fa_wait_irmd_alloc_resp(int fd) -{ - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - assert(ipcpi.alloc_id == fd); - - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - return 0; -} - static int fa_handle_flow_req(struct fa_msg * msg, size_t len) { @@ -551,8 +477,8 @@ static int fa_handle_flow_req(struct fa_msg * msg, int fd; qosspec_t qs; struct fa_flow * flow; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + uint8_t * dst; + buffer_t data; /* Piggbacked data on flow alloc request. */ msg_len = sizeof(*msg) + ipcp_dir_hash_len(); if (len < msg_len) { @@ -560,8 +486,9 @@ static int fa_handle_flow_req(struct fa_msg * msg, return -EPERM; } - data = (uint8_t *) msg + msg_len; - dlen = len - msg_len; + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); @@ -573,7 +500,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); - fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); if (fd < 0) return fd; @@ -597,20 +524,21 @@ static int fa_handle_flow_reply(struct fa_msg * msg, { int fd; struct fa_flow * flow; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + buffer_t data; /* Piggbacked data on flow alloc request. */ time_t mpl = IPCP_UNICAST_MPL; assert(len >= sizeof(*msg)); - data = (uint8_t *) msg + sizeof(*msg); - dlen = len - sizeof(*msg); + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); pthread_rwlock_wrlock(&fa.flows_lock); fd = eid_to_fd(ntoh64(msg->r_eid)); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow reply for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); return -ENOTALLOC; } @@ -625,8 +553,10 @@ static int fa_handle_flow_reply(struct fa_msg * msg, pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen)) + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { + log_err("Failed to reply for flow allocation on fd %d.", fd); return -EIRMD; + } return 0; } @@ -645,6 +575,8 @@ static int fa_handle_flow_update(struct fa_msg * msg, fd = eid_to_fd(ntoh64(msg->r_eid)); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow update for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); return -EPERM; } @@ -737,7 +669,7 @@ int fa_init(void) fail_mtx: pthread_rwlock_destroy(&fa.flows_lock); fail_rwlock: - log_err("Failed to initialize flow allocator."); + return -1; } @@ -793,7 +725,6 @@ int fa_start(void) fail_thread: psched_destroy(fa.psched); fail_psched: - log_err("Failed to start flow allocator."); return -1; } @@ -805,11 +736,10 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -825,7 +755,7 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_sdb_reserve(&sdb, len + data->len)) return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); @@ -847,10 +777,11 @@ int fa_alloc(int fd, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(shm_du_buff_head(sdb) + len, data, dlen); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); if (dt_write_packet(addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation request packet."); ipcp_sdb_release(sdb); return -1; } @@ -868,10 +799,9 @@ int fa_alloc(int fd, return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -880,44 +810,55 @@ int fa_alloc_resp(int fd, flow = &fa.flows[fd]; - if (fa_wait_irmd_alloc_resp(fd) < 0) - return -1; + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); + goto fail_alloc_resp; + } - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - fa_flow_fini(flow); - return -1; + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve sdb (%zu bytes).", + sizeof(*msg) + data->len); + goto fail_reserve; } msg = (struct fa_msg *) shm_du_buff_head(sdb); memset(msg, 0, sizeof(*msg)); - pthread_rwlock_wrlock(&fa.flows_lock); - msg->code = FLOW_REPLY; + msg->response = response; + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); + + pthread_rwlock_rdlock(&fa.flows_lock); + msg->r_eid = hton64(flow->r_eid); msg->s_eid = hton64(flow->s_eid); - msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + pthread_rwlock_unlock(&fa.flows_lock); + + if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation response packet."); + goto fail_packet; + } if (response < 0) { + pthread_rwlock_rdlock(&fa.flows_lock); fa_flow_fini(flow); - ipcp_sdb_release(sdb); + pthread_rwlock_unlock(&fa.flows_lock); } else { psched_add(fa.psched, fd); } - if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { - fa_flow_fini(flow); - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); - return -1; - } + return 0; + fail_packet: + ipcp_sdb_release(sdb); + fail_reserve: + pthread_rwlock_wrlock(&fa.flows_lock); + fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); - - return 0; + fail_alloc_resp: + return -1; } int fa_dealloc(int fd) @@ -933,7 +874,7 @@ int fa_dealloc(int fd) pthread_rwlock_unlock(&fa.flows_lock); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); return 0; } @@ -948,6 +889,7 @@ static int fa_update_remote(int fd, uint64_t r_addr; if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg)); return -1; } @@ -971,6 +913,7 @@ static int fa_update_remote(int fd, if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow update packet."); ipcp_sdb_release(sdb); return -1; } @@ -995,6 +938,7 @@ void fa_np1_rcv(uint64_t eid, fd = eid_to_fd(eid); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); ipcp_sdb_release(sdb); return; } @@ -1010,6 +954,7 @@ void fa_np1_rcv(uint64_t eid, pthread_rwlock_unlock(&fa.flows_lock); if (ipcp_flow_write(fd, sdb) < 0) { + log_dbg("Failed to write to flow %d.", fd); ipcp_sdb_release(sdb); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index 1d4200ec..1e716966 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow allocator of the IPC Process * @@ -34,16 +34,14 @@ int fa_start(void); void fa_stop(void); -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len); +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len); +int fa_alloc_resp(int fd, + int response, + const buffer_t * data); int fa_dealloc(int fd); diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c index b56c218c..e6cb2994 100644 --- a/src/ipcpd/unicast/main.c +++ b/src/ipcpd/unicast/main.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Unicast IPC Process * @@ -32,12 +32,12 @@ #define THIS_TYPE IPCP_UNICAST #include <ouroboros/errno.h> -#include <ouroboros/hash.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/logs.h> #include <ouroboros/notifier.h> +#include <ouroboros/random.h> #include <ouroboros/rib.h> -#include <ouroboros/time_utils.h> +#include <ouroboros/time.h> #include "common/connmgr.h" #include "common/enroll.h" @@ -59,18 +59,13 @@ struct ipcp ipcpi; static int initialize_components(const struct ipcp_config * conf) { - ipcpi.layer_name = strdup(conf->layer_info.layer_name); - if (ipcpi.layer_name == NULL) { - log_err("Failed to set layer name."); - goto fail_layer_name; - } - - ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; + strcpy(ipcpi.layer_name, conf->layer_info.name); + ipcpi.dir_hash_algo = (enum hash_algo) conf->layer_info.dir_hash_algo; assert(ipcp_dir_hash_len() != 0); - if (addr_auth_init(conf->addr_auth_type, - &conf->addr_size)) { + if (addr_auth_init(conf->unicast.addr_auth_type, + &conf->unicast.dt.addr_size)) { log_err("Failed to init address authority."); goto fail_addr_auth; } @@ -81,17 +76,14 @@ static int initialize_components(const struct ipcp_config * conf) goto fail_addr_auth; } - log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); + log_info("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); - if (ca_init(conf->cong_avoid)) { + if (ca_init(conf->unicast.cong_avoid)) { log_err("Failed to initialize congestion avoidance."); goto fail_ca; } - if (dt_init(conf->routing_type, - conf->addr_size, - conf->eid_size, - conf->max_ttl)) { + if (dt_init(conf->unicast.dt)) { log_err("Failed to initialize data transfer component."); goto fail_dt; } @@ -119,8 +111,6 @@ static int initialize_components(const struct ipcp_config * conf) fail_ca: addr_auth_fini(); fail_addr_auth: - free(ipcpi.layer_name); - fail_layer_name: return -1; } @@ -135,32 +125,26 @@ static void finalize_components(void) ca_fini(); addr_auth_fini(); - - free(ipcpi.layer_name); } static int start_components(void) { - assert(ipcp_get_state() == IPCP_INIT); - - ipcp_set_state(IPCP_OPERATIONAL); - - if (dt_start()) { + if (dt_start() < 0) { log_err("Failed to start data transfer."); goto fail_dt_start; } - if (fa_start()) { + if (fa_start() < 0) { log_err("Failed to start flow allocator."); goto fail_fa_start; } - if (enroll_start()) { + if (enroll_start() < 0) { log_err("Failed to start enrollment."); goto fail_enroll_start; } - if (connmgr_start()) { + if (connmgr_start() < 0) { log_err("Failed to start AP connection manager."); goto fail_connmgr_start; } @@ -180,9 +164,6 @@ static int start_components(void) static void stop_components(void) { - assert(ipcp_get_state() == IPCP_OPERATIONAL || - ipcp_get_state() == IPCP_SHUTDOWN); - connmgr_stop(); enroll_stop(); @@ -208,38 +189,46 @@ static int unicast_ipcp_enroll(const char * dst, struct layer_info * info) { struct conn conn; + uint8_t id[ENROLL_ID_LEN]; + + if (random_buffer(id, ENROLL_ID_LEN) < 0) { + log_err("Failed to generate enrollment ID."); + goto fail_id; + } - if (connmgr_alloc(COMPID_ENROLL, dst, NULL, &conn)) { - log_err("Failed to get connection."); - goto fail_er_flow; + log_info_id(id, "Requesting enrollment."); + + if (connmgr_alloc(COMPID_ENROLL, dst, NULL, &conn) < 0) { + log_err_id(id, "Failed to get connection."); + goto fail_id; } /* Get boot state from peer. */ - if (enroll_boot(&conn)) { - log_err("Failed to get boot information."); + if (enroll_boot(&conn, id) < 0) { + log_err_id(id, "Failed to get boot information."); goto fail_enroll_boot; } - if (initialize_components(enroll_get_conf())) { - log_err("Failed to initialize IPCP components."); + if (initialize_components(enroll_get_conf()) < 0) { + log_err_id(id, "Failed to initialize components."); goto fail_enroll_boot; } - if (start_components()) { - log_err("Failed to start components."); + if (start_components() < 0) { + log_err_id(id, "Failed to start components."); goto fail_start_comp; } - if (enroll_done(&conn, 0)) - log_warn("Failed to confirm enrollment with peer."); + if (enroll_ack(&conn, id, 0) < 0) + log_err_id(id, "Failed to confirm enrollment."); - if (connmgr_dealloc(COMPID_ENROLL, &conn)) - log_warn("Failed to deallocate enrollment flow."); + if (connmgr_dealloc(COMPID_ENROLL, &conn) < 0) + log_warn_id(id, "Failed to dealloc enrollment flow."); - log_info("Enrolled with %s.", dst); + log_info_id(id, "Enrolled with %s.", dst); - info->dir_hash_algo = ipcpi.dir_hash_algo; - strcpy(info->layer_name, ipcpi.layer_name); + info->dir_hash_algo = (enum pol_dir_hash) ipcpi.dir_hash_algo; + strcpy(info->name, ipcpi.layer_name); return 0; @@ -247,7 +236,7 @@ static int unicast_ipcp_enroll(const char * dst, finalize_components(); fail_enroll_boot: connmgr_dealloc(COMPID_ENROLL, &conn); - fail_er_flow: + fail_id: return -1; } @@ -258,23 +247,21 @@ static int unicast_ipcp_bootstrap(const struct ipcp_config * conf) enroll_bootstrap(conf); - if (initialize_components(conf)) { + if (initialize_components(conf) < 0) { log_err("Failed to init IPCP components."); goto fail_init; } - if (start_components()) { + if (start_components() < 0) { log_err("Failed to init IPCP components."); goto fail_start; } - if (bootstrap_components()) { + if (bootstrap_components() < 0) { log_err("Failed to bootstrap IPCP components."); goto fail_bootstrap; } - log_dbg("Bootstrapped in layer %s.", conf->layer_info.layer_name); - return 0; fail_bootstrap: @@ -312,39 +299,35 @@ int main(int argc, goto fail_init; } - if (notifier_init()) { + if (notifier_init() < 0) { log_err("Failed to initialize notifier component."); goto fail_notifier_init; } - if (connmgr_init()) { + if (connmgr_init() < 0) { log_err("Failed to initialize connection manager."); goto fail_connmgr_init; } - if (enroll_init()) { + if (enroll_init() < 0) { log_err("Failed to initialize enrollment component."); goto fail_enroll_init; } - if (ipcp_boot() < 0) { - log_err("Failed to boot IPCP."); - goto fail_boot; - } - - if (ipcp_create_r(0)) { - log_err("Failed to notify IRMd we are initialized."); - ipcp_set_state(IPCP_NULL); - goto fail_create_r; + if (ipcp_start() < 0) { + log_err("Failed to start IPCP."); + goto fail_start; } - ipcp_shutdown(); + ipcp_sigwait(); if (ipcp_get_state() == IPCP_SHUTDOWN) { stop_components(); finalize_components(); } + ipcp_stop(); + enroll_fini(); connmgr_fini(); @@ -355,17 +338,14 @@ int main(int argc, exit(EXIT_SUCCESS); - fail_create_r: - ipcp_shutdown(); - fail_boot: + fail_start: enroll_fini(); fail_enroll_init: connmgr_fini(); fail_connmgr_init: notifier_fini(); fail_notifier_init: - ipcp_fini(); + ipcp_fini(); fail_init: - ipcp_create_r(-1); exit(EXIT_FAILURE); } diff --git a/src/ipcpd/unicast/pff.c b/src/ipcpd/unicast/pff.c index 362b5177..9b2aa2b4 100644 --- a/src/ipcpd/unicast/pff.c +++ b/src/ipcpd/unicast/pff.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * PDU Forwarding Function * @@ -59,8 +59,10 @@ struct pff * pff_create(enum pol_pff pol) } pff->pff_i = pff->ops->create(); - if (pff->pff_i == NULL) + if (pff->pff_i == NULL) { + log_err("Failed to create PFF instance."); goto err; + } return pff; err: diff --git a/src/ipcpd/unicast/pff.h b/src/ipcpd/unicast/pff.h index b9fbf298..f44e5531 100644 --- a/src/ipcpd/unicast/pff.h +++ b/src/ipcpd/unicast/pff.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * PDU Forwarding Function * diff --git a/src/ipcpd/unicast/pff/alternate.c b/src/ipcpd/unicast/pff/alternate.c index d0148674..85e85914 100644 --- a/src/ipcpd/unicast/pff/alternate.c +++ b/src/ipcpd/unicast/pff/alternate.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Policy for PFF with alternate next hops * diff --git a/src/ipcpd/unicast/pff/alternate.h b/src/ipcpd/unicast/pff/alternate.h index 49ed89fd..96207e74 100644 --- a/src/ipcpd/unicast/pff/alternate.h +++ b/src/ipcpd/unicast/pff/alternate.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Policy for PFF with alternate next hops * diff --git a/src/ipcpd/unicast/pff/multipath.c b/src/ipcpd/unicast/pff/multipath.c index b20133ec..cbab0f5f 100644 --- a/src/ipcpd/unicast/pff/multipath.c +++ b/src/ipcpd/unicast/pff/multipath.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Policy for PFF supporting multipath routing * diff --git a/src/ipcpd/unicast/pff/multipath.h b/src/ipcpd/unicast/pff/multipath.h index 048ed761..0eb03476 100644 --- a/src/ipcpd/unicast/pff/multipath.h +++ b/src/ipcpd/unicast/pff/multipath.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Policy for PFF supporting multipath routing * diff --git a/src/ipcpd/unicast/pff/ops.h b/src/ipcpd/unicast/pff/ops.h index 1db227aa..16a31273 100644 --- a/src/ipcpd/unicast/pff/ops.h +++ b/src/ipcpd/unicast/pff/ops.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Pff policy ops * diff --git a/src/ipcpd/unicast/pff/pft.c b/src/ipcpd/unicast/pff/pft.c index 9c5d941d..8c436113 100644 --- a/src/ipcpd/unicast/pff/pft.c +++ b/src/ipcpd/unicast/pff/pft.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Packet forwarding table (PFT) with chaining on collisions * diff --git a/src/ipcpd/unicast/pff/pft.h b/src/ipcpd/unicast/pff/pft.h index 9093d73e..711dabcb 100644 --- a/src/ipcpd/unicast/pff/pft.h +++ b/src/ipcpd/unicast/pff/pft.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Packet forwarding table (PFT) with chaining on collisions * diff --git a/src/ipcpd/unicast/pff/pol.h b/src/ipcpd/unicast/pff/pol.h index 046505a0..245b03c4 100644 --- a/src/ipcpd/unicast/pff/pol.h +++ b/src/ipcpd/unicast/pff/pol.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * PDU Forwarding Function policies * diff --git a/src/ipcpd/unicast/pff/simple.c b/src/ipcpd/unicast/pff/simple.c index 36d657ce..5f95e3ce 100644 --- a/src/ipcpd/unicast/pff/simple.c +++ b/src/ipcpd/unicast/pff/simple.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Simple PDU Forwarding Function * diff --git a/src/ipcpd/unicast/pff/simple.h b/src/ipcpd/unicast/pff/simple.h index a7e4f799..0966a186 100644 --- a/src/ipcpd/unicast/pff/simple.h +++ b/src/ipcpd/unicast/pff/simple.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Simple policy for PFF * diff --git a/src/ipcpd/unicast/pff/tests/pft_test.c b/src/ipcpd/unicast/pff/tests/pft_test.c index 00442f36..18287fb8 100644 --- a/src/ipcpd/unicast/pff/tests/pft_test.c +++ b/src/ipcpd/unicast/pff/tests/pft_test.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Test of the hash table * diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c index 8e15945a..7e12148b 100644 --- a/src/ipcpd/unicast/psched.c +++ b/src/ipcpd/unicast/psched.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Packet scheduler component * diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h index da4aa980..831f8084 100644 --- a/src/ipcpd/unicast/psched.h +++ b/src/ipcpd/unicast/psched.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Packet scheduler component * diff --git a/src/ipcpd/unicast/routing.c b/src/ipcpd/unicast/routing.c index da228c2f..f5417c24 100644 --- a/src/ipcpd/unicast/routing.c +++ b/src/ipcpd/unicast/routing.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Routing component of the IPCP * diff --git a/src/ipcpd/unicast/routing.h b/src/ipcpd/unicast/routing.h index 717fbed9..d5d833ae 100644 --- a/src/ipcpd/unicast/routing.h +++ b/src/ipcpd/unicast/routing.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Routing component of the IPCP * diff --git a/src/ipcpd/unicast/routing/graph.c b/src/ipcpd/unicast/routing/graph.c index b04b99a7..32f3e6fb 100644 --- a/src/ipcpd/unicast/routing/graph.c +++ b/src/ipcpd/unicast/routing/graph.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Undirected graph structure * diff --git a/src/ipcpd/unicast/routing/graph.h b/src/ipcpd/unicast/routing/graph.h index aaf1d947..8190cc6c 100644 --- a/src/ipcpd/unicast/routing/graph.h +++ b/src/ipcpd/unicast/routing/graph.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Undirected graph structure * diff --git a/src/ipcpd/unicast/routing/link-state.c b/src/ipcpd/unicast/routing/link-state.c index 6d386e29..57c0c7cb 100644 --- a/src/ipcpd/unicast/routing/link-state.c +++ b/src/ipcpd/unicast/routing/link-state.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Link state routing policy * diff --git a/src/ipcpd/unicast/routing/link-state.h b/src/ipcpd/unicast/routing/link-state.h index dc531b03..d77d72df 100644 --- a/src/ipcpd/unicast/routing/link-state.h +++ b/src/ipcpd/unicast/routing/link-state.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Link state routing policy * diff --git a/src/ipcpd/unicast/routing/ops.h b/src/ipcpd/unicast/routing/ops.h index cf885566..8a79b7ec 100644 --- a/src/ipcpd/unicast/routing/ops.h +++ b/src/ipcpd/unicast/routing/ops.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Routing policy ops * diff --git a/src/ipcpd/unicast/routing/pol.h b/src/ipcpd/unicast/routing/pol.h index 35250d5f..b6a6f150 100644 --- a/src/ipcpd/unicast/routing/pol.h +++ b/src/ipcpd/unicast/routing/pol.h @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Routing policies * diff --git a/src/ipcpd/unicast/routing/tests/graph_test.c b/src/ipcpd/unicast/routing/tests/graph_test.c index aecd4262..d805640c 100644 --- a/src/ipcpd/unicast/routing/tests/graph_test.c +++ b/src/ipcpd/unicast/routing/tests/graph_test.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Test of the graph structure * |