diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/ipcpd/normal/connmgr.c | 1 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 351 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.h | 45 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 438 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.h | 54 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 748 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.h | 61 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.c | 29 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.h | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 122 | 
11 files changed, 972 insertions, 884 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 2045b8df..9a220ba6 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -22,8 +22,9 @@ set(SOURCE_FILES    addr_auth.c    connmgr.c    dir.c +  dt.c    enroll.c -  fmgr.c +  fa.c    frct.c    gam.c    graph.c diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index 421bc5b0..56fe9164 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -32,7 +32,6 @@  #include "ae.h"  #include "connmgr.h"  #include "enroll.h" -#include "fmgr.h"  #include "frct.h"  #include "ipcp.h"  #include "ribmgr.h" diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c new file mode 100644 index 00000000..72e0195e --- /dev/null +++ b/src/ipcpd/normal/dt.c @@ -0,0 +1,351 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data Transfer AE + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "dt-ae" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/rib.h> +#include <ouroboros/dev.h> + +#include "dt.h" +#include "connmgr.h" +#include "ipcp.h" +#include "shm_pci.h" +#include "pff.h" +#include "neighbors.h" +#include "gam.h" +#include "routing.h" +#include "sdu_sched.h" +#include "frct.h" +#include "ae.h" +#include "ribconfig.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <inttypes.h> +#include <assert.h> + +struct { +        flow_set_t *       set[QOS_CUBE_MAX]; +        struct sdu_sched * sdu_sched; + +        struct pff *       pff[QOS_CUBE_MAX]; +        struct routing_i * routing[QOS_CUBE_MAX]; + +        struct gam *       gam; +        struct nbs *       nbs; +        struct ae *        ae; + +        struct nb_notifier nb_notifier; +} dt; + +static int dt_neighbor_event(enum nb_event event, +                             struct conn   conn) +{ +        qoscube_t cube; + +        /* We are only interested in neighbors being added and removed. */ +        switch (event) { +        case NEIGHBOR_ADDED: +                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); +                flow_set_add(dt.set[cube], conn.flow_info.fd); +                log_dbg("Added fd %d to flow set.", conn.flow_info.fd); +                break; +        case NEIGHBOR_REMOVED: +                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); +                flow_set_del(dt.set[cube], conn.flow_info.fd); +                log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); +                break; +        default: +                break; +        } + +        return 0; +} + +static int sdu_handler(int                  fd, +                       qoscube_t            qc, +                       struct shm_du_buff * sdb) +{ +        struct pci pci; + +        memset(&pci, 0, sizeof(pci)); + +        shm_pci_des(sdb, &pci); + +        if (pci.dst_addr != ipcpi.dt_addr) { +                if (pci.ttl == 0) { +                        log_dbg("TTL was zero."); +                        ipcp_flow_del(sdb); +                        return 0; +                } + +                pff_lock(dt.pff[qc]); + +                fd = pff_nhop(dt.pff[qc], pci.dst_addr); +                if (fd < 0) { +                        pff_unlock(dt.pff[qc]); +                        log_err("No next hop for %" PRIu64, pci.dst_addr); +                        ipcp_flow_del(sdb); +                        return -1; +                } + +                pff_unlock(dt.pff[qc]); + +                if (ipcp_flow_write(fd, sdb)) { +                        log_err("Failed to write SDU to fd %d.", fd); +                        ipcp_flow_del(sdb); +                        return -1; +                } +        } else { +                shm_pci_shrink(sdb); + +                if (frct_post_sdu(&pci, sdb)) { +                        log_err("Failed to hand PDU to FRCT."); +                        return -1; +                } +        } + +        return 0; +} + +int dt_init(void) +{ +        int              i; +        int              j; +        struct conn_info info; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.set[i] = flow_set_create(); +                if (dt.set[i] == NULL) { +                        goto fail_flows; +                        return -1; +                } +        } + +        if (shm_pci_init()) { +                log_err("Failed to init shm pci."); +                goto fail_flows; +                return -1; +        } + +        memset(&info, 0, sizeof(info)); + +        strcpy(info.ae_name, DT_AE); +        strcpy(info.protocol, FRCT_PROTO); +        info.pref_version = 1; +        info.pref_syntax = PROTO_FIXED; +        info.addr = ipcpi.dt_addr; + +        dt.ae = connmgr_ae_create(info); +        if (dt.ae == NULL) { +                log_err("Failed to create AE struct."); +                goto fail_flows; +        } + +        dt.nbs = nbs_create(); +        if (dt.nbs == NULL) { +                log_err("Failed to create neighbors struct."); +                goto fail_connmgr; +        } + +        dt.nb_notifier.notify_call = dt_neighbor_event; +        if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) { +                log_err("Failed to register notifier."); +                goto fail_nbs; +        } + +        if (routing_init(dt.nbs)) { +                log_err("Failed to init routing."); +                goto fail_nbs_notifier; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.pff[i] = pff_create(); +                if (dt.pff[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                pff_destroy(dt.pff[j]); +                        goto fail_routing; +                } +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.routing[i] = routing_i_create(dt.pff[i]); +                if (dt.routing[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                routing_i_destroy(dt.routing[j]); +                        goto fail_pff; +                } +        } + +        return 0; + fail_pff: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                pff_destroy(dt.pff[i]); + fail_routing: +        routing_fini(); + fail_nbs_notifier: +        nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + fail_nbs: +        nbs_destroy(dt.nbs); + fail_connmgr: +        connmgr_ae_destroy(dt.ae); + fail_flows: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(dt.set[i]); + +        return -1; +} + +void dt_fini(void) +{ +        int i; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                routing_i_destroy(dt.routing[i]); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                pff_destroy(dt.pff[i]); + +        routing_fini(); + +        nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + +        nbs_destroy(dt.nbs); + +        connmgr_ae_destroy(dt.ae); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(dt.set[i]); +} + +int dt_start(void) +{ +        enum pol_gam pg; + +        if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) +            != sizeof(pg)) { +                log_err("Failed to read policy for ribmgr gam."); +                return -1; +        } + +        dt.gam = gam_create(pg, dt.nbs, dt.ae); +        if (dt.gam == NULL) { +                log_err("Failed to init dt graph adjacency manager."); +                return -1; +        } + +        dt.sdu_sched = sdu_sched_create(dt.set, sdu_handler); +        if (dt.sdu_sched == NULL) { +                log_err("Failed to create N-1 SDU scheduler."); +                gam_destroy(dt.gam); +                return -1; +        } + +        return 0; +} + +void dt_stop(void) +{ +        sdu_sched_destroy(dt.sdu_sched); + +        gam_destroy(dt.gam); +} + +int dt_write_sdu(struct pci *         pci, +                 struct shm_du_buff * sdb) +{ +        int fd; + +        assert(pci); +        assert(sdb); + +        pff_lock(dt.pff[pci->qos_id]); + +        fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr); +        if (fd < 0) { +                pff_unlock(dt.pff[pci->qos_id]); +                log_err("Could not get nhop for address %" PRIu64, +                        pci->dst_addr); +                ipcp_flow_del(sdb); +                return -1; +        } + +        pff_unlock(dt.pff[pci->qos_id]); + +        if (shm_pci_ser(sdb, pci)) { +                log_err("Failed to serialize PDU."); +                ipcp_flow_del(sdb); +                return -1; +        } + +        if (ipcp_flow_write(fd, sdb)) { +                log_err("Failed to write SDU to fd %d.", fd); +                ipcp_flow_del(sdb); +                return -1; +        } + +        return 0; +} + +int dt_write_buf(struct pci * pci, +                 buffer_t *   buf) +{ +        buffer_t * buffer; +        int        fd; + +        assert(pci); +        assert(buf); +        assert(buf->data); + +        pff_lock(dt.pff[pci->qos_id]); + +        fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr); +        if (fd < 0) { +                pff_unlock(dt.pff[pci->qos_id]); +                log_err("Could not get nhop for address %" PRIu64, +                        pci->dst_addr); +                return -1; +        } + +        pff_unlock(dt.pff[pci->qos_id]); + +        buffer = shm_pci_ser_buf(buf, pci); +        if (buffer == NULL) { +                log_err("Failed to serialize buffer."); +                return -1; +        } + +        if (flow_write(fd, buffer->data, buffer->len) == -1) { +                log_err("Failed to write buffer to fd."); +                free(buffer); +                return -1; +        } + +        free(buffer->data); +        free(buffer); + +        return 0; +} diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h new file mode 100644 index 00000000..dea9b91f --- /dev/null +++ b/src/ipcpd/normal/dt.h @@ -0,0 +1,45 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data Transfer AE + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_DT_H +#define OUROBOROS_IPCPD_NORMAL_DT_H + +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/utils.h> + +#include "shm_pci.h" + +int  dt_init(void); + +void dt_fini(void); + +int  dt_start(void); + +void dt_stop(void); + +int  dt_write_sdu(struct pci *         pci, +                  struct shm_du_buff * sdb); + +int  dt_write_buf(struct pci * pci, +                  buffer_t *   buf); + +#endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c new file mode 100644 index 00000000..be1080b1 --- /dev/null +++ b/src/ipcpd/normal/fa.c @@ -0,0 +1,438 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow allocator of the IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "flow-allocator" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/rib.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> + +#include "fa.h" +#include "sdu_sched.h" +#include "ipcp.h" +#include "ribconfig.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#include "flow_alloc.pb-c.h" +typedef FlowAllocMsg flow_alloc_msg_t; + +#define TIMEOUT 10000 /* nanoseconds */ + +struct { +        pthread_rwlock_t   flows_lock; +        cep_id_t           fd_to_cep_id[AP_MAX_FLOWS]; +        int                cep_id_to_fd[IPCPD_MAX_CONNS]; + +        flow_set_t *       set[QOS_CUBE_MAX]; +        struct sdu_sched * sdu_sched; +} fa; + +static int sdu_handler(int                  fd, +                       qoscube_t            qc, +                       struct shm_du_buff * sdb) +{ +        (void) qc; + +        pthread_rwlock_rdlock(&fa.flows_lock); + +        if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) { +                pthread_rwlock_unlock(&fa.flows_lock); +                ipcp_flow_del(sdb); +                log_warn("Failed to hand SDU to FRCT."); +                return -1; +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +int fa_init(void) +{ +        int i; + +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                fa.fd_to_cep_id[i] = INVALID_CEP_ID; + +        for (i = 0; i < IPCPD_MAX_CONNS; ++i) +                fa.cep_id_to_fd[i] = -1; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fa.set[i] = flow_set_create(); +                if (fa.set[i] == NULL) +                        goto fail_flows; +        } + +        if (pthread_rwlock_init(&fa.flows_lock, NULL)) +                goto fail_flows; + +        return 0; +fail_flows: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(fa.set[i]); + +        return -1; +} + +void fa_fini(void) +{ +        int i; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(fa.set[i]); + +        pthread_rwlock_destroy(&fa.flows_lock); +} + +int fa_start(void) +{ +        fa.sdu_sched = sdu_sched_create(fa.set, sdu_handler); +        if (fa.sdu_sched == NULL) { +                log_err("Failed to create SDU scheduler."); +                return -1; +        } + +        return 0; +} + +void fa_stop(void) +{ +        sdu_sched_destroy(fa.sdu_sched); +} + +int fa_alloc(int             fd, +             const uint8_t * dst, +             qoscube_t       qc) +{ +        cep_id_t         cep_id; +        buffer_t         buf; +        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; +        char             path[RIB_MAX_PATH_LEN + 1]; +        uint64_t         addr; +        ssize_t          ch; +        ssize_t          i; +        char **          children; +        char             hashstr[ipcp_dir_hash_strlen() + 1]; +        char *           dst_ipcp = NULL; + +        ipcp_hash_str(hashstr, dst); + +        assert(strlen(hashstr) + strlen(DIR_PATH) + 1 +               < RIB_MAX_PATH_LEN); + +        strcpy(path, DIR_PATH); + +        rib_path_append(path, hashstr); + +        ch = rib_children(path, &children); +        if (ch <= 0) +                return -1; + +        for (i = 0; i < ch; ++i) +                if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) +                        dst_ipcp = children[i]; +                else +                        free(children[i]); + +        free(children); + +        if (dst_ipcp == NULL) +                return -1; + +        strcpy(path, MEMBERS_PATH); + +        rib_path_append(path, dst_ipcp); + +        free(dst_ipcp); + +        if (rib_read(path, &addr, sizeof(addr)) < 0) +                return -1; + +        msg.code        = FLOW_ALLOC_CODE__FLOW_REQ; +        msg.has_hash    = true; +        msg.hash.len    = ipcp_dir_hash_len(); +        msg.hash.data   = (uint8_t *) dst; +        msg.has_qoscube = true; +        msg.qoscube     = qc; + +        buf.len = flow_alloc_msg__get_packed_size(&msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) +                return -1; + +        flow_alloc_msg__pack(&msg, buf.data); + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        cep_id = frct_i_create(addr, &buf, qc); +        if (cep_id == INVALID_CEP_ID) { +                pthread_rwlock_unlock(&fa.flows_lock); +                free(buf.data); +                return -1; +        } + +        free(buf.data); + +        fa.fd_to_cep_id[fd] = cep_id; +        fa.cep_id_to_fd[cep_id] = fd; + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +/* Call under flows lock */ +static int fa_flow_dealloc(int fd) +{ +        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; +        buffer_t         buf; +        int              ret; +        qoscube_t        qc; + +        ipcp_flow_get_qoscube(fd, &qc); +        flow_set_del(fa.set[qc], fd); + +        msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; + +        buf.len = flow_alloc_msg__get_packed_size(&msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) +                return -ENOMEM; + +        flow_alloc_msg__pack(&msg, buf.data); + +        ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf); + +        fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1; +        fa.fd_to_cep_id[fd] = INVALID_CEP_ID; + +        free(buf.data); + +        return ret; +} + +int fa_alloc_resp(int fd, +                  int response) +{ +        struct timespec ts = {0, TIMEOUT * 1000}; +        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; +        buffer_t         buf; + +        msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; +        msg.response = response; +        msg.has_response = true; + +        pthread_mutex_lock(&ipcpi.alloc_lock); + +        while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) +                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                       &ipcpi.alloc_lock, +                                       &ts); + +        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                pthread_mutex_unlock(&ipcpi.alloc_lock); +                return -1; +        } + +        ipcpi.alloc_id = -1; +        pthread_cond_broadcast(&ipcpi.alloc_cond); + +        pthread_mutex_unlock(&ipcpi.alloc_lock); + +        buf.len = flow_alloc_msg__get_packed_size(&msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) +                return -ENOMEM; + +        flow_alloc_msg__pack(&msg, buf.data); + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        if (response < 0) { +                frct_i_destroy(fa.fd_to_cep_id[fd], &buf); +                free(buf.data); +                fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] +                        = INVALID_CEP_ID; +                fa.fd_to_cep_id[fd] = -1; +        } else { +                qoscube_t qc; +                ipcp_flow_get_qoscube(fd, &qc); +                if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) { +                        pthread_rwlock_unlock(&fa.flows_lock); +                        free(buf.data); +                        return -1; +                } +                flow_set_add(fa.set[qc], fd); +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        free(buf.data); + +        return 0; +} + +int fa_dealloc(int fd) +{ +        int ret; + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        ret = fa_flow_dealloc(fd); + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return ret; +} + +int fa_post_buf(cep_id_t   cep_id, +                buffer_t * buf) +{ +        struct timespec    ts  = {0, TIMEOUT * 1000}; +        int                ret = 0; +        int                fd; +        flow_alloc_msg_t * msg; +        qoscube_t          qc; + +        /* Depending on the message call the function in ipcp-dev.h */ + +        msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); +        if (msg == NULL) { +                log_err("Failed to unpack flow alloc message"); +                return -1; +        } + +        switch (msg->code) { +        case FLOW_ALLOC_CODE__FLOW_REQ: +                pthread_mutex_lock(&ipcpi.alloc_lock); + +                if (!msg->has_hash) { +                        log_err("Bad flow request."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        return -1; +                } + +                while (ipcpi.alloc_id != -1 && +                       ipcp_get_state() == IPCP_OPERATIONAL) +                        pthread_cond_timedwait(&ipcpi.alloc_cond, +                                               &ipcpi.alloc_lock, +                                               &ts); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        log_dbg("Won't allocate over non-operational IPCP."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        return -1; +                } + +                assert(ipcpi.alloc_id == -1); + +                fd = ipcp_flow_req_arr(getpid(), +                                       msg->hash.data, +                                       ipcp_dir_hash_len(), +                                       msg->qoscube); +                if (fd < 0) { +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        log_err("Failed to get fd for flow."); +                        return -1; +                } + +                pthread_rwlock_wrlock(&fa.flows_lock); + +                fa.fd_to_cep_id[fd] = cep_id; +                fa.cep_id_to_fd[cep_id] = fd; + +                pthread_rwlock_unlock(&fa.flows_lock); + +                ipcpi.alloc_id = fd; +                pthread_cond_broadcast(&ipcpi.alloc_cond); + +                pthread_mutex_unlock(&ipcpi.alloc_lock); + +                break; +        case FLOW_ALLOC_CODE__FLOW_REPLY: +                pthread_rwlock_wrlock(&fa.flows_lock); + +                fd = fa.cep_id_to_fd[cep_id]; +                ret = ipcp_flow_alloc_reply(fd, msg->response); +                if (msg->response < 0) { +                        fa.fd_to_cep_id[fd] = INVALID_CEP_ID; +                        fa.cep_id_to_fd[cep_id] = -1; +                } else { +                        ipcp_flow_get_qoscube(fd, &qc); +                        flow_set_add(fa.set[qc], +                                     fa.cep_id_to_fd[cep_id]); +                } + +                pthread_rwlock_unlock(&fa.flows_lock); + +                break; +        case FLOW_ALLOC_CODE__FLOW_DEALLOC: +                fd = fa.cep_id_to_fd[cep_id]; +                ipcp_flow_get_qoscube(fd, &qc); +                flow_set_del(fa.set[qc], fd); +                ret = flow_dealloc(fd); +                break; +        default: +                log_err("Got an unknown flow allocation message."); +                ret = -1; +                break; +        } + +        flow_alloc_msg__free_unpacked(msg, NULL); + +        return ret; +} + +int fa_post_sdu(cep_id_t             cep_id, +                struct shm_du_buff * sdb) +{ +        int fd; + +        pthread_rwlock_rdlock(&fa.flows_lock); + +        fd = fa.cep_id_to_fd[cep_id]; +        if (ipcp_flow_write(fd, sdb)) { +                pthread_rwlock_unlock(&fa.flows_lock); +                log_err("Failed to hand SDU to N flow."); +                return -1; +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h new file mode 100644 index 00000000..d370a381 --- /dev/null +++ b/src/ipcpd/normal/fa.h @@ -0,0 +1,54 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow allocator of the IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_FA_H +#define OUROBOROS_IPCPD_NORMAL_FA_H + +#include <ouroboros/shared.h> +#include <ouroboros/utils.h> + +#include "frct.h" + +int  fa_init(void); + +void fa_fini(void); + +int  fa_start(void); + +void fa_stop(void); + +int  fa_alloc(int             fd, +              const uint8_t * dst, +              qoscube_t       qos); + +int  fa_alloc_resp(int fd, +                   int response); + +int  fa_dealloc(int fd); + +int  fa_post_buf(cep_id_t   cep_id, +                 buffer_t * buf); + +int  fa_post_sdu(cep_id_t             cep_id, +                 struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c deleted file mode 100644 index d055b311..00000000 --- a/src/ipcpd/normal/fmgr.c +++ /dev/null @@ -1,748 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include <ouroboros/config.h> -#include <ouroboros/logs.h> -#include <ouroboros/dev.h> -#include <ouroboros/list.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/errno.h> -#include <ouroboros/cacep.h> -#include <ouroboros/rib.h> - -#include "connmgr.h" -#include "fmgr.h" -#include "frct.h" -#include "ipcp.h" -#include "shm_pci.h" -#include "ribconfig.h" -#include "pff.h" -#include "neighbors.h" -#include "gam.h" -#include "routing.h" -#include "sdu_sched.h" - -#include <stdlib.h> -#include <stdbool.h> -#include <pthread.h> -#include <string.h> -#include <inttypes.h> - -#include "flow_alloc.pb-c.h" -typedef FlowAllocMsg flow_alloc_msg_t; - -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ - -struct { -        pthread_rwlock_t   np1_flows_lock; -        cep_id_t           np1_fd_to_cep_id[AP_MAX_FLOWS]; -        int                np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - -        flow_set_t *       np1_set[QOS_CUBE_MAX]; -        struct sdu_sched * np1_sdu_sched; - -        flow_set_t *       nm1_set[QOS_CUBE_MAX]; -        struct sdu_sched * nm1_sdu_sched; - -        struct pff *       pff[QOS_CUBE_MAX]; -        struct routing_i * routing[QOS_CUBE_MAX]; - -        struct gam *       gam; -        struct nbs *       nbs; -        struct ae *        ae; - -        struct nb_notifier nb_notifier; -} fmgr; - -static int fmgr_neighbor_event(enum nb_event event, -                               struct conn   conn) -{ -        qoscube_t cube; - -        /* We are only interested in neighbors being added and removed. */ -        switch (event) { -        case NEIGHBOR_ADDED: -                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); -                flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd); -                log_dbg("Added fd %d to flow set.", conn.flow_info.fd); -                break; -        case NEIGHBOR_REMOVED: -                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); -                flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd); -                log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); -                break; -        default: -                break; -        } - -        return 0; -} - -static int np1_sdu_handler(int                  fd, -                           qoscube_t            qc, -                           struct shm_du_buff * sdb) -{ -        (void) qc; - -        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - -        if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                ipcp_flow_del(sdb); -                log_warn("Failed to hand SDU to FRCT."); -                return -1; -        } - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return 0; -} - -static int nm1_sdu_handler(int                  fd, -                           qoscube_t            qc, -                           struct shm_du_buff * sdb) -{ -        struct pci pci; - -        memset(&pci, 0, sizeof(pci)); - -        shm_pci_des(sdb, &pci); - -        if (pci.dst_addr != ipcpi.dt_addr) { -                if (pci.ttl == 0) { -                        log_dbg("TTL was zero."); -                        ipcp_flow_del(sdb); -                        return 0; -                } - -                pff_lock(fmgr.pff[qc]); - -                fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); -                if (fd < 0) { -                        pff_unlock(fmgr.pff[qc]); -                        log_err("No next hop for %" PRIu64, pci.dst_addr); -                        ipcp_flow_del(sdb); -                        return -1; -                } - -                pff_unlock(fmgr.pff[qc]); - -                if (ipcp_flow_write(fd, sdb)) { -                        log_err("Failed to write SDU to fd %d.", fd); -                        ipcp_flow_del(sdb); -                        return -1; -                } -        } else { -                shm_pci_shrink(sdb); - -                if (frct_nm1_post_sdu(&pci, sdb)) { -                        log_err("Failed to hand PDU to FRCT."); -                        return -1; -                } -        } - -        return 0; -} - -static void fmgr_destroy_flows(void) -{ -        int i; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                flow_set_destroy(fmgr.nm1_set[i]); -                flow_set_destroy(fmgr.np1_set[i]); -        } -} - -static void fmgr_destroy_routing(void) -{ -        int i; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                routing_i_destroy(fmgr.routing[i]); -} - -static void fmgr_destroy_pff(void) -{ -        int i; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                pff_destroy(fmgr.pff[i]); -} - -int fmgr_init(void) -{ -        int              i; -        int              j; -        struct conn_info info; - -        for (i = 0; i < AP_MAX_FLOWS; ++i) -                fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; - -        for (i = 0; i < IPCPD_MAX_CONNS; ++i) -                fmgr.np1_cep_id_to_fd[i] = -1; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fmgr.np1_set[i] = flow_set_create(); -                if (fmgr.np1_set[i] == NULL) { -                        fmgr_destroy_flows(); -                        return -1; -                } - -                fmgr.nm1_set[i] = flow_set_create(); -                if (fmgr.nm1_set[i] == NULL) { -                        fmgr_destroy_flows(); -                        return -1; -                } -        } - -        if (shm_pci_init()) { -                log_err("Failed to init shm pci."); -                fmgr_destroy_flows(); -                return -1; -        } - -        memset(&info, 0, sizeof(info)); - -        strcpy(info.ae_name, DT_AE); -        strcpy(info.protocol, FRCT_PROTO); -        info.pref_version = 1; -        info.pref_syntax = PROTO_FIXED; -        info.addr = ipcpi.dt_addr; - -        fmgr.ae = connmgr_ae_create(info); -        if (fmgr.ae == NULL) { -                log_err("Failed to create AE struct."); -                fmgr_destroy_flows(); -                return -1; -        } - -        fmgr.nbs = nbs_create(); -        if (fmgr.nbs == NULL) { -                log_err("Failed to create neighbors struct."); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        fmgr.nb_notifier.notify_call = fmgr_neighbor_event; -        if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) { -                log_err("Failed to register notifier."); -                nbs_destroy(fmgr.nbs); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        if (routing_init(fmgr.nbs)) { -                log_err("Failed to init routing."); -                nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                nbs_destroy(fmgr.nbs); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) { -                routing_fini(); -                nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                nbs_destroy(fmgr.nbs); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fmgr.pff[i] = pff_create(); -                if (fmgr.pff[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                pff_destroy(fmgr.pff[j]); -                        pthread_rwlock_destroy(&fmgr.np1_flows_lock); -                        routing_fini(); -                        nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                        nbs_destroy(fmgr.nbs); -                        fmgr_destroy_flows(); -                        connmgr_ae_destroy(fmgr.ae); -                        return -1; -                } - -                fmgr.routing[i] = routing_i_create(fmgr.pff[i]); -                if (fmgr.routing[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                routing_i_destroy(fmgr.routing[j]); -                        fmgr_destroy_pff(); -                        pthread_rwlock_destroy(&fmgr.np1_flows_lock); -                        routing_fini(); -                        nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                        nbs_destroy(fmgr.nbs); -                        fmgr_destroy_flows(); -                        connmgr_ae_destroy(fmgr.ae); -                        return -1; -                } -        } - -        return 0; -} - -void fmgr_fini() -{ -        nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - -        fmgr_destroy_routing(); - -        fmgr_destroy_pff(); - -        routing_fini(); - -        fmgr_destroy_flows(); - -        connmgr_ae_destroy(fmgr.ae); - -        nbs_destroy(fmgr.nbs); -} - -int fmgr_start(void) -{ -        enum pol_gam pg; - -        if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) -            != sizeof(pg)) { -                log_err("Failed to read policy for ribmgr gam."); -                return -1; -        } - -        fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae); -        if (fmgr.gam == NULL) { -                log_err("Failed to init dt graph adjacency manager."); -                return -1; -        } - -        fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); -        if (fmgr.nm1_sdu_sched == NULL) { -                log_err("Failed to create N-1 SDU scheduler."); -                gam_destroy(fmgr.gam); -                return -1; -        } - -        fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); -        if (fmgr.np1_sdu_sched == NULL) { -                log_err("Failed to create N+1 SDU scheduler."); -                sdu_sched_destroy(fmgr.nm1_sdu_sched); -                gam_destroy(fmgr.gam); -                return -1; -        } - -        return 0; -} - -void fmgr_stop(void) -{ -        sdu_sched_destroy(fmgr.np1_sdu_sched); - -        sdu_sched_destroy(fmgr.nm1_sdu_sched); - -        gam_destroy(fmgr.gam); -} - -int fmgr_np1_alloc(int             fd, -                   const uint8_t * dst, -                   qoscube_t       cube) -{ -        cep_id_t         cep_id; -        buffer_t         buf; -        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; -        char             path[RIB_MAX_PATH_LEN + 1]; -        uint64_t         addr; -        ssize_t          ch; -        ssize_t          i; -        char **          children; -        char             hashstr[ipcp_dir_hash_strlen() + 1]; -        char *           dst_ipcp = NULL; - -        ipcp_hash_str(hashstr, dst); - -        assert(strlen(hashstr) + strlen(DIR_PATH) + 1 -               < RIB_MAX_PATH_LEN); - -        strcpy(path, DIR_PATH); - -        rib_path_append(path, hashstr); - -        ch = rib_children(path, &children); -        if (ch <= 0) -                return -1; - -        for (i = 0; i < ch; ++i) -                if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) -                        dst_ipcp = children[i]; -                else -                        free(children[i]); - -        free(children); - -        if (dst_ipcp == NULL) -                return -1; - -        strcpy(path, MEMBERS_PATH); - -        rib_path_append(path, dst_ipcp); - -        free(dst_ipcp); - -        if (rib_read(path, &addr, sizeof(addr)) < 0) -                return -1; - -        msg.code        = FLOW_ALLOC_CODE__FLOW_REQ; -        msg.has_hash    = true; -        msg.hash.len    = ipcp_dir_hash_len(); -        msg.hash.data   = (uint8_t *) dst; -        msg.has_qoscube = true; -        msg.qoscube     = cube; - -        buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) -                return -1; - -        buf.data = malloc(buf.len); -        if (buf.data == NULL) -                return -1; - -        flow_alloc_msg__pack(&msg, buf.data); - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        cep_id = frct_i_create(addr, &buf, cube); -        if (cep_id == INVALID_CEP_ID) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                free(buf.data); -                return -1; -        } - -        free(buf.data); - -        fmgr.np1_fd_to_cep_id[fd] = cep_id; -        fmgr.np1_cep_id_to_fd[cep_id] = fd; - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return 0; -} - -/* Call under np1_flows lock */ -static int np1_flow_dealloc(int fd) -{ -        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; -        buffer_t         buf; -        int              ret; -        qoscube_t        cube; - -        ipcp_flow_get_qoscube(fd, &cube); -        flow_set_del(fmgr.np1_set[cube], fd); - -        msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; - -        buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) -                return -1; - -        buf.data = malloc(buf.len); -        if (buf.data == NULL) -                return -ENOMEM; - -        flow_alloc_msg__pack(&msg, buf.data); - -        ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - -        fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; -        fmgr.np1_fd_to_cep_id[fd] = -1; - -        free(buf.data); - -        return ret; -} - -int fmgr_np1_alloc_resp(int fd, -                        int response) -{ -        struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; -        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; -        buffer_t         buf; - -        msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; -        msg.response = response; -        msg.has_response = true; - -        pthread_mutex_lock(&ipcpi.alloc_lock); - -        while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) -                pthread_cond_timedwait(&ipcpi.alloc_cond, -                                       &ipcpi.alloc_lock, -                                       &ts); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_mutex_unlock(&ipcpi.alloc_lock); -                return -1; -        } - -        ipcpi.alloc_id = -1; -        pthread_cond_broadcast(&ipcpi.alloc_cond); - -        pthread_mutex_unlock(&ipcpi.alloc_lock); - -        buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) -                return -1; - -        buf.data = malloc(buf.len); -        if (buf.data == NULL) -                return -ENOMEM; - -        flow_alloc_msg__pack(&msg, buf.data); - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        if (response < 0) { -                frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); -                free(buf.data); -                fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] -                        = INVALID_CEP_ID; -                fmgr.np1_fd_to_cep_id[fd] = -1; -        } else { -                qoscube_t cube; -                ipcp_flow_get_qoscube(fd, &cube); -                if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        free(buf.data); -                        return -1; -                } -                flow_set_add(fmgr.np1_set[cube], fd); -        } - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        free(buf.data); - -        return 0; -} - -int fmgr_np1_dealloc(int fd) -{ -        int ret; - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        ret = np1_flow_dealloc(fd); - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return ret; -} - -int fmgr_np1_post_buf(cep_id_t   cep_id, -                      buffer_t * buf) -{ -        struct timespec    ts  = {0, FD_UPDATE_TIMEOUT * 1000}; -        int                ret = 0; -        int                fd; -        flow_alloc_msg_t * msg; -        qoscube_t          cube; - -        /* Depending on the message call the function in ipcp-dev.h */ - -        msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); -        if (msg == NULL) { -                log_err("Failed to unpack flow alloc message"); -                return -1; -        } - -        switch (msg->code) { -        case FLOW_ALLOC_CODE__FLOW_REQ: -                pthread_mutex_lock(&ipcpi.alloc_lock); - -                if (!msg->has_hash) { -                        log_err("Bad flow request."); -                        return -1; -                } - -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &ts); - -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        return -1; -                } - -                assert(ipcpi.alloc_id == -1); - -                fd = ipcp_flow_req_arr(getpid(), -                                       msg->hash.data, -                                       ipcp_dir_hash_len(), -                                       msg->qoscube); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        log_err("Failed to get fd for flow."); -                        return -1; -                } - -                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -                fmgr.np1_fd_to_cep_id[fd] = cep_id; -                fmgr.np1_cep_id_to_fd[cep_id] = fd; - -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); - -                pthread_mutex_unlock(&ipcpi.alloc_lock); - -                break; -        case FLOW_ALLOC_CODE__FLOW_REPLY: -                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -                fd = fmgr.np1_cep_id_to_fd[cep_id]; -                ret = ipcp_flow_alloc_reply(fd, msg->response); -                if (msg->response < 0) { -                        fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; -                        fmgr.np1_cep_id_to_fd[cep_id] = -1; -                } else { -                        ipcp_flow_get_qoscube(fd, &cube); -                        flow_set_add(fmgr.np1_set[cube], -                                     fmgr.np1_cep_id_to_fd[cep_id]); -                } - -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -                break; -        case FLOW_ALLOC_CODE__FLOW_DEALLOC: -                fd = fmgr.np1_cep_id_to_fd[cep_id]; -                ipcp_flow_get_qoscube(fd, &cube); -                flow_set_del(fmgr.np1_set[cube], fd); -                ret = flow_dealloc(fd); -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                ret = -1; -                break; -        } - -        flow_alloc_msg__free_unpacked(msg, NULL); - -        return ret; -} - -int fmgr_np1_post_sdu(cep_id_t             cep_id, -                      struct shm_du_buff * sdb) -{ -        int fd; - -        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - -        fd = fmgr.np1_cep_id_to_fd[cep_id]; -        if (ipcp_flow_write(fd, sdb)) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                log_err("Failed to hand SDU to N flow."); -                return -1; -        } - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return 0; -} - -int fmgr_nm1_write_sdu(struct pci *         pci, -                       struct shm_du_buff * sdb) -{ -        int fd; - -        if (pci == NULL || sdb == NULL) -                return -EINVAL; - -        pff_lock(fmgr.pff[pci->qos_id]); -        fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); -        if (fd < 0) { -                pff_unlock(fmgr.pff[pci->qos_id]); -                log_err("Could not get nhop for address %" PRIu64, -                        pci->dst_addr); -                ipcp_flow_del(sdb); -                return -1; -        } -        pff_unlock(fmgr.pff[pci->qos_id]); - -        if (shm_pci_ser(sdb, pci)) { -                log_err("Failed to serialize PDU."); -                ipcp_flow_del(sdb); -                return -1; -        } - -        if (ipcp_flow_write(fd, sdb)) { -                log_err("Failed to write SDU to fd %d.", fd); -                ipcp_flow_del(sdb); -                return -1; -        } - -        return 0; -} - -int fmgr_nm1_write_buf(struct pci * pci, -                       buffer_t *   buf) -{ -        buffer_t * buffer; -        int        fd; - -        if (pci == NULL || buf == NULL || buf->data == NULL) -                return -EINVAL; - -        pff_lock(fmgr.pff[pci->qos_id]); -        fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); -        if (fd < 0) { -                pff_unlock(fmgr.pff[pci->qos_id]); -                log_err("Could not get nhop for address %" PRIu64, -                        pci->dst_addr); -                return -1; -        } -        pff_unlock(fmgr.pff[pci->qos_id]); - -        buffer = shm_pci_ser_buf(buf, pci); -        if (buffer == NULL) { -                log_err("Failed to serialize buffer."); -                return -1; -        } - -        if (flow_write(fd, buffer->data, buffer->len) == -1) { -                log_err("Failed to write buffer to fd."); -                free(buffer); -                return -1; -        } - -        free(buffer->data); -        free(buffer); -        return 0; -} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h deleted file mode 100644 index c59c0875..00000000 --- a/src/ipcpd/normal/fmgr.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_FMGR_H -#define OUROBOROS_IPCPD_NORMAL_FMGR_H - -#include <ouroboros/shared.h> -#include <ouroboros/qos.h> - -#include "ae.h" -#include "frct.h" - -int  fmgr_init(void); - -void fmgr_fini(void); - -int  fmgr_start(void); - -void fmgr_stop(void); - -int  fmgr_np1_alloc(int             fd, -                    const uint8_t * dst, -                    qoscube_t       qos); - -int  fmgr_np1_alloc_resp(int fd, -                         int response); - -int  fmgr_np1_dealloc(int fd); - -int  fmgr_np1_post_buf(cep_id_t   id, -                       buffer_t * buf); - -int  fmgr_np1_post_sdu(cep_id_t             id, -                       struct shm_du_buff * sdb); - -int  fmgr_nm1_write_sdu(struct pci *         pci, -                        struct shm_du_buff * sdb); - -int  fmgr_nm1_write_buf(struct pci * pci, -                        buffer_t *   buf); - -#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */ diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index d873beae..bfcde1b3 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -30,8 +30,9 @@  #include <ouroboros/errno.h>  #include "frct.h" -#include "fmgr.h"  #include "ipcp.h" +#include "dt.h" +#include "fa.h"  #include <stdlib.h>  #include <stdbool.h> @@ -210,8 +211,8 @@ int frct_fini()          return 0;  } -int frct_nm1_post_sdu(struct pci *         pci, -                      struct shm_du_buff * sdb) +int frct_post_sdu(struct pci *         pci, +                  struct shm_du_buff * sdb)  {          struct frct_i * instance;          buffer_t        buf; @@ -250,17 +251,17 @@ int frct_nm1_post_sdu(struct pci *         pci,                  buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);                  buf.data = shm_du_buff_head(sdb); -                if (fmgr_np1_post_buf(id, &buf)) { -                        log_err("Failed to hand buffer to Flow Manager."); +                if (fa_post_buf(id, &buf)) { +                        log_err("Failed to hand buffer to FA.");                          ipcp_flow_del(sdb);                          return -1;                  }                  ipcp_flow_del(sdb);          } else { -                /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ -                if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) { -                        log_err("Failed to hand SDU to FMGR."); +                /* FIXME: Known cep-ids are delivered to FA (minimal DTP) */ +                if (fa_post_sdu(pci->dst_cep_id, sdb)) { +                        log_err("Failed to hand SDU to FA.");                          ipcp_flow_del(sdb);                          return -1;                  } @@ -301,11 +302,11 @@ cep_id_t frct_i_create(uint64_t   address,          pci.seqno = 0;          pci.qos_id = cube; -        if (fmgr_nm1_write_buf(&pci, buf)) { +        if (dt_write_buf(&pci, buf)) {                  pthread_mutex_lock(&frct.instances_lock);                  destroy_frct_i(id);                  pthread_mutex_unlock(&frct.instances_lock); -                log_err("Failed to hand PDU to FMGR."); +                log_err("Failed to hand PDU to DT.");                  return INVALID_CEP_ID;          } @@ -350,7 +351,7 @@ int frct_i_accept(cep_id_t   id,          pthread_mutex_unlock(&frct.instances_lock); -        if (fmgr_nm1_write_buf(&pci, buf)) +        if (dt_write_buf(&pci, buf))                  return -1;          return 0; @@ -390,7 +391,7 @@ int frct_i_destroy(cep_id_t   id,          pthread_mutex_unlock(&frct.instances_lock);          if (buf != NULL && buf->data != NULL) -                if (fmgr_nm1_write_buf(&pci, buf)) +                if (dt_write_buf(&pci, buf))                          return -1;          return 0; @@ -427,9 +428,9 @@ int frct_i_write_sdu(cep_id_t             id,          pci.seqno = (instance->seqno)++;          pci.qos_id = instance->cube; -        if (fmgr_nm1_write_sdu(&pci, sdb)) { +        if (dt_write_sdu(&pci, sdb)) {                  pthread_mutex_unlock(&frct.instances_lock); -                log_err("Failed to hand SDU to FMGR."); +                log_err("Failed to hand SDU to DT.");                  return -1;          } diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index a1dcb151..b179e36b 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -50,7 +50,7 @@ int         frct_i_destroy(cep_id_t   id,  int         frct_i_write_sdu(cep_id_t             id,                               struct shm_du_buff * sdb); -int         frct_nm1_post_sdu(struct pci *         pci, -                              struct shm_du_buff * sdb); +int         frct_post_sdu(struct pci *         pci, +                          struct shm_du_buff * sdb);  #endif /* OUROBOROS_IPCPD_NORMAL_FRCT_H */ diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 67424914..ab8cf387 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -36,7 +36,8 @@  #include "connmgr.h"  #include "dir.h"  #include "enroll.h" -#include "fmgr.h" +#include "fa.h" +#include "dt.h"  #include "ipcp.h"  #include "ribconfig.h"  #include "ribmgr.h" @@ -73,7 +74,7 @@ static int boot_components(void)                         &ipcpi.dir_hash_algo, sizeof(ipcpi.dir_hash_algo));          if (len < 0) {                  log_err("Failed to read hash length: %zd.", len); -                return -1; +                goto fail_name;          }          ipcpi.dir_hash_algo = ntoh32(ipcpi.dir_hash_algo); @@ -82,7 +83,7 @@ static int boot_components(void)          if (rib_add(MEMBERS_PATH, ipcpi.name)) {                  log_err("Failed to add name to " MEMBERS_PATH); -                return -1; +                goto fail_name;          }          log_dbg("Starting components."); @@ -90,27 +91,25 @@ static int boot_components(void)          if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa))              != sizeof(pa)) {                  log_err("Failed to read policy for address authority."); -                return -1; +                goto fail_name;          }          if (addr_auth_init(pa)) {                  log_err("Failed to init address authority."); -                return -1; +                goto fail_name;          }          ipcpi.dt_addr = addr_auth_address();          if (ipcpi.dt_addr == 0) {                  log_err("Failed to get a valid address."); -                addr_auth_fini(); -                return -1; +                goto fail_addr_auth;          }          path[0] = '\0';          rib_path_append(rib_path_append(path, MEMBERS_NAME), ipcpi.name);          if (rib_write(path, &ipcpi.dt_addr, sizeof(&ipcpi.dt_addr))) {                  log_err("Failed to write address to member object."); -                addr_auth_fini(); -                return -1; +                goto fail_addr_auth;          }          log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); @@ -119,91 +118,100 @@ static int boot_components(void)          if (ribmgr_init()) {                  log_err("Failed to initialize RIB manager."); -                addr_auth_fini(); -                return -1; +                goto fail_addr_auth;          }          if (dir_init()) {                  log_err("Failed to initialize directory."); -                ribmgr_fini(); -                addr_auth_fini(); -                return -1; +                goto fail_ribmgr;          }          log_dbg("Ribmgr started.");          if (frct_init()) { -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini();                  log_err("Failed to initialize FRCT."); -                return -1; +                goto fail_dir;          } -        if (fmgr_init()) { -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini(); -                log_err("Failed to initialize flow manager component."); -                return -1; +        if (fa_init()) { +                log_err("Failed to initialize flow allocator ae."); +                goto fail_frct;          } -        if (fmgr_start()) { -                fmgr_fini(); -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini(); -                log_err("Failed to start flow manager."); -                return -1; +        if (dt_init()) { +                log_err("Failed to initialize data transfer ae."); +                goto fail_fa; +        } + +        if (fa_start()) { +                log_err("Failed to start flow allocator."); +                goto fail_dt; +        } + +        if (dt_start()) { +                log_err("Failed to start data transfer ae."); +                goto fail_fa_start;          }          if (enroll_start()) { -                fmgr_stop(); -                fmgr_fini(); -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini();                  log_err("Failed to start enroll."); -                return -1; +                goto fail_dt_start;          }          ipcp_set_state(IPCP_OPERATIONAL);          if (connmgr_start()) { -                ipcp_set_state(IPCP_INIT); -                enroll_stop(); -                fmgr_stop(); -                fmgr_fini(); -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini();                  log_err("Failed to start AP connection manager."); -                return -1; +                goto fail_enroll;          }          return 0; + + fail_enroll: +        ipcp_set_state(IPCP_INIT); +        enroll_stop(); + fail_dt_start: +        dt_stop(); + fail_fa_start: +        fa_stop(); + fail_dt: +        dt_fini(); + fail_fa: +        fa_fini(); + fail_frct: +        frct_fini(); + fail_dir: +        dir_fini(); + fail_ribmgr: +        ribmgr_fini(); + fail_addr_auth: +        addr_auth_fini(); + fail_name: +        free(ipcpi.dif_name); + +        return -1;  }  void shutdown_components(void)  { -        ribmgr_fini(); -          connmgr_stop();          enroll_stop(); -        frct_fini(); +        dt_stop(); + +        fa_stop(); -        fmgr_stop(); +        dt_fini(); -        fmgr_fini(); +        fa_fini(); + +        frct_fini();          dir_fini(); +        ribmgr_fini(); +          addr_auth_fini();          free(ipcpi.dif_name); @@ -366,9 +374,9 @@ static struct ipcp_ops normal_ops = {          .ipcp_reg             = dir_reg,          .ipcp_unreg           = dir_unreg,          .ipcp_query           = dir_query, -        .ipcp_flow_alloc      = fmgr_np1_alloc, -        .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, -        .ipcp_flow_dealloc    = fmgr_np1_dealloc +        .ipcp_flow_alloc      = fa_alloc, +        .ipcp_flow_alloc_resp = fa_alloc_resp, +        .ipcp_flow_dealloc    = fa_dealloc  };  int main(int    argc, | 
