diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2019-07-25 12:50:46 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2019-07-29 19:36:45 +0200 | 
| commit | dae15c284248d49079ad5f8a3d8ff30e217f419e (patch) | |
| tree | ea7942e940396c0c78304fef8b43fb25c5aebba8 /src/ipcpd/unicast | |
| parent | c9232acef855b51d1bc199a68c03c0695ac11192 (diff) | |
| download | ouroboros-dae15c284248d49079ad5f8a3d8ff30e217f419e.tar.gz ouroboros-dae15c284248d49079ad5f8a3d8ff30e217f419e.zip | |
build: Refactor normal to unicast
This completes the renaming of the normal IPCP to the unicast IPCP in
the sources, to get everything consistent with the documentation.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/unicast')
41 files changed, 10036 insertions, 0 deletions
| diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt new file mode 100644 index 00000000..c9344f89 --- /dev/null +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -0,0 +1,69 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR +  ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR +  ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +set(IPCP_UNICAST_TARGET ipcpd-unicast CACHE INTERNAL "") + +protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto) + +math(EXPR PFT_EXPR "1 << 12") +set(PFT_SIZE ${PFT_EXPR} CACHE STRING +  "Size of the PDU forwarding table") +if (HAVE_FUSE) +  set(IPCP_FLOW_STATS TRUE CACHE BOOL +    "Enable flow statistics tracking in IPCP") +    if (IPCP_FLOW_STATS) +       message(STATUS "IPCP flow statistics enabled") +    else () +       message(STATUS "IPCP flow statistics disabled") +    endif () +endif () + + +set(SOURCE_FILES +  # Add source files here +  addr_auth.c +  connmgr.c +  dht.c +  dir.c +  dt.c +  enroll.c +  fa.c +  main.c +  pff.c +  routing.c +  psched.c +  # Add policies last +  pol/alternate_pff.c +  pol/flat.c +  pol/link_state.c +  pol/graph.c +  pol/simple_pff.c +  ) + +add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} +  ${KAD_PROTO_SRCS} ${LAYER_CONFIG_PROTO_SRCS}) +target_link_libraries(ipcpd-unicast LINK_PUBLIC ouroboros-dev) + +include(AddCompileFlags) +if (CMAKE_BUILD_TYPE MATCHES "Debug*") +  add_compile_flags(ipcpd-unicast -DCONFIG_OUROBOROS_DEBUG) +endif () + +install(TARGETS ipcpd-unicast RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR}) + +add_subdirectory(pol/tests) + +if (NOT GNU) +  add_subdirectory(tests) +endif () diff --git a/src/ipcpd/unicast/addr_auth.c b/src/ipcpd/unicast/addr_auth.c new file mode 100644 index 00000000..50c56055 --- /dev/null +++ b/src/ipcpd/unicast/addr_auth.c @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Address authority + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define OUROBOROS_PREFIX "addr_auth" + +#include <ouroboros/logs.h> + +#include "addr_auth.h" +#include "pol-addr-auth-ops.h" +#include "pol/flat.h" + +#include <stdlib.h> + +struct pol_addr_auth_ops * ops; + +int addr_auth_init(enum pol_addr_auth type, +                   const void *       info) +{ +        switch (type) { +        case ADDR_AUTH_FLAT_RANDOM: +                ops = &flat_ops; +                break; +        default: +                log_err("Unknown address authority type."); +                return -1; +        } + +        return ops->init(info); +} + +uint64_t addr_auth_address(void) +{ +        return ops->address(); +} + +int addr_auth_fini(void) +{ +        return ops->fini(); +} diff --git a/src/ipcpd/unicast/addr_auth.h b/src/ipcpd/unicast/addr_auth.h new file mode 100644 index 00000000..cf1509ab --- /dev/null +++ b/src/ipcpd/unicast/addr_auth.h @@ -0,0 +1,37 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Address authority + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_H +#define OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_H + +#include <ouroboros/ipcp.h> + +#include <stdint.h> + +int      addr_auth_init(enum pol_addr_auth type, +                        const void *       info); + +int      addr_auth_fini(void); + +uint64_t addr_auth_address(void); + +#endif /* OUROBOROS_IPCPD_UNICAST_ADDR_AUTH_H */ diff --git a/src/ipcpd/unicast/comp.h b/src/ipcpd/unicast/comp.h new file mode 100644 index 00000000..b8294c3b --- /dev/null +++ b/src/ipcpd/unicast/comp.h @@ -0,0 +1,48 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Components for the unicast IPC process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_COMP_H +#define OUROBOROS_IPCPD_UNICAST_COMP_H + +#include <ouroboros/cacep.h> + +#include "dt.h" + +#define DST_MAX_STRLEN 64 + +enum comp_id { +        COMPID_DT = 0, +        COMPID_ENROLL, +        COMPID_MGMT, +        COMPID_MAX +}; + +struct conn { +        struct conn_info conn_info; +        struct { +                char      dst[DST_MAX_STRLEN + 1]; +                int       fd; +                qosspec_t qs; +        } flow_info; +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_COMP_H */ diff --git a/src/ipcpd/unicast/connmgr.c b/src/ipcpd/unicast/connmgr.c new file mode 100644 index 00000000..7c1280c3 --- /dev/null +++ b/src/ipcpd/unicast/connmgr.c @@ -0,0 +1,525 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Handles connections between components + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "connection-manager" + +#include <ouroboros/dev.h> +#include <ouroboros/cacep.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> +#include <ouroboros/logs.h> +#include <ouroboros/notifier.h> + +#include "comp.h" +#include "connmgr.h" +#include "dir.h" +#include "enroll.h" +#include "ipcp.h" + +#include <pthread.h> +#include <string.h> +#include <stdlib.h> +#include <assert.h> + +enum connmgr_state { +        CONNMGR_NULL = 0, +        CONNMGR_INIT, +        CONNMGR_RUNNING +}; + +struct conn_el { +        struct list_head next; +        struct conn      conn; +}; + +struct comp { +        struct conn_info info; + +        struct list_head conns; +        struct list_head pending; + +        pthread_cond_t   cond; +        pthread_mutex_t  lock; +}; + +struct { +        struct comp        comps[COMPID_MAX]; +        enum connmgr_state state; + +        pthread_t          acceptor; +} connmgr; + +static int get_id_by_name(const char * name) +{ +        enum comp_id i; + +        for (i = 0; i < COMPID_MAX; ++i) +                if (strcmp(name, connmgr.comps[i].info.comp_name) == 0) +                        return i; + +        return -1; +} + +static int get_conn_by_fd(int           fd, +                          enum comp_id  id, +                          struct conn * conn) +{ +        struct list_head * p; + +        pthread_mutex_lock(&connmgr.comps[id].lock); + +        list_for_each(p, &connmgr.comps[id].conns) { +                struct conn_el * c = +                        list_entry(p, struct conn_el, next); +                if (c->conn.flow_info.fd == fd) { +                        *conn = c->conn; +                        pthread_mutex_unlock(&connmgr.comps[id].lock); +                        return 0; +                } +        } + +        pthread_mutex_unlock(&connmgr.comps[id].lock); + +        return -1; +} + +static int add_comp_conn(enum comp_id       id, +                         int                fd, +                         qosspec_t          qs, +                         struct conn_info * rcv_info) +{ +        struct conn_el * el; + +        el = malloc(sizeof(*el)); +        if (el == NULL) { +                log_err("Not enough memory."); +                return -1; +        } + +        el->conn.conn_info    = *rcv_info; +        el->conn.flow_info.fd = fd; +        el->conn.flow_info.qs = qs; + +        pthread_mutex_lock(&connmgr.comps[id].lock); + +        list_add(&el->next, &connmgr.comps[id].pending); +        pthread_cond_signal(&connmgr.comps[id].cond); + +        pthread_mutex_unlock(&connmgr.comps[id].lock); + +        return 0; +} + +static void * flow_acceptor(void * o) +{ +        int               fd; +        qosspec_t         qs; +        struct conn_info  rcv_info; +        struct conn_info  fail_info; + +        (void) o; + +        memset(&fail_info, 0, sizeof(fail_info)); + +        while (true) { +                int id; + +                fd = flow_accept(&qs, NULL); +                if (fd < 0) { +                        if (fd != -EIRMD) +                                log_warn("Flow accept failed: %d", fd); +                        continue; +                } + +                if (cacep_rcv(fd, &rcv_info)) { +                        log_dbg("Error establishing application connection."); +                        flow_dealloc(fd); +                        continue; +                } + +                id = get_id_by_name(rcv_info.comp_name); +                if (id < 0) { +                        log_dbg("Connection request for unknown component %s.", +                                rcv_info.comp_name); +                        cacep_snd(fd, &fail_info); +                        flow_dealloc(fd); +                        continue; +                } + +                assert(id < COMPID_MAX); + +                if (cacep_snd(fd, &connmgr.comps[id].info)) { +                        log_dbg("Failed to respond to request."); +                        flow_dealloc(fd); +                        continue; +                } + +                if (add_comp_conn(id, fd, qs, &rcv_info)) { +                        log_dbg("Failed to add new connection."); +                        flow_dealloc(fd); +                        continue; +                } +        } + +        return (void *) 0; +} + +static void handle_event(void *       self, +                         int          event, +                         const void * o) +{ +        struct conn conn; + +        (void) self; + +        if (!(event == NOTIFY_DT_FLOW_UP || +              event == NOTIFY_DT_FLOW_DOWN || +              event == NOTIFY_DT_FLOW_DEALLOC)) +                return; + +        if (get_conn_by_fd(*((int *) o), COMPID_DT, &conn)) +                return; + +        switch (event) { +        case NOTIFY_DT_FLOW_UP: +                notifier_event(NOTIFY_DT_CONN_UP, &conn); +                break; +        case NOTIFY_DT_FLOW_DOWN: +                notifier_event(NOTIFY_DT_CONN_DOWN, &conn); +                break; +        case NOTIFY_DT_FLOW_DEALLOC: +                notifier_event(NOTIFY_DT_CONN_DEL, &conn); +                break; +        default: +                break; +        } +} + +int connmgr_init(void) +{ +        connmgr.state = CONNMGR_INIT; + +        if (notifier_reg(handle_event, NULL)) +                return -1; + +        return 0; +} + +void connmgr_fini(void) +{ +        int i; + +        notifier_unreg(handle_event); + +        if (connmgr.state == CONNMGR_RUNNING) +                pthread_join(connmgr.acceptor, NULL); + +        for (i = 0; i < COMPID_MAX; ++i) +                connmgr_comp_fini(i); +} + +int connmgr_start(void) +{ +        if (pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL)) +                return -1; + +        connmgr.state = CONNMGR_RUNNING; + +        return 0; +} + +void connmgr_stop(void) +{ +        if (connmgr.state == CONNMGR_RUNNING) +                pthread_cancel(connmgr.acceptor); +} + +int connmgr_comp_init(enum comp_id             id, +                      const struct conn_info * info) +{ +        struct comp * comp; + +        assert(id >= 0 && id < COMPID_MAX); + +        comp = connmgr.comps + id; + +        if (pthread_mutex_init(&comp->lock, NULL)) +                return -1; + +        if (pthread_cond_init(&comp->cond, NULL)) { +                pthread_mutex_destroy(&comp->lock); +                return -1; +        } + +        list_head_init(&comp->conns); +        list_head_init(&comp->pending); + +        memcpy(&connmgr.comps[id].info, info, sizeof(connmgr.comps[id].info)); + +        return 0; +} + +void connmgr_comp_fini(enum comp_id id) +{ +        struct list_head * p; +        struct list_head * h; +        struct comp *      comp; + +        assert(id >= 0 && id < COMPID_MAX); + +        if (strlen(connmgr.comps[id].info.comp_name) == 0) +                return; + +        comp = connmgr.comps + id; + +        pthread_mutex_lock(&comp->lock); + +        list_for_each_safe(p, h, &comp->conns) { +                struct conn_el * e = list_entry(p, struct conn_el, next); +                list_del(&e->next); +                free(e); +        } + +        list_for_each_safe(p, h, &comp->pending) { +                struct conn_el * e = list_entry(p, struct conn_el, next); +                list_del(&e->next); +                free(e); +        } + +        pthread_mutex_unlock(&comp->lock); + +        pthread_cond_destroy(&comp->cond); +        pthread_mutex_destroy(&comp->lock); + +        memset(&connmgr.comps[id].info, 0, sizeof(connmgr.comps[id].info)); +} + +int connmgr_ipcp_connect(const char * dst, +                         const char * component, +                         qosspec_t    qs) +{ +        struct conn_el * ce; +        int              id; + +        assert(dst); +        assert(component); + +        ce = malloc(sizeof(*ce)); +        if (ce == NULL) { +                log_dbg("Out of memory."); +                return -1; +        } + +        id = get_id_by_name(component); +        if (id < 0) { +                log_dbg("No such component: %s", component); +                free(ce); +                return -1; +        } + +        if (connmgr_alloc(id, dst, &qs, &ce->conn)) { +                free(ce); +                return -1; +        } + +        if (strlen(dst) > DST_MAX_STRLEN) { +                log_warn("Truncating dst length for connection."); +                memcpy(ce->conn.flow_info.dst, dst, DST_MAX_STRLEN); +                ce->conn.flow_info.dst[DST_MAX_STRLEN] = '\0'; +        } else { +                strcpy(ce->conn.flow_info.dst, dst); +        } + +        pthread_mutex_lock(&connmgr.comps[id].lock); + +        list_add(&ce->next, &connmgr.comps[id].conns); + +        pthread_mutex_unlock(&connmgr.comps[id].lock); + +        return 0; +} + +int connmgr_ipcp_disconnect(const char * dst, +                            const char * component) +{ +        struct list_head * p; +        struct list_head * h; +        int                id; + +        assert(dst); +        assert(component); + +        id = get_id_by_name(component); +        if (id < 0) +                return -1; + +        pthread_mutex_lock(&connmgr.comps[id].lock); + +        list_for_each_safe(p,h, &connmgr.comps[id].conns) { +                struct conn_el * el = list_entry(p, struct conn_el, next); +                if (strcmp(el->conn.flow_info.dst, dst) == 0) { +                        int ret; +                        pthread_mutex_unlock(&connmgr.comps[id].lock); +                        list_del(&el->next); +                        ret = connmgr_dealloc(id, &el->conn); +                        free(el); +                        return ret; +                } +        } + +        pthread_mutex_unlock(&connmgr.comps[id].lock); + +        return 0; +} + +int connmgr_alloc(enum comp_id  id, +                  const char *  dst, +                  qosspec_t *   qs, +                  struct conn * conn) +{ +        assert(id >= 0 && id < COMPID_MAX); +        assert(dst); + +        conn->flow_info.fd = flow_alloc(dst, qs, NULL); +        if (conn->flow_info.fd < 0) { +                log_dbg("Failed to allocate flow to %s.", dst); +                return -1; +        } + +        if (qs != NULL) +                conn->flow_info.qs = *qs; +        else +                memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs)); + +        log_dbg("Sending cacep info for protocol %s to fd %d.", +                connmgr.comps[id].info.protocol, conn->flow_info.fd); + +        if (cacep_snd(conn->flow_info.fd, &connmgr.comps[id].info)) { +                log_dbg("Failed to create application connection."); +                flow_dealloc(conn->flow_info.fd); +                return -1; +        } + +        if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) { +                log_dbg("Failed to connect to application."); +                flow_dealloc(conn->flow_info.fd); +                return -1; +        } + +        if (strcmp(connmgr.comps[id].info.protocol, conn->conn_info.protocol)) { +                log_dbg("Unknown protocol (requested %s, got %s).", +                        connmgr.comps[id].info.protocol, +                        conn->conn_info.protocol); +                flow_dealloc(conn->flow_info.fd); +                return -1; +        } + +        if (connmgr.comps[id].info.pref_version != +            conn->conn_info.pref_version) { +                log_dbg("Unknown protocol version."); +                flow_dealloc(conn->flow_info.fd); +                return -1; +        } + +        if (connmgr.comps[id].info.pref_syntax != conn->conn_info.pref_syntax) { +                log_dbg("Unknown protocol syntax."); +                flow_dealloc(conn->flow_info.fd); +                return -1; +        } + +        switch (id) { +        case COMPID_DT: +                notifier_event(NOTIFY_DT_CONN_ADD, conn); +#ifdef IPCP_CONN_WAIT_DIR +                dir_wait_running(); +#endif +                break; +        case COMPID_MGMT: +                notifier_event(NOTIFY_MGMT_CONN_ADD, conn); +                break; +        default: +                break; +        } + +        return 0; +} + +int connmgr_dealloc(enum comp_id  id, +                    struct conn * conn) +{ +        switch (id) { +        case COMPID_DT: +                notifier_event(NOTIFY_DT_CONN_DEL, conn); +                break; +        case COMPID_MGMT: +                notifier_event(NOTIFY_MGMT_CONN_DEL, conn); +                break; +        default: +                break; +        } + +        return flow_dealloc(conn->flow_info.fd); +} + + +int connmgr_wait(enum comp_id  id, +                 struct conn * conn) +{ +        struct conn_el * el; +        struct comp *    comp; + +        assert(id >= 0 && id < COMPID_MAX); +        assert(conn); + +        comp = connmgr.comps + id; + +        pthread_mutex_lock(&comp->lock); + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) &comp->lock); + +        while (list_is_empty(&comp->pending)) +                pthread_cond_wait(&comp->cond, &comp->lock); + +        pthread_cleanup_pop(false); + +        el = list_first_entry((&comp->pending), struct conn_el, next); +        if (el == NULL) { +                pthread_mutex_unlock(&comp->lock); +                return -1; +        } + +        *conn = el->conn; + +        list_del(&el->next); +        list_add(&el->next, &connmgr.comps[id].conns); + +        pthread_mutex_unlock(&comp->lock); + +        return 0; +} diff --git a/src/ipcpd/unicast/connmgr.h b/src/ipcpd/unicast/connmgr.h new file mode 100644 index 00000000..44a3fe39 --- /dev/null +++ b/src/ipcpd/unicast/connmgr.h @@ -0,0 +1,74 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Handles the different AP connections + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CONNMGR_H +#define OUROBOROS_IPCPD_UNICAST_CONNMGR_H + +#include <ouroboros/cacep.h> +#include <ouroboros/qos.h> + +#include "comp.h" + +#define NOTIFY_DT_CONN_ADD     0x00D0 +#define NOTIFY_DT_CONN_DEL     0x00D1 +#define NOTIFY_DT_CONN_QOS     0x00D2 +#define NOTIFY_DT_CONN_UP      0x00D3 +#define NOTIFY_DT_CONN_DOWN    0x00D4 +#define NOTIFY_DT_FLOW_UP      0x00D5 +#define NOTIFY_DT_FLOW_DOWN    0x00D6 +#define NOTIFY_DT_FLOW_DEALLOC 0x00D7 + +#define NOTIFY_MGMT_CONN_ADD   0x00F0 +#define NOTIFY_MGMT_CONN_DEL   0x00F1 + +int         connmgr_init(void); + +void        connmgr_fini(void); + +int         connmgr_start(void); + +void        connmgr_stop(void); + +int         connmgr_comp_init(enum comp_id             id, +                              const struct conn_info * info); + +void        connmgr_comp_fini(enum comp_id id); + +int         connmgr_ipcp_connect(const char * dst, +                                 const char * component, +                                 qosspec_t    qs); + +int         connmgr_ipcp_disconnect(const char * dst, +                                    const char * component); + +int         connmgr_alloc(enum comp_id  id, +                          const char *  dst, +                          qosspec_t *   qs, +                          struct conn * conn); + +int         connmgr_dealloc(enum comp_id  id, +                            struct conn * conn); + +int         connmgr_wait(enum comp_id  id, +                         struct conn * conn); + +#endif /* OUROBOROS_IPCPD_UNICAST_CONNMGR_H */ diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c new file mode 100644 index 00000000..f24a2c51 --- /dev/null +++ b/src/ipcpd/unicast/dht.c @@ -0,0 +1,2840 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Distributed Hash Table based on Kademlia + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define DHT              "dht" +#define OUROBOROS_PREFIX DHT + +#include <ouroboros/hash.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/notifier.h> +#include <ouroboros/random.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h> +#include <ouroboros/utils.h> + +#include "connmgr.h" +#include "dht.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> +#include <limits.h> + +#include "kademlia.pb-c.h" +typedef KadMsg kad_msg_t; +typedef KadContactMsg kad_contact_msg_t; + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +#define DHT_MAX_REQS  2048 /* KAD recommends rnd(), bmp can be changed.    */ +#define KAD_ALPHA     3    /* Parallel factor, proven optimal value.       */ +#define KAD_K         8    /* Replication factor, MDHT value.              */ +#define KAD_T_REPL    900  /* Replication time, tied to k. MDHT value.     */ +#define KAD_T_REFR    900  /* Refresh time stale bucket, MDHT value.       */ +#define KAD_T_JOIN    8    /* Response time to wait for a join.            */ +#define KAD_T_RESP    5    /* Response time to wait for a response.        */ +#define KAD_R_PING    2    /* Ping retries before declaring peer dead.     */ +#define KAD_QUEER     15   /* Time to declare peer questionable.           */ +#define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8.   */ +#define KAD_RESP_RETR 6    /* Number of retries on sending a response.     */ +#define KAD_JOIN_RETR 8    /* Number of retries sending a join.            */ +#define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.         */ +#define HANDLE_TIMEO  1000 /* Timeout for dht_handle_packet tpm check (ms) */ +#define DHT_RETR_ADDR 1    /* Number of addresses to return on retrieve    */ + +enum dht_state { +        DHT_INIT = 0, +        DHT_SHUTDOWN, +        DHT_JOINING, +        DHT_RUNNING, +}; + +enum kad_code { +        KAD_JOIN = 0, +        KAD_FIND_NODE, +        KAD_FIND_VALUE, +        /* Messages without a response below. */ +        KAD_STORE, +        KAD_RESPONSE +}; + +enum kad_req_state { +        REQ_NULL = 0, +        REQ_INIT, +        REQ_PENDING, +        REQ_RESPONSE, +        REQ_DONE, +        REQ_DESTROY +}; + +enum lookup_state { +        LU_NULL = 0, +        LU_INIT, +        LU_PENDING, +        LU_UPDATE, +        LU_COMPLETE, +        LU_DESTROY +}; + +struct kad_req { +        struct list_head   next; + +        uint32_t           cookie; +        enum kad_code      code; +        uint8_t *          key; +        uint64_t           addr; + +        enum kad_req_state state; +        pthread_cond_t     cond; +        pthread_mutex_t    lock; + +        time_t             t_exp; +}; + +struct cookie_el { +        struct list_head next; + +        uint32_t         cookie; +}; + +struct lookup { +        struct list_head  next; + +        struct list_head  cookies; + +        uint8_t *         key; + +        struct list_head  contacts; +        size_t            n_contacts; + +        uint64_t *        addrs; +        size_t            n_addrs; + +        enum lookup_state state; +        pthread_cond_t    cond; +        pthread_mutex_t   lock; +}; + +struct val { +        struct list_head next; + +        uint64_t         addr; + +        time_t           t_exp; +        time_t           t_rep; +}; + +struct ref_entry { +        struct list_head next; + +        uint8_t *        key; + +        time_t           t_rep; +}; + +struct dht_entry { +        struct list_head next; + +        uint8_t *        key; +        size_t           n_vals; +        struct list_head vals; +}; + +struct contact { +        struct list_head next; + +        uint8_t *        id; +        uint64_t         addr; + +        size_t           fails; +        time_t           t_seen; +}; + +struct bucket { +        struct list_head contacts; +        size_t           n_contacts; + +        struct list_head alts; +        size_t           n_alts; + +        time_t           t_refr; + +        size_t           depth; +        uint8_t          mask; + +        struct bucket *  parent; +        struct bucket *  children[1L << KAD_BETA]; +}; + +struct cmd { +        struct list_head     next; + +        struct shm_du_buff * sdb; +}; + +struct dht { +        size_t           alpha; +        size_t           b; +        size_t           k; + +        time_t           t_expire; +        time_t           t_refresh; +        time_t           t_replic; +        time_t           t_repub; + +        uint8_t *        id; +        uint64_t         addr; + +        struct bucket *  buckets; + +        struct list_head entries; + +        struct list_head refs; + +        struct list_head lookups; + +        struct list_head requests; +        struct bmp *     cookies; + +        enum dht_state   state; +        struct list_head cmds; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; + +        pthread_rwlock_t lock; + +        int              fd; + +        struct tpm *     tpm; + +        pthread_t        worker; +}; + +struct join_info { +        struct dht * dht; +        uint64_t     addr; +}; + +struct packet_info { +        struct dht *         dht; +        struct shm_du_buff * sdb; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, +                             size_t          len) +{ +        uint8_t * dup; + +        dup = malloc(sizeof(*dup) * len); +        if (dup == NULL) +                return NULL; + +        memcpy(dup, key, len); + +        return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ +        enum dht_state state; + +        pthread_mutex_lock(&dht->mtx); + +        state = dht->state; + +        pthread_mutex_unlock(&dht->mtx); + +        return state; +} + +static int dht_set_state(struct dht *   dht, +                         enum dht_state state) +{ +        pthread_mutex_lock(&dht->mtx); + +        if (state == DHT_JOINING && dht->state != DHT_INIT) { +                 pthread_mutex_unlock(&dht->mtx); +                 return -1; +        } + +        dht->state = state; + +        pthread_cond_broadcast(&dht->cond); + +        pthread_mutex_unlock(&dht->mtx); + +        return 0; +} + +int dht_wait_running(struct dht * dht) +{ +        int ret = 0; + +        pthread_mutex_lock(&dht->mtx); + +        pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, +                             &dht->mtx); + +        while (dht->state == DHT_JOINING) +                pthread_cond_wait(&dht->cond, &dht->mtx); + +        if (dht->state != DHT_RUNNING) +                ret = -1; + +        pthread_cleanup_pop(true); + +        return ret; +} + +static uint8_t * create_id(size_t len) +{ +        uint8_t * id; + +        id = malloc(len); +        if (id == NULL) +                return NULL; + +        if (random_buffer(id, len) < 0) { +                free(id); +                return NULL; +        } + +        return id; +} + +static void kad_req_create(struct dht * dht, +                           kad_msg_t *  msg, +                           uint64_t     addr) +{ +        struct kad_req *   req; +        pthread_condattr_t cattr; +        struct timespec    t; +        size_t             b; + +        req = malloc(sizeof(*req)); +        if (req == NULL) +                return; + +        list_head_init(&req->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        req->t_exp  = t.tv_sec + KAD_T_RESP; +        req->addr   = addr; +        req->state  = REQ_INIT; +        req->cookie = msg->cookie; +        req->code   = msg->code; +        req->key    = NULL; + +        pthread_rwlock_rdlock(&dht->lock); +        b = dht->b; +        pthread_rwlock_unlock(&dht->lock); + +        if (msg->has_key) { +                req->key = dht_dup_key(msg->key.data, b); +                if (req->key == NULL) { +                        free(req); +                        return; +                } +        } + +        if (pthread_mutex_init(&req->lock, NULL)) { +                free(req->key); +                free(req); +                return; +        } + +        pthread_condattr_init(&cattr); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_cond_init(&req->cond, &cattr)) { +                pthread_condattr_destroy(&cattr); +                pthread_mutex_destroy(&req->lock); +                free(req->key); +                free(req); +                return; +        } + +        pthread_condattr_destroy(&cattr); + +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&req->next, &dht->requests); + +        pthread_rwlock_unlock(&dht->lock); +} + +static void cancel_req_destroy(void * o) +{ +        struct kad_req * req = (struct kad_req *) o; + +        pthread_mutex_unlock(&req->lock); + +        pthread_cond_destroy(&req->cond); +        pthread_mutex_destroy(&req->lock); + +        if (req->key != NULL) +                free(req->key); + +        free(req); +} + +static void kad_req_destroy(struct kad_req * req) +{ +        assert(req); + +        pthread_mutex_lock(&req->lock); + +        switch (req->state) { +        case REQ_DESTROY: +                pthread_mutex_unlock(&req->lock); +                return; +        case REQ_PENDING: +                req->state = REQ_DESTROY; +                pthread_cond_signal(&req->cond); +                break; +        case REQ_INIT: +        case REQ_DONE: +                req->state = REQ_NULL; +                break; +        case REQ_RESPONSE: +        case REQ_NULL: +        default: +                break; +        } + +        pthread_cleanup_push(cancel_req_destroy, req); + +        while (req->state != REQ_NULL && req->state != REQ_DONE) +                pthread_cond_wait(&req->cond, &req->lock); + +        pthread_cleanup_pop(true); +} + +static int kad_req_wait(struct kad_req * req, +                        time_t           t) +{ +        struct timespec timeo = {t, 0}; +        struct timespec abs; +        int ret = 0; + +        assert(req); + +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        ts_add(&abs, &timeo, &abs); + +        pthread_mutex_lock(&req->lock); + +        req->state = REQ_PENDING; + +        pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, +                             &req->lock); + +        while (req->state == REQ_PENDING && ret != -ETIMEDOUT) +                ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + +        switch(req->state) { +        case REQ_DESTROY: +                ret = -1; +                req->state = REQ_NULL; +                pthread_cond_signal(&req->cond); +                break; +        case REQ_PENDING: /* ETIMEDOUT */ +        case REQ_RESPONSE: +                req->state = REQ_DONE; +                pthread_cond_broadcast(&req->cond); +                break; +        default: +                break; +        } + +        pthread_cleanup_pop(true); + +        return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ +        pthread_mutex_lock(&req->lock); + +        req->state = REQ_RESPONSE; +        pthread_cond_signal(&req->cond); + +        pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, +                                       size_t          len, +                                       uint64_t        addr) +{ +        struct contact * c; +        struct timespec  t; + +        c = malloc(sizeof(*c)); +        if (c == NULL) +                return NULL; + +        list_head_init(&c->next); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        c->addr   = addr; +        c->fails  = 0; +        c->t_seen = t.tv_sec; +        c->id     = dht_dup_key(id, len); +        if (c->id == NULL) { +                free(c); +                return NULL; +        } + +        return c; +} + +static void contact_destroy(struct contact * c) +{ +        if (c != NULL) +                free(c->id); + +        free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, +                                   const uint8_t * id) +{ +        uint8_t byte; +        uint8_t mask; + +        assert(b); + +        if (b->children[0] == NULL) +                return b; + +        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + +        mask = ((1L << KAD_BETA) - 1) & 0xFF; + +        byte >>= (CHAR_BIT - KAD_BETA) - +                (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + +        return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht *    dht, +                                      const uint8_t * id) +{ +        assert(dht->buckets); + +        return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, +                     const uint8_t * dst) +{ +        return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, +                              struct contact *   c, +                              const uint8_t *    key) +{ +        struct list_head * p; + +        assert(l); +        assert(c); +        assert(key); +        assert(c->id); + +        list_for_each(p, l) { +                struct contact * e = list_entry(p, struct contact, next); +                if (dist(c->id, key) > dist(e->id, key)) +                        break; +        } + +        list_add_tail(&c->next, p); + +        return 1; +} + +static size_t dht_contact_list(struct dht *       dht, +                               struct list_head * l, +                               const uint8_t *    key) +{ +        struct list_head * p; +        struct bucket *    b; +        size_t             len = 0; +        size_t             i; +        struct timespec    t; + +        assert(l); +        assert(dht); +        assert(key); +        assert(list_is_empty(l)); + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        b = dht_get_bucket(dht, key); +        if (b == NULL) +                return 0; + +        b->t_refr = t.tv_sec + KAD_T_REFR; + +        if (b->n_contacts == dht->k || b->parent == NULL) { +                list_for_each(p, &b->contacts) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        c = contact_create(c->id, dht->b, c->addr); +                        if (list_add_sorted(l, c, key) == 1) +                                if (++len == dht->k) +                                        break; +                } +        } else { +                struct bucket * d = b->parent; +                for (i = 0; i < (1L << KAD_BETA) && len < dht->k; ++i) { +                        list_for_each(p, &d->children[i]->contacts) { +                                struct contact * c; +                                c = list_entry(p, struct contact, next); +                                c = contact_create(c->id, dht->b, c->addr); +                                if (c == NULL) +                                        continue; +                                if (list_add_sorted(l, c, key) == 1) +                                        if (++len == dht->k) +                                                break; +                        } +                } +        } + +        assert(len == dht->k || b->parent == NULL); + +        return len; +} + +static struct lookup * lookup_create(struct dht *    dht, +                                     const uint8_t * id) +{ +        struct lookup *    lu; +        pthread_condattr_t cattr; + +        assert(dht); +        assert(id); + +        lu = malloc(sizeof(*lu)); +        if (lu == NULL) +                goto fail_malloc; + +        list_head_init(&lu->contacts); +        list_head_init(&lu->cookies); + +        lu->state   = LU_INIT; +        lu->addrs   = NULL; +        lu->n_addrs = 0; +        lu->key     = dht_dup_key(id, dht->b); +        if (lu->key == NULL) +                goto fail_id; + +        if (pthread_mutex_init(&lu->lock, NULL)) +                goto fail_mutex; + +        pthread_condattr_init(&cattr); +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + +        if (pthread_cond_init(&lu->cond, &cattr)) +                goto fail_cond; + +        pthread_condattr_destroy(&cattr); + +        pthread_rwlock_wrlock(&dht->lock); + +        list_add(&lu->next, &dht->lookups); + +        lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + +        pthread_rwlock_unlock(&dht->lock); + +        return lu; + + fail_cond: +        pthread_condattr_destroy(&cattr); +        pthread_mutex_destroy(&lu->lock); + fail_mutex: +        free(lu->key); + fail_id: +        free(lu); + fail_malloc: +        return NULL; +} + +static void cancel_lookup_destroy(void * o) +{ +        struct lookup *    lu; +        struct list_head * p; +        struct list_head * h; + +        lu = (struct lookup *) o; + +        if (lu->key != NULL) +                free(lu->key); +        if (lu->addrs != NULL) +                free(lu->addrs); + +        list_for_each_safe(p, h, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +        } + +        list_for_each_safe(p, h, &lu->cookies) { +                struct cookie_el * c = list_entry(p, struct cookie_el, next); +                list_del(&c->next); +                free(c); +        } + +        pthread_mutex_unlock(&lu->lock); + +        pthread_mutex_destroy(&lu->lock); + +        free(lu); +} + +static void lookup_destroy(struct lookup * lu) +{ +        assert(lu); + +        pthread_mutex_lock(&lu->lock); + +        switch (lu->state) { +        case LU_DESTROY: +                pthread_mutex_unlock(&lu->lock); +                return; +        case LU_PENDING: +                lu->state = LU_DESTROY; +                pthread_cond_broadcast(&lu->cond); +                break; +        case LU_INIT: +        case LU_UPDATE: +        case LU_COMPLETE: +                lu->state = LU_NULL; +                break; +        case LU_NULL: +        default: +                break; +        } + +        pthread_cleanup_push(cancel_lookup_destroy, lu); + +        while (lu->state != LU_NULL) +                pthread_cond_wait(&lu->cond, &lu->lock); + +        pthread_cleanup_pop(true); +} + +static void lookup_update(struct dht *    dht, +                          struct lookup * lu, +                          kad_msg_t *     msg) +{ +        struct list_head * p = NULL; +        struct list_head * h; +        struct contact *   c = NULL; +        size_t             n; +        size_t             pos = 0; +        bool               mod = false; + +        assert(lu); +        assert(msg); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return; + +        pthread_mutex_lock(&lu->lock); + +        list_for_each_safe(p, h, &lu->cookies) { +                struct cookie_el * e = list_entry(p, struct cookie_el, next); +                if (e->cookie == msg->cookie) { +                        list_del(&e->next); +                        free(e); +                        break; +                } +        } + +        if (lu->state == LU_COMPLETE) { +                pthread_mutex_unlock(&lu->lock); +                return; +        } + +        if (msg->n_addrs > 0) { +                if (lu->addrs == NULL) { +                        lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); +                        for (n = 0; n < msg->n_addrs; ++n) +                                lu->addrs[n] = msg->addrs[n]; +                        lu->n_addrs = msg->n_addrs; +                } + +                lu->state = LU_COMPLETE; +                pthread_cond_broadcast(&lu->cond); +                pthread_mutex_unlock(&lu->lock); +                return; +        } + +        pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, +                             &lu->lock); + +        while (lu->state == LU_INIT) { +                pthread_rwlock_unlock(&dht->lock); +                pthread_cond_wait(&lu->cond, &lu->lock); +                pthread_rwlock_rdlock(&dht->lock); +        } + +        pthread_cleanup_pop(false); + +        for (n = 0; n < msg->n_contacts; ++n) { +                c = contact_create(msg->contacts[n]->id.data, +                                   dht->b, msg->contacts[n]->addr); +                if (c == NULL) +                        continue; + +                pos = 0; + +                list_for_each(p, &lu->contacts) { +                        struct contact * e; +                        e = list_entry(p, struct contact, next); +                        if (!memcmp(e->id, c->id, dht->b)) { +                                contact_destroy(c); +                                c = NULL; +                                break; +                        } + +                        if (dist(c->id, lu->key) > dist(e->id, lu->key)) +                                break; + +                        pos++; +                } + +                if (c == NULL) +                        continue; + +                if (lu->n_contacts < dht->k) { +                        list_add_tail(&c->next, p); +                        ++lu->n_contacts; +                        mod = true; +                } else if (pos == dht->k) { +                        contact_destroy(c); +                } else { +                        struct contact * d; +                        list_add_tail(&c->next, p); +                        d = list_last_entry(&lu->contacts, +                                            struct contact, next); +                        list_del(&d->next); +                        assert(lu->contacts.prv != &d->next); +                        contact_destroy(d); +                        mod = true; +                } +        } + +        if (list_is_empty(&lu->cookies) && !mod) +                lu->state = LU_COMPLETE; +        else +                lu->state = LU_UPDATE; + +        pthread_cond_broadcast(&lu->cond); +        pthread_mutex_unlock(&lu->lock); +        return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, +                                uint64_t *      addrs) +{ +        ssize_t n; + +        assert(lu); + +        pthread_mutex_lock(&lu->lock); + +        for (n = 0; (size_t) n < lu->n_addrs; ++n) +                addrs[n] = lu->addrs[n]; + +        assert((size_t) n == lu->n_addrs); + +        pthread_mutex_unlock(&lu->lock); + +        return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, +                                    uint64_t *      addrs) +{ +        struct list_head * p; +        ssize_t            n = 0; + +        assert(lu); +        assert(addrs); + +        pthread_mutex_lock(&lu->lock); + +        list_for_each(p, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                addrs[n] = c->addr; +                n++; +        } + +        pthread_mutex_unlock(&lu->lock); + +        return n; +} + +static void lookup_new_addrs(struct lookup * lu, +                             uint64_t *      addrs) +{ +        struct list_head * p; +        size_t             n = 0; + +        assert(lu); +        assert(addrs); + +        pthread_mutex_lock(&lu->lock); + +        /* Uses fails to check if the contact has been contacted. */ +        list_for_each(p, &lu->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                if (c->fails == 0) { +                        c->fails = 1; +                        addrs[n] = c->addr; +                        n++; +                } + +                if (n == KAD_ALPHA) +                        break; +        } + +        assert(n <= KAD_ALPHA); + +        addrs[n] = 0; + +        pthread_mutex_unlock(&lu->lock); +} + +static void lookup_set_state(struct lookup *   lu, +                             enum lookup_state state) +{ +        pthread_mutex_lock(&lu->lock); + +        lu->state = state; +        pthread_cond_broadcast(&lu->cond); + +        pthread_mutex_unlock(&lu->lock); +} + +static void cleanup_wait(void * o) +{ +        struct lookup * lu = (struct lookup *) o; +        lu->state = LU_NULL; +        pthread_mutex_unlock(&lu->lock); +        lookup_destroy(lu); +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ +        struct timespec   timeo = {KAD_T_RESP, 0}; +        struct timespec   abs; +        enum lookup_state state; +        int               ret = 0; + +        clock_gettime(PTHREAD_COND_CLOCK, &abs); + +        ts_add(&abs, &timeo, &abs); + +        pthread_mutex_lock(&lu->lock); + +        if (lu->state == LU_INIT || lu->state == LU_UPDATE) +                lu->state = LU_PENDING; + +        pthread_cleanup_push(cleanup_wait, lu); + +        while (lu->state == LU_PENDING && ret != -ETIMEDOUT) +                ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs); + +        pthread_cleanup_pop(false); + +        if (ret == -ETIMEDOUT) +                lu->state = LU_COMPLETE; + +        state = lu->state; + +        pthread_mutex_unlock(&lu->lock); + +        return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, +                                         kad_msg_t *  msg) +{ +        struct list_head * p; + +        assert(dht); +        assert(msg); + +        list_for_each(p, &dht->requests) { +                struct kad_req * r = list_entry(p, struct kad_req, next); +                if (r->cookie == msg->cookie) +                        return r; +        } + +        return NULL; +} + +static struct lookup * dht_find_lookup(struct dht *    dht, +                                       uint32_t        cookie) +{ +        struct list_head * p; +        struct list_head * p2; +        struct list_head * h2; + +        assert(dht); +        assert(cookie > 0); + +        list_for_each(p, &dht->lookups) { +                struct lookup * l = list_entry(p, struct lookup, next); +                pthread_mutex_lock(&l->lock); +                list_for_each_safe(p2, h2, &l->cookies) { +                        struct cookie_el * e; +                        e = list_entry(p2, struct cookie_el, next); +                        if (e->cookie == cookie) { +                                list_del(&e->next); +                                free(e); +                                pthread_mutex_unlock(&l->lock); +                                return l; +                        } +                } +                pthread_mutex_unlock(&l->lock); +        } + +        return NULL; +} + +static struct val * val_create(uint64_t addr, +                               time_t   exp) +{ +        struct val *    v; +        struct timespec t; + +        v = malloc(sizeof(*v)); +        if (v == NULL) +                return NULL; + +        list_head_init(&v->next); +        v->addr = addr; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        v->t_exp = t.tv_sec + exp; +        v->t_rep = t.tv_sec + KAD_T_REPL; + +        return v; +} + +static void val_destroy(struct val * v) +{ +        assert(v); + +        free(v); +} + +static struct ref_entry * ref_entry_create(struct dht *    dht, +                                           const uint8_t * key) +{ +        struct ref_entry * e; +        struct timespec    t; + +        assert(dht); +        assert(key); + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        e->key = dht_dup_key(key, dht->b); +        if (e->key == NULL) { +                free(e); +                return NULL; +        } + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        e->t_rep = t.tv_sec + dht->t_repub; + +        return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ +        free(e->key); +        free(e); +} + +static struct dht_entry * dht_entry_create(struct dht *    dht, +                                           const uint8_t * key) +{ +        struct dht_entry * e; + +        assert(dht); +        assert(key); + +        e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        list_head_init(&e->next); +        list_head_init(&e->vals); + +        e->n_vals = 0; + +        e->key = dht_dup_key(key, dht->b); +        if (e->key == NULL) { +                free(e); +                return NULL; +        } + +        return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(e); + +        list_for_each_safe(p, h, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                list_del(&v->next); +                val_destroy(v); +        } + +        free(e->key); + +        free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, +                              uint64_t           addr, +                              time_t             exp) +{ +        struct list_head * p; +        struct val * val; +        struct timespec t; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr == addr) { +                        if (v->t_exp < t.tv_sec + exp) { +                                v->t_exp = t.tv_sec + exp; +                                v->t_rep = t.tv_sec + KAD_T_REPL; +                        } + +                        return 0; +                } +        } + +        val = val_create(addr, exp); +        if (val == NULL) +                return -ENOMEM; + +        list_add(&val->next, &e->vals); +        ++e->n_vals; + +        return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, +                               uint64_t           addr) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(e); + +        list_for_each_safe(p, h, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr == addr) { +                        list_del(&v->next); +                        val_destroy(v); +                        --e->n_vals; +                } +        } + +        if (e->n_vals == 0) { +                list_del(&e->next); +                dht_entry_destroy(e); +        } +} + +static uint64_t dht_entry_get_addr(struct dht *       dht, +                                   struct dht_entry * e) +{ +        struct list_head * p; + +        assert(e); +        assert(!list_is_empty(&e->vals)); + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                if (v->addr != dht->addr) +                        return v->addr; +        } + +        return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * key, +                                  enum kad_code   code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht *       dht, +                           struct bucket *    b, +                           time_t             t, +                           struct list_head * r) +{ +        size_t i; + +        if (*b->children != NULL) +                for (i = 0; i < (1L << KAD_BETA); ++i) +                        bucket_refresh(dht, b->children[i], t, r); + +        if (b->n_contacts == 0) +                return; + +        if (t > b->t_refr) { +                struct contact * c; +                struct contact * d; +                c = list_first_entry(&b->contacts, struct contact, next); +                d = contact_create(c->id, dht->b, c->addr); +                if (c != NULL) +                        list_add(&d->next, r); +                return; +        } +} + + +static struct bucket * bucket_create(void) +{ +        struct bucket * b; +        struct timespec t; +        size_t          i; + +        b = malloc(sizeof(*b)); +        if (b == NULL) +                return NULL; + +        list_head_init(&b->contacts); +        b->n_contacts = 0; + +        list_head_init(&b->alts); +        b->n_alts = 0; + +        clock_gettime(CLOCK_REALTIME_COARSE, &t); +        b->t_refr = t.tv_sec + KAD_T_REFR; + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                b->children[i]  = NULL; + +        b->parent = NULL; +        b->depth = 0; + +        return b; +} + +static void bucket_destroy(struct bucket * b) +{ +        struct list_head * p; +        struct list_head * h; +        size_t             i; + +        assert(b); + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                if (b->children[i] != NULL) +                        bucket_destroy(b->children[i]); + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +                --b->n_contacts; +        } + +        list_for_each_safe(p, h, &b->alts) { +                struct contact * c = list_entry(p, struct contact, next); +                list_del(&c->next); +                contact_destroy(c); +                --b->n_contacts; +        } + +        free(b); +} + +static bool bucket_has_id(struct bucket * b, +                          const uint8_t * id) +{ +        uint8_t mask; +        uint8_t byte; + +        if (b->depth == 0) +                return true; + +        byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + +        mask = ((1L << KAD_BETA) - 1) & 0xFF; + +        byte >>= (CHAR_BIT - KAD_BETA) - +                (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + +        return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ +        struct list_head * p; +        struct list_head * h; +        uint8_t mask = 0; +        size_t i; +        size_t c; + +        assert(b); +        assert(b->n_alts == 0); +        assert(b->n_contacts); +        assert(b->children[0] == NULL); + +        c = b->n_contacts; + +        for (i = 0; i < (1L << KAD_BETA); ++i) { +                b->children[i] = bucket_create(); +                if (b->children[i] == NULL) { +                        size_t j; +                        for (j = 0; j < i; ++j) +                                bucket_destroy(b->children[j]); +                        return -1; +                } + +                b->children[i]->depth  = b->depth + 1; +                b->children[i]->mask   = mask; +                b->children[i]->parent = b; + +                list_for_each_safe(p, h, &b->contacts) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        if (bucket_has_id(b->children[i], c->id)) { +                                list_del(&c->next); +                                --b->n_contacts; +                                list_add(&c->next, &b->children[i]->contacts); +                                ++b->children[i]->n_contacts; +                        } +                } + +                mask++; +        } + +        for (i = 0; i < (1L << KAD_BETA); ++i) +                if (b->children[i]->n_contacts == c) +                        split_bucket(b->children[i]); + +        return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht *    dht, +                             const uint8_t * id, +                             uint64_t        addr) +{ +        struct list_head * p; +        struct list_head * h; +        struct bucket *    b; +        struct contact *   c; + +        assert(dht); + +        b = dht_get_bucket(dht, id); +        if (b == NULL) +                return -1; + +        c = contact_create(id, dht->b, addr); +        if (c == NULL) +                return -1; + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * d = list_entry(p, struct contact, next); +                if (d->addr == addr) { +                        list_del(&d->next); +                        contact_destroy(d); +                        --b->n_contacts; +                } +        } + +        if (b->n_contacts == dht->k) { +                if (bucket_has_id(b, dht->id)) { +                        list_add_tail(&c->next, &b->contacts); +                        ++b->n_contacts; +                        if (split_bucket(b)) { +                                list_del(&c->next); +                                contact_destroy(c); +                                --b->n_contacts; +                        } +                } else if (b->n_alts == dht->k) { +                        struct contact * d; +                        d = list_first_entry(&b->alts, struct contact, next); +                        list_del(&d->next); +                        contact_destroy(d); +                        list_add_tail(&c->next, &b->alts); +                } else { +                        list_add_tail(&c->next, &b->alts); +                        ++b->n_alts; +                } +        } else { +                list_add_tail(&c->next, &b->contacts); +                ++b->n_contacts; +        } + +        return 0; +} + +static int send_msg(struct dht * dht, +                    kad_msg_t *  msg, +                    uint64_t     addr) +{ +#ifndef __DHT_TEST__ +        struct shm_du_buff * sdb; +        size_t               len; +#endif +        int                  retr = 0; + +        if (msg->code == KAD_RESPONSE) +                retr = KAD_RESP_RETR; + +        pthread_rwlock_wrlock(&dht->lock); + +        if (dht->id != NULL) { +                msg->has_s_id = true; +                msg->s_id.data = dht->id; +                msg->s_id.len  = dht->b; +        } + +        msg->s_addr = dht->addr; + +        if (msg->code < KAD_STORE) { +                msg->cookie = bmp_allocate(dht->cookies); +                if (!bmp_is_id_valid(dht->cookies, msg->cookie)) { +                        pthread_rwlock_unlock(&dht->lock); +                        goto fail_bmp_alloc; +                } +        } + +        pthread_rwlock_unlock(&dht->lock); + +#ifndef __DHT_TEST__ +        len = kad_msg__get_packed_size(msg); +        if (len == 0) +                goto fail_msg; + +        while (true) { +                if (ipcp_sdb_reserve(&sdb, len)) +                        goto fail_msg; + +                kad_msg__pack(msg, shm_du_buff_head(sdb)); + +                if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) +                        break; + +                ipcp_sdb_release(sdb); + +                sleep(1); + +                if (--retr < 0) +                        goto fail_msg; +        } + +#else +        (void) addr; +        (void) retr; +#endif /* __DHT_TEST__ */ + +        if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) +                kad_req_create(dht, msg, addr); + +        return msg->cookie; +#ifndef __DHT_TEST__ + fail_msg: +        pthread_rwlock_wrlock(&dht->lock); +        bmp_release(dht->cookies, msg->cookie); +        pthread_rwlock_unlock(&dht->lock); +#endif /* !__DHT_TEST__ */ + fail_bmp_alloc: +        return -1; +} + +static struct dht_entry * dht_find_entry(struct dht *    dht, +                                         const uint8_t * key) +{ +        struct list_head * p; + +        list_for_each(p, &dht->entries) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                if (!memcmp(key, e->key, dht->b)) +                        return e; +        } + +        return NULL; +} + +static int kad_add(struct dht *              dht, +                   const kad_contact_msg_t * contacts, +                   ssize_t                   n, +                   time_t                    exp) +{ +        struct dht_entry * e; + +        pthread_rwlock_wrlock(&dht->lock); + +        while (n-- > 0) { +                if (contacts[n].id.len != dht->b) +                        log_warn("Bad key length in contact data."); + +                e = dht_find_entry(dht, contacts[n].id.data); +                if (e != NULL) { +                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) +                                goto fail; +                } else { +                        e = dht_entry_create(dht, contacts[n].id.data); +                        if (e == NULL) +                                goto fail; + +                        if (dht_entry_add_addr(e, contacts[n].addr, exp)) { +                                dht_entry_destroy(e); +                                goto fail; +                        } + +                        list_add(&e->next, &dht->entries); +                } +        } + +        pthread_rwlock_unlock(&dht->lock); +        return 0; + + fail: +        pthread_rwlock_unlock(&dht->lock); +        return -ENOMEM; +} + +static int wait_resp(struct dht * dht, +                     kad_msg_t *  msg, +                     time_t       timeo) +{ +        struct kad_req * req; + +        assert(dht); +        assert(msg); + +        pthread_rwlock_rdlock(&dht->lock); + +        req = dht_find_request(dht, msg); +        if (req == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -EPERM; +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht *    dht, +                     const uint8_t * key, +                     uint64_t        addr, +                     uint64_t        r_addr, +                     time_t          ttl) +{ +        kad_msg_t msg = KAD_MSG__INIT; +        kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; +        kad_contact_msg_t * cmsgp[1]; + +        cmsg.id.data = (uint8_t *) key; +        cmsg.addr    = addr; + +        pthread_rwlock_rdlock(&dht->lock); + +        cmsg.id.len  = dht->b; + +        pthread_rwlock_unlock(&dht->lock); + +        cmsgp[0] = &cmsg; + +        msg.code         = KAD_STORE; +        msg.has_t_expire = true; +        msg.t_expire     = ttl; +        msg.n_contacts   = 1; +        msg.contacts     = cmsgp; + +        if (send_msg(dht, &msg, r_addr) < 0) +                return -1; + +        return 0; +} + +static ssize_t kad_find(struct dht *     dht, +                        struct lookup *  lu, +                        const uint64_t * addrs, +                        enum kad_code    code) +{ +        kad_msg_t msg  = KAD_MSG__INIT; +        ssize_t   sent = 0; + +        assert(dht); +        assert(lu->key); + +        msg.code = code; + +        msg.has_key       = true; +        msg.key.data      = (uint8_t *) lu->key; +        msg.key.len       = dht->b; + +        while (*addrs != 0) { +                struct cookie_el * c; +                int ret; + +                if (*addrs == dht->addr) { +                        ++addrs; +                        continue; +                } + +                ret = send_msg(dht, &msg, *addrs); +                if (ret < 0) +                        break; + +                c = malloc(sizeof(*c)); +                if (c == NULL) +                        break; + +                c->cookie = (uint32_t) ret; + +                pthread_mutex_lock(&lu->lock); + +                list_add_tail(&c->next, &lu->cookies); + +                pthread_mutex_unlock(&lu->lock); + +                ++sent; +                ++addrs; +        } + +        return sent; +} + +static void lookup_detach(struct dht *    dht, +                          struct lookup * lu) +{ +        pthread_rwlock_wrlock(&dht->lock); + +        list_del(&lu->next); + +        pthread_rwlock_unlock(&dht->lock); +} + +static struct lookup * kad_lookup(struct dht *    dht, +                                  const uint8_t * id, +                                  enum kad_code   code) +{ +        uint64_t          addrs[KAD_ALPHA + 1]; +        enum lookup_state state; +        struct lookup *   lu; + +        lu = lookup_create(dht, id); +        if (lu == NULL) +                return NULL; + +        lookup_new_addrs(lu, addrs); + +        if (addrs[0] == 0) { +                lookup_detach(dht, lu); +                lookup_destroy(lu); +                return NULL; +        } + +        if (kad_find(dht, lu, addrs, code) == 0) { +                lookup_detach(dht, lu); +                return lu; +        } + +        while ((state = lookup_wait(lu)) != LU_COMPLETE) { +                switch (state) { +                case LU_UPDATE: +                        lookup_new_addrs(lu, addrs); +                        if (addrs[0] == 0) +                                break; + +                        kad_find(dht, lu, addrs, code); +                        break; +                case LU_DESTROY: +                        lookup_detach(dht, lu); +                        lookup_set_state(lu, LU_NULL); +                        return NULL; +                default: +                        break; +                } +        } + +        assert(state = LU_COMPLETE); + +        lookup_detach(dht, lu); + +        return lu; +} + +static void kad_publish(struct dht *    dht, +                        const uint8_t * key, +                        uint64_t        addr, +                        time_t          exp) +{ +        struct lookup * lu; +        uint64_t      * addrs; +        ssize_t         n; +        size_t          k; +        time_t          t_expire; + + +        assert(dht); +        assert(key); + +        pthread_rwlock_rdlock(&dht->lock); + +        k        = dht->k; +        t_expire = dht->t_expire; + +        pthread_rwlock_unlock(&dht->lock); + +        addrs = malloc(k * sizeof(*addrs)); +        if (addrs == NULL) +                return; + +        lu = kad_lookup(dht, key, KAD_FIND_NODE); +        if (lu == NULL) { +                free(addrs); +                return; +        } + +        n = lookup_contact_addrs(lu, addrs); + +        while (n-- > 0) { +                if (addrs[n] == dht->addr) { +                        kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; +                        msg.id.data = (uint8_t *) key; +                        msg.id.len  = dht->b; +                        msg.addr    = addr; +                        kad_add(dht, &msg, 1, exp); +                } else { +                        if (kad_store(dht, key, addr, addrs[n], t_expire)) +                                log_warn("Failed to send store message."); +                } +        } + +        lookup_destroy(lu); + +        free(addrs); +} + +static int kad_join(struct dht * dht, +                    uint64_t     addr) +{ +        kad_msg_t       msg = KAD_MSG__INIT; + +        msg.code = KAD_JOIN; + +        msg.has_alpha       = true; +        msg.has_b           = true; +        msg.has_k           = true; +        msg.has_t_refresh   = true; +        msg.has_t_replicate = true; +        msg.alpha           = KAD_ALPHA; +        msg.k               = KAD_K; +        msg.t_refresh       = KAD_T_REFR; +        msg.t_replicate     = KAD_T_REPL; + +        pthread_rwlock_rdlock(&dht->lock); + +        msg.b               = dht->b; + +        pthread_rwlock_unlock(&dht->lock); + +        if (send_msg(dht, &msg, addr) < 0) +                return -1; + +        if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) +                return -1; + +        dht->id = create_id(dht->b); +        if (dht->id == NULL) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        dht_update_bucket(dht, dht->id, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static void dht_dead_peer(struct dht * dht, +                          uint8_t *    key, +                          uint64_t     addr) +{ +        struct list_head * p; +        struct list_head * h; +        struct bucket *    b; + +        b = dht_get_bucket(dht, key); + +        list_for_each_safe(p, h, &b->contacts) { +                struct contact * c = list_entry(p, struct contact, next); +                if (b->n_contacts + b->n_alts <= dht->k) { +                        ++c->fails; +                        return; +                } + +                if (c->addr == addr) { +                        list_del(&c->next); +                        contact_destroy(c); +                        --b->n_contacts; +                        break; +                } +        } + +        while (b->n_contacts < dht->k && b->n_alts > 0) { +                struct contact * c; +                c = list_first_entry(&b->alts, struct contact, next); +                list_del(&c->next); +                --b->n_alts; +                list_add(&c->next, &b->contacts); +                ++b->n_contacts; +        } +} + +static int dht_del(struct dht *    dht, +                   const uint8_t * key, +                   uint64_t        addr) +{ +        struct dht_entry * e; + +        pthread_rwlock_wrlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -EPERM; +        } + +        dht_entry_del_addr(e, addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static buffer_t dht_retrieve(struct dht *    dht, +                             const uint8_t * key) +{ +        struct dht_entry * e; +        struct list_head * p; +        buffer_t           buf; +        uint64_t *         pos; +        size_t             addrs = 0; + +        pthread_rwlock_rdlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e == NULL) +                goto fail; + +        buf.len = MIN(DHT_RETR_ADDR, e->n_vals); +        if (buf.len == 0) +                goto fail; + +        pos = malloc(sizeof(dht->addr) * buf.len); +        if (pos == NULL) +                goto fail; + +        buf.data = (uint8_t *) pos; + +        list_for_each(p, &e->vals) { +                struct val * v = list_entry(p, struct val, next); +                *pos++ = v->addr; +                if (++addrs >= buf.len) +                        break; +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return buf; + + fail: +        pthread_rwlock_unlock(&dht->lock); +        buf.len = 0; + +        return buf; +} + +static ssize_t dht_get_contacts(struct dht *          dht, +                                const uint8_t *       key, +                                kad_contact_msg_t *** msgs) +{ +        struct list_head   l; +        struct list_head * p; +        struct list_head * h; +        size_t             len; +        size_t             i = 0; + +        list_head_init(&l); + +        pthread_rwlock_wrlock(&dht->lock); + +        len = dht_contact_list(dht, &l, key); +        if (len == 0) { +                pthread_rwlock_unlock(&dht->lock); +                return 0; +        } + +        *msgs = malloc(len * sizeof(**msgs)); +        if (*msgs == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return 0; +        } + +        list_for_each_safe(p, h, &l) { +                struct contact * c = list_entry(p, struct contact, next); +                (*msgs)[i] = malloc(sizeof(***msgs)); +                if ((*msgs)[i] == NULL) { +                        pthread_rwlock_unlock(&dht->lock); +                        while (i > 0) +                                free(*msgs[--i]); +                        free(*msgs); +                        return 0; +                } + +                kad_contact_msg__init((*msgs)[i]); + +                (*msgs)[i]->id.data = c->id; +                (*msgs)[i]->id.len  = dht->b; +                (*msgs)[i++]->addr  = c->addr; +                list_del(&c->next); +                free(c); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        return i; +} + +static time_t gcd(time_t a, +                  time_t b) +{ +        if (a == 0) +                return b; + +        return gcd(b % a, a); +} + +static void * work(void * o) +{ +        struct dht *       dht; +        struct timespec    now; +        struct list_head * p; +        struct list_head * h; +        struct list_head   reflist; +        time_t             intv; +        struct lookup *    lu; + +        dht = (struct dht *) o; + +        pthread_rwlock_rdlock(&dht->lock); + +        intv = gcd(dht->t_expire, dht->t_repub); +        intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + +        pthread_rwlock_unlock(&dht->lock); + +        list_head_init(&reflist); + +        while (true) { +                clock_gettime(CLOCK_REALTIME_COARSE, &now); + +                pthread_rwlock_wrlock(&dht->lock); + +                /* Republish registered hashes. */ +                list_for_each(p, &dht->refs) { +                        struct ref_entry * e; +                        uint8_t *          key; +                        uint64_t           addr; +                        time_t             t_expire; +                        e = list_entry(p, struct ref_entry, next); +                        if (now.tv_sec > e->t_rep) { +                                key = dht_dup_key(e->key, dht->b); +                                if (key == NULL) +                                        continue; +                                addr = dht->addr; +                                t_expire = dht->t_expire; +                                e->t_rep = now.tv_sec + dht->t_repub; + +                                pthread_rwlock_unlock(&dht->lock); +                                kad_publish(dht, key, addr, t_expire); +                                pthread_rwlock_wrlock(&dht->lock); +                                free(key); +                        } +                } + +                /* Remove stale entries and republish if necessary. */ +                list_for_each_safe(p, h, &dht->entries) { +                        struct list_head * p1; +                        struct list_head * h1; +                        struct dht_entry * e; +                        uint8_t *          key; +                        time_t             t_expire; +                        e = list_entry (p, struct dht_entry, next); +                        list_for_each_safe(p1, h1, &e->vals) { +                                struct val * v; +                                uint64_t     addr; +                                v = list_entry(p1, struct val, next); +                                if (now.tv_sec > v->t_exp) { +                                        list_del(&v->next); +                                        val_destroy(v); +                                        continue; +                                } + +                                if (now.tv_sec > v->t_rep) { +                                        key  = dht_dup_key(e->key, dht->b); +                                        addr = v->addr; +                                        t_expire = dht->t_expire = now.tv_sec; +                                        v->t_rep = now.tv_sec + dht->t_replic; +                                        pthread_rwlock_unlock(&dht->lock); +                                        kad_publish(dht, key, addr, t_expire); +                                        pthread_rwlock_wrlock(&dht->lock); +                                        free(key); +                                } +                        } +                } + +                /* Check the requests list for unresponsive nodes. */ +                list_for_each_safe(p, h, &dht->requests) { +                        struct kad_req * r; +                        r = list_entry(p, struct kad_req, next); +                        if (now.tv_sec > r->t_exp) { +                                list_del(&r->next); +                                bmp_release(dht->cookies, r->cookie); +                                dht_dead_peer(dht, r->key, r->addr); +                                kad_req_destroy(r); +                        } +                } + +                /* Refresh unaccessed buckets. */ +                bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + +                pthread_rwlock_unlock(&dht->lock); + +                list_for_each_safe(p, h, &reflist) { +                        struct contact * c; +                        c = list_entry(p, struct contact, next); +                        lu = kad_lookup(dht, c->id, KAD_FIND_NODE); +                        if (lu != NULL) +                                lookup_destroy(lu); +                        list_del(&c->next); +                        contact_destroy(c); +                } + +                sleep(intv); +        } + +        return (void *) 0; +} + +static int kad_handle_join_resp(struct dht *     dht, +                                struct kad_req * req, +                                kad_msg_t *      msg) +{ +        assert(dht); +        assert(req); +        assert(msg); + +        /* We might send version numbers later to warn of updates if needed. */ +        if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && +              msg->has_t_refresh && msg->has_t_replicate)) { +                log_warn("Join refused by remote."); +                return -1; +        } + +        if (msg->b < sizeof(uint64_t)) { +                log_err("Hash sizes less than 8 bytes unsupported."); +                return -1; +        } + +        pthread_rwlock_wrlock(&dht->lock); + +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        /* Likely corrupt packet. The member will refuse, we might here too. */ +        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) +                log_warn("Different kademlia parameters detected."); + +        if (msg->t_replicate != KAD_T_REPL) +                log_warn("Different kademlia replication time detected."); + +        if (msg->t_refresh != KAD_T_REFR) +                log_warn("Different kademlia refresh time detected."); + +        dht->k        = msg->k; +        dht->b        = msg->b; +        dht->t_expire = msg->t_expire; +        dht->t_repub  = MAX(1, dht->t_expire - 10); + +        if (pthread_create(&dht->worker, NULL, work, dht)) { +                bucket_destroy(dht->buckets); +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        kad_req_respond(req); + +        dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + +        pthread_rwlock_unlock(&dht->lock); + +        log_dbg("Enrollment of DHT completed."); + +        return 0; +} + +static int kad_handle_find_resp(struct dht *     dht, +                                struct kad_req * req, +                                kad_msg_t *      msg) +{ +        struct lookup * lu; + +        assert(dht); +        assert(req); +        assert(msg); + +        pthread_rwlock_rdlock(&dht->lock); + +        lu = dht_find_lookup(dht, req->cookie); +        if (lu == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -1; +        } + +        lookup_update(dht, lu, msg); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +static void kad_handle_response(struct dht * dht, +                                kad_msg_t *  msg) +{ +        struct kad_req * req; + +        assert(dht); +        assert(msg); + +        pthread_rwlock_wrlock(&dht->lock); + +        req = dht_find_request(dht, msg); +        if (req == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return; +        } + +        bmp_release(dht->cookies, req->cookie); +        list_del(&req->next); + +        pthread_rwlock_unlock(&dht->lock); + +        switch(req->code) { +        case KAD_JOIN: +                if (kad_handle_join_resp(dht, req, msg)) +                        log_err("Enrollment of DHT failed."); +                break; +        case KAD_FIND_VALUE: +        case KAD_FIND_NODE: +                if (dht_get_state(dht) != DHT_RUNNING) +                        break; +                kad_handle_find_resp(dht, req, msg); +                break; +        default: +                break; +        } + +        kad_req_destroy(req); +} + +int dht_bootstrap(struct dht * dht, +                  size_t       b, +                  time_t       t_expire) +{ +        assert(dht); + +        pthread_rwlock_wrlock(&dht->lock); + +        dht->id = create_id(b); +        if (dht->id == NULL) +                goto fail_id; + +        dht->buckets = bucket_create(); +        if (dht->buckets == NULL) +                goto fail_buckets; + +        dht->buckets->depth = 0; +        dht->buckets->mask  = 0; + +        dht->b        = b / CHAR_BIT; +        dht->t_expire = MAX(2, t_expire); +        dht->t_repub  = MAX(1, t_expire - 10); +        dht->k        = KAD_K; + +        if (pthread_create(&dht->worker, NULL, work, dht)) +                goto fail_pthread_create; + +        dht->state = DHT_RUNNING; + +        dht_update_bucket(dht, dht->id, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; + + fail_pthread_create: +        bucket_destroy(dht->buckets); +        dht->buckets = NULL; + fail_buckets: +        free(dht->id); +        dht->id = NULL; + fail_id: +        pthread_rwlock_unlock(&dht->lock); +        return -1; +} + +static struct ref_entry * ref_entry_get(struct dht *    dht, +                                        const uint8_t * key) +{ +        struct list_head * p; + +        list_for_each(p, &dht->refs) { +                struct ref_entry * r = list_entry(p, struct ref_entry, next); +                if (!memcmp(key, r->key, dht-> b) ) +                        return r; +        } + +        return NULL; +} + +int dht_reg(struct dht *    dht, +            const uint8_t * key) +{ +        struct ref_entry * e; +        uint64_t           addr; +        time_t             t_expire; + +        assert(dht); +        assert(key); +        assert(dht->addr != 0); + +        if (dht_wait_running(dht)) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        if (ref_entry_get(dht, key) != NULL) { +                log_dbg("Name already registered."); +                pthread_rwlock_unlock(&dht->lock); +                return 0; +        } + +        e = ref_entry_create(dht, key); +        if (e == NULL) { +                pthread_rwlock_unlock(&dht->lock); +                return -ENOMEM; +        } + +        list_add(&e->next, &dht->refs); + +        t_expire = dht->t_expire; +        addr = dht->addr; + +        pthread_rwlock_unlock(&dht->lock); + +        kad_publish(dht, key, addr, t_expire); + +        return 0; +} + +int dht_unreg(struct dht *    dht, +              const uint8_t * key) +{ +        struct list_head * p; +        struct list_head * h; + +        assert(dht); +        assert(key); + +        if (dht_get_state(dht) != DHT_RUNNING) +                return -1; + +        pthread_rwlock_wrlock(&dht->lock); + +        list_for_each_safe(p, h, &dht->refs) { +                struct ref_entry * r = list_entry(p, struct ref_entry, next); +                if (!memcmp(key, r->key, dht-> b) ) { +                        list_del(&r->next); +                        ref_entry_destroy(r); +                } +        } + +        dht_del(dht, key, dht->addr); + +        pthread_rwlock_unlock(&dht->lock); + +        return 0; +} + +uint64_t dht_query(struct dht *    dht, +                   const uint8_t * key) +{ +        struct dht_entry * e; +        struct lookup *    lu; +        uint64_t           addrs[KAD_K]; +        size_t             n; + +        addrs[0] = 0; + +        if (dht_wait_running(dht)) +                return 0; + +        pthread_rwlock_rdlock(&dht->lock); + +        e = dht_find_entry(dht, key); +        if (e != NULL) +                addrs[0] = dht_entry_get_addr(dht, e); + +        pthread_rwlock_unlock(&dht->lock); + +        if (addrs[0] != 0) +                return addrs[0]; + +        lu = kad_lookup(dht, key, KAD_FIND_VALUE); +        if (lu == NULL) +                return 0; + +        n = lookup_get_addrs(lu, addrs); +        if (n == 0) { +                lookup_destroy(lu); +                return 0; +        } + +        lookup_destroy(lu); + +        /* Current behaviour is anycast and return the first peer address. */ +        if (addrs[0] != dht->addr) +                return addrs[0]; + +        if (n > 1) +                return addrs[1]; + +        return 0; +} + +static void * dht_handle_packet(void * o) +{ +        struct dht * dht = (struct dht *) o; + +        assert(dht); + +        while (true) { +                kad_msg_t *          msg; +                kad_contact_msg_t ** cmsgs; +                kad_msg_t            resp_msg = KAD_MSG__INIT; +                uint64_t             addr; +                buffer_t             buf; +                size_t               i; +                size_t               b; +                size_t               t_expire; +                struct cmd *         cmd; + +                pthread_mutex_lock(&dht->mtx); + +                pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, +                                     &dht->mtx); + +                while (list_is_empty(&dht->cmds)) +                        pthread_cond_wait(&dht->cond, &dht->mtx); + +                cmd = list_last_entry(&dht->cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_cleanup_pop(true); + +                i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + +                msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); +#ifndef __DHT_TEST__ +                ipcp_sdb_release(cmd->sdb); +#endif +                free(cmd); + +                if (msg == NULL) { +                        log_err("Failed to unpack message."); +                        continue; +                } + +                if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_dbg("Got a request message when not running."); +                        continue; +                } + +                pthread_rwlock_rdlock(&dht->lock); + +                b        = dht->b; +                t_expire = dht->t_expire; + +                pthread_rwlock_unlock(&dht->lock); + +                if (msg->has_key && msg->key.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad key in message."); +                        continue; +                } + +                if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { +                        kad_msg__free_unpacked(msg, NULL); +                        log_warn("Bad source ID in message of type %d.", +                                 msg->code); +                        continue; +                } + +                tpm_dec(dht->tpm); + +                addr = msg->s_addr; + +                resp_msg.code   = KAD_RESPONSE; +                resp_msg.cookie = msg->cookie; + +                switch(msg->code) { +                case KAD_JOIN: +                        /* Refuse enrollee on check fails. */ +                        if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { +                                log_warn("Parameter mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } + +                        if (msg->t_replicate != KAD_T_REPL) { +                                log_warn("Replication time mismatch. " +                                         "DHT enrolment refused."); + +                                break; +                        } + +                        if (msg->t_refresh != KAD_T_REFR) { +                                log_warn("Refresh time mismatch. " +                                         "DHT enrolment refused."); +                                break; +                        } + +                        resp_msg.has_alpha       = true; +                        resp_msg.has_b           = true; +                        resp_msg.has_k           = true; +                        resp_msg.has_t_expire    = true; +                        resp_msg.has_t_refresh   = true; +                        resp_msg.has_t_replicate = true; +                        resp_msg.alpha           = KAD_ALPHA; +                        resp_msg.b               = b; +                        resp_msg.k               = KAD_K; +                        resp_msg.t_expire        = t_expire; +                        resp_msg.t_refresh       = KAD_T_REFR; +                        resp_msg.t_replicate     = KAD_T_REPL; +                        break; +                case KAD_FIND_VALUE: +                        buf = dht_retrieve(dht, msg->key.data); +                        if (buf.len != 0) { +                                resp_msg.n_addrs = buf.len; +                                resp_msg.addrs   = (uint64_t *) buf.data; +                                break; +                        } +                        /* FALLTHRU */ +                case KAD_FIND_NODE: +                        /* Return k closest contacts. */ +                        resp_msg.n_contacts = +                                dht_get_contacts(dht, msg->key.data, &cmsgs); +                        resp_msg.contacts = cmsgs; +                        break; +                case KAD_STORE: +                        if (msg->n_contacts < 1) { +                                log_warn("No contacts in store message."); +                                break; +                        } + +                        if (!msg->has_t_expire) { +                                log_warn("No expiry time in store message."); +                                break; +                        } + +                        kad_add(dht, *msg->contacts, msg->n_contacts, +                                msg->t_expire); +                        break; +                case KAD_RESPONSE: +                        kad_handle_response(dht, msg); +                        break; +                default: +                        assert(false); +                        break; +                } + +                if (msg->code != KAD_JOIN) { +                        pthread_rwlock_wrlock(&dht->lock); +                        if (dht_get_state(dht) == DHT_JOINING && +                            dht->buckets == NULL) { +                                pthread_rwlock_unlock(&dht->lock); +                                break; +                        } + +                        if (dht_update_bucket(dht, msg->s_id.data, addr)) +                                log_warn("Failed to update bucket."); +                        pthread_rwlock_unlock(&dht->lock); +                } + +                if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) +                                log_warn("Failed to send response."); + +                kad_msg__free_unpacked(msg, NULL); + +                if (resp_msg.n_addrs > 0) +                        free(resp_msg.addrs); + +                if (resp_msg.n_contacts == 0) { +                        tpm_inc(dht->tpm); +                        continue; +                } + +                for (i = 0; i < resp_msg.n_contacts; ++i) +                        kad_contact_msg__free_unpacked(resp_msg.contacts[i], +                                                       NULL); +                free(resp_msg.contacts); + +                tpm_inc(dht->tpm); +        } + +        return (void *) 0; +} + +static void dht_post_packet(void *               comp, +                            struct shm_du_buff * sdb) +{ +        struct cmd * cmd; +        struct dht * dht = (struct dht *) comp; + +        if (dht_get_state(dht) == DHT_SHUTDOWN) { +#ifndef __DHT_TEST__ +                ipcp_sdb_release(sdb); +#endif +                return; +        } + +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory."); +                return; +        } + +        cmd->sdb = sdb; + +        pthread_mutex_lock(&dht->mtx); + +        list_add(&cmd->next, &dht->cmds); + +        pthread_cond_signal(&dht->cond); + +        pthread_mutex_unlock(&dht->mtx); +} + +void dht_destroy(struct dht * dht) +{ +        struct list_head * p; +        struct list_head * h; + +        if (dht == NULL) +                return; + +#ifndef __DHT_TEST__ +        tpm_stop(dht->tpm); + +        tpm_destroy(dht->tpm); +#endif +        if (dht_get_state(dht) == DHT_RUNNING) { +                dht_set_state(dht, DHT_SHUTDOWN); +                pthread_cancel(dht->worker); +                pthread_join(dht->worker, NULL); +        } + +        pthread_rwlock_wrlock(&dht->lock); + +        list_for_each_safe(p, h, &dht->cmds) { +                struct cmd * c = list_entry(p, struct cmd, next); +                list_del(&c->next); +#ifndef __DHT_TEST__ +                ipcp_sdb_release(c->sdb); +#endif +                free(c); +        } + +        list_for_each_safe(p, h, &dht->entries) { +                struct dht_entry * e = list_entry(p, struct dht_entry, next); +                list_del(&e->next); +                dht_entry_destroy(e); +        } + +        list_for_each_safe(p, h, &dht->requests) { +                struct kad_req * r = list_entry(p, struct kad_req, next); +                list_del(&r->next); +                kad_req_destroy(r); +        } + +        list_for_each_safe(p, h, &dht->refs) { +                struct ref_entry * e = list_entry(p, struct ref_entry, next); +                list_del(&e->next); +                ref_entry_destroy(e); +        } + +        list_for_each_safe(p, h, &dht->lookups) { +                struct lookup * l = list_entry(p, struct lookup, next); +                list_del(&l->next); +                lookup_destroy(l); +        } + +        pthread_rwlock_unlock(&dht->lock); + +        if (dht->buckets != NULL) +                bucket_destroy(dht->buckets); + +        bmp_destroy(dht->cookies); + +        pthread_mutex_destroy(&dht->mtx); + +        pthread_rwlock_destroy(&dht->lock); + +        free(dht->id); + +        free(dht); +} + +static void * join_thr(void * o) +{ +        struct join_info * info = (struct join_info *) o; +        struct lookup *    lu; +        size_t             retr = 0; + +        assert(info); + +        while (kad_join(info->dht, info->addr)) { +                if (dht_get_state(info->dht) == DHT_SHUTDOWN) { +                        log_dbg("DHT enrollment aborted."); +                        goto finish; +                } + +                if (retr++ == KAD_JOIN_RETR) { +                        dht_set_state(info->dht, DHT_INIT); +                        log_warn("DHT enrollment attempt failed."); +                        goto finish; +                } + +                sleep(KAD_JOIN_INTV); +        } + +        dht_set_state(info->dht, DHT_RUNNING); + +        lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); +        if (lu != NULL) +                lookup_destroy(lu); + + finish: +        free(info); + +        return (void *) 0; +} + +static void handle_event(void *       self, +                         int          event, +                         const void * o) +{ +        struct dht * dht = (struct dht *) self; + +        if (event == NOTIFY_DT_CONN_ADD) { +                pthread_t          thr; +                struct join_info * inf; +                struct conn *      c     = (struct conn *) o; +                struct timespec    slack = {0, DHT_ENROLL_SLACK * MILLION}; + +                /* Give the pff some time to update for the new link. */ +                nanosleep(&slack, NULL); + +                switch(dht_get_state(dht)) { +                case DHT_INIT: +                        inf = malloc(sizeof(*inf)); +                        if (inf == NULL) +                                break; + +                        inf->dht  = dht; +                        inf->addr = c->conn_info.addr; + +                        if (dht_set_state(dht, DHT_JOINING) == 0 || +                            dht_wait_running(dht)) { +                                if (pthread_create(&thr, NULL, join_thr, inf)) { +                                        dht_set_state(dht, DHT_INIT); +                                        free(inf); +                                        return; +                                } +                                pthread_detach(thr); +                        } else { +                                free(inf); +                        } +                        break; +                case DHT_RUNNING: +                        /* +                         * FIXME: this lookup for effiency reasons +                         * causes a SEGV when stressed with rapid +                         * enrollments. +                         * lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); +                         * if (lu != NULL) +                         *         lookup_destroy(lu); +                         */ +                        break; +                default: +                        break; +                } +        } +} + +struct dht * dht_create(uint64_t addr) +{ +        struct dht * dht; + +        dht = malloc(sizeof(*dht)); +        if (dht == NULL) +                goto fail_malloc; + +        dht->buckets = NULL; + +        list_head_init(&dht->entries); +        list_head_init(&dht->requests); +        list_head_init(&dht->refs); +        list_head_init(&dht->lookups); +        list_head_init(&dht->cmds); + +        if (pthread_rwlock_init(&dht->lock, NULL)) +                goto fail_rwlock; + +        if (pthread_mutex_init(&dht->mtx, NULL)) +                goto fail_mutex; + +        if (pthread_cond_init(&dht->cond, NULL)) +                goto fail_cond; + +        dht->cookies = bmp_create(DHT_MAX_REQS, 1); +        if (dht->cookies == NULL) +                goto fail_bmp; + +        dht->b    = 0; +        dht->addr = addr; +        dht->id   = NULL; +#ifndef __DHT_TEST__ +        dht->tpm = tpm_create(2, 1, dht_handle_packet, dht); +        if (dht->tpm == NULL) +                goto fail_tpm_create; + +        if (tpm_start(dht->tpm)) +                goto fail_tpm_start; + +        dht->fd   = dt_reg_comp(dht, &dht_post_packet, DHT); +        notifier_reg(handle_event, dht); +#else +        (void) handle_event; +        (void) dht_handle_packet; +        (void) dht_post_packet; +#endif +        dht->state = DHT_INIT; + +        return dht; +#ifndef __DHT_TEST__ + fail_tpm_start: +        tpm_destroy(dht->tpm); + fail_tpm_create: +        bmp_destroy(dht->cookies); +#endif + fail_bmp: +        pthread_cond_destroy(&dht->cond); + fail_cond: +        pthread_mutex_destroy(&dht->mtx); + fail_mutex: +        pthread_rwlock_destroy(&dht->lock); + fail_rwlock: +        free(dht); + fail_malloc: +        return NULL; +} diff --git a/src/ipcpd/unicast/dht.h b/src/ipcpd/unicast/dht.h new file mode 100644 index 00000000..7d7601f5 --- /dev/null +++ b/src/ipcpd/unicast/dht.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Distributed Hash Table based on Kademlia + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DHT_H +#define OUROBOROS_IPCPD_UNICAST_DHT_H + +#include <ouroboros/ipcp-dev.h> + +#include <stdint.h> +#include <sys/types.h> + +struct dht; + +struct dht * dht_create(uint64_t addr); + +int          dht_bootstrap(struct dht * dht, +                           size_t       b, +                           time_t       t_expire); + +void         dht_destroy(struct dht * dht); + +int          dht_reg(struct dht *    dht, +                     const uint8_t * key); + +int          dht_unreg(struct dht *    dht, +                       const uint8_t * key); + +uint64_t     dht_query(struct dht *    dht, +                       const uint8_t * key); + +int          dht_wait_running(struct dht * dht); + +#endif /* OUROBOROS_IPCPD_UNICAST_DHT_H */ diff --git a/src/ipcpd/unicast/dir.c b/src/ipcpd/unicast/dir.c new file mode 100644 index 00000000..3b0446fa --- /dev/null +++ b/src/ipcpd/unicast/dir.c @@ -0,0 +1,101 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Directory + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "directory" + +#include <ouroboros/endian.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/utils.h> + +#include "dir.h" +#include "dht.h" +#include "ipcp.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> +#include <limits.h> + +#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) + +struct dht * dht; + +int dir_init(void) +{ +        dht = dht_create(ipcpi.dt_addr); +        if (dht == NULL) +                return -ENOMEM; + +        return 0; +} + +void dir_fini(void) +{ +        dht_destroy(dht); +} + +int dir_bootstrap(void) { +        log_dbg("Bootstrapping directory."); + +        /* TODO: get parameters for bootstrap from IRM tool. */ +        if (dht_bootstrap(dht, KAD_B, 86400)) { +                dht_destroy(dht); +                return -ENOMEM; +        } + +        log_info("Directory bootstrapped."); + +        return 0; +} + +int dir_reg(const uint8_t * hash) +{ +        return dht_reg(dht, hash); +} + +int dir_unreg(const uint8_t * hash) +{ +        return dht_unreg(dht, hash); +} + +uint64_t dir_query(const uint8_t * hash) +{ +        return dht_query(dht, hash); +} + +int dir_wait_running(void) +{ +        if (dht_wait_running(dht)) { +                log_warn("Directory did not bootstrap."); +                return -1; +        } + +        return 0; +} diff --git a/src/ipcpd/unicast/dir.h b/src/ipcpd/unicast/dir.h new file mode 100644 index 00000000..8aba8b1d --- /dev/null +++ b/src/ipcpd/unicast/dir.h @@ -0,0 +1,40 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Directory + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DIR_H +#define OUROBOROS_IPCPD_UNICAST_DIR_H + +int      dir_init(void); + +void     dir_fini(void); + +int      dir_bootstrap(void); + +int      dir_reg(const uint8_t * hash); + +int      dir_unreg(const uint8_t * hash); + +uint64_t dir_query(const uint8_t * hash); + +int      dir_wait_running(void); + +#endif /* OUROBOROS_IPCPD_UNICAST_DIR_H */ diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c new file mode 100644 index 00000000..2fd3c060 --- /dev/null +++ b/src/ipcpd/unicast/dt.c @@ -0,0 +1,913 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Data Transfer Component + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define DT               "dt" +#define OUROBOROS_PREFIX DT + +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/dev.h> +#include <ouroboros/notifier.h> +#include <ouroboros/rib.h> +#ifdef IPCP_FLOW_STATS +#include <ouroboros/fccntl.h> +#endif + +#include "connmgr.h" +#include "ipcp.h" +#include "dt.h" +#include "pff.h" +#include "routing.h" +#include "psched.h" +#include "comp.h" +#include "fa.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <inttypes.h> +#include <assert.h> + +#define QOS_BLOCK_LEN 672 +#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX) + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +struct comp_info { +        void (* post_packet)(void * comp, struct shm_du_buff * sdb); +        void * comp; +        char * name; +}; + +/* Fixed field lengths */ +#define TTL_LEN 1 +#define QOS_LEN 1 +#define ECN_LEN 1 + +struct dt_pci { +        uint64_t  dst_addr; +        qoscube_t qc; +        uint8_t   ttl; +        uint8_t   ecn; +        uint32_t  eid; +}; + +struct { +        uint8_t         addr_size; +        uint8_t         eid_size; +        size_t          head_size; + +        /* Offsets */ +        size_t          qc_o; +        size_t          ttl_o; +        size_t          ecn_o; +        size_t          eid_o; + +        /* Initial TTL value */ +        uint8_t         max_ttl; +} dt_pci_info; + +static int dt_pci_ser(struct shm_du_buff * sdb, +                      struct dt_pci *      dt_pci) +{ +        uint8_t * head; +        uint8_t   ttl = dt_pci_info.max_ttl; + +        assert(sdb); +        assert(dt_pci); + +        head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size); +        if (head == NULL) +                return -EPERM; + +        /* FIXME: Add check and operations for Big Endian machines. */ +        memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size); +        memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN); +        memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN); +        memcpy(head + dt_pci_info.ecn_o,  &dt_pci->ecn, ECN_LEN); +        memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size); + +        return 0; +} + +static void dt_pci_des(struct shm_du_buff * sdb, +                       struct dt_pci *      dt_pci) +{ +        uint8_t * head; + +        assert(sdb); +        assert(dt_pci); + +        head = shm_du_buff_head(sdb); + +        /* Decrease TTL */ +        --*(head + dt_pci_info.ttl_o); + +        /* FIXME: Add check and operations for Big Endian machines. */ +        memcpy(&dt_pci->dst_addr, head, dt_pci_info.addr_size); +        memcpy(&dt_pci->qc, head + dt_pci_info.qc_o, QOS_LEN); +        memcpy(&dt_pci->ttl, head + dt_pci_info.ttl_o, TTL_LEN); +        memcpy(&dt_pci->ecn, head + dt_pci_info.ecn_o, ECN_LEN); +        memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size); +} + +static void dt_pci_shrink(struct shm_du_buff * sdb) +{ +        assert(sdb); + +        shm_du_buff_head_release(sdb, dt_pci_info.head_size); +} + +struct { +        struct psched *    psched; + +        struct pff *       pff[QOS_CUBE_MAX]; +        struct routing_i * routing[QOS_CUBE_MAX]; +#ifdef IPCP_FLOW_STATS +        struct { +                time_t          stamp; +                uint64_t        addr; +                size_t          snd_pkt[QOS_CUBE_MAX]; +                size_t          rcv_pkt[QOS_CUBE_MAX]; +                size_t          snd_bytes[QOS_CUBE_MAX]; +                size_t          rcv_bytes[QOS_CUBE_MAX]; +                size_t          lcl_r_pkt[QOS_CUBE_MAX]; +                size_t          lcl_r_bytes[QOS_CUBE_MAX]; +                size_t          lcl_w_pkt[QOS_CUBE_MAX]; +                size_t          lcl_w_bytes[QOS_CUBE_MAX]; +                size_t          r_drp_pkt[QOS_CUBE_MAX]; +                size_t          r_drp_bytes[QOS_CUBE_MAX]; +                size_t          w_drp_pkt[QOS_CUBE_MAX]; +                size_t          w_drp_bytes[QOS_CUBE_MAX]; +                size_t          f_nhp_pkt[QOS_CUBE_MAX]; +                size_t          f_nhp_bytes[QOS_CUBE_MAX]; +                pthread_mutex_t lock; +        } stat[PROG_MAX_FLOWS]; + +        size_t             n_flows; +#endif +        struct bmp *       res_fds; +        struct comp_info   comps[PROG_RES_FDS]; +        pthread_rwlock_t   lock; + +        pthread_t          listener; +} dt; + +static int dt_stat_read(const char * path, +                        char *       buf, +                        size_t       len) +{ +#ifdef IPCP_FLOW_STATS +        int         fd; +        int         i; +        char        str[QOS_BLOCK_LEN + 1]; +        char        addrstr[20]; +        char        tmstr[20]; +        size_t      rxqlen = 0; +        size_t      txqlen = 0; +        struct tm * tm; + +        /* NOTE: we may need stronger checks. */ +        fd = atoi(path); + +        if (len < STAT_FILE_LEN) +                return 0; + +        buf[0] = '\0'; + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt.stat[fd].stamp == 0) { +                pthread_mutex_unlock(&dt.stat[fd].lock); +                return 0; +        } + +        if (dt.stat[fd].addr == ipcpi.dt_addr) +                sprintf(addrstr, "%s", dt.comps[fd].name); +        else +                sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr); + +        tm = localtime(&dt.stat[fd].stamp); +        strftime(tmstr, sizeof(tmstr), "%F %T", tm); + +        if (fd >= PROG_RES_FDS) { +                fccntl(fd, FLOWGRXQLEN, &rxqlen); +                fccntl(fd, FLOWGTXQLEN, &txqlen); +        } + +        sprintf(buf, +                "Flow established at:      %20s\n" +                "Endpoint address:         %20s\n" +                "Queued packets (rx):      %20zu\n" +                "Queued packets (tx):      %20zu\n\n", +                tmstr, addrstr, rxqlen, txqlen); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                sprintf(str, +                        "Qos cube %3d:\n" +                        " sent (packets):          %20zu\n" +                        " sent (bytes):            %20zu\n" +                        " rcvd (packets):          %20zu\n" +                        " rcvd (bytes):            %20zu\n" +                        " local sent (packets):    %20zu\n" +                        " local sent (bytes):      %20zu\n" +                        " local rcvd (packets):    %20zu\n" +                        " local rcvd (bytes):      %20zu\n" +                        " dropped ttl (packets):   %20zu\n" +                        " dropped ttl (bytes):     %20zu\n" +                        " failed writes (packets): %20zu\n" +                        " failed writes (bytes):   %20zu\n" +                        " failed nhop (packets):   %20zu\n" +                        " failed nhop (bytes):     %20zu\n", +                        i, +                        dt.stat[fd].snd_pkt[i], +                        dt.stat[fd].snd_bytes[i], +                        dt.stat[fd].rcv_pkt[i], +                        dt.stat[fd].rcv_bytes[i], +                        dt.stat[fd].lcl_w_pkt[i], +                        dt.stat[fd].lcl_w_bytes[i], +                        dt.stat[fd].lcl_r_pkt[i], +                        dt.stat[fd].lcl_r_bytes[i], +                        dt.stat[fd].r_drp_pkt[i], +                        dt.stat[fd].r_drp_bytes[i], +                        dt.stat[fd].w_drp_pkt[i], +                        dt.stat[fd].w_drp_bytes[i], +                        dt.stat[fd].f_nhp_pkt[i], +                        dt.stat[fd].f_nhp_bytes[i] +                        ); +                strcat(buf, str); +        } + +        pthread_mutex_unlock(&dt.stat[fd].lock); + +        return STAT_FILE_LEN; +#else +        (void) path; +        (void) buf; +        (void) len; +        return 0; +#endif +} + +static int dt_stat_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS +        char   entry[RIB_PATH_LEN + 1]; +        size_t i; +        int    idx = 0; + +        pthread_rwlock_rdlock(&dt.lock); + +        if (dt.n_flows < 1) { +                pthread_rwlock_unlock(&dt.lock); +                return 0; +        } + +        *buf = malloc(sizeof(**buf) * dt.n_flows); +        if (*buf == NULL) { +                pthread_rwlock_unlock(&dt.lock); +                return -ENOMEM; +        } + +        for (i = 0; i < PROG_MAX_FLOWS; ++i) { +                pthread_mutex_lock(&dt.stat[i].lock); + +                if (dt.stat[i].stamp == 0) { +                        pthread_mutex_unlock(&dt.stat[i].lock); +                        /* Optimization: skip unused res_fds. */ +                        if (i < PROG_RES_FDS) +                                i = PROG_RES_FDS; +                        continue; +                } + +                sprintf(entry, "%zu", i); + +                (*buf)[idx] = malloc(strlen(entry) + 1); +                if ((*buf)[idx] == NULL) { +                        while (idx-- > 0) +                                free((*buf)[idx]); +                        free(buf); +                        pthread_mutex_unlock(&dt.stat[i].lock); +                        pthread_rwlock_unlock(&dt.lock); +                        return -ENOMEM; +                } + +                strcpy((*buf)[idx++], entry); + +                pthread_mutex_unlock(&dt.stat[i].lock); +        } + +        pthread_rwlock_unlock(&dt.lock); + +        assert((size_t) idx == dt.n_flows); + +        return idx; +#else +        (void) buf; +        return 0; +#endif +} + +static int dt_stat_getattr(const char *  path, +                           struct stat * st) +{ +#ifdef IPCP_FLOW_STATS +        int fd; + +        fd = atoi(path); + +        st->st_mode  = S_IFREG | 0755; +        st->st_nlink = 1; +        st->st_uid   = getuid(); +        st->st_gid   = getgid(); + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt.stat[fd].stamp != -1) { +                st->st_size  = STAT_FILE_LEN; +                st->st_mtime = dt.stat[fd].stamp; +        } else { +                st->st_size  = 0; +                st->st_mtime = 0; +        } + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#else +        (void) path; +        (void) st; +#endif +        return 0; +} + +static struct rib_ops r_ops = { +        .read    = dt_stat_read, +        .readdir = dt_stat_readdir, +        .getattr = dt_stat_getattr +}; + +#ifdef IPCP_FLOW_STATS + +static void stat_used(int      fd, +                      uint64_t addr) +{ +        struct timespec now; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        memset(&dt.stat[fd], 0, sizeof(dt.stat[fd])); + +        dt.stat[fd].stamp = (addr != INVALID_ADDR) ? now.tv_sec : 0; +        dt.stat[fd].addr = addr; + +        pthread_mutex_unlock(&dt.stat[fd].lock); + +        pthread_rwlock_wrlock(&dt.lock); + +        (addr != INVALID_ADDR) ? ++dt.n_flows : --dt.n_flows; + +        pthread_rwlock_unlock(&dt.lock); +} +#endif + +static void handle_event(void *       self, +                         int          event, +                         const void * o) +{ +        struct conn * c; + +        (void) self; + +        c = (struct conn *) o; + +        switch (event) { +        case NOTIFY_DT_CONN_ADD: +#ifdef IPCP_FLOW_STATS +                stat_used(c->flow_info.fd, c->conn_info.addr); +#endif +                psched_add(dt.psched, c->flow_info.fd); +                log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); +                break; +        case NOTIFY_DT_CONN_DEL: +#ifdef IPCP_FLOW_STATS +                stat_used(c->flow_info.fd, INVALID_ADDR); +#endif +                psched_del(dt.psched, c->flow_info.fd); +                log_dbg("Removed fd %d from " +                        "packet scheduler.", c->flow_info.fd); +                break; +        default: +                break; +        } +} + +static void packet_handler(int                  fd, +                           qoscube_t            qc, +                           struct shm_du_buff * sdb) +{ +        struct dt_pci dt_pci; +        int           ret; +        int           ofd; +#ifdef IPCP_FLOW_STATS +        size_t        len; +#else +        (void)        fd; +#endif + +#ifdef IPCP_FLOW_STATS +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif +        memset(&dt_pci, 0, sizeof(dt_pci)); +        dt_pci_des(sdb, &dt_pci); +        if (dt_pci.dst_addr != ipcpi.dt_addr) { +                if (dt_pci.ttl == 0) { +                        log_dbg("TTL was zero."); +                        ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; +                        ++dt.stat[fd].r_drp_pkt[qc]; +                        dt.stat[fd].r_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif +                        return; +                } + +                /* FIXME: Use qoscube from PCI instead of incoming flow. */ +                ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); +                if (ofd < 0) { +                        log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr); +                        ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; +                        ++dt.stat[fd].f_nhp_pkt[qc]; +                        dt.stat[fd].f_nhp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif +                        return; +                } + +                ret = ipcp_flow_write(ofd, sdb); +                if (ret < 0) { +                        log_dbg("Failed to write packet to fd %d.", ofd); +                        if (ret == -EFLOWDOWN) +                                notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); +                        ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +                        pthread_mutex_lock(&dt.stat[ofd].lock); + +                        ++dt.stat[ofd].w_drp_pkt[qc]; +                        dt.stat[ofd].w_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif +                        return; +                } +#ifdef IPCP_FLOW_STATS +                pthread_mutex_lock(&dt.stat[fd].lock); + +                ++dt.stat[fd].rcv_pkt[qc]; +                dt.stat[fd].rcv_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[fd].lock); +                pthread_mutex_lock(&dt.stat[ofd].lock); + +                ++dt.stat[ofd].snd_pkt[qc]; +                dt.stat[ofd].snd_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif +        } else { +                dt_pci_shrink(sdb); +                if (dt_pci.eid >= PROG_RES_FDS) { +                        if (ipcp_flow_write(dt_pci.eid, sdb)) { +                                ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                                pthread_mutex_lock(&dt.stat[fd].lock); +                                ++dt.stat[fd].rcv_pkt[qc]; +                                dt.stat[fd].rcv_bytes[qc] += len; +                                pthread_mutex_unlock(&dt.stat[fd].lock); + +                                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); +                                ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; +                                dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; +                                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif + +                        } +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                        ++dt.stat[dt_pci.eid].rcv_pkt[qc]; +                        dt.stat[dt_pci.eid].rcv_bytes[qc] += len; +                        ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; +                        dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif +                        return; +                } + +                if (dt.comps[dt_pci.eid].post_packet == NULL) { +                        log_err("No registered component on eid %d.", +                                dt_pci.eid); +                        ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                        ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; +                        dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif +                        return; +                } +#ifdef IPCP_FLOW_STATS +                pthread_mutex_lock(&dt.stat[fd].lock); + +                ++dt.stat[fd].rcv_pkt[qc]; +                dt.stat[fd].rcv_bytes[qc] += len; +                ++dt.stat[fd].lcl_r_pkt[qc]; +                dt.stat[fd].lcl_r_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[fd].lock); +                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                ++dt.stat[dt_pci.eid].snd_pkt[qc]; +                dt.stat[dt_pci.eid].snd_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif +                dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, +                                                 sdb); +        } +} + +static void * dt_conn_handle(void * o) +{ +        struct conn conn; + +        (void) o; + +        while (true) { +                if (connmgr_wait(COMPID_DT, &conn)) { +                        log_err("Failed to get next DT connection."); +                        continue; +                } + +                /* NOTE: connection acceptance policy could be here. */ + +                notifier_event(NOTIFY_DT_CONN_ADD, &conn); +        } + +        return 0; +} + +int dt_init(enum pol_routing pr, +            enum pol_pff     pp, +            uint8_t          addr_size, +            uint8_t          eid_size, +            uint8_t          max_ttl) +{ +        int              i; +        int              j; +        char             dtstr[256]; +        struct conn_info info; + +        memset(&info, 0, sizeof(info)); + +        strcpy(info.comp_name, DT_COMP); +        strcpy(info.protocol, DT_PROTO); +        info.pref_version = 1; +        info.pref_syntax  = PROTO_FIXED; +        info.addr         = ipcpi.dt_addr; + +        dt_pci_info.addr_size = addr_size; +        dt_pci_info.eid_size  = eid_size; +        dt_pci_info.max_ttl   = max_ttl; + +        dt_pci_info.qc_o      = dt_pci_info.addr_size; +        dt_pci_info.ttl_o     = dt_pci_info.qc_o + QOS_LEN; +        dt_pci_info.ecn_o     = dt_pci_info.ttl_o + TTL_LEN; +        dt_pci_info.eid_o     = dt_pci_info.ecn_o + ECN_LEN; +        dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size; + +        if (notifier_reg(handle_event, NULL)) { +                log_err("Failed to register with notifier."); +                goto fail_notifier_reg; +        } + +        if (connmgr_comp_init(COMPID_DT, &info)) { +                log_err("Failed to register with connmgr."); +                goto fail_connmgr_comp_init; +        } + +        if (routing_init(pr)) { +                log_err("Failed to init routing."); +                goto fail_routing; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.pff[i] = pff_create(pp); +                if (dt.pff[i] == NULL) { +                        log_err("Failed to create a PFF."); +                        for (j = 0; j < i; ++j) +                                pff_destroy(dt.pff[j]); +                        goto fail_pff; +                } +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.routing[i] = routing_i_create(dt.pff[i]); +                if (dt.routing[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                routing_i_destroy(dt.routing[j]); +                        goto fail_routing_i; +                } +        } + +        if (pthread_rwlock_init(&dt.lock, NULL)) { +                log_err("Failed to init rwlock."); +                goto fail_rwlock_init; +        } + +        dt.res_fds = bmp_create(PROG_RES_FDS, 0); +        if (dt.res_fds == NULL) +                goto fail_res_fds; +#ifdef IPCP_FLOW_STATS +        memset(dt.stat, 0, sizeof(dt.stat)); + +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                if (pthread_mutex_init(&dt.stat[i].lock, NULL)) { +                        for (j = 0; j < i; ++j) +                                pthread_mutex_destroy(&dt.stat[j].lock); +                        goto fail_stat_lock; +                } + +        dt.n_flows = 0; +#endif +        sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr); +        if (rib_reg(dtstr, &r_ops)) +                goto fail_rib_reg; + +        return 0; + + fail_rib_reg: +#ifdef IPCP_FLOW_STATS +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                pthread_mutex_destroy(&dt.stat[i].lock); + fail_stat_lock: +#endif +        bmp_destroy(dt.res_fds); + fail_res_fds: +        pthread_rwlock_destroy(&dt.lock); + fail_rwlock_init: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                routing_i_destroy(dt.routing[j]); + fail_routing_i: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                pff_destroy(dt.pff[i]); + fail_pff: +        routing_fini(); + fail_routing: +        connmgr_comp_fini(COMPID_DT); + fail_connmgr_comp_init: +        notifier_unreg(&handle_event); + fail_notifier_reg: +        return -1; +} + +void dt_fini(void) +{ +        int i; + +        rib_unreg(DT); +#ifdef IPCP_FLOW_STATS +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                pthread_mutex_destroy(&dt.stat[i].lock); +#endif +        bmp_destroy(dt.res_fds); + +        pthread_rwlock_destroy(&dt.lock); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                routing_i_destroy(dt.routing[i]); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                pff_destroy(dt.pff[i]); + +        routing_fini(); + +        connmgr_comp_fini(COMPID_DT); + +        notifier_unreg(&handle_event); +} + +int dt_start(void) +{ +        dt.psched = psched_create(packet_handler); +        if (dt.psched == NULL) { +                log_err("Failed to create N-1 packet scheduler."); +                return -1; +        } + +        if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { +                log_err("Failed to create listener thread."); +                psched_destroy(dt.psched); +                return -1; +        } + +        return 0; +} + +void dt_stop(void) +{ +        pthread_cancel(dt.listener); +        pthread_join(dt.listener, NULL); +        psched_destroy(dt.psched); +} + +int dt_reg_comp(void * comp, +                void (* func)(void * func, struct shm_du_buff *), +                char * name) +{ +        int res_fd; + +        assert(func); + +        pthread_rwlock_wrlock(&dt.lock); + +        res_fd = bmp_allocate(dt.res_fds); +        if (!bmp_is_id_valid(dt.res_fds, res_fd)) { +                log_warn("Reserved fds depleted."); +                pthread_rwlock_unlock(&dt.lock); +                return -EBADF; +        } + +        assert(dt.comps[res_fd].post_packet == NULL); +        assert(dt.comps[res_fd].comp == NULL); +        assert(dt.comps[res_fd].name == NULL); + +        dt.comps[res_fd].post_packet = func; +        dt.comps[res_fd].comp     = comp; +        dt.comps[res_fd].name     = name; + +        pthread_rwlock_unlock(&dt.lock); +#ifdef IPCP_FLOW_STATS +        stat_used(res_fd, ipcpi.dt_addr); +#endif +        return res_fd; +} + +int dt_write_packet(uint64_t             dst_addr, +                    qoscube_t            qc, +                    int                  np1_fd, +                    struct shm_du_buff * sdb) +{ +        int           fd; +        struct dt_pci dt_pci; +        int           ret; +#ifdef IPCP_FLOW_STATS +        size_t        len; +#endif +        assert(sdb); +        assert(dst_addr != ipcpi.dt_addr); + +        fd = pff_nhop(dt.pff[qc], dst_addr); +        if (fd < 0) { +                log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); +#ifdef IPCP_FLOW_STATS +                len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + +                pthread_mutex_lock(&dt.stat[np1_fd].lock); + +                ++dt.stat[np1_fd].lcl_r_pkt[qc]; +                dt.stat[np1_fd].lcl_r_bytes[qc] += len; +                ++dt.stat[np1_fd].f_nhp_pkt[qc]; +                dt.stat[np1_fd].f_nhp_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[np1_fd].lock); +#endif +                return -1; +        } + +        dt_pci.dst_addr = dst_addr; +        dt_pci.qc       = qc; +        dt_pci.eid      = np1_fd; +        dt_pci.ecn      = 0; + +        if (dt_pci_ser(sdb, &dt_pci)) { +                log_dbg("Failed to serialize PDU."); +#ifdef IPCP_FLOW_STATS +                len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif +                goto fail_write; +        } +#ifdef IPCP_FLOW_STATS +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif +        ret = ipcp_flow_write(fd, sdb); +        if (ret < 0) { +                log_dbg("Failed to write packet to fd %d.", fd); +                if (ret == -EFLOWDOWN) +                        notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); +                goto fail_write; +        } +#ifdef IPCP_FLOW_STATS +        pthread_mutex_lock(&dt.stat[np1_fd].lock); + +        ++dt.stat[np1_fd].lcl_r_pkt[qc]; +        dt.stat[np1_fd].lcl_r_bytes[qc] += len; + +        pthread_mutex_unlock(&dt.stat[np1_fd].lock); +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt_pci.eid < PROG_RES_FDS) { +                ++dt.stat[fd].lcl_w_pkt[qc]; +                dt.stat[fd].lcl_w_bytes[qc] += len; +        } +        ++dt.stat[fd].snd_pkt[qc]; +        dt.stat[fd].snd_bytes[qc] += len; + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif +        return 0; + + fail_write: +#ifdef IPCP_FLOW_STATS +        pthread_mutex_lock(&dt.stat[np1_fd].lock); + +        ++dt.stat[np1_fd].lcl_w_pkt[qc]; +        dt.stat[np1_fd].lcl_w_bytes[qc] += len; + +        pthread_mutex_unlock(&dt.stat[np1_fd].lock); +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt_pci.eid < PROG_RES_FDS) { +                ++dt.stat[fd].lcl_w_pkt[qc]; +                dt.stat[fd].lcl_w_bytes[qc] += len; +        } +        ++dt.stat[fd].w_drp_pkt[qc]; +        dt.stat[fd].w_drp_bytes[qc] += len; + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif +        return -1; +} diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h new file mode 100644 index 00000000..5f2ee1a5 --- /dev/null +++ b/src/ipcpd/unicast/dt.h @@ -0,0 +1,56 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Data Transfer component + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_DT_H +#define OUROBOROS_IPCPD_UNICAST_DT_H + +#include <ouroboros/ipcp.h> +#include <ouroboros/qoscube.h> +#include <ouroboros/shm_rdrbuff.h> + +#define DT_COMP      "Data Transfer" +#define DT_PROTO     "dtp" +#define INVALID_ADDR 0 + +int  dt_init(enum pol_routing pr, +             enum pol_pff     pp, +             uint8_t          addr_size, +             uint8_t          eid_size, +             uint8_t          max_ttl +); + +void dt_fini(void); + +int  dt_start(void); + +void dt_stop(void); + +int  dt_reg_comp(void * comp, +                 void (* func)(void * comp, struct shm_du_buff * sdb), +                 char * name); + +int  dt_write_packet(uint64_t             dst_addr, +                     qoscube_t            qc, +                     int                  res_fd, +                     struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCPD_UNICAST_DT_H */ diff --git a/src/ipcpd/unicast/enroll.c b/src/ipcpd/unicast/enroll.c new file mode 100644 index 00000000..b572f89d --- /dev/null +++ b/src/ipcpd/unicast/enroll.c @@ -0,0 +1,382 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Enrollment Task + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 199309L +#endif + +#define OUROBOROS_PREFIX "enrollment" + +#include <ouroboros/endian.h> +#include <ouroboros/errno.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/dev.h> +#include <ouroboros/logs.h> +#include <ouroboros/errno.h> +#include <ouroboros/sockets.h> + +#include "connmgr.h" +#include "enroll.h" +#include "ipcp.h" + +#include <assert.h> +#include <stdlib.h> +#include <string.h> +#include <pthread.h> + +#include "ipcp_config.pb-c.h" +typedef EnrollMsg enroll_msg_t; + +#define ENROLL_COMP             "Enrollment" +#define ENROLL_PROTO            "OEP" /* Ouroboros enrollment protocol */ +#define ENROLL_WARN_TIME_OFFSET 20 +#define ENROLL_BUF_LEN          1024 + +enum enroll_state { +        ENROLL_NULL = 0, +        ENROLL_INIT, +        ENROLL_RUNNING +}; + +struct { +        struct ipcp_config conf; +        enum enroll_state  state; +        pthread_t          listener; +} enroll; + +static int send_rcv_enroll_msg(int fd) +{ +        enroll_msg_t    req = ENROLL_MSG__INIT; +        enroll_msg_t *  reply; +        uint8_t         buf[ENROLL_BUF_LEN]; +        ssize_t         len; +        ssize_t         delta_t; +        struct timespec t0; +        struct timespec rtt; + +        req.code = ENROLL_CODE__ENROLL_REQ; + +        len = enroll_msg__get_packed_size(&req); +        if (len < 0) { +                log_dbg("Failed pack request message."); +                return -1; +        } + +        enroll_msg__pack(&req, buf); + +        clock_gettime(CLOCK_REALTIME, &t0); + +        if (flow_write(fd, buf, len)) { +                log_dbg("Failed to send request message."); +                return -1; +        } + +        len = flow_read(fd, buf, ENROLL_BUF_LEN); +        if (len < 0) { +                log_dbg("No enrollment reply received."); +                return -1; +        } + +        log_dbg("Received enrollment info (%zd bytes).", len); + +        reply = enroll_msg__unpack(NULL, len, buf); +        if (reply == NULL) { +                log_dbg("No enrollment response."); +                return -1; +        } + +        if (reply->code != ENROLL_CODE__ENROLL_BOOT) { +                log_dbg("Failed to unpack enrollment response."); +                enroll_msg__free_unpacked(reply, NULL); +                return -1; +        } + +        if (!(reply->has_t_sec && reply->has_t_nsec)) { +                log_dbg("No time in response message."); +                enroll_msg__free_unpacked(reply, NULL); +                return -1; +        } + +        clock_gettime(CLOCK_REALTIME, &rtt); + +        delta_t = ts_diff_ms(&t0, &rtt); + +        rtt.tv_sec  = reply->t_sec; +        rtt.tv_nsec = reply->t_nsec; + +        if (labs(ts_diff_ms(&t0, &rtt)) - delta_t > ENROLL_WARN_TIME_OFFSET) +                log_warn("Clock offset above threshold."); + +        strcpy(enroll.conf.layer_info.layer_name, +               reply->conf->layer_info->layer_name); +        enroll.conf.type           = reply->conf->ipcp_type; +        enroll.conf.addr_size      = reply->conf->addr_size; +        enroll.conf.eid_size       = reply->conf->eid_size; +        enroll.conf.max_ttl        = reply->conf->max_ttl; +        enroll.conf.addr_auth_type = reply->conf->addr_auth_type; +        enroll.conf.routing_type   = reply->conf->routing_type; +        enroll.conf.pff_type       = reply->conf->pff_type; +        enroll.conf.layer_info.dir_hash_algo +                = reply->conf->layer_info->dir_hash_algo; + +        enroll_msg__free_unpacked(reply, NULL); + +        return 0; +} + +static ssize_t enroll_pack(uint8_t ** buf) +{ +        enroll_msg_t      msg        = ENROLL_MSG__INIT; +        ipcp_config_msg_t config     = IPCP_CONFIG_MSG__INIT; +        layer_info_msg_t  layer_info = LAYER_INFO_MSG__INIT; +        struct timespec   now; +        ssize_t           len; + +        clock_gettime(CLOCK_REALTIME, &now); + +        msg.code       = ENROLL_CODE__ENROLL_BOOT; +        msg.has_t_sec  = true; +        msg.t_sec      = now.tv_sec; +        msg.has_t_nsec = true; +        msg.t_nsec     = now.tv_nsec; +        msg.conf       = &config; + +        config.ipcp_type          = enroll.conf.type; +        config.has_addr_size      = true; +        config.addr_size          = enroll.conf.addr_size; +        config.has_eid_size       = true; +        config.eid_size           = enroll.conf.eid_size; +        config.has_max_ttl        = true; +        config.max_ttl            = enroll.conf.max_ttl; +        config.has_addr_auth_type = true; +        config.addr_auth_type     = enroll.conf.addr_auth_type; +        config.has_routing_type   = true; +        config.routing_type       = enroll.conf.routing_type; +        config.has_pff_type       = true; +        config.pff_type           = enroll.conf.pff_type; +        config.layer_info         = &layer_info; + +        layer_info.layer_name     = (char *) enroll.conf.layer_info.layer_name; +        layer_info.dir_hash_algo  = enroll.conf.layer_info.dir_hash_algo; + +        len = enroll_msg__get_packed_size(&msg); + +        *buf = malloc(len); +        if (*buf == NULL) +                return -1; + +        enroll_msg__pack(&msg, *buf); + +        return len; +} + +static void * enroll_handle(void * o) +{ +        struct conn    conn; +        uint8_t        buf[ENROLL_BUF_LEN]; +        uint8_t *      reply; +        ssize_t        len; +        enroll_msg_t * msg; + +        (void) o; + +        while (true) { +                if (connmgr_wait(COMPID_ENROLL, &conn)) { +                        log_err("Failed to get next connection."); +                        continue; +                } + +                len = flow_read(conn.flow_info.fd, buf, ENROLL_BUF_LEN); +                if (len < 0) { +                        log_err("Failed to read from flow."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        continue; +                } + +                msg = enroll_msg__unpack(NULL, len, buf); +                if (msg == NULL) { +                        log_err("Failed to unpack message."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        continue; +                } + +                if (msg->code != ENROLL_CODE__ENROLL_REQ) { +                        log_err("Wrong message type."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        enroll_msg__free_unpacked(msg, NULL); +                        continue; +                } + +                log_dbg("Enrolling a new neighbor."); + +                enroll_msg__free_unpacked(msg, NULL); + +                len = enroll_pack(&reply); +                if (reply == NULL) { +                        log_err("Failed to pack enrollment message."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        continue; +                } + +                log_dbg("Sending enrollment info (%zd bytes).", len); + +                if (flow_write(conn.flow_info.fd, reply, len)) { +                        log_err("Failed respond to enrollment request."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        free(reply); +                        continue; +                } + +                free(reply); + +                len = flow_read(conn.flow_info.fd, buf, ENROLL_BUF_LEN); +                if (len < 0) { +                        log_err("Failed to read from flow."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        continue; +                } + +                msg = enroll_msg__unpack(NULL, len, buf); +                if (msg == NULL) { +                        log_err("Failed to unpack message."); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        continue; +                } + +                if (msg->code != ENROLL_CODE__ENROLL_DONE || !msg->has_result) { +                        log_err("Wrong message type."); +                        enroll_msg__free_unpacked(msg, NULL); +                        connmgr_dealloc(COMPID_ENROLL, &conn); +                        continue; +                } + +                if (msg->result == 0) +                        log_dbg("Neighbor enrollment successful."); +                else +                        log_dbg("Neigbor reported failed enrollment."); + +                enroll_msg__free_unpacked(msg, NULL); + +                connmgr_dealloc(COMPID_ENROLL, &conn); +        } + +        return 0; +} + +int enroll_boot(struct conn * conn) +{ +        log_dbg("Getting boot information."); + +        if (send_rcv_enroll_msg(conn->flow_info.fd)) { +                log_err("Failed to enroll."); +                return -1; +        } + +        return 0; +} + +int enroll_done(struct conn * conn, +                int           result) +{ +        enroll_msg_t msg = ENROLL_MSG__INIT; +        uint8_t      buf[ENROLL_BUF_LEN]; +        ssize_t       len; + +        msg.code       = ENROLL_CODE__ENROLL_DONE; +        msg.has_result = true; +        msg.result     = result; + +        len = enroll_msg__get_packed_size(&msg); +        if (len < 0) { +                log_dbg("Failed pack request message."); +                return -1; +        } + +        enroll_msg__pack(&msg, buf); + +        if (flow_write(conn->flow_info.fd, buf, len)) { +                log_dbg("Failed to send acknowledgment."); +                return -1; +        } + +        return 0; +} + +void enroll_bootstrap(const struct ipcp_config * conf) +{ +        assert(conf); + +        memcpy(&enroll.conf, conf, sizeof(enroll.conf)); +} + +struct ipcp_config * enroll_get_conf(void) +{ +        return &enroll.conf; +} + +int enroll_init(void) +{ +        struct conn_info info; + +        memset(&info, 0, sizeof(info)); + +        strcpy(info.comp_name, ENROLL_COMP); +        strcpy(info.protocol, ENROLL_PROTO); +        info.pref_version = 1; +        info.pref_syntax  = PROTO_GPB; +        info.addr         = 0; + +        if (connmgr_comp_init(COMPID_ENROLL, &info)) { +                log_err("Failed to register with connmgr."); +                return -1; +        } + +        enroll.state = ENROLL_INIT; + +        return 0; +} + +void enroll_fini(void) +{ +        if (enroll.state == ENROLL_RUNNING) +                pthread_join(enroll.listener, NULL); + +        connmgr_comp_fini(COMPID_ENROLL); +} + +int enroll_start(void) +{ +        if (pthread_create(&enroll.listener, NULL, enroll_handle, NULL)) +                return -1; + +        enroll.state = ENROLL_RUNNING; + +        return 0; +} + +void enroll_stop(void) +{ +        if (enroll.state == ENROLL_RUNNING) +                pthread_cancel(enroll.listener); +} diff --git a/src/ipcpd/unicast/enroll.h b/src/ipcpd/unicast/enroll.h new file mode 100644 index 00000000..0a1ce5fc --- /dev/null +++ b/src/ipcpd/unicast/enroll.h @@ -0,0 +1,47 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Enrollment Task + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ENROLL_H +#define OUROBOROS_IPCPD_UNICAST_ENROLL_H + +#include <ouroboros/ipcp.h> + +#include "comp.h" + +int                  enroll_init(void); + +void                 enroll_fini(void); + +int                  enroll_start(void); + +void                 enroll_stop(void); + +void                 enroll_bootstrap(const struct ipcp_config * conf); + +int                  enroll_boot(struct conn * conn); + +int                  enroll_done(struct conn * conn, +                                 int           result); + +struct ipcp_config * enroll_get_conf(void); + +#endif /* OUROBOROS_IPCPD_UNICAST_ENROLL_H */ diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c new file mode 100644 index 00000000..fbcbc6fa --- /dev/null +++ b/src/ipcpd/unicast/fa.c @@ -0,0 +1,491 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Flow allocator of the IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define FA               "flow-allocator" +#define OUROBOROS_PREFIX FA + +#include <ouroboros/logs.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> + +#include "dir.h" +#include "fa.h" +#include "psched.h" +#include "ipcp.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#define TIMEOUT 10000 /* nanoseconds */ + +#define FLOW_REQ   0 +#define FLOW_REPLY 1 + +struct fa_msg { +        uint64_t s_addr; +        uint32_t r_eid; +        uint32_t s_eid; +        uint8_t  code; +        int8_t   response; +        /* QoS parameters from spec, aligned */ +        uint8_t  availability; +        uint8_t  in_order; +        uint32_t delay; +        uint64_t bandwidth; +        uint32_t loss; +        uint32_t ber; +        uint32_t max_gap; +} __attribute__((packed)); + +struct cmd { +        struct list_head     next; +        struct shm_du_buff * sdb; +}; + +struct { +        pthread_rwlock_t flows_lock; +        int              r_eid[PROG_MAX_FLOWS]; +        uint64_t         r_addr[PROG_MAX_FLOWS]; +        int              fd; + +        struct list_head cmds; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; +        pthread_t        worker; + +        struct psched *  psched; +} fa; + +static void packet_handler(int                  fd, +                           qoscube_t            qc, +                           struct shm_du_buff * sdb) +{ +        pthread_rwlock_rdlock(&fa.flows_lock); + +        if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { +                pthread_rwlock_unlock(&fa.flows_lock); +                ipcp_sdb_release(sdb); +                log_warn("Failed to forward packet."); +                return; +        } + +        pthread_rwlock_unlock(&fa.flows_lock); +} + +static void destroy_conn(int fd) +{ +        fa.r_eid[fd]   = -1; +        fa.r_addr[fd] = INVALID_ADDR; +} + +static void fa_post_packet(void *               comp, +                           struct shm_du_buff * sdb) +{ +        struct cmd * cmd; + +        assert(comp == &fa); + +        (void) comp; + +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory."); +                ipcp_sdb_release(sdb); +                return; +        } + +        cmd->sdb = sdb; + +        pthread_mutex_lock(&fa.mtx); + +        list_add(&cmd->next, &fa.cmds); + +        pthread_cond_signal(&fa.cond); + +        pthread_mutex_unlock(&fa.mtx); +} + +static void * fa_handle_packet(void * o) +{ +        struct timespec ts  = {0, TIMEOUT * 1000}; + +        (void) o; + +        while (true) { +                struct timespec abstime; +                int             fd; +                uint8_t *       buf; +                struct fa_msg * msg; +                qosspec_t       qs; +                struct cmd *    cmd; + +                pthread_mutex_lock(&fa.mtx); + +                pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                                     &fa.mtx); + +                while (list_is_empty(&fa.cmds)) +                        pthread_cond_wait(&fa.cond, &fa.mtx); + +                cmd = list_last_entry(&fa.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_cleanup_pop(true); + +                buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); +                if (buf == NULL) { +                        log_err("Failed to allocate memory."); +                        ipcp_sdb_release(cmd->sdb); +                        free(cmd); +                        continue; +                } + +                msg = (struct fa_msg *) buf; + +                /* Depending on the message call the function in ipcp-dev.h */ + +                assert(sizeof(*msg) + ipcp_dir_hash_len() >= +                       (unsigned long int) (shm_du_buff_tail(cmd->sdb) - +                                            shm_du_buff_head(cmd->sdb))); + +                memcpy(msg, shm_du_buff_head(cmd->sdb), +                       shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb)); + +                ipcp_sdb_release(cmd->sdb); + +                free(cmd); + +                switch (msg->code) { +                case FLOW_REQ: +                        clock_gettime(PTHREAD_COND_CLOCK, &abstime); + +                        pthread_mutex_lock(&ipcpi.alloc_lock); + +                        while (ipcpi.alloc_id != -1 && +                               ipcp_get_state() == IPCP_OPERATIONAL) { +                                ts_add(&abstime, &ts, &abstime); +                                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                                       &ipcpi.alloc_lock, +                                                       &abstime); +                        } + +                        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_dbg("Won't allocate over non-operational" +                                        "IPCP."); +                                free(msg); +                                continue; +                        } + +                        assert(ipcpi.alloc_id == -1); + +                        qs.delay        = ntoh32(msg->delay); +                        qs.bandwidth    = ntoh64(msg->bandwidth); +                        qs.availability = msg->availability; +                        qs.loss         = ntoh32(msg->loss); +                        qs.ber          = ntoh32(msg->ber); +                        qs.in_order     = msg->in_order; +                        qs.max_gap      = ntoh32(msg->max_gap); + +                        fd = ipcp_flow_req_arr((uint8_t *) (msg + 1), +                                               ipcp_dir_hash_len(), +                                               qs); +                        if (fd < 0) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_err("Failed to get fd for flow."); +                                free(msg); +                                continue; +                        } + +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[fd]  = ntoh32(msg->s_eid); +                        fa.r_addr[fd] = ntoh64(msg->s_addr); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        ipcpi.alloc_id = fd; +                        pthread_cond_broadcast(&ipcpi.alloc_cond); + +                        pthread_mutex_unlock(&ipcpi.alloc_lock); + +                        break; +                case FLOW_REPLY: +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + +                        ipcp_flow_alloc_reply(ntoh32(msg->r_eid), +                                              msg->response); + +                        if (msg->response < 0) +                                destroy_conn(ntoh32(msg->r_eid)); +                        else +                                psched_add(fa.psched, ntoh32(msg->r_eid)); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        break; +                default: +                        log_err("Got an unknown flow allocation message."); +                        break; +                } + +                free(msg); +        } +} + +int fa_init(void) +{ +        int i; + +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                destroy_conn(i); + +        if (pthread_rwlock_init(&fa.flows_lock, NULL)) +                goto fail_rwlock; + +        if (pthread_mutex_init(&fa.mtx, NULL)) +                goto fail_mtx; + +        if (pthread_cond_init(&fa.cond, NULL)) +                goto fail_cond; + +        list_head_init(&fa.cmds); + +        fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); + +        return 0; + + fail_cond: +        pthread_mutex_destroy(&fa.mtx); + fail_mtx: +        pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: +        log_err("Failed to initialize flow allocator."); +        return -1; +} + +void fa_fini(void) +{ +        pthread_cond_destroy(&fa.cond);; +        pthread_mutex_destroy(&fa.mtx); +        pthread_rwlock_destroy(&fa.flows_lock); +} + +int fa_start(void) +{ +        struct sched_param  par; +        int                 pol; +        int                 max; + +        fa.psched = psched_create(packet_handler); +        if (fa.psched == NULL) { +                log_err("Failed to start packet scheduler."); +                goto fail_psched; +        } + +        if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) { +                log_err("Failed to create worker thread."); +                goto fail_thread; +        } + +        if (pthread_getschedparam(fa.worker, &pol, &par)) { +                log_err("Failed to get worker thread scheduling parameters."); +                goto fail_sched; +        } + +        max = sched_get_priority_max(pol); +        if (max < 0) { +                log_err("Failed to get max priority for scheduler."); +                goto fail_sched; +        } + +        par.sched_priority = max; + +        if (pthread_setschedparam(fa.worker, pol, &par)) { +                log_err("Failed to set scheduler priority to maximum."); +                goto fail_sched; +        } + +        return 0; + + fail_sched: +        pthread_cancel(fa.worker); +        pthread_join(fa.worker, NULL); + fail_thread: +        psched_destroy(fa.psched); + fail_psched: +        log_err("Failed to start flow allocator."); +        return -1; +} + +void fa_stop(void) +{ +        pthread_cancel(fa.worker); +        pthread_join(fa.worker, NULL); + +        psched_destroy(fa.psched); +} + +int fa_alloc(int             fd, +             const uint8_t * dst, +             qosspec_t       qs) +{ +        struct fa_msg *      msg; +        uint64_t             addr; +        struct shm_du_buff * sdb; +        qoscube_t            qc; + +        addr = dir_query(dst); +        if (addr == 0) +                return -1; + +        if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) +                return -1; + +        msg               = (struct fa_msg *) shm_du_buff_head(sdb); +        msg->code         = FLOW_REQ; +        msg->s_eid        = hton32(fd); +        msg->s_addr       = hton64(ipcpi.dt_addr); +        msg->delay        = hton32(qs.delay); +        msg->bandwidth    = hton64(qs.bandwidth); +        msg->availability = qs.availability; +        msg->loss         = hton32(qs.loss); +        msg->ber          = hton32(qs.ber); +        msg->in_order     = qs.in_order; +        msg->max_gap      = hton32(qs.max_gap); + +        memcpy(msg + 1, dst, ipcp_dir_hash_len()); + +        qc = qos_spec_to_cube(qs); + +        if (dt_write_packet(addr, qc, fa.fd, sdb)) { +                ipcp_sdb_release(sdb); +                return -1; +        } + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        assert(fa.r_eid[fd] == -1); +        fa.r_addr[fd] = addr; + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +int fa_alloc_resp(int fd, +                  int response) +{ +        struct timespec      ts = {0, TIMEOUT * 1000}; +        struct timespec      abstime; +        struct fa_msg *      msg; +        struct shm_du_buff * sdb; +        qoscube_t            qc; + +        clock_gettime(PTHREAD_COND_CLOCK, &abstime); + +        pthread_mutex_lock(&ipcpi.alloc_lock); + +        while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { +                ts_add(&abstime, &ts, &abstime); +                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                       &ipcpi.alloc_lock, +                                       &abstime); +        } + +        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                pthread_mutex_unlock(&ipcpi.alloc_lock); +                return -1; +        } + +        ipcpi.alloc_id = -1; +        pthread_cond_broadcast(&ipcpi.alloc_cond); + +        pthread_mutex_unlock(&ipcpi.alloc_lock); + +        if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) { +                destroy_conn(fd); +                return -1; +        } + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        msg           = (struct fa_msg *) shm_du_buff_head(sdb); +        msg->code     = FLOW_REPLY; +        msg->r_eid    = hton32(fa.r_eid[fd]); +        msg->s_eid    = hton32(fd); +        msg->response = response; + +        if (response < 0) { +                destroy_conn(fd); +                ipcp_sdb_release(sdb); +        } else { +                psched_add(fa.psched, fd); +        } + +        ipcp_flow_get_qoscube(fd, &qc); + +        assert(qc >= 0 && qc < QOS_CUBE_MAX); + +        if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { +                destroy_conn(fd); +                pthread_rwlock_unlock(&fa.flows_lock); +                ipcp_sdb_release(sdb); +                return -1; +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +int fa_dealloc(int fd) +{ +        if (ipcp_flow_fini(fd) < 0) +                return 0; + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        psched_del(fa.psched, fd); + +        destroy_conn(fd); + +        pthread_rwlock_unlock(&fa.flows_lock); + +        flow_dealloc(fd); + +        return 0; +} diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h new file mode 100644 index 00000000..ea055ec2 --- /dev/null +++ b/src/ipcpd/unicast/fa.h @@ -0,0 +1,46 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Flow allocator of the IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_FA_H +#define OUROBOROS_IPCPD_UNICAST_FA_H + +#include <ouroboros/qos.h> +#include <ouroboros/utils.h> + +int  fa_init(void); + +void fa_fini(void); + +int  fa_start(void); + +void fa_stop(void); + +int  fa_alloc(int             fd, +              const uint8_t * dst, +              qosspec_t       qs); + +int  fa_alloc_resp(int fd, +                   int response); + +int  fa_dealloc(int fd); + +#endif /* OUROBOROS_IPCPD_UNICAST_FA_H */ diff --git a/src/ipcpd/unicast/kademlia.proto b/src/ipcpd/unicast/kademlia.proto new file mode 100644 index 00000000..6bd78b38 --- /dev/null +++ b/src/ipcpd/unicast/kademlia.proto @@ -0,0 +1,45 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * KAD protocol + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message kad_contact_msg { +        required bytes  id   = 1; +        required uint64 addr = 2; +}; + +message kad_msg { +        required uint32 code              =  1; +        required uint32 cookie            =  2; +        required uint64 s_addr            =  3; +        optional bytes  s_id              =  4; +        optional bytes  key               =  5; +        repeated uint64 addrs             =  6; +        repeated kad_contact_msg contacts =  7; +        // enrolment parameters +        optional uint32 alpha             =  8; +        optional uint32 b                 =  9; +        optional uint32 k                 = 10; +        optional uint32 t_expire          = 11; +        optional uint32 t_refresh         = 12; +        optional uint32 t_replicate       = 13; +};
\ No newline at end of file diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c new file mode 100644 index 00000000..c159b26d --- /dev/null +++ b/src/ipcpd/unicast/main.c @@ -0,0 +1,378 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Unicast IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "unicast-ipcp" + +#include <ouroboros/errno.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/logs.h> +#include <ouroboros/notifier.h> +#include <ouroboros/rib.h> +#include <ouroboros/time_utils.h> + +#include "addr_auth.h" +#include "connmgr.h" +#include "dir.h" +#include "dt.h" +#include "enroll.h" +#include "fa.h" +#include "ipcp.h" + +#include <stdbool.h> +#include <signal.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +#define THIS_TYPE IPCP_UNICAST + +static int initialize_components(const struct ipcp_config * conf) +{ +        ipcpi.layer_name = strdup(conf->layer_info.layer_name); +        if (ipcpi.layer_name == NULL) { +                log_err("Failed to set layer name."); +                goto fail_layer_name; +        } + +        ipcpi.dir_hash_algo = conf->layer_info.dir_hash_algo; + +        assert(ipcp_dir_hash_len() != 0); + +        if (addr_auth_init(conf->addr_auth_type, +                           &conf->addr_size)) { +                log_err("Failed to init address authority."); +                goto fail_addr_auth; +        } + +        ipcpi.dt_addr = addr_auth_address(); +        if (ipcpi.dt_addr == 0) { +                log_err("Failed to get a valid address."); +                goto fail_addr_auth; +        } + +        log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); + +        if (dt_init(conf->routing_type, +                    conf->pff_type, +                    conf->addr_size, +                    conf->eid_size, +                    conf->max_ttl)) { +                log_err("Failed to initialize data transfer component."); +                goto fail_dt; +        } + +        if (fa_init()) { +                log_err("Failed to initialize flow allocator component."); +                goto fail_fa; +        } + +        if (dir_init()) { +                log_err("Failed to initialize directory."); +                goto fail_dir; +        } + +        ipcp_set_state(IPCP_INIT); + +        return 0; + + fail_dir: +        fa_fini(); + fail_fa: +        dt_fini(); + fail_dt: +        addr_auth_fini(); + fail_addr_auth: +        free(ipcpi.layer_name); + fail_layer_name: +        return -1; +} + +static void finalize_components(void) +{ +        dir_fini(); + +        fa_fini(); + +        dt_fini(); + +        addr_auth_fini(); + +        free(ipcpi.layer_name); +} + +static int start_components(void) +{ +        assert(ipcp_get_state() == IPCP_INIT); + +        ipcp_set_state(IPCP_OPERATIONAL); + +        if (fa_start()) { +                log_err("Failed to start flow allocator."); +                goto fail_fa_start; +        } + +        if (enroll_start()) { +                log_err("Failed to start enrollment."); +                goto fail_enroll_start; +        } + +        if (connmgr_start()) { +                log_err("Failed to start AP connection manager."); +                goto fail_connmgr_start; +        } + +        return 0; + + fail_connmgr_start: +        enroll_stop(); + fail_enroll_start: +        fa_stop(); + fail_fa_start: +        ipcp_set_state(IPCP_INIT); +        return -1; +} + +static void stop_components(void) +{ +        assert(ipcp_get_state() == IPCP_OPERATIONAL || +               ipcp_get_state() == IPCP_SHUTDOWN); + +        connmgr_stop(); + +        enroll_stop(); + +        fa_stop(); + +        ipcp_set_state(IPCP_INIT); +} + +static int bootstrap_components(void) +{ +        if (dir_bootstrap()) { +                log_err("Failed to bootstrap directory."); +                dt_stop(); +                return -1; +        } + +        return 0; +} + +static int unicast_ipcp_enroll(const char *        dst, +                               struct layer_info * info) +{ +        struct conn conn; + +        if (connmgr_alloc(COMPID_ENROLL, dst, NULL, &conn)) { +                log_err("Failed to get connection."); +                goto fail_er_flow; +        } + +        /* Get boot state from peer. */ +        if (enroll_boot(&conn)) { +                log_err("Failed to get boot information."); +                goto fail_enroll_boot; +        } + +        if (initialize_components(enroll_get_conf())) { +                log_err("Failed to initialize IPCP components."); +                goto fail_enroll_boot; +        } + +        if (dt_start()) { +                log_err("Failed to initialize IPCP components."); +                goto fail_dt_start; +        } + +        if (start_components()) { +                log_err("Failed to start components."); +                goto fail_start_comp; +        } + +        if (enroll_done(&conn, 0)) +                log_warn("Failed to confirm enrollment with peer."); + +        if (connmgr_dealloc(COMPID_ENROLL, &conn)) +                log_warn("Failed to deallocate enrollment flow."); + +        log_info("Enrolled with %s.", dst); + +        info->dir_hash_algo = ipcpi.dir_hash_algo; +        strcpy(info->layer_name, ipcpi.layer_name); + +        return 0; + + fail_start_comp: +        dt_stop(); + fail_dt_start: +        finalize_components(); + fail_enroll_boot: +        connmgr_dealloc(COMPID_ENROLL, &conn); + fail_er_flow: +        return -1; +} + +static int unicast_ipcp_bootstrap(const struct ipcp_config * conf) +{ +        assert(conf); +        assert(conf->type == THIS_TYPE); + +        enroll_bootstrap(conf); + +        if (initialize_components(conf)) { +                log_err("Failed to init IPCP components."); +                goto fail_init; +        } + +        if (dt_start()) { +                log_err("Failed to initialize IPCP components."); +                goto fail_dt_start; +        }; + +        if (start_components()) { +                log_err("Failed to init IPCP components."); +                goto fail_start; +        } + +        if (bootstrap_components()) { +                log_err("Failed to bootstrap IPCP components."); +                goto fail_bootstrap; +        } + +        log_dbg("Bootstrapped in layer %s.", conf->layer_info.layer_name); + +        return 0; + + fail_bootstrap: +        stop_components(); + fail_start: +        dt_stop(); + fail_dt_start: +        finalize_components(); + fail_init: +        return -1; +} + +static int unicast_ipcp_query(const uint8_t * dst) +{ +        return dir_query(dst) ? 0 : -1; +} + +static struct ipcp_ops unicast_ops = { +        .ipcp_bootstrap       = unicast_ipcp_bootstrap, +        .ipcp_enroll          = unicast_ipcp_enroll, +        .ipcp_connect         = connmgr_ipcp_connect, +        .ipcp_disconnect      = connmgr_ipcp_disconnect, +        .ipcp_reg             = dir_reg, +        .ipcp_unreg           = dir_unreg, +        .ipcp_query           = unicast_ipcp_query, +        .ipcp_flow_alloc      = fa_alloc, +        .ipcp_flow_join       = NULL, +        .ipcp_flow_alloc_resp = fa_alloc_resp, +        .ipcp_flow_dealloc    = fa_dealloc +}; + +int main(int    argc, +         char * argv[]) +{ +        if (ipcp_init(argc, argv, &unicast_ops) < 0) { +                log_err("Failed to init IPCP."); +                goto fail_init; +        } + +        /* These components must be init at creation. */ +        if (rib_init(ipcpi.name)) { +                log_err("Failed to initialize RIB."); +                goto fail_rib_init; +        } + +        if (notifier_init()) { +                log_err("Failed to initialize notifier component."); +                goto fail_notifier_init; +        } + +        if (connmgr_init()) { +                log_err("Failed to initialize connection manager."); +                goto fail_connmgr_init; +        } + +        if (enroll_init()) { +                log_err("Failed to initialize enrollment component."); +                goto fail_enroll_init; +        } + +        if (ipcp_boot() < 0) { +                log_err("Failed to boot IPCP."); +                goto fail_boot; +        } + +        if (ipcp_create_r(0)) { +                log_err("Failed to notify IRMd we are initialized."); +                ipcp_set_state(IPCP_NULL); +                goto fail_create_r; +        } + +        ipcp_shutdown(); + +        if (ipcp_get_state() == IPCP_SHUTDOWN) { +                dt_stop(); +                stop_components(); +                finalize_components(); +        } + +        enroll_fini(); + +        connmgr_fini(); + +        notifier_fini(); + +        rib_fini(); + +        ipcp_fini(); + +        exit(EXIT_SUCCESS); + + fail_create_r: +        ipcp_shutdown(); + fail_boot: +        enroll_fini(); + fail_enroll_init: +        connmgr_fini(); + fail_connmgr_init: +        notifier_fini(); + fail_notifier_init: +        rib_fini(); + fail_rib_init: +       ipcp_fini(); + fail_init: +        ipcp_create_r(-1); +        exit(EXIT_FAILURE); +} diff --git a/src/ipcpd/unicast/pff.c b/src/ipcpd/unicast/pff.c new file mode 100644 index 00000000..5b1fa429 --- /dev/null +++ b/src/ipcpd/unicast/pff.c @@ -0,0 +1,127 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * PDU Forwarding Function + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define OUROBOROS_PREFIX "pff" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> + +#include "pff.h" +#include "pol-pff-ops.h" +#include "pol/alternate_pff.h" +#include "pol/simple_pff.h" + +struct pff { +        struct pol_pff_ops * ops; +        struct pff_i *       pff_i; +}; + +struct pff * pff_create(enum pol_pff pol) +{ +        struct pff * pff; + +        pff = malloc(sizeof(*pff)); +        if (pff == NULL) +                return NULL; + +        switch (pol) { +        case PFF_ALTERNATE: +                log_dbg("Using alternate PFF policy."); +                pff->ops = &alternate_pff_ops; +                break; +        case PFF_SIMPLE: +                log_dbg("Using simple PFF policy."); +                pff->ops = &simple_pff_ops; +                break; +        default: +                goto err; +        } + +        pff->pff_i = pff->ops->create(); +        if (pff->pff_i == NULL) +                goto err; + +        return pff; + err: +        free(pff); +        return NULL; +} + +void pff_destroy(struct pff * pff) +{ +        pff->ops->destroy(pff->pff_i); + +        free(pff); +} + +void pff_lock(struct pff * pff) +{ +        return pff->ops->lock(pff->pff_i); +} + +void pff_unlock(struct pff * pff) +{ +        return pff->ops->unlock(pff->pff_i); +} + +int pff_add(struct pff * pff, +            uint64_t     addr, +            int *        fd, +            size_t       len) +{ +        return pff->ops->add(pff->pff_i, addr, fd, len); +} + +int pff_update(struct pff * pff, +               uint64_t     addr, +               int *        fd, +               size_t       len) +{ +        return pff->ops->update(pff->pff_i, addr, fd, len); +} + +int pff_del(struct pff * pff, +            uint64_t     addr) +{ +        return pff->ops->del(pff->pff_i, addr); +} + +void pff_flush(struct pff * pff) +{ +        return pff->ops->flush(pff->pff_i); +} + +int pff_nhop(struct pff * pff, +             uint64_t     addr) +{ +        return pff->ops->nhop(pff->pff_i, addr); +} + +int pff_flow_state_change(struct pff * pff, +                          int          fd, +                          bool         up) +{ +        if (pff->ops->flow_state_change != NULL) +                return pff->ops->flow_state_change(pff->pff_i, fd, up); + +        return 0; +} diff --git a/src/ipcpd/unicast/pff.h b/src/ipcpd/unicast/pff.h new file mode 100644 index 00000000..9533395f --- /dev/null +++ b/src/ipcpd/unicast/pff.h @@ -0,0 +1,63 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * PDU Forwarding Function + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_PFF_H +#define OUROBOROS_IPCPD_UNICAST_PFF_H + +#include <ouroboros/ipcp.h> + +#include <stdint.h> +#include <stdlib.h> +#include <stdbool.h> + +struct pff * pff_create(enum pol_pff pol); + +void         pff_destroy(struct pff * pff); + +void         pff_lock(struct pff * pff); + +void         pff_unlock(struct pff * pff); + +int          pff_add(struct pff * pff, +                     uint64_t     addr, +                     int *        fd, +                     size_t       len); + +int          pff_update(struct pff * pff, +                        uint64_t     addr, +                        int *        fd, +                        size_t       len); + +int          pff_del(struct pff * pff, +                     uint64_t     addr); + +void         pff_flush(struct pff * pff); + +/* Returns fd towards next hop */ +int          pff_nhop(struct pff * pff, +                      uint64_t     addr); + +int          pff_flow_state_change(struct pff * pff, +                                   int          fd, +                                   bool         up); + +#endif /* OUROBOROS_IPCPD_UNICAST_PFF_H */ diff --git a/src/ipcpd/unicast/pol-addr-auth-ops.h b/src/ipcpd/unicast/pol-addr-auth-ops.h new file mode 100644 index 00000000..7b1a4514 --- /dev/null +++ b/src/ipcpd/unicast/pol-addr-auth-ops.h @@ -0,0 +1,34 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Address authority policy ops + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_POL_ADDR_AUTH_OPS_H +#define OUROBOROS_IPCPD_UNICAST_POL_ADDR_AUTH_OPS_H + +struct pol_addr_auth_ops { +        int      (* init)(const void * info); + +        int      (* fini)(void); + +        uint64_t (* address)(void); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_POL_ADDR_AUTH_OPS_H */ diff --git a/src/ipcpd/unicast/pol-pff-ops.h b/src/ipcpd/unicast/pol-pff-ops.h new file mode 100644 index 00000000..766bb5b8 --- /dev/null +++ b/src/ipcpd/unicast/pol-pff-ops.h @@ -0,0 +1,63 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Pff policy ops + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_POL_PFF_OPS_H +#define OUROBOROS_IPCPD_UNICAST_POL_PFF_OPS_H + +#include <stdbool.h> + +struct pff_i; + +struct pol_pff_ops { +        struct pff_i * (* create)(void); + +        void           (* destroy)(struct pff_i * pff_i); + +        void           (* lock)(struct pff_i * pff_i); + +        void           (* unlock)(struct pff_i * pff_i); + +        int            (* add)(struct pff_i * pff_i, +                               uint64_t       addr, +                               int *          fd, +                               size_t         len); + +        int            (* update)(struct pff_i * pff_i, +                                  uint64_t       addr, +                                  int *          fd, +                                  size_t         len); + +        int            (* del)(struct pff_i * pff_i, +                               uint64_t       addr); + +        void           (* flush)(struct pff_i * pff_i); + +        int            (* nhop)(struct pff_i * pff_i, +                                uint64_t       addr); + +        /* Optional operation. */ +        int            (* flow_state_change)(struct pff_i * pff_i, +                                             int            fd, +                                             bool           up); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_POL_PFF_OPS_H */ diff --git a/src/ipcpd/unicast/pol-routing-ops.h b/src/ipcpd/unicast/pol-routing-ops.h new file mode 100644 index 00000000..ae8e632d --- /dev/null +++ b/src/ipcpd/unicast/pol-routing-ops.h @@ -0,0 +1,38 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Routing policy ops + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_POL_ROUTING_OPS_H +#define OUROBOROS_IPCPD_UNICAST_POL_ROUTING_OPS_H + +#include "pff.h" + +struct pol_routing_ops { +        int                (* init)(enum pol_routing pr); + +        void               (* fini)(void); + +        struct routing_i * (* routing_i_create)(struct pff * pff); + +        void               (* routing_i_destroy)(struct routing_i * instance); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_POL_ROUTING_OPS_H */ diff --git a/src/ipcpd/unicast/pol/alternate_pff.c b/src/ipcpd/unicast/pol/alternate_pff.c new file mode 100644 index 00000000..38937297 --- /dev/null +++ b/src/ipcpd/unicast/pol/alternate_pff.c @@ -0,0 +1,403 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Policy for PFF with alternate next hops + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#include <ouroboros/hashtable.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> + +#include <string.h> +#include <assert.h> +#include <pthread.h> + +#include "alternate_pff.h" + +struct nhop { +        struct list_head next; +        int              fd; +}; + +struct addr { +        struct list_head next; +        uint64_t         addr; +}; + +struct pff_i { +        struct htable *  table; + +        struct list_head addrs; + +        struct list_head nhops_down; + +        pthread_rwlock_t lock; +}; + +struct pol_pff_ops alternate_pff_ops = { +        .create            = alternate_pff_create, +        .destroy           = alternate_pff_destroy, +        .lock              = alternate_pff_lock, +        .unlock            = alternate_pff_unlock, +        .add               = alternate_pff_add, +        .update            = alternate_pff_update, +        .del               = alternate_pff_del, +        .flush             = alternate_pff_flush, +        .nhop              = alternate_pff_nhop, +        .flow_state_change = alternate_flow_state_change +}; + +static int add_addr(struct pff_i * pff_i, +                    uint64_t       addr) +{ +        struct addr * a; + +        a = malloc(sizeof(*a)); +        if (a == NULL) +                return -1; + +        a->addr = addr; + +        list_add(&a->next, &(pff_i->addrs)); + +        return 0; +} + +static void del_addr(struct pff_i * pff_i, +                     uint64_t       addr) +{ +        struct list_head * pos = NULL; +        struct list_head * n   = NULL; + +        list_for_each_safe(pos, n, &(pff_i->addrs)) { +                struct addr * e = list_entry(pos, struct addr, next); +                if (e->addr == addr) { +                        list_del(&e->next); +                        free(e); +                        return; +                } +        } +} + +static void del_addrs(struct pff_i * pff_i) +{ +        struct list_head * pos = NULL; +        struct list_head * n   = NULL; + +        list_for_each_safe(pos, n, &(pff_i->addrs)) { +                struct addr * e = list_entry(pos, struct addr, next); +                list_del(&e->next); +                free(e); +        } +} + +static void del_nhops_down(struct pff_i * pff_i) +{ +        struct list_head * pos = NULL; +        struct list_head * n   = NULL; + +        list_for_each_safe(pos, n, &(pff_i->nhops_down)) { +                struct nhop * e = list_entry(pos, struct nhop, next); +                list_del(&e->next); +                free(e); +        } +} + +static int del_nhop_down(struct pff_i * pff_i, +                         int            fd) +{ +        struct list_head * pos = NULL; +        struct list_head * n   = NULL; + +        list_for_each_safe(pos, n, &(pff_i->nhops_down)) { +                struct nhop * e = list_entry(pos, struct nhop, next); +                if (e->fd == fd) { +                        list_del(&e->next); +                        free(e); +                        return 0; +                } +        } + +        return -1; +} + +static int add_nhop_down(struct pff_i * pff_i, +                         int            fd) +{ +        struct nhop *      nhop; + +        nhop = malloc(sizeof(*nhop)); +        if (nhop == NULL) +                return -1; + +        nhop->fd = fd; + +        list_add(&nhop->next, &(pff_i->nhops_down)); + +        return 0; +} + +static bool nhops_down_has(struct pff_i * pff_i, +                           int            fd) +{ +        struct list_head * pos = NULL; + +        list_for_each(pos, &pff_i->nhops_down) { +                struct nhop * e = list_entry(pos, struct nhop, next); +                if (e->fd == fd) +                        return true; +        } + +        return false; +} + +static int add_to_htable(struct pff_i * pff_i, +                         uint64_t       addr, +                         int *          fd, +                         size_t         len) +{ +        int * val; + +        assert(pff_i); +        assert(len > 0); + +        val = malloc(sizeof(*val) * (len + 1)); +        if (val == NULL) +                goto fail_malloc; + +        memcpy(val, fd, len * sizeof(*val)); +        /* Put primary hop again at the end */ +        val[len] = val[0]; + +        if (htable_insert(pff_i->table, addr, val, len)) +                goto fail_insert; + +        return 0; + + fail_insert: +        free(val); + fail_malloc: +        return -1; +} + +struct pff_i * alternate_pff_create(void) +{ +        struct pff_i * tmp; + +        tmp = malloc(sizeof(*tmp)); +        if (tmp == NULL) +                goto fail_malloc; + +        if (pthread_rwlock_init(&tmp->lock, NULL)) +                goto fail_lock; + +        tmp->table = htable_create(PFT_SIZE, false); +        if (tmp->table == NULL) +                goto fail_table; + +        list_head_init(&tmp->nhops_down); +        list_head_init(&tmp->addrs); + +        return tmp; + + fail_table: +        pthread_rwlock_destroy(&tmp->lock); + fail_lock: +        free(tmp); + fail_malloc: +        return NULL; +} + +void alternate_pff_destroy(struct pff_i * pff_i) +{ +        assert(pff_i); + +        htable_destroy(pff_i->table); +        del_nhops_down(pff_i); +        del_addrs(pff_i); +        pthread_rwlock_destroy(&pff_i->lock); +        free(pff_i); +} + +void alternate_pff_lock(struct pff_i * pff_i) +{ +        pthread_rwlock_wrlock(&pff_i->lock); +} + +void alternate_pff_unlock(struct pff_i * pff_i) +{ +        pthread_rwlock_unlock(&pff_i->lock); +} + +int alternate_pff_add(struct pff_i * pff_i, +                      uint64_t       addr, +                      int *          fd, +                      size_t         len) +{ +        assert(pff_i); +        assert(len > 0); + +        if (add_to_htable(pff_i, addr, fd, len)) +                return -1; + +        if (add_addr(pff_i, addr)) { +                htable_delete(pff_i->table, addr); +                return -1; +        } + +        return 0; +} + +int alternate_pff_update(struct pff_i * pff_i, +                         uint64_t       addr, +                         int *          fd, +                         size_t         len) +{ +        assert(pff_i); +        assert(len > 0); + +        if (htable_delete(pff_i->table, addr)) +                return -1; + +        if (add_to_htable(pff_i, addr, fd, len)) +                return -1; + +        return 0; +} + +int alternate_pff_del(struct pff_i * pff_i, +                      uint64_t       addr) +{ +        assert(pff_i); + +        del_addr(pff_i, addr); + +        if (htable_delete(pff_i->table, addr)) +                return -1; + +        return 0; +} + +void alternate_pff_flush(struct pff_i * pff_i) +{ +        assert(pff_i); + +        htable_flush(pff_i->table); + +        del_nhops_down(pff_i); + +        del_addrs(pff_i); +} + +int alternate_pff_nhop(struct pff_i * pff_i, +                       uint64_t       addr) +{ +        int    fd = -1; +        size_t len; +        void * el; + +        assert(pff_i); + +        pthread_rwlock_rdlock(&pff_i->lock); + +        if (htable_lookup(pff_i->table, addr, &el, &len)) { +                pthread_rwlock_unlock(&pff_i->lock); +                return -1; +        } + +        fd = *((int *) el); + +        pthread_rwlock_unlock(&pff_i->lock); + +        return fd; +} + +int alternate_flow_state_change(struct pff_i * pff_i, +                                int            fd, +                                bool           up) +{ +        struct list_head * pos = NULL; +        size_t             len; +        void *             el; +        int *              fds; +        size_t             i; +        int                tmp; + +        assert(pff_i); + +        pthread_rwlock_wrlock(&pff_i->lock); + +        if (up) { +                if (del_nhop_down(pff_i, fd)) { +                        pthread_rwlock_unlock(&pff_i->lock); +                        return -1; +                } +        } else { +                if (add_nhop_down(pff_i, fd)) { +                        pthread_rwlock_unlock(&pff_i->lock); +                        return -1; +                } +        } + +        list_for_each(pos, &pff_i->addrs) { +                struct addr * e = list_entry(pos, struct addr, next); +                if (htable_lookup(pff_i->table, e->addr, &el, &len)) { +                        pthread_rwlock_unlock(&pff_i->lock); +                        return -1; +                } + +                fds = (int *) el; + +                if (up) { +                        /* It is using an alternate */ +                        if (fds[len] == fd && fds[0] != fd) { +                                for (i = 0 ; i < len; i++) { +                                        /* Found the primary */ +                                        if (fds[i] == fd) { +                                                tmp = fds[0]; +                                                fds[0] = fds[i]; +                                                fds[i] = tmp; +                                                break; +                                        } +                                } +                        } +                } else { +                        /* Need to switch to a (different) alternate */ +                        if (fds[0] == fd) { +                                for (i = 1; i < len; i++) { +                                        /* Usable alternate */ +                                        if (!nhops_down_has(pff_i, fds[i])) { +                                                tmp = fds[0]; +                                                fds[0] = fds[i]; +                                                fds[i] = tmp; +                                                break; +                                        } +                                } +                        } +                } +        } + +        pthread_rwlock_unlock(&pff_i->lock); + +        return 0; +} diff --git a/src/ipcpd/unicast/pol/alternate_pff.h b/src/ipcpd/unicast/pol/alternate_pff.h new file mode 100644 index 00000000..7bdf26de --- /dev/null +++ b/src/ipcpd/unicast/pol/alternate_pff.h @@ -0,0 +1,61 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Policy for PFF with alternate next hops + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ALTERNATE_PFF_H +#define OUROBOROS_IPCPD_UNICAST_ALTERNATE_PFF_H + +#include "pol-pff-ops.h" + +struct pff_i * alternate_pff_create(void); + +void           alternate_pff_destroy(struct pff_i * pff_i); + +void           alternate_pff_lock(struct pff_i * pff_i); + +void           alternate_pff_unlock(struct pff_i * pff_i); + +int            alternate_pff_add(struct pff_i * pff_i, +                                 uint64_t       addr, +                                 int *          fd, +                                 size_t         len); + +int            alternate_pff_update(struct pff_i * pff_i, +                                    uint64_t       addr, +                                    int *          fd, +                                    size_t         len); + +int            alternate_pff_del(struct pff_i * pff_i, +                                 uint64_t       addr); + +void           alternate_pff_flush(struct pff_i * pff_i); + +/* Returns fd towards next hop */ +int            alternate_pff_nhop(struct pff_i * pff_i, +                                  uint64_t       addr); + +int            alternate_flow_state_change(struct pff_i * pff_i, +                                           int            fd, +                                           bool           up); + +struct pol_pff_ops alternate_pff_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_ALTERNATE_PFF_H */ diff --git a/src/ipcpd/unicast/pol/flat.c b/src/ipcpd/unicast/pol/flat.c new file mode 100644 index 00000000..157885f9 --- /dev/null +++ b/src/ipcpd/unicast/pol/flat.c @@ -0,0 +1,87 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Policy for flat addresses in a distributed way + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "flat-addr-auth" + +#include <ouroboros/logs.h> +#include <ouroboros/errno.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> + +#include "ipcp.h" +#include "flat.h" + +#include <time.h> +#include <stdlib.h> +#include <math.h> +#include <string.h> +#include <assert.h> + +#define NAME_LEN 8 + +struct { +        uint8_t addr_size; +} flat; + +#define INVALID_ADDRESS 0 + +struct pol_addr_auth_ops flat_ops = { +        .init    = flat_init, +        .fini    = flat_fini, +        .address = flat_address +}; + +int flat_init(const void * info) +{ +        flat.addr_size = *((uint8_t *) info); + +        if (flat.addr_size != 4) { +                log_err("Flat address policy mandates 4 byte addresses."); +                return -1; +        } + +        return 0; +} + +int flat_fini(void) +{ +        return 0; +} + +uint64_t flat_address(void) +{ +        struct timespec t; +        uint32_t        addr; + +        clock_gettime(CLOCK_REALTIME, &t); +        srand(t.tv_nsec); + +        addr = (rand() % (RAND_MAX - 1) + 1) & 0xFFFFFFFF; + +        return addr; +} diff --git a/src/ipcpd/unicast/pol/flat.h b/src/ipcpd/unicast/pol/flat.h new file mode 100644 index 00000000..64aa9ce0 --- /dev/null +++ b/src/ipcpd/unicast/pol/flat.h @@ -0,0 +1,36 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Policy for flat addresses in a distributed way + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_FLAT_H +#define OUROBOROS_IPCPD_UNICAST_FLAT_H + +#include "pol-addr-auth-ops.h" + +int      flat_init(const void * info); + +int      flat_fini(void); + +uint64_t flat_address(void); + +struct pol_addr_auth_ops flat_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_FLAT_H */ diff --git a/src/ipcpd/unicast/pol/graph.c b/src/ipcpd/unicast/pol/graph.c new file mode 100644 index 00000000..499dc2de --- /dev/null +++ b/src/ipcpd/unicast/pol/graph.c @@ -0,0 +1,695 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Undirected graph structure + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#define OUROBOROS_PREFIX "graph" + +#include <ouroboros/logs.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> + +#include "graph.h" +#include "ipcp.h" + +#include <assert.h> +#include <pthread.h> +#include <stdlib.h> +#include <limits.h> +#include <string.h> + +struct edge { +        struct list_head next; +        struct vertex *  nb; +        qosspec_t        qs; +        int              announced; +}; + +struct vertex { +        struct list_head next; +        uint64_t         addr; +        struct list_head edges; +        int              index; +}; + +struct graph { +        size_t           nr_vertices; +        struct list_head vertices; +        pthread_mutex_t  lock; +}; + +static struct edge * find_edge_by_addr(struct vertex * vertex, +                                       uint64_t        dst_addr) +{ +        struct list_head * p = NULL; + +        list_for_each(p, &vertex->edges) { +                struct edge * e = list_entry(p, struct edge, next); +                if (e->nb->addr == dst_addr) +                        return e; +        } + +        return NULL; +} + +static struct vertex * find_vertex_by_addr(struct graph * graph, +                                           uint64_t       addr) +{ +        struct list_head * p = NULL; + +        list_for_each(p, &graph->vertices) { +                struct vertex * e = list_entry(p, struct vertex, next); +                if (e->addr == addr) +                        return e; +        } + +        return NULL; +} + +static struct edge * add_edge(struct vertex * vertex, +                              struct vertex * nb) +{ +        struct edge * edge; + +        edge = malloc(sizeof(*edge)); +        if (edge == NULL) +                return NULL; + +        list_head_init(&edge->next); +        edge->nb = nb; +        edge->announced = 0; + +        list_add(&edge->next, &vertex->edges); + +        return edge; +} + +static void del_edge(struct edge * edge) +{ +       list_del(&edge->next); +       free(edge); +} + +static struct vertex * add_vertex(struct graph * graph, +                                  uint64_t       addr) +{ +        struct vertex *    vertex; +        struct list_head * p; +        int                i = 0; + +        vertex = malloc(sizeof(*vertex)); +        if (vertex == NULL) +                return NULL; + +        list_head_init(&vertex->next); +        list_head_init(&vertex->edges); +        vertex->addr = addr; + +        /* Keep them ordered on address. */ +        list_for_each(p, &graph->vertices) { +                struct vertex * v = list_entry(p, struct vertex, next); +                if (v->addr > addr) +                        break; +                i++; +        } + +        vertex->index = i; + +        list_add_tail(&vertex->next, p); + +        /* Increase the index of the vertices to the right. */ +        list_for_each(p, &graph->vertices) { +                struct vertex * v = list_entry(p, struct vertex, next); +                if (v->addr > addr) +                        v->index++; +        } + +        graph->nr_vertices++; + +        return vertex; +} + +static void del_vertex(struct graph * graph, +                       struct vertex * vertex) +{ +        struct list_head * p = NULL; +        struct list_head * n = NULL; + +        list_del(&vertex->next); + +        /* Decrease the index of the vertices to the right. */ +        list_for_each(p, &graph->vertices) { +                struct vertex * v = list_entry(p, struct vertex, next); +                if (v->addr > vertex->addr) +                        v->index--; +        } + +        list_for_each_safe(p, n, &vertex->edges) { +                struct edge * e = list_entry(p, struct edge, next); +                del_edge(e); +        } + +        free(vertex); + +        graph->nr_vertices--; +} + +struct graph * graph_create(void) +{ +        struct graph * graph; + +        graph = malloc(sizeof(*graph)); +        if (graph == NULL) +                return NULL; + +        if (pthread_mutex_init(&graph->lock, NULL)) { +                free(graph); +                return NULL; +        } + +        graph->nr_vertices = 0; +        list_head_init(&graph->vertices); + +        return graph; +} + +void graph_destroy(struct graph * graph) +{ +        struct list_head * p = NULL; +        struct list_head * n = NULL; + +        assert(graph); + +        pthread_mutex_lock(&graph->lock); + +        list_for_each_safe(p, n, &graph->vertices) { +                struct vertex * e = list_entry(p, struct vertex, next); +                del_vertex(graph, e); +        } + +        pthread_mutex_unlock(&graph->lock); + +        pthread_mutex_destroy(&graph->lock); + +        free(graph); +} + +int graph_update_edge(struct graph * graph, +                      uint64_t       s_addr, +                      uint64_t       d_addr, +                      qosspec_t      qs) +{ +        struct vertex * v; +        struct edge *   e; +        struct vertex * nb; +        struct edge *   nb_e; + +        assert(graph); + +        pthread_mutex_lock(&graph->lock); + +        v = find_vertex_by_addr(graph, s_addr); +        if (v == NULL) { +                v = add_vertex(graph, s_addr); +                if (v == NULL) { +                        pthread_mutex_unlock(&graph->lock); +                        log_err("Failed to add vertex."); +                        return -ENOMEM; +                } +        } + +        nb = find_vertex_by_addr(graph, d_addr); +        if (nb == NULL) { +                nb = add_vertex(graph, d_addr); +                if (nb == NULL) { +                        if (list_is_empty(&v->edges)) +                                del_vertex(graph, v); +                        pthread_mutex_unlock(&graph->lock); +                        log_err("Failed to add vertex."); +                        return -ENOMEM; +                } +        } + +        e = find_edge_by_addr(v, d_addr); +        if (e == NULL) { +                e = add_edge(v, nb); +                if (e == NULL) { +                        if (list_is_empty(&v->edges)) +                                del_vertex(graph, v); +                        if (list_is_empty(&nb->edges)) +                                del_vertex(graph, nb); +                        pthread_mutex_unlock(&graph->lock); +                        log_err("Failed to add edge."); +                        return -ENOMEM; +                } +        } + +        e->announced++; +        e->qs = qs; + +        nb_e = find_edge_by_addr(nb, s_addr); +        if (nb_e == NULL) { +                nb_e = add_edge(nb, v); +                if (nb_e == NULL) { +                        if (--e->announced == 0) +                                del_edge(e); +                        if (list_is_empty(&v->edges)) +                                del_vertex(graph, v); +                        if (list_is_empty(&nb->edges)) +                                del_vertex(graph, nb); +                        pthread_mutex_unlock(&graph->lock); +                        log_err("Failed to add edge."); +                        return -ENOMEM; +                } +        } + +        nb_e->announced++; +        nb_e->qs = qs; + +        pthread_mutex_unlock(&graph->lock); + +        return 0; +} + +int graph_del_edge(struct graph * graph, +                   uint64_t       s_addr, +                   uint64_t       d_addr) +{ +        struct vertex * v; +        struct edge *   e; +        struct vertex * nb; +        struct edge *   nb_e; + +        assert(graph); + +        pthread_mutex_lock(&graph->lock); + +        v = find_vertex_by_addr(graph, s_addr); +        if (v == NULL) { +                pthread_mutex_unlock(&graph->lock); +                log_err("No such source vertex."); +                return -1; +        } + +        nb = find_vertex_by_addr(graph, d_addr); +        if (nb == NULL) { +                pthread_mutex_unlock(&graph->lock); +                log_err("No such destination vertex."); +                return -1; +        } + +        e = find_edge_by_addr(v, d_addr); +        if (e == NULL) { +                pthread_mutex_unlock(&graph->lock); +                log_err("No such source edge."); +                return -1; +        } + +        nb_e = find_edge_by_addr(nb, s_addr); +        if (nb_e == NULL) { +                pthread_mutex_unlock(&graph->lock); +                log_err("No such destination edge."); +                return -1; +        } + +        if (--e->announced == 0) +                del_edge(e); +        if (--nb_e->announced == 0) +                del_edge(nb_e); + +        /* Removing vertex if it was the last edge */ +        if (list_is_empty(&v->edges)) +                del_vertex(graph, v); +        if (list_is_empty(&nb->edges)) +                del_vertex(graph, nb); + +        pthread_mutex_unlock(&graph->lock); + +        return 0; +} + +static int get_min_vertex(struct graph *   graph, +                          int *            dist, +                          bool *           used, +                          struct vertex ** v) +{ +        int                min = INT_MAX; +        int                index = -1; +        int                i = 0; +        struct list_head * p = NULL; + +        *v = NULL; + +        list_for_each(p, &graph->vertices) { +                if (!used[i] && dist[i] < min) { +                        min = dist[i]; +                        index = i; +                        *v = list_entry(p, struct vertex, next); +                } + +                i++; +        } + +        if (index != -1) +                used[index] = true; + +        return index; +} + +static int dijkstra(struct graph *    graph, +                    uint64_t          src, +                    struct vertex *** nhops, +                    int **            dist) +{ +        bool *             used; +        struct list_head * p = NULL; +        int                i = 0; +        struct vertex *    v = NULL; +        struct edge *      e = NULL; +        int                alt; + +        *nhops = malloc(sizeof(**nhops) * graph->nr_vertices); +        if (*nhops == NULL) +                goto fail_pnhops; + +        *dist = malloc(sizeof(**dist) * graph->nr_vertices); +        if (*dist == NULL) +                goto fail_pdist; + +        used = malloc(sizeof(*used) * graph->nr_vertices); +        if (used == NULL) +                goto fail_used; + +        /* Init the data structures */ +        memset(used, 0, sizeof(*used) * graph->nr_vertices); +        memset(*nhops, 0, sizeof(**nhops) * graph->nr_vertices); +        memset(*dist, 0, sizeof(**dist) * graph->nr_vertices); + +        list_for_each(p, &graph->vertices) { +                v = list_entry(p, struct vertex, next); +                (*dist)[i++]  = (v->addr == src) ? 0 : INT_MAX; +        } + +        /* Perform actual Dijkstra */ +        i = get_min_vertex(graph, *dist, used, &v); +        while (v != NULL) { +                list_for_each(p, &v->edges) { +                        e = list_entry(p, struct edge, next); + +                        /* Only include it if both sides announced it. */ +                        if (e->announced != 2) +                                continue; + +                        /* +                         * NOTE: Current weight is just hop count. +                         * Method could be extended to use a different +                         * weight for a different QoS cube. +                         */ +                        alt = (*dist)[i] + 1; +                        if (alt < (*dist)[e->nb->index]) { +                                (*dist)[e->nb->index] = alt; +                                if (v->addr == src) +                                        (*nhops)[e->nb->index] = e->nb; +                                else +                                        (*nhops)[e->nb->index] = (*nhops)[i]; +                        } +                } +                i = get_min_vertex(graph, *dist, used, &v); +        } + +        free(used); + +        return 0; + + fail_used: +        free(*dist); + fail_pdist: +        free(*nhops); + fail_pnhops: +        return -1; + +} + +static void free_routing_table(struct list_head * table) +{ +        struct list_head * h; +        struct list_head * p; +        struct list_head * q; +        struct list_head * i; + +        list_for_each_safe(p, h, table) { +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); +                list_for_each_safe(q, i, &t->nhops) { +                        struct nhop * n = +                                list_entry(q, struct nhop, next); +                        list_del(&n->next); +                        free(n); +                } +                list_del(&t->next); +                free(t); +        } +} + +void graph_free_routing_table(struct graph *     graph, +                              struct list_head * table) +{ +        assert(table); + +        pthread_mutex_lock(&graph->lock); + +        free_routing_table(table); + +        pthread_mutex_unlock(&graph->lock); +} + +static int graph_routing_table_simple(struct graph *     graph, +                                      uint64_t           s_addr, +                                      struct list_head * table, +                                      int **             dist) +{ +        struct vertex **       nhops; +        struct list_head *     p; +        int                    i = 0; +        struct vertex *        v; +        struct routing_table * t; +        struct nhop *          n; + +        /* We need at least 2 vertices for a table */ +        if (graph->nr_vertices < 2) +                goto fail_vertices; + +        if (dijkstra(graph, s_addr, &nhops, dist)) +                goto fail_vertices; + +        list_head_init(table); + +        /* Now construct the routing table from the nhops. */ +        list_for_each(p, &graph->vertices) { +                v = list_entry(p, struct vertex, next); + +                /* This is the src */ +                if (nhops[i] == NULL) { +                        i++; +                        continue; +                } + +                t = malloc(sizeof(*t)); +                if (t == NULL) +                        goto fail_t; + +                list_head_init(&t->nhops); + +                n = malloc(sizeof(*n)); +                if (n == NULL) +                        goto fail_n; + +                t->dst = v->addr; +                n->nhop = nhops[i]->addr; + +                list_add(&n->next, &t->nhops); +                list_add(&t->next, table); + +                i++; +        } + +        free(nhops); + +        return 0; + + fail_n: +        free(t); + fail_t: +        free_routing_table(table); +        free(nhops); +        free(*dist); + fail_vertices: +        *dist = NULL; +        return -1; +} + +static int add_lfa_to_table(struct list_head * table, +                            uint64_t           addr, +                            uint64_t           lfa) +{ +        struct list_head * p = NULL; +        struct nhop *      n; + +        n = malloc(sizeof(*n)); +        if (n == NULL) +                return -1; + +        n->nhop = lfa; + +        list_for_each(p, table) { +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); +                if (t->dst == addr) { +                        list_add_tail(&n->next, &t->nhops); +                        return 0; +                } +        } + +        free(n); + +        return -1; +} + +int graph_routing_table(struct graph *     graph, +                        enum routing_algo  algo, +                        uint64_t           s_addr, +                        struct list_head * table) +{ +        int *              s_dist; +        int *              n_dist[PROG_MAX_FLOWS]; +        uint64_t           addrs[PROG_MAX_FLOWS]; +        int                n_index[PROG_MAX_FLOWS]; +        struct list_head * p; +        struct list_head * q; +        struct vertex *    v; +        struct edge *      e; +        struct vertex **   nhops; +        int                i = 0; +        int                j = 0; +        int                k; + +        assert(graph); +        assert(table); + +        pthread_mutex_lock(&graph->lock); + +        /* Get the simple next hops routing table. */ +        if (graph_routing_table_simple(graph, s_addr, table, &s_dist)) +                goto fail_table_simple; + +        /* Possibly augment the routing table. */ +        switch (algo) { +        case ROUTING_SIMPLE: +                break; +        case ROUTING_LFA: +                for (j = 0; j < PROG_MAX_FLOWS; j++) { +                        n_dist[j] = NULL; +                        n_index[j] = -1; +                        addrs[j] = -1; +                } + +                list_for_each(p, &graph->vertices) { +                        v = list_entry(p, struct vertex, next); + +                        if (v->addr != s_addr) +                                continue; + +                        /* +                         * Get the distances for every neighbor +                         * of the source. +                         */ +                        list_for_each(q, &v->edges) { +                                e = list_entry(q, struct edge, next); + +                                addrs[i] = e->nb->addr; +                                n_index[i] = e->nb->index; +                                if (dijkstra(graph, e->nb->addr, +                                             &nhops, &(n_dist[i++]))) +                                        goto fail_dijkstra; + +                                free(nhops); +                        } + +                        break; +                } + +                /* Loop though all nodes to see if we have a LFA for them. */ +                list_for_each(p, &graph->vertices) { +                        v = list_entry(p, struct vertex, next); + +                        if (v->addr == s_addr) +                                continue; + +                        /* +                         * Check for every neighbor if +                         * dist(neighbor, destination) < +                         * dist(neighbor, source) + dist(source, destination). +                         */ +                        for (j = 0; j < i; j++) { +                                /* Exclude ourselves. */ +                                if (addrs[j] == v->addr) +                                        continue; + +                                if (n_dist[j][v->index] < +                                    s_dist[n_index[j]] + s_dist[v->index]) +                                        if (add_lfa_to_table(table, v->addr, +                                                             addrs[j])) +                                                goto fail_add_lfa; +                        } +                } + +                for (j = 0; j < i; j++) +                        free(n_dist[j]); + +                break; +        default: +                log_err("Unsupported algorithm."); +                goto fail_algo; +        } + +        pthread_mutex_unlock(&graph->lock); + +        free(s_dist); + +        return 0; + + fail_add_lfa: +        for (k = j; k < i; k++) +                free(n_dist[k]); + fail_dijkstra: +        free_routing_table(table); + fail_algo: +        free(s_dist); + fail_table_simple: +        pthread_mutex_unlock(&graph->lock); + +        return -1; +} diff --git a/src/ipcpd/unicast/pol/graph.h b/src/ipcpd/unicast/pol/graph.h new file mode 100644 index 00000000..06a2bd0d --- /dev/null +++ b/src/ipcpd/unicast/pol/graph.h @@ -0,0 +1,68 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Undirected graph structure + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_GRAPH_H +#define OUROBOROS_IPCPD_UNICAST_GRAPH_H + +#include <ouroboros/list.h> +#include <ouroboros/qos.h> + +#include <inttypes.h> + +enum routing_algo { +         ROUTING_SIMPLE = 0, +         ROUTING_LFA +}; + +struct nhop { +        struct list_head next; +        uint64_t         nhop; +}; + +struct routing_table { +        struct list_head next; +        uint64_t         dst; +        struct list_head nhops; +}; + +struct graph * graph_create(void); + +void           graph_destroy(struct graph * graph); + +int            graph_update_edge(struct graph * graph, +                                 uint64_t       s_addr, +                                 uint64_t       d_addr, +                                 qosspec_t      qs); + +int            graph_del_edge(struct graph * graph, +                              uint64_t       s_addr, +                              uint64_t       d_addr); + +int            graph_routing_table(struct graph *     graph, +                                   enum routing_algo  algo, +                                   uint64_t           s_addr, +                                   struct list_head * table); + +void           graph_free_routing_table(struct graph *     graph, +                                        struct list_head * table); + +#endif /* OUROBOROS_IPCPD_UNICAST_GRAPH_H */ diff --git a/src/ipcpd/unicast/pol/link_state.c b/src/ipcpd/unicast/pol/link_state.c new file mode 100644 index 00000000..d8f0e263 --- /dev/null +++ b/src/ipcpd/unicast/pol/link_state.c @@ -0,0 +1,1022 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Link state routing policy + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "link-state-routing" + +#include <ouroboros/endian.h> +#include <ouroboros/dev.h> +#include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/list.h> +#include <ouroboros/logs.h> +#include <ouroboros/notifier.h> +#include <ouroboros/rib.h> +#include <ouroboros/utils.h> + +#include "comp.h" +#include "connmgr.h" +#include "graph.h" +#include "ipcp.h" +#include "link_state.h" +#include "pff.h" + +#include <assert.h> +#include <stdlib.h> +#include <inttypes.h> +#include <string.h> +#include <pthread.h> + +#define RECALC_TIME    4 +#define LS_UPDATE_TIME 15 +#define LS_TIMEO       60 +#define LS_ENTRY_SIZE  104 +#define LSDB           "lsdb" + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + +struct lsa { +        uint64_t d_addr; +        uint64_t s_addr; +        uint64_t seqno; +} __attribute__((packed)); + +struct routing_i { +        struct list_head next; + +        struct pff *     pff; +        pthread_t        calculator; + +        bool             modified; +        pthread_mutex_t  lock; +}; + +/* TODO: link weight support. */ +struct adjacency { +        struct list_head next; + +        uint64_t         dst; +        uint64_t         src; + +        uint64_t         seqno; + +        time_t           stamp; +}; + +enum nb_type { +        NB_DT = 0, +        NB_MGMT +}; + +struct nb { +        struct list_head next; + +        uint64_t         addr; +        int              fd; +        enum nb_type     type; +}; + +struct { +        struct list_head  nbs; +        size_t            nbs_len; +        fset_t *          mgmt_set; + +        struct list_head  db; +        size_t            db_len; + +        pthread_rwlock_t  db_lock; + +        struct graph *    graph; + +        pthread_t         lsupdate; +        pthread_t         lsreader; +        pthread_t         listener; + +        struct list_head  routing_instances; +        pthread_mutex_t   routing_i_lock; + +        enum routing_algo routing_algo; +} ls; + +struct pol_routing_ops link_state_ops = { +        .init              = link_state_init, +        .fini              = link_state_fini, +        .routing_i_create  = link_state_routing_i_create, +        .routing_i_destroy = link_state_routing_i_destroy +}; + +static int str_adj(struct adjacency * adj, +                   char *             buf, +                   size_t             len) +{ +        char        tmbuf[64]; +        char        srcbuf[64]; +        char        dstbuf[64]; +        char        seqnobuf[64]; +        struct tm * tm; + +        if (len < LS_ENTRY_SIZE) +                return -1; + +        tm = localtime(&adj->stamp); +        strftime(tmbuf, sizeof(tmbuf), "%F %T", tm); /* 19 chars */ + +        sprintf(srcbuf, "%" PRIu64, adj->src); +        sprintf(dstbuf, "%" PRIu64, adj->dst); +        sprintf(seqnobuf, "%" PRIu64, adj->seqno); + +        sprintf(buf, "src: %20s\ndst: %20s\nseqno: %18s\nupd: %20s\n", +                srcbuf, dstbuf, seqnobuf, tmbuf); + +        return LS_ENTRY_SIZE; +} + +static struct adjacency * get_adj(const char * path) +{ +        struct list_head * p; +        char               entry[RIB_PATH_LEN + 1]; + +        assert(path); + +        list_for_each(p, &ls.db) { +                struct adjacency * a = list_entry(p, struct adjacency, next); +                sprintf(entry, "%" PRIu64 ".%" PRIu64, a->src, a->dst); +                if (strcmp(entry, path) == 0) +                        return a; +        } + +        return NULL; +} + +static int lsdb_getattr(const char *  path, +                        struct stat * st) +{ +        struct adjacency * adj; +        struct timespec    now; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_rwlock_rdlock(&ls.db_lock); + +        adj = get_adj(path); +        if (adj != NULL) { +                st->st_mtime = adj->stamp; +                st->st_size  = LS_ENTRY_SIZE; +        } else { +                st->st_mtime = now.tv_sec; +                st->st_size  = 0; +        } + +        st->st_mode  = S_IFREG | 0755; +        st->st_nlink = 1; +        st->st_uid   = getuid(); +        st->st_gid   = getgid(); + +        pthread_rwlock_unlock(&ls.db_lock); + +        return 0; +} + +static int lsdb_read(const char * path, +                     char *       buf, +                     size_t       len) +{ +        struct adjacency * a; +        int                size; + +        pthread_rwlock_rdlock(&ls.db_lock); + +        if (ls.db_len + ls.nbs_len == 0) +                goto fail; + +        a = get_adj(path); +        if (a == NULL) +                goto fail; + +        size = str_adj(a, buf, len); +        if (size < 0) +                goto fail; + +        pthread_rwlock_unlock(&ls.db_lock); +        return size; + + fail: +        pthread_rwlock_unlock(&ls.db_lock); +        return -1; +} + +static int lsdb_readdir(char *** buf) +{ +        struct list_head * p; +        char               entry[RIB_PATH_LEN + 1]; +        ssize_t            idx = 0; + +        pthread_rwlock_rdlock(&ls.db_lock); + +        if (ls.db_len + ls.nbs_len == 0) { +                pthread_rwlock_unlock(&ls.db_lock); +                return 0; +        } + +        *buf = malloc(sizeof(**buf) * (ls.db_len + ls.nbs_len)); +        if (*buf == NULL) { +                pthread_rwlock_unlock(&ls.db_lock); +                return -ENOMEM; +        } + +        list_for_each(p, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                char * str = (nb->type == NB_DT ? "dt." : "mgmt."); +                sprintf(entry, "%s%" PRIu64, str, nb->addr); +                (*buf)[idx] = malloc(strlen(entry) + 1); +                if ((*buf)[idx] == NULL) { +                        while (idx-- > 0) +                                free((*buf)[idx]); +                        free(buf); +                        pthread_rwlock_unlock(&ls.db_lock); +                        return -ENOMEM; +                } + +                strcpy((*buf)[idx], entry); + +                idx++; +        } + +        list_for_each(p, &ls.db) { +                struct adjacency * a = list_entry(p, struct adjacency, next); +                sprintf(entry, "%" PRIu64 ".%" PRIu64, a->src, a->dst); +                (*buf)[idx] = malloc(strlen(entry) + 1); +                if ((*buf)[idx] == NULL) { +                        ssize_t j; +                        for (j = 0; j < idx; ++j) +                                free(*buf[j]); +                        free(buf); +                        pthread_rwlock_unlock(&ls.db_lock); +                        return -ENOMEM; +                } + +                strcpy((*buf)[idx], entry); + +                idx++; +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        return idx; +} + +static struct rib_ops r_ops = { +        .read    = lsdb_read, +        .readdir = lsdb_readdir, +        .getattr = lsdb_getattr +}; + +static int lsdb_add_nb(uint64_t     addr, +                       int          fd, +                       enum nb_type type) +{ +        struct list_head * p; +        struct nb *        nb; + +        pthread_rwlock_wrlock(&ls.db_lock); + +        list_for_each(p, &ls.nbs) { +                struct nb * el = list_entry(p, struct nb, next); +                if (el->addr == addr && el->type == type) { +                        log_dbg("Already know %s neighbor %" PRIu64 ".", +                                type == NB_DT ? "dt" : "mgmt", addr); +                        if (el->fd != fd) { +                                log_warn("Existing neighbor assigned new fd."); +                                el->fd = fd; +                        } +                        pthread_rwlock_unlock(&ls.db_lock); +                        return -EPERM; +                } + +                if (addr > el->addr) +                        break; +        } + +        nb = malloc(sizeof(*nb)); +        if (nb == NULL) { +                pthread_rwlock_unlock(&ls.db_lock); +                return -ENOMEM; +        } + +        nb->addr  = addr; +        nb->fd    = fd; +        nb->type  = type; + +        list_add_tail(&nb->next, p); + +        ++ls.nbs_len; + +        log_dbg("Type %s neighbor %" PRIu64 " added.", +                nb->type == NB_DT ? "dt" : "mgmt", addr); + +        pthread_rwlock_unlock(&ls.db_lock); + +        return 0; +} + +static int lsdb_del_nb(uint64_t     addr, +                       int          fd) +{ +        struct list_head * p; +        struct list_head * h; + +        pthread_rwlock_wrlock(&ls.db_lock); + +        list_for_each_safe(p, h, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                if (nb->addr == addr && nb->fd == fd) { +                        list_del(&nb->next); +                        --ls.nbs_len; +                        pthread_rwlock_unlock(&ls.db_lock); +                        log_dbg("Type %s neighbor %" PRIu64 " deleted.", +                                nb->type == NB_DT ? "dt" : "mgmt", addr); +                        free(nb); +                        return 0; +                } +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        return -EPERM; +} + +static int nbr_to_fd(uint64_t addr) +{ +        struct list_head * p; + +        pthread_rwlock_rdlock(&ls.db_lock); + +        list_for_each(p, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                if (nb->addr == addr && nb->type == NB_DT) { +                        pthread_rwlock_unlock(&ls.db_lock); +                        return nb->fd; +                } +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        return -1; +} + +static void calculate_pff(struct routing_i * instance) +{ +        int                fd; +        struct list_head   table; +        struct list_head * p; +        struct list_head * q; +        int                fds[PROG_MAX_FLOWS]; + +        if (graph_routing_table(ls.graph, ls.routing_algo, +                                ipcpi.dt_addr, &table)) +                return; + +        pff_lock(instance->pff); + +        pff_flush(instance->pff); + +        /* Calculate forwarding table from routing table. */ +        list_for_each(p, &table) { +                int                    i = 0; +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); + +                list_for_each(q, &t->nhops) { +                        struct nhop * n = list_entry(q, struct nhop, next); + +                        fd = nbr_to_fd(n->nhop); +                        if (fd == -1) +                                continue; + +                        fds[i++] = fd; +                } +                if (i > 0) +                        pff_add(instance->pff, t->dst, fds, i); +        } + +        pff_unlock(instance->pff); + +        graph_free_routing_table(ls.graph, &table); +} + +static void set_pff_modified(bool calc) +{ +        struct list_head * p; + +        pthread_mutex_lock(&ls.routing_i_lock); +        list_for_each(p, &ls.routing_instances) { +                struct routing_i * inst = +                        list_entry(p, struct routing_i, next); +                pthread_mutex_lock(&inst->lock); +                inst->modified = true; +                pthread_mutex_unlock(&inst->lock); +                if (calc) +                        calculate_pff(inst); +        } +        pthread_mutex_unlock(&ls.routing_i_lock); +} + +static int lsdb_add_link(uint64_t    src, +                         uint64_t    dst, +                         uint64_t    seqno, +                         qosspec_t * qs) +{ +        struct list_head * p; +        struct adjacency * adj; +        struct timespec    now; +        int                ret = -1; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_rwlock_wrlock(&ls.db_lock); + +        list_for_each(p, &ls.db) { +                struct adjacency * a = list_entry(p, struct adjacency, next); +                if (a->dst == dst && a->src == src) { +                        if (a->seqno < seqno) { +                                a->stamp = now.tv_sec; +                                a->seqno = seqno; +                                ret = 0; +                        } +                        pthread_rwlock_unlock(&ls.db_lock); +                        return ret; +                } + +                if (a->dst > dst || (a->dst == dst && a->src > src)) +                        break; +        } + +        adj = malloc(sizeof(*adj)); +        if (adj == NULL) { +                pthread_rwlock_unlock(&ls.db_lock); +                return -ENOMEM; +        } + +        adj->dst   = dst; +        adj->src   = src; +        adj->seqno = seqno; +        adj->stamp = now.tv_sec; + +        list_add_tail(&adj->next, p); + +        ls.db_len++; + +        if (graph_update_edge(ls.graph, src, dst, *qs)) +                log_warn("Failed to add edge to graph."); + +        pthread_rwlock_unlock(&ls.db_lock); + +        set_pff_modified(true); + +        return 0; +} + +static int lsdb_del_link(uint64_t src, +                         uint64_t dst) +{ +        struct list_head * p; +        struct list_head * h; + +        pthread_rwlock_wrlock(&ls.db_lock); + +        list_for_each_safe(p, h, &ls.db) { +                struct adjacency * a = list_entry(p, struct adjacency, next); +                if (a->dst == dst && a->src == src) { +                        list_del(&a->next); +                        if (graph_del_edge(ls.graph, src, dst)) +                                log_warn("Failed to delete edge from graph."); + +                        ls.db_len--; + +                        pthread_rwlock_unlock(&ls.db_lock); +                        set_pff_modified(false); +                        free(a); +                        return 0; +                } +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        return -EPERM; +} + +static void * periodic_recalc_pff(void * o) +{ +        bool               modified; +        struct routing_i * inst = (struct routing_i *) o; + +        while (true) { +                pthread_mutex_lock(&inst->lock); +                modified = inst->modified; +                inst->modified = false; +                pthread_mutex_unlock(&inst->lock); + +                if (modified) +                        calculate_pff(inst); +                sleep(RECALC_TIME); +        } + +        return (void *) 0; +} + +static void send_lsm(uint64_t src, +                     uint64_t dst, +                     uint64_t seqno) +{ +        struct lsa         lsm; +        struct list_head * p; + +        lsm.d_addr = hton64(dst); +        lsm.s_addr = hton64(src); +        lsm.seqno  = hton64(seqno); + +        list_for_each(p, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                if (nb->type == NB_MGMT) +                        flow_write(nb->fd, &lsm, sizeof(lsm)); +        } +} + +/* replicate the lsdb to a mgmt neighbor */ +static void lsdb_replicate(int fd) +{ +        struct list_head * p; +        struct list_head * h; +        struct list_head   copy; + +        list_head_init(©); + +        /* Lock the lsdb, copy the lsms and send outside of lock. */ +        pthread_rwlock_rdlock(&ls.db_lock); + +        list_for_each(p, &ls.db) { +                struct adjacency * adj; +                struct adjacency * cpy; +                adj = list_entry(p, struct adjacency, next); +                cpy = malloc(sizeof(*cpy)); +                if (cpy == NULL) { +                        log_warn("Failed to replicate full lsdb."); +                        break; +                } + +                cpy->dst   = adj->dst; +                cpy->src   = adj->src; +                cpy->seqno = adj->seqno; + +                list_add_tail(&cpy->next, ©); +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        list_for_each_safe(p, h, ©) { +                struct lsa         lsm; +                struct adjacency * adj; +                adj = list_entry(p, struct adjacency, next); +                lsm.d_addr = hton64(adj->dst); +                lsm.s_addr = hton64(adj->src); +                lsm.seqno  = hton64(adj->seqno); +                list_del(&adj->next); +                free(adj); +                flow_write(fd, &lsm, sizeof(lsm)); +        } +} + +static void * lsupdate(void * o) +{ +        struct list_head * p; +        struct list_head * h; +        struct timespec    now; + +        (void) o; + +        while (true) { +                clock_gettime(CLOCK_REALTIME_COARSE, &now); + +                pthread_rwlock_rdlock(&ls.db_lock); + +                pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock, +                                     (void *) &ls.db_lock); + +                list_for_each_safe(p, h, &ls.db) { +                        struct adjacency * adj; +                        adj = list_entry(p, struct adjacency, next); +                        if (now.tv_sec - adj->stamp > LS_TIMEO) { +                                list_del(&adj->next); +                                log_dbg("%" PRIu64 " - %" PRIu64" timed out.", +                                        adj->src, adj->dst); +                                if (graph_del_edge(ls.graph, adj->src, +                                                   adj->dst)) +                                        log_err("Failed to del edge."); +                                free(adj); +                                continue; +                        } + +                        if (adj->src == ipcpi.dt_addr) { +                                adj->seqno++; +                                send_lsm(adj->src, adj->dst, adj->seqno); +                                adj->stamp = now.tv_sec; +                        } +                } + +                pthread_cleanup_pop(true); + +                sleep(LS_UPDATE_TIME); +        } + +        return (void *) 0; +} + +static void * ls_conn_handle(void * o) +{ +        struct conn conn; + +        (void) o; + +        while (true) { +                if (connmgr_wait(COMPID_MGMT, &conn)) { +                        log_err("Failed to get next MGMT connection."); +                        continue; +                } + +                /* NOTE: connection acceptance policy could be here. */ + +                notifier_event(NOTIFY_MGMT_CONN_ADD, &conn); +        } + +        return 0; +} + + +static void forward_lsm(uint8_t * buf, +                        size_t    len, +                        int       in_fd) +{ +        struct list_head * p; + +        pthread_rwlock_rdlock(&ls.db_lock); + +        pthread_cleanup_push((void (*))(void *) pthread_rwlock_unlock, +                             &ls.db_lock); + +        list_for_each(p, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                if (nb->type == NB_MGMT && nb->fd != in_fd) +                        flow_write(nb->fd, buf, len); +        } + +        pthread_cleanup_pop(true); +} + +static void * lsreader(void * o) +{ +        fqueue_t *   fq; +        int          ret; +        uint8_t      buf[sizeof(struct lsa)]; +        int          fd; +        qosspec_t    qs; +        struct lsa * msg; +        size_t       len; + +        (void) o; + +        memset(&qs, 0, sizeof(qs)); + +        fq = fqueue_create(); +        if (fq == NULL) +                return (void *) -1; + +        pthread_cleanup_push((void (*) (void *)) fqueue_destroy, +                             (void *) fq); + +        while (true) { +                ret = fevent(ls.mgmt_set, fq, NULL); +                if (ret < 0) { +                        log_warn("Event error: %d.", ret); +                        continue; +                } + +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (fqueue_type(fq) != FLOW_PKT) +                                continue; + +                        len = flow_read(fd, buf, sizeof(*msg)); +                        if (len <= 0 || len != sizeof(*msg)) +                                continue; + +                        msg = (struct lsa *) buf; + +                        if (lsdb_add_link(ntoh64(msg->s_addr), +                                          ntoh64(msg->d_addr), +                                          ntoh64(msg->seqno), +                                          &qs)) +                                continue; + +                        forward_lsm(buf, len, fd); +                } +        } + +        pthread_cleanup_pop(true); + +        return (void *) 0; +} + +static void flow_event(int  fd, +                       bool up) +{ + +        struct list_head * p; + +        log_dbg("Notifying routing instances of flow event."); + +        pthread_mutex_lock(&ls.routing_i_lock); + +        list_for_each(p, &ls.routing_instances) { +                struct routing_i * ri = list_entry(p, struct routing_i, next); +                pff_flow_state_change(ri->pff, fd, up); +        } + +        pthread_mutex_unlock(&ls.routing_i_lock); +} + +static void handle_event(void *       self, +                         int          event, +                         const void * o) +{ +        /* FIXME: Apply correct QoS on graph */ +        struct conn *      c; +        qosspec_t          qs; +        int                flags; + +        (void) self; + +        c = (struct conn *) o; + +        memset(&qs, 0, sizeof(qs)); + +        switch (event) { +        case NOTIFY_DT_CONN_ADD: +                pthread_rwlock_rdlock(&ls.db_lock); +                send_lsm(ipcpi.dt_addr, c->conn_info.addr, 0); +                pthread_rwlock_unlock(&ls.db_lock); + +                if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) +                        log_dbg("Failed to add neighbor to LSDB."); + +                if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, 0, &qs)) +                        log_dbg("Failed to add new adjacency to LSDB."); +                break; +        case NOTIFY_DT_CONN_DEL: +                flow_event(c->flow_info.fd, false); + +                if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) +                        log_dbg("Failed to delete neighbor from LSDB."); + +                if (lsdb_del_link(ipcpi.dt_addr, c->conn_info.addr)) +                        log_dbg("Local link was not in LSDB."); +                break; +        case NOTIFY_DT_CONN_QOS: +                log_dbg("QoS changes currently unsupported."); +                break; +        case NOTIFY_DT_CONN_UP: +                flow_event(c->flow_info.fd, true); +                break; +        case NOTIFY_DT_CONN_DOWN: +                flow_event(c->flow_info.fd, false); +                break; +        case NOTIFY_MGMT_CONN_ADD: +                fccntl(c->flow_info.fd, FLOWGFLAGS, &flags); +                fccntl(c->flow_info.fd, FLOWSFLAGS, flags | FLOWFRNOPART); +                fset_add(ls.mgmt_set, c->flow_info.fd); +                if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT)) +                        log_warn("Failed to add mgmt neighbor to LSDB."); +                /* replicate the entire lsdb */ +                lsdb_replicate(c->flow_info.fd); +                break; +        case NOTIFY_MGMT_CONN_DEL: +                fset_del(ls.mgmt_set, c->flow_info.fd); +                if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) +                        log_warn("Failed to delete mgmt neighbor from LSDB."); +                break; +        default: +                break; +        } +} + +struct routing_i * link_state_routing_i_create(struct pff * pff) +{ +        struct routing_i * tmp; + +        assert(pff); + +        tmp = malloc(sizeof(*tmp)); +        if (tmp == NULL) +                goto fail_tmp; + +        tmp->pff      = pff; +        tmp->modified = false; + +        if (pthread_mutex_init(&tmp->lock, NULL)) +                goto fail_instance_lock_init; + +        if (pthread_create(&tmp->calculator, NULL, +                           periodic_recalc_pff, tmp)) +                goto fail_pthread_create_lsupdate; + +        pthread_mutex_lock(&ls.routing_i_lock); + +        list_add(&tmp->next, &ls.routing_instances); + +        pthread_mutex_unlock(&ls.routing_i_lock); + +        return tmp; + + fail_pthread_create_lsupdate: +        pthread_mutex_destroy(&tmp->lock); + fail_instance_lock_init: +        free(tmp); + fail_tmp: +        return NULL; +} + +void link_state_routing_i_destroy(struct routing_i * instance) +{ +        assert(instance); + +        pthread_mutex_lock(&ls.routing_i_lock); + +        list_del(&instance->next); + +        pthread_mutex_unlock(&ls.routing_i_lock); + +        pthread_cancel(instance->calculator); + +        pthread_join(instance->calculator, NULL); + +        pthread_mutex_destroy(&instance->lock); + +        free(instance); +} + +int link_state_init(enum pol_routing pr) +{ +        struct conn_info info; + +        memset(&info, 0, sizeof(info)); + +        strcpy(info.comp_name, LS_COMP); +        strcpy(info.protocol, LS_PROTO); +        info.pref_version = 1; +        info.pref_syntax  = PROTO_GPB; +        info.addr         = ipcpi.dt_addr; + +        switch (pr) { +        case ROUTING_LINK_STATE: +                log_dbg("Using link state routing policy."); +                ls.routing_algo = ROUTING_SIMPLE; +                break; +        case ROUTING_LINK_STATE_LFA: +                log_dbg("Using Loop-Free Alternates policy."); +                ls.routing_algo = ROUTING_LFA; +                break; +        default: +                goto fail_graph; +        } + +        ls.graph = graph_create(); +        if (ls.graph == NULL) +                goto fail_graph; + +        if (notifier_reg(handle_event, NULL)) +                goto fail_notifier_reg; + +        if (pthread_rwlock_init(&ls.db_lock, NULL)) +                goto fail_db_lock_init; + +        if (pthread_mutex_init(&ls.routing_i_lock, NULL)) +                goto fail_routing_i_lock_init; + +        if (connmgr_comp_init(COMPID_MGMT, &info)) +                goto fail_connmgr_comp_init; + +        ls.mgmt_set = fset_create(); +        if (ls.mgmt_set == NULL) +                goto fail_fset_create; + +        list_head_init(&ls.db); +        list_head_init(&ls.nbs); +        list_head_init(&ls.routing_instances); + +        if (pthread_create(&ls.lsupdate, NULL, lsupdate, NULL)) +                goto fail_pthread_create_lsupdate; + +        if (pthread_create(&ls.lsreader, NULL, lsreader, NULL)) +                goto fail_pthread_create_lsreader; + +        if (pthread_create(&ls.listener, NULL, ls_conn_handle, NULL)) +                goto fail_pthread_create_listener; + +        if (rib_reg(LSDB, &r_ops)) +                goto fail_rib_reg; + +        ls.db_len      = 0; +        ls.nbs_len     = 0; + +        return 0; + + fail_rib_reg: +        pthread_cancel(ls.listener); +        pthread_join(ls.listener, NULL); + fail_pthread_create_listener: +        pthread_cancel(ls.lsreader); +        pthread_join(ls.lsreader, NULL); + fail_pthread_create_lsreader: +        pthread_cancel(ls.lsupdate); +        pthread_join(ls.lsupdate, NULL); + fail_pthread_create_lsupdate: +        fset_destroy(ls.mgmt_set); + fail_fset_create: +        connmgr_comp_fini(COMPID_MGMT); + fail_connmgr_comp_init: +        pthread_mutex_destroy(&ls.routing_i_lock); + fail_routing_i_lock_init: +        pthread_rwlock_destroy(&ls.db_lock); + fail_db_lock_init: +        notifier_unreg(handle_event); + fail_notifier_reg: +        graph_destroy(ls.graph); + fail_graph: +        return -1; +} + +void link_state_fini(void) +{ +        struct list_head * p; +        struct list_head * h; + +        rib_unreg(LSDB); + +        pthread_cancel(ls.listener); +        pthread_join(ls.listener, NULL); + +        pthread_cancel(ls.lsreader); +        pthread_join(ls.lsreader, NULL); + +        pthread_cancel(ls.lsupdate); +        pthread_join(ls.lsupdate, NULL); + +        fset_destroy(ls.mgmt_set); + +        connmgr_comp_fini(COMPID_MGMT); + +        graph_destroy(ls.graph); + +        pthread_rwlock_wrlock(&ls.db_lock); + +        list_for_each_safe(p, h, &ls.db) { +                struct adjacency * a = list_entry(p, struct adjacency, next); +                list_del(&a->next); +                free(a); +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        pthread_rwlock_destroy(&ls.db_lock); + +        pthread_mutex_destroy(&ls.routing_i_lock); + +        notifier_unreg(handle_event); +} diff --git a/src/ipcpd/unicast/pol/link_state.h b/src/ipcpd/unicast/pol/link_state.h new file mode 100644 index 00000000..a7b44b4e --- /dev/null +++ b/src/ipcpd/unicast/pol/link_state.h @@ -0,0 +1,41 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Link state routing policy + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_POL_LINK_STATE_H +#define OUROBOROS_IPCPD_UNICAST_POL_LINK_STATE_H + +#define LS_COMP  "Management" +#define LS_PROTO "LSP" + +#include "pol-routing-ops.h" + +int                link_state_init(enum pol_routing pr); + +void               link_state_fini(void); + +struct routing_i * link_state_routing_i_create(struct pff * pff); + +void               link_state_routing_i_destroy(struct routing_i * instance); + +struct pol_routing_ops link_state_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_POL_LINK_STATE_H */ diff --git a/src/ipcpd/unicast/pol/simple_pff.c b/src/ipcpd/unicast/pol/simple_pff.c new file mode 100644 index 00000000..4338c53c --- /dev/null +++ b/src/ipcpd/unicast/pol/simple_pff.c @@ -0,0 +1,187 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Simple PDU Forwarding Function + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#include <ouroboros/hashtable.h> +#include <ouroboros/errno.h> + +#include <assert.h> +#include <pthread.h> + +#include "simple_pff.h" + +struct pff_i { +        struct htable *  table; +        pthread_rwlock_t lock; +}; + +struct pol_pff_ops simple_pff_ops = { +        .create            = simple_pff_create, +        .destroy           = simple_pff_destroy, +        .lock              = simple_pff_lock, +        .unlock            = simple_pff_unlock, +        .add               = simple_pff_add, +        .update            = simple_pff_update, +        .del               = simple_pff_del, +        .flush             = simple_pff_flush, +        .nhop              = simple_pff_nhop, +        .flow_state_change = NULL +}; + +struct pff_i * simple_pff_create(void) +{ +        struct pff_i * tmp; + +        tmp = malloc(sizeof(*tmp)); +        if (tmp == NULL) +                return NULL; + +        if (pthread_rwlock_init(&tmp->lock, NULL)) { +                free(tmp); +                return NULL; +        } + +        tmp->table = htable_create(PFT_SIZE, false); +        if (tmp->table == NULL) { +                pthread_rwlock_destroy(&tmp->lock); +                free(tmp); +                return NULL; +        } + +        return tmp; +} + +void simple_pff_destroy(struct pff_i * pff_i) +{ +        assert(pff_i); + +        htable_destroy(pff_i->table); + +        pthread_rwlock_destroy(&pff_i->lock); +        free(pff_i); +} + +void simple_pff_lock(struct pff_i * pff_i) +{ +        pthread_rwlock_wrlock(&pff_i->lock); +} + +void simple_pff_unlock(struct pff_i * pff_i) +{ +        pthread_rwlock_unlock(&pff_i->lock); +} + +int simple_pff_add(struct pff_i * pff_i, +                   uint64_t       addr, +                   int *          fd, +                   size_t         len) +{ +        int * val; + +        assert(pff_i); +        assert(len > 0); + +        (void) len; + +        val = malloc(sizeof(*val)); +        if (val == NULL) +                return -ENOMEM; + +        *val = fd[0]; + +        if (htable_insert(pff_i->table, addr, val, 1)) { +                free(val); +                return -1; +        } + +        return 0; +} + +int simple_pff_update(struct pff_i * pff_i, +                      uint64_t       addr, +                      int *          fd, +                      size_t         len) +{ +        int * val; + +        assert(pff_i); +        assert(len > 0); + +        (void) len; + +        val = malloc(sizeof(*val)); +        if (val == NULL) +                return -ENOMEM; +        *val = fd[0]; + +        if (htable_delete(pff_i->table, addr)) { +                free(val); +                return -1; +        } + +        if (htable_insert(pff_i->table, addr, val, 1)) { +                free(val); +                return -1; +        } + +        return 0; +} + +int simple_pff_del(struct pff_i * pff_i, +                   uint64_t       addr) +{ +        assert(pff_i); + +        if (htable_delete(pff_i->table, addr)) +                return -1; + +        return 0; +} + +void simple_pff_flush(struct pff_i * pff_i) +{ +        assert(pff_i); + +        htable_flush(pff_i->table); +} + +int simple_pff_nhop(struct pff_i * pff_i, +                    uint64_t       addr) +{ +        void * j; +        size_t len; +        int    fd = -1; + +        assert(pff_i); + +        pthread_rwlock_rdlock(&pff_i->lock); + +        if (!htable_lookup(pff_i->table, addr, &j, &len)) +                fd = *((int *) j); + +        pthread_rwlock_unlock(&pff_i->lock); + +        return fd; +} diff --git a/src/ipcpd/unicast/pol/simple_pff.h b/src/ipcpd/unicast/pol/simple_pff.h new file mode 100644 index 00000000..02c09a58 --- /dev/null +++ b/src/ipcpd/unicast/pol/simple_pff.h @@ -0,0 +1,57 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Simple policy for PFF + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_SIMPLE_PFF_H +#define OUROBOROS_IPCPD_UNICAST_SIMPLE_PFF_H + +#include "pol-pff-ops.h" + +struct pff_i * simple_pff_create(void); + +void           simple_pff_destroy(struct pff_i * pff_i); + +void           simple_pff_lock(struct pff_i * pff_i); + +void           simple_pff_unlock(struct pff_i * pff_i); + +int            simple_pff_add(struct pff_i * pff_i, +                              uint64_t       addr, +                              int *          fd, +                              size_t         len); + +int            simple_pff_update(struct pff_i * pff_i, +                                 uint64_t       addr, +                                 int *          fd, +                                 size_t         len); + +int            simple_pff_del(struct pff_i * pff_i, +                              uint64_t       addr); + +void           simple_pff_flush(struct pff_i * pff_i); + +/* Returns fd towards next hop */ +int            simple_pff_nhop(struct pff_i * pff_i, +                               uint64_t       addr); + +struct pol_pff_ops simple_pff_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_SIMPLE_PFF_H */ diff --git a/src/ipcpd/unicast/pol/tests/CMakeLists.txt b/src/ipcpd/unicast/pol/tests/CMakeLists.txt new file mode 100644 index 00000000..d0652533 --- /dev/null +++ b/src/ipcpd/unicast/pol/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR +  ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR +  ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c +  # Add new tests here +  graph_test.c +  ) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests}) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +remove(tests_to_run test_suite.c) + +foreach (test ${tests_to_run}) +  get_filename_component(test_name ${test} NAME_WE) +  add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/unicast/pol/tests/graph_test.c b/src/ipcpd/unicast/pol/tests/graph_test.c new file mode 100644 index 00000000..a312c1a8 --- /dev/null +++ b/src/ipcpd/unicast/pol/tests/graph_test.c @@ -0,0 +1,300 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Test of the graph structure + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include <ouroboros/utils.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "graph.c" + +struct graph *   graph; +struct list_head table; +qosspec_t        qs; + +int graph_test_entries(int entries) +{ +        struct list_head * p; +        int                i = 0; + +        if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { +                printf("Failed to get routing table.\n"); +                return -1; +        } + +        list_for_each(p, &table) +                i++; + +        if (i != entries) { +                printf("Wrong number of entries.\n"); +                graph_free_routing_table(graph, &table); +                return -1; +        } + +        graph_free_routing_table(graph, &table); + +        return 0; +} + +int graph_test_double_link(void) +{ +        struct list_head * p; +        int                i = 0; + +        if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { +                printf("Failed to get routing table.\n"); +                return -1; +        } + +        list_for_each(p, &table) +                i++; + +        if (i != 2) { +                printf("Wrong number of entries.\n"); +                graph_free_routing_table(graph, &table); +                return -1; +        } + +        list_for_each(p, &table) { +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); +                struct nhop *          n = +                        list_first_entry(&t->nhops, struct nhop, next); + +                if ((t->dst != 2 && n->nhop != 2) || +                    (t->dst != 3 && n->nhop != 2)) { +                        printf("Wrong routing entry.\n"); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } +        } + +        graph_free_routing_table(graph, &table); + +        return 0; +} + +int graph_test_single_link(void) +{ +        struct list_head * p; +        int                i = 0; + +        if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { +                printf("Failed to get routing table.\n"); +                return -1; +        } + +        list_for_each(p, &table) +                i++; + +        if (i != 1) { +                printf("Wrong number of entries.\n"); +                graph_free_routing_table(graph, &table); +                return -1; +        } + +        list_for_each(p, &table) { +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); +                struct nhop *          n = +                        list_first_entry(&t->nhops, struct nhop, next); + +                if (t->dst != 2 && n->nhop != 2) { +                        printf("Wrong routing entry.\n"); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } +        } + +        graph_free_routing_table(graph, &table); + +        return 0; +} + +int graph_test(int     argc, +               char ** argv) +{ +        int                nhop; +        int                dst; +        struct list_head * p; + +        (void) argc; +        (void) argv; + +        memset(&qs, 0, sizeof(qs)); + +        graph = graph_create(); +        if (graph == NULL) { +                printf("Failed to create graph.\n"); +                return -1; +        } + +        graph_destroy(graph); + +        graph = graph_create(); +        if (graph == NULL) { +                printf("Failed to create graph.\n"); +                return -1; +        } + +        if (graph_update_edge(graph, 1, 2, qs)) { +                printf("Failed to add edge.\n"); +                graph_destroy(graph); +                return -1; +        } + +        if (graph_update_edge(graph, 2, 1, qs)) { +                printf("Failed to add edge.\n"); +                graph_destroy(graph); +                return -1; +        } + +        if (graph_test_single_link()) { +                graph_destroy(graph); +                return -1; +        } + +        if (graph_update_edge(graph, 2, 3, qs)) { +                printf("Failed to add edge.\n"); +                graph_destroy(graph); +                return -1; +        } + +        if (graph_update_edge(graph, 3, 2, qs)) { +                printf("Failed to add edge.\n"); +                graph_destroy(graph); +                return -1; +        } + + +        if (graph_test_double_link()) { +                graph_destroy(graph); +                return -1; +        } + +        if (graph_del_edge(graph, 2, 3)) { +                printf("Failed to delete edge.\n"); +                graph_destroy(graph); +                return -1; +        } + +        if (graph_del_edge(graph, 3, 2)) { +                printf("Failed to delete edge.\n"); +                graph_destroy(graph); +                return -1; +        } + +        if (graph_test_single_link()) { +                graph_destroy(graph); +                return -1; +        } + +        graph_update_edge(graph, 2, 3, qs); +        graph_update_edge(graph, 3, 2, qs); +        graph_update_edge(graph, 1, 3, qs); +        graph_update_edge(graph, 3, 1, qs); + +        if (graph_test_entries(2)) { +                graph_destroy(graph); +                return -1; +        } + +        graph_update_edge(graph, 3, 4, qs); +        graph_update_edge(graph, 4, 3, qs); +        graph_update_edge(graph, 4, 5, qs); +        graph_update_edge(graph, 5, 4, qs); + +        if (graph_test_entries(4)) { +                graph_destroy(graph); +                return -1; +        } + +        graph_update_edge(graph, 2, 6, qs); +        graph_update_edge(graph, 6, 2, qs); +        graph_update_edge(graph, 6, 7, qs); +        graph_update_edge(graph, 7, 6, qs); +        graph_update_edge(graph, 3, 7, qs); +        graph_update_edge(graph, 7, 3, qs); + +        if (graph_test_entries(6)) { +                graph_destroy(graph); +                return -1; +        } + +        if (graph_routing_table(graph, ROUTING_SIMPLE, 1, &table)) { +                printf("Failed to get routing table.\n"); +                return -1; +        } + +        list_for_each(p, &table) { +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); +                struct nhop *          n = +                        list_first_entry(&t->nhops, struct nhop, next); + +                dst = t->dst; +                nhop = n->nhop; + +                if (dst == 3 && nhop != 3) { +                        printf("Wrong entry."); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } + +                if (dst == 2 && nhop != 2) { +                        printf("Wrong entry."); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } + +                if (dst == 6 && nhop != 2) { +                        printf("Wrong entry."); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } + +                if (dst == 4 && nhop != 3) { +                        printf("Wrong entry."); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } + +                if (dst == 5 && nhop != 3) { +                        printf("Wrong entry."); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } + +                if (dst == 7 && nhop != 3) { +                        printf("Wrong entry."); +                        graph_free_routing_table(graph, &table); +                        return -1; +                } +        } + +        graph_free_routing_table(graph, &table); + +        return 0; +} diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c new file mode 100644 index 00000000..6e8c4e0e --- /dev/null +++ b/src/ipcpd/unicast/psched.c @@ -0,0 +1,239 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Packet scheduler component + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/notifier.h> + +#include "ipcp.h" +#include "psched.h" +#include "connmgr.h" + +#include <assert.h> +#include <sched.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +static int qos_prio [] = { +        QOS_PRIO_BE, +        QOS_PRIO_VIDEO, +        QOS_PRIO_VOICE, +}; + +struct psched { +        fset_t *         set[QOS_CUBE_MAX]; +        next_packet_fn_t callback; +        pthread_t        readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +}; + +struct sched_info { +        struct psched * sch; +        qoscube_t       qc; +}; + +static void cleanup_reader(void * o) +{ +        fqueue_destroy((fqueue_t *) o); +} + +static void * packet_reader(void * o) +{ +        struct psched *       sched; +        struct shm_du_buff *  sdb; +        int                   fd; +        fqueue_t *            fq; +        qoscube_t             qc; + +        sched = ((struct sched_info *) o)->sch; +        qc    = ((struct sched_info *) o)->qc; + +        ipcp_lock_to_core(); + +        free(o); + +        fq = fqueue_create(); +        if (fq == NULL) +                return (void *) -1; + +        pthread_cleanup_push(cleanup_reader, fq); + +        while (true) { +                int ret = fevent(sched->set[qc], fq, NULL); +                if (ret < 0) +                        continue; + +                while ((fd = fqueue_next(fq)) >= 0) { +                        switch (fqueue_type(fq)) { +                        case FLOW_DEALLOC: +                                notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd); +                                break; +                        case FLOW_DOWN: +                                notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); +                                break; +                        case FLOW_UP: +                                notifier_event(NOTIFY_DT_FLOW_UP, &fd); +                                break; +                        case FLOW_PKT: +                                if (ipcp_flow_read(fd, &sdb)) +                                        continue; + +                                sched->callback(fd, qc, sdb); +                                break; +                        default: +                                break; +                        } +                } +        } + +        pthread_cleanup_pop(true); + +        return (void *) 0; +} + +struct psched * psched_create(next_packet_fn_t callback) +{ +        struct psched *       psched; +        struct sched_info *   infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL]; +        int                   i; +        int                   j; + +        assert(callback); + +        psched = malloc(sizeof(*psched)); +        if (psched == NULL) +                goto fail_malloc; + +        psched->callback = callback; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                psched->set[i] = fset_create(); +                if (psched->set[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                fset_destroy(psched->set[j]); +                        goto fail_flow_set; +                } +        } + +        for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { +                infos[i] = malloc(sizeof(*infos[i])); +                if (infos[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                free(infos[j]); +                        goto fail_infos; +                } +                infos[i]->sch = psched; +                infos[i]->qc  = i % QOS_CUBE_MAX; +        } + +        for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { +                if (pthread_create(&psched->readers[i], NULL, +                                   packet_reader, infos[i])) { +                        for (j = 0; j < i; ++j) +                                pthread_cancel(psched->readers[j]); +                        for (j = 0; j < i; ++j) +                                pthread_join(psched->readers[j], NULL); +                        for (j = i; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) +                                free(infos[i]); +                        goto fail_infos; +                } +        } + +        for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { +                struct sched_param  par; +                int                 pol = SCHED_RR; +                int                 min; +                int                 max; + +                min = sched_get_priority_min(pol); +                max = sched_get_priority_max(pol); + +                min = (max - min) / 2; + +                par.sched_priority = min + +                        (qos_prio[i % QOS_CUBE_MAX] * (max - min) / 99); + +                if (pthread_setschedparam(psched->readers[i], pol, &par)) +                        goto fail_sched; +        } + +        return psched; + + fail_sched: +        for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) +                pthread_cancel(psched->readers[j]); +        for (j = 0; j < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++j) +                pthread_join(psched->readers[j], NULL); + fail_infos: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                fset_destroy(psched->set[j]); + fail_flow_set: +        free(psched); + fail_malloc: +        return NULL; +} + +void psched_destroy(struct psched * psched) +{ +        int i; + +        assert(psched); + +        for (i = 0; i < QOS_CUBE_MAX * IPCP_SCHED_THR_MUL; ++i) { +                pthread_cancel(psched->readers[i]); +                pthread_join(psched->readers[i], NULL); +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                fset_destroy(psched->set[i]); + +        free(psched); +} + +void psched_add(struct psched * psched, +                int             fd) +{ +        qoscube_t qc; + +        assert(psched); + +        ipcp_flow_get_qoscube(fd, &qc); +        fset_add(psched->set[qc], fd); +} + +void psched_del(struct psched * psched, +                int             fd) +{ +        qoscube_t qc; + +        assert(psched); + +        ipcp_flow_get_qoscube(fd, &qc); +        fset_del(psched->set[qc], fd); +} diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h new file mode 100644 index 00000000..bc98670b --- /dev/null +++ b/src/ipcpd/unicast/psched.h @@ -0,0 +1,43 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Packet scheduler component + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_PSCHED_H +#define OUROBOROS_IPCPD_UNICAST_PSCHED_H + +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> + +typedef void (* next_packet_fn_t)(int                  fd, +                                  qoscube_t            qc, +                                  struct shm_du_buff * sdb); + +struct psched * psched_create(next_packet_fn_t callback); + +void            psched_destroy(struct psched * psched); + +void            psched_add(struct psched * psched, +                           int             fd); + +void            psched_del(struct psched * psched, +                           int             fd); + +#endif /* OUROBOROS_IPCPD_UNICAST_PSCHED_H */ diff --git a/src/ipcpd/unicast/routing.c b/src/ipcpd/unicast/routing.c new file mode 100644 index 00000000..2cd24707 --- /dev/null +++ b/src/ipcpd/unicast/routing.c @@ -0,0 +1,59 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Routing component of the IPCP + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include <ouroboros/errno.h> + +#include "routing.h" +#include "pol/link_state.h" + +struct pol_routing_ops * r_ops; + +int routing_init(enum pol_routing pr) +{ +        switch (pr) { +        case ROUTING_LINK_STATE: +        case ROUTING_LINK_STATE_LFA: +                r_ops = &link_state_ops; +                break; +        default: +                return -ENOTSUP; +        } + +        return r_ops->init(pr); +} + +struct routing_i * routing_i_create(struct pff * pff) +{ +        return r_ops->routing_i_create(pff); +} + +void routing_i_destroy(struct routing_i * instance) +{ +        return r_ops->routing_i_destroy(instance); +} + +void routing_fini(void) +{ +        r_ops->fini(); +} diff --git a/src/ipcpd/unicast/routing.h b/src/ipcpd/unicast/routing.h new file mode 100644 index 00000000..ffded5bf --- /dev/null +++ b/src/ipcpd/unicast/routing.h @@ -0,0 +1,41 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Routing component of the IPCP + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_ROUTING_H +#define OUROBOROS_IPCPD_UNICAST_ROUTING_H + +#include <ouroboros/ipcp.h> +#include <ouroboros/qos.h> + +#include "pff.h" + +#include <stdint.h> + +int                routing_init(enum pol_routing pr); + +void               routing_fini(void); + +struct routing_i * routing_i_create(struct pff * pff); + +void               routing_i_destroy(struct routing_i * instance); + +#endif /* OUROBOROS_IPCPD_UNICAST_ROUTING_H */ diff --git a/src/ipcpd/unicast/tests/CMakeLists.txt b/src/ipcpd/unicast/tests/CMakeLists.txt new file mode 100644 index 00000000..482711d5 --- /dev/null +++ b/src/ipcpd/unicast/tests/CMakeLists.txt @@ -0,0 +1,37 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR +  ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR +  ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c +  # Add new tests here +  dht_test.c +  ) + +protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS ../kademlia.proto) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} +  ${KAD_PROTO_SRCS}) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +remove(tests_to_run test_suite.c) + +foreach (test ${tests_to_run}) +  get_filename_component(test_name ${test} NAME_WE) +  add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/unicast/tests/dht_test.c b/src/ipcpd/unicast/tests/dht_test.c new file mode 100644 index 00000000..26e9102e --- /dev/null +++ b/src/ipcpd/unicast/tests/dht_test.c @@ -0,0 +1,99 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2019 + * + * Unit tests of the DHT + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define __DHT_TEST__ + +#include "dht.c" + +#include <pthread.h> +#include <time.h> +#include <stdlib.h> +#include <stdio.h> + +#define KEY_LEN  32 + +#define EXP      86400 +#define CONTACTS 1000 + +int dht_test(int     argc, +             char ** argv) +{ +        struct dht * dht; +        uint64_t     addr = 0x0D1F; +        uint8_t      key[KEY_LEN]; +        size_t       i; + +        (void) argc; +        (void) argv; + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to create dht.\n"); +                return -1; +        } + +        dht_destroy(dht); + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to re-create dht.\n"); +                return -1; +        } + +        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +                printf("Failed to bootstrap dht.\n"); +                dht_destroy(dht); +                return -1; +        } + +        dht_destroy(dht); + +        dht = dht_create(addr); +        if (dht == NULL) { +                printf("Failed to re-create dht.\n"); +                return -1; +        } + +        if (dht_bootstrap(dht, KEY_LEN, EXP)) { +                printf("Failed to bootstrap dht.\n"); +                dht_destroy(dht); +                return -1; +        } + +        for (i = 0; i < CONTACTS; ++i) { +                uint64_t addr; +                random_buffer(&addr, sizeof(addr)); +                random_buffer(key, KEY_LEN); +                pthread_rwlock_wrlock(&dht->lock); +                if (dht_update_bucket(dht, key, addr)) { +                        pthread_rwlock_unlock(&dht->lock); +                        printf("Failed to update bucket.\n"); +                        dht_destroy(dht); +                        return -1; +                } +                pthread_rwlock_unlock(&dht->lock); +        } + +        dht_destroy(dht); + +        return 0; +} | 
