diff options
| author | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-21 11:21:44 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-21 12:46:37 +0200 | 
| commit | a9d71381a84886007625958b9daea6b2d4a50563 (patch) | |
| tree | 67b8576e9747d7815c7eed7170f49a10e5a4e0e0 /src/ipcpd/normal | |
| parent | 4bfd6c07281847405e127e9588376fcf20d07a7e (diff) | |
| download | ouroboros-a9d71381a84886007625958b9daea6b2d4a50563.tar.gz ouroboros-a9d71381a84886007625958b9daea6b2d4a50563.zip | |
ipcpd: normal: Split flow manager into DT and FA
This splits the flow manager into the Data Transfer AE, which is in
charge of routing SDUs, and the Flow Allocator AE, which handles flow
allocations.
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/ipcpd/normal/connmgr.c | 1 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 351 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.h | 45 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 438 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.h | 54 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 748 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.h | 61 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.c | 29 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.h | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 122 | 
11 files changed, 972 insertions, 884 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 2045b8df..9a220ba6 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -22,8 +22,9 @@ set(SOURCE_FILES    addr_auth.c    connmgr.c    dir.c +  dt.c    enroll.c -  fmgr.c +  fa.c    frct.c    gam.c    graph.c diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index 421bc5b0..56fe9164 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -32,7 +32,6 @@  #include "ae.h"  #include "connmgr.h"  #include "enroll.h" -#include "fmgr.h"  #include "frct.h"  #include "ipcp.h"  #include "ribmgr.h" diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c new file mode 100644 index 00000000..72e0195e --- /dev/null +++ b/src/ipcpd/normal/dt.c @@ -0,0 +1,351 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data Transfer AE + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "dt-ae" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/rib.h> +#include <ouroboros/dev.h> + +#include "dt.h" +#include "connmgr.h" +#include "ipcp.h" +#include "shm_pci.h" +#include "pff.h" +#include "neighbors.h" +#include "gam.h" +#include "routing.h" +#include "sdu_sched.h" +#include "frct.h" +#include "ae.h" +#include "ribconfig.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <inttypes.h> +#include <assert.h> + +struct { +        flow_set_t *       set[QOS_CUBE_MAX]; +        struct sdu_sched * sdu_sched; + +        struct pff *       pff[QOS_CUBE_MAX]; +        struct routing_i * routing[QOS_CUBE_MAX]; + +        struct gam *       gam; +        struct nbs *       nbs; +        struct ae *        ae; + +        struct nb_notifier nb_notifier; +} dt; + +static int dt_neighbor_event(enum nb_event event, +                             struct conn   conn) +{ +        qoscube_t cube; + +        /* We are only interested in neighbors being added and removed. */ +        switch (event) { +        case NEIGHBOR_ADDED: +                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); +                flow_set_add(dt.set[cube], conn.flow_info.fd); +                log_dbg("Added fd %d to flow set.", conn.flow_info.fd); +                break; +        case NEIGHBOR_REMOVED: +                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); +                flow_set_del(dt.set[cube], conn.flow_info.fd); +                log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); +                break; +        default: +                break; +        } + +        return 0; +} + +static int sdu_handler(int                  fd, +                       qoscube_t            qc, +                       struct shm_du_buff * sdb) +{ +        struct pci pci; + +        memset(&pci, 0, sizeof(pci)); + +        shm_pci_des(sdb, &pci); + +        if (pci.dst_addr != ipcpi.dt_addr) { +                if (pci.ttl == 0) { +                        log_dbg("TTL was zero."); +                        ipcp_flow_del(sdb); +                        return 0; +                } + +                pff_lock(dt.pff[qc]); + +                fd = pff_nhop(dt.pff[qc], pci.dst_addr); +                if (fd < 0) { +                        pff_unlock(dt.pff[qc]); +                        log_err("No next hop for %" PRIu64, pci.dst_addr); +                        ipcp_flow_del(sdb); +                        return -1; +                } + +                pff_unlock(dt.pff[qc]); + +                if (ipcp_flow_write(fd, sdb)) { +                        log_err("Failed to write SDU to fd %d.", fd); +                        ipcp_flow_del(sdb); +                        return -1; +                } +        } else { +                shm_pci_shrink(sdb); + +                if (frct_post_sdu(&pci, sdb)) { +                        log_err("Failed to hand PDU to FRCT."); +                        return -1; +                } +        } + +        return 0; +} + +int dt_init(void) +{ +        int              i; +        int              j; +        struct conn_info info; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.set[i] = flow_set_create(); +                if (dt.set[i] == NULL) { +                        goto fail_flows; +                        return -1; +                } +        } + +        if (shm_pci_init()) { +                log_err("Failed to init shm pci."); +                goto fail_flows; +                return -1; +        } + +        memset(&info, 0, sizeof(info)); + +        strcpy(info.ae_name, DT_AE); +        strcpy(info.protocol, FRCT_PROTO); +        info.pref_version = 1; +        info.pref_syntax = PROTO_FIXED; +        info.addr = ipcpi.dt_addr; + +        dt.ae = connmgr_ae_create(info); +        if (dt.ae == NULL) { +                log_err("Failed to create AE struct."); +                goto fail_flows; +        } + +        dt.nbs = nbs_create(); +        if (dt.nbs == NULL) { +                log_err("Failed to create neighbors struct."); +                goto fail_connmgr; +        } + +        dt.nb_notifier.notify_call = dt_neighbor_event; +        if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) { +                log_err("Failed to register notifier."); +                goto fail_nbs; +        } + +        if (routing_init(dt.nbs)) { +                log_err("Failed to init routing."); +                goto fail_nbs_notifier; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.pff[i] = pff_create(); +                if (dt.pff[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                pff_destroy(dt.pff[j]); +                        goto fail_routing; +                } +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                dt.routing[i] = routing_i_create(dt.pff[i]); +                if (dt.routing[i] == NULL) { +                        for (j = 0; j < i; ++j) +                                routing_i_destroy(dt.routing[j]); +                        goto fail_pff; +                } +        } + +        return 0; + fail_pff: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                pff_destroy(dt.pff[i]); + fail_routing: +        routing_fini(); + fail_nbs_notifier: +        nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + fail_nbs: +        nbs_destroy(dt.nbs); + fail_connmgr: +        connmgr_ae_destroy(dt.ae); + fail_flows: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(dt.set[i]); + +        return -1; +} + +void dt_fini(void) +{ +        int i; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                routing_i_destroy(dt.routing[i]); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                pff_destroy(dt.pff[i]); + +        routing_fini(); + +        nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + +        nbs_destroy(dt.nbs); + +        connmgr_ae_destroy(dt.ae); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(dt.set[i]); +} + +int dt_start(void) +{ +        enum pol_gam pg; + +        if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) +            != sizeof(pg)) { +                log_err("Failed to read policy for ribmgr gam."); +                return -1; +        } + +        dt.gam = gam_create(pg, dt.nbs, dt.ae); +        if (dt.gam == NULL) { +                log_err("Failed to init dt graph adjacency manager."); +                return -1; +        } + +        dt.sdu_sched = sdu_sched_create(dt.set, sdu_handler); +        if (dt.sdu_sched == NULL) { +                log_err("Failed to create N-1 SDU scheduler."); +                gam_destroy(dt.gam); +                return -1; +        } + +        return 0; +} + +void dt_stop(void) +{ +        sdu_sched_destroy(dt.sdu_sched); + +        gam_destroy(dt.gam); +} + +int dt_write_sdu(struct pci *         pci, +                 struct shm_du_buff * sdb) +{ +        int fd; + +        assert(pci); +        assert(sdb); + +        pff_lock(dt.pff[pci->qos_id]); + +        fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr); +        if (fd < 0) { +                pff_unlock(dt.pff[pci->qos_id]); +                log_err("Could not get nhop for address %" PRIu64, +                        pci->dst_addr); +                ipcp_flow_del(sdb); +                return -1; +        } + +        pff_unlock(dt.pff[pci->qos_id]); + +        if (shm_pci_ser(sdb, pci)) { +                log_err("Failed to serialize PDU."); +                ipcp_flow_del(sdb); +                return -1; +        } + +        if (ipcp_flow_write(fd, sdb)) { +                log_err("Failed to write SDU to fd %d.", fd); +                ipcp_flow_del(sdb); +                return -1; +        } + +        return 0; +} + +int dt_write_buf(struct pci * pci, +                 buffer_t *   buf) +{ +        buffer_t * buffer; +        int        fd; + +        assert(pci); +        assert(buf); +        assert(buf->data); + +        pff_lock(dt.pff[pci->qos_id]); + +        fd = pff_nhop(dt.pff[pci->qos_id], pci->dst_addr); +        if (fd < 0) { +                pff_unlock(dt.pff[pci->qos_id]); +                log_err("Could not get nhop for address %" PRIu64, +                        pci->dst_addr); +                return -1; +        } + +        pff_unlock(dt.pff[pci->qos_id]); + +        buffer = shm_pci_ser_buf(buf, pci); +        if (buffer == NULL) { +                log_err("Failed to serialize buffer."); +                return -1; +        } + +        if (flow_write(fd, buffer->data, buffer->len) == -1) { +                log_err("Failed to write buffer to fd."); +                free(buffer); +                return -1; +        } + +        free(buffer->data); +        free(buffer); + +        return 0; +} diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h new file mode 100644 index 00000000..dea9b91f --- /dev/null +++ b/src/ipcpd/normal/dt.h @@ -0,0 +1,45 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Data Transfer AE + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_DT_H +#define OUROBOROS_IPCPD_NORMAL_DT_H + +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/utils.h> + +#include "shm_pci.h" + +int  dt_init(void); + +void dt_fini(void); + +int  dt_start(void); + +void dt_stop(void); + +int  dt_write_sdu(struct pci *         pci, +                  struct shm_du_buff * sdb); + +int  dt_write_buf(struct pci * pci, +                  buffer_t *   buf); + +#endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c new file mode 100644 index 00000000..be1080b1 --- /dev/null +++ b/src/ipcpd/normal/fa.c @@ -0,0 +1,438 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow allocator of the IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "flow-allocator" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/rib.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> + +#include "fa.h" +#include "sdu_sched.h" +#include "ipcp.h" +#include "ribconfig.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#include "flow_alloc.pb-c.h" +typedef FlowAllocMsg flow_alloc_msg_t; + +#define TIMEOUT 10000 /* nanoseconds */ + +struct { +        pthread_rwlock_t   flows_lock; +        cep_id_t           fd_to_cep_id[AP_MAX_FLOWS]; +        int                cep_id_to_fd[IPCPD_MAX_CONNS]; + +        flow_set_t *       set[QOS_CUBE_MAX]; +        struct sdu_sched * sdu_sched; +} fa; + +static int sdu_handler(int                  fd, +                       qoscube_t            qc, +                       struct shm_du_buff * sdb) +{ +        (void) qc; + +        pthread_rwlock_rdlock(&fa.flows_lock); + +        if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) { +                pthread_rwlock_unlock(&fa.flows_lock); +                ipcp_flow_del(sdb); +                log_warn("Failed to hand SDU to FRCT."); +                return -1; +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +int fa_init(void) +{ +        int i; + +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                fa.fd_to_cep_id[i] = INVALID_CEP_ID; + +        for (i = 0; i < IPCPD_MAX_CONNS; ++i) +                fa.cep_id_to_fd[i] = -1; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fa.set[i] = flow_set_create(); +                if (fa.set[i] == NULL) +                        goto fail_flows; +        } + +        if (pthread_rwlock_init(&fa.flows_lock, NULL)) +                goto fail_flows; + +        return 0; +fail_flows: +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(fa.set[i]); + +        return -1; +} + +void fa_fini(void) +{ +        int i; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                flow_set_destroy(fa.set[i]); + +        pthread_rwlock_destroy(&fa.flows_lock); +} + +int fa_start(void) +{ +        fa.sdu_sched = sdu_sched_create(fa.set, sdu_handler); +        if (fa.sdu_sched == NULL) { +                log_err("Failed to create SDU scheduler."); +                return -1; +        } + +        return 0; +} + +void fa_stop(void) +{ +        sdu_sched_destroy(fa.sdu_sched); +} + +int fa_alloc(int             fd, +             const uint8_t * dst, +             qoscube_t       qc) +{ +        cep_id_t         cep_id; +        buffer_t         buf; +        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; +        char             path[RIB_MAX_PATH_LEN + 1]; +        uint64_t         addr; +        ssize_t          ch; +        ssize_t          i; +        char **          children; +        char             hashstr[ipcp_dir_hash_strlen() + 1]; +        char *           dst_ipcp = NULL; + +        ipcp_hash_str(hashstr, dst); + +        assert(strlen(hashstr) + strlen(DIR_PATH) + 1 +               < RIB_MAX_PATH_LEN); + +        strcpy(path, DIR_PATH); + +        rib_path_append(path, hashstr); + +        ch = rib_children(path, &children); +        if (ch <= 0) +                return -1; + +        for (i = 0; i < ch; ++i) +                if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) +                        dst_ipcp = children[i]; +                else +                        free(children[i]); + +        free(children); + +        if (dst_ipcp == NULL) +                return -1; + +        strcpy(path, MEMBERS_PATH); + +        rib_path_append(path, dst_ipcp); + +        free(dst_ipcp); + +        if (rib_read(path, &addr, sizeof(addr)) < 0) +                return -1; + +        msg.code        = FLOW_ALLOC_CODE__FLOW_REQ; +        msg.has_hash    = true; +        msg.hash.len    = ipcp_dir_hash_len(); +        msg.hash.data   = (uint8_t *) dst; +        msg.has_qoscube = true; +        msg.qoscube     = qc; + +        buf.len = flow_alloc_msg__get_packed_size(&msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) +                return -1; + +        flow_alloc_msg__pack(&msg, buf.data); + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        cep_id = frct_i_create(addr, &buf, qc); +        if (cep_id == INVALID_CEP_ID) { +                pthread_rwlock_unlock(&fa.flows_lock); +                free(buf.data); +                return -1; +        } + +        free(buf.data); + +        fa.fd_to_cep_id[fd] = cep_id; +        fa.cep_id_to_fd[cep_id] = fd; + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +/* Call under flows lock */ +static int fa_flow_dealloc(int fd) +{ +        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; +        buffer_t         buf; +        int              ret; +        qoscube_t        qc; + +        ipcp_flow_get_qoscube(fd, &qc); +        flow_set_del(fa.set[qc], fd); + +        msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; + +        buf.len = flow_alloc_msg__get_packed_size(&msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) +                return -ENOMEM; + +        flow_alloc_msg__pack(&msg, buf.data); + +        ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf); + +        fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1; +        fa.fd_to_cep_id[fd] = INVALID_CEP_ID; + +        free(buf.data); + +        return ret; +} + +int fa_alloc_resp(int fd, +                  int response) +{ +        struct timespec ts = {0, TIMEOUT * 1000}; +        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; +        buffer_t         buf; + +        msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; +        msg.response = response; +        msg.has_response = true; + +        pthread_mutex_lock(&ipcpi.alloc_lock); + +        while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) +                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                       &ipcpi.alloc_lock, +                                       &ts); + +        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                pthread_mutex_unlock(&ipcpi.alloc_lock); +                return -1; +        } + +        ipcpi.alloc_id = -1; +        pthread_cond_broadcast(&ipcpi.alloc_cond); + +        pthread_mutex_unlock(&ipcpi.alloc_lock); + +        buf.len = flow_alloc_msg__get_packed_size(&msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(buf.len); +        if (buf.data == NULL) +                return -ENOMEM; + +        flow_alloc_msg__pack(&msg, buf.data); + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        if (response < 0) { +                frct_i_destroy(fa.fd_to_cep_id[fd], &buf); +                free(buf.data); +                fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] +                        = INVALID_CEP_ID; +                fa.fd_to_cep_id[fd] = -1; +        } else { +                qoscube_t qc; +                ipcp_flow_get_qoscube(fd, &qc); +                if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) { +                        pthread_rwlock_unlock(&fa.flows_lock); +                        free(buf.data); +                        return -1; +                } +                flow_set_add(fa.set[qc], fd); +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        free(buf.data); + +        return 0; +} + +int fa_dealloc(int fd) +{ +        int ret; + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        ret = fa_flow_dealloc(fd); + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return ret; +} + +int fa_post_buf(cep_id_t   cep_id, +                buffer_t * buf) +{ +        struct timespec    ts  = {0, TIMEOUT * 1000}; +        int                ret = 0; +        int                fd; +        flow_alloc_msg_t * msg; +        qoscube_t          qc; + +        /* Depending on the message call the function in ipcp-dev.h */ + +        msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); +        if (msg == NULL) { +                log_err("Failed to unpack flow alloc message"); +                return -1; +        } + +        switch (msg->code) { +        case FLOW_ALLOC_CODE__FLOW_REQ: +                pthread_mutex_lock(&ipcpi.alloc_lock); + +                if (!msg->has_hash) { +                        log_err("Bad flow request."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        return -1; +                } + +                while (ipcpi.alloc_id != -1 && +                       ipcp_get_state() == IPCP_OPERATIONAL) +                        pthread_cond_timedwait(&ipcpi.alloc_cond, +                                               &ipcpi.alloc_lock, +                                               &ts); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        log_dbg("Won't allocate over non-operational IPCP."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        return -1; +                } + +                assert(ipcpi.alloc_id == -1); + +                fd = ipcp_flow_req_arr(getpid(), +                                       msg->hash.data, +                                       ipcp_dir_hash_len(), +                                       msg->qoscube); +                if (fd < 0) { +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        log_err("Failed to get fd for flow."); +                        return -1; +                } + +                pthread_rwlock_wrlock(&fa.flows_lock); + +                fa.fd_to_cep_id[fd] = cep_id; +                fa.cep_id_to_fd[cep_id] = fd; + +                pthread_rwlock_unlock(&fa.flows_lock); + +                ipcpi.alloc_id = fd; +                pthread_cond_broadcast(&ipcpi.alloc_cond); + +                pthread_mutex_unlock(&ipcpi.alloc_lock); + +                break; +        case FLOW_ALLOC_CODE__FLOW_REPLY: +                pthread_rwlock_wrlock(&fa.flows_lock); + +                fd = fa.cep_id_to_fd[cep_id]; +                ret = ipcp_flow_alloc_reply(fd, msg->response); +                if (msg->response < 0) { +                        fa.fd_to_cep_id[fd] = INVALID_CEP_ID; +                        fa.cep_id_to_fd[cep_id] = -1; +                } else { +                        ipcp_flow_get_qoscube(fd, &qc); +                        flow_set_add(fa.set[qc], +                                     fa.cep_id_to_fd[cep_id]); +                } + +                pthread_rwlock_unlock(&fa.flows_lock); + +                break; +        case FLOW_ALLOC_CODE__FLOW_DEALLOC: +                fd = fa.cep_id_to_fd[cep_id]; +                ipcp_flow_get_qoscube(fd, &qc); +                flow_set_del(fa.set[qc], fd); +                ret = flow_dealloc(fd); +                break; +        default: +                log_err("Got an unknown flow allocation message."); +                ret = -1; +                break; +        } + +        flow_alloc_msg__free_unpacked(msg, NULL); + +        return ret; +} + +int fa_post_sdu(cep_id_t             cep_id, +                struct shm_du_buff * sdb) +{ +        int fd; + +        pthread_rwlock_rdlock(&fa.flows_lock); + +        fd = fa.cep_id_to_fd[cep_id]; +        if (ipcp_flow_write(fd, sdb)) { +                pthread_rwlock_unlock(&fa.flows_lock); +                log_err("Failed to hand SDU to N flow."); +                return -1; +        } + +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h new file mode 100644 index 00000000..d370a381 --- /dev/null +++ b/src/ipcpd/normal/fa.h @@ -0,0 +1,54 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow allocator of the IPC Process + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_FA_H +#define OUROBOROS_IPCPD_NORMAL_FA_H + +#include <ouroboros/shared.h> +#include <ouroboros/utils.h> + +#include "frct.h" + +int  fa_init(void); + +void fa_fini(void); + +int  fa_start(void); + +void fa_stop(void); + +int  fa_alloc(int             fd, +              const uint8_t * dst, +              qoscube_t       qos); + +int  fa_alloc_resp(int fd, +                   int response); + +int  fa_dealloc(int fd); + +int  fa_post_buf(cep_id_t   cep_id, +                 buffer_t * buf); + +int  fa_post_sdu(cep_id_t             cep_id, +                 struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c deleted file mode 100644 index d055b311..00000000 --- a/src/ipcpd/normal/fmgr.c +++ /dev/null @@ -1,748 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include <ouroboros/config.h> -#include <ouroboros/logs.h> -#include <ouroboros/dev.h> -#include <ouroboros/list.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/errno.h> -#include <ouroboros/cacep.h> -#include <ouroboros/rib.h> - -#include "connmgr.h" -#include "fmgr.h" -#include "frct.h" -#include "ipcp.h" -#include "shm_pci.h" -#include "ribconfig.h" -#include "pff.h" -#include "neighbors.h" -#include "gam.h" -#include "routing.h" -#include "sdu_sched.h" - -#include <stdlib.h> -#include <stdbool.h> -#include <pthread.h> -#include <string.h> -#include <inttypes.h> - -#include "flow_alloc.pb-c.h" -typedef FlowAllocMsg flow_alloc_msg_t; - -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ - -struct { -        pthread_rwlock_t   np1_flows_lock; -        cep_id_t           np1_fd_to_cep_id[AP_MAX_FLOWS]; -        int                np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - -        flow_set_t *       np1_set[QOS_CUBE_MAX]; -        struct sdu_sched * np1_sdu_sched; - -        flow_set_t *       nm1_set[QOS_CUBE_MAX]; -        struct sdu_sched * nm1_sdu_sched; - -        struct pff *       pff[QOS_CUBE_MAX]; -        struct routing_i * routing[QOS_CUBE_MAX]; - -        struct gam *       gam; -        struct nbs *       nbs; -        struct ae *        ae; - -        struct nb_notifier nb_notifier; -} fmgr; - -static int fmgr_neighbor_event(enum nb_event event, -                               struct conn   conn) -{ -        qoscube_t cube; - -        /* We are only interested in neighbors being added and removed. */ -        switch (event) { -        case NEIGHBOR_ADDED: -                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); -                flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd); -                log_dbg("Added fd %d to flow set.", conn.flow_info.fd); -                break; -        case NEIGHBOR_REMOVED: -                ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); -                flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd); -                log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); -                break; -        default: -                break; -        } - -        return 0; -} - -static int np1_sdu_handler(int                  fd, -                           qoscube_t            qc, -                           struct shm_du_buff * sdb) -{ -        (void) qc; - -        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - -        if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                ipcp_flow_del(sdb); -                log_warn("Failed to hand SDU to FRCT."); -                return -1; -        } - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return 0; -} - -static int nm1_sdu_handler(int                  fd, -                           qoscube_t            qc, -                           struct shm_du_buff * sdb) -{ -        struct pci pci; - -        memset(&pci, 0, sizeof(pci)); - -        shm_pci_des(sdb, &pci); - -        if (pci.dst_addr != ipcpi.dt_addr) { -                if (pci.ttl == 0) { -                        log_dbg("TTL was zero."); -                        ipcp_flow_del(sdb); -                        return 0; -                } - -                pff_lock(fmgr.pff[qc]); - -                fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); -                if (fd < 0) { -                        pff_unlock(fmgr.pff[qc]); -                        log_err("No next hop for %" PRIu64, pci.dst_addr); -                        ipcp_flow_del(sdb); -                        return -1; -                } - -                pff_unlock(fmgr.pff[qc]); - -                if (ipcp_flow_write(fd, sdb)) { -                        log_err("Failed to write SDU to fd %d.", fd); -                        ipcp_flow_del(sdb); -                        return -1; -                } -        } else { -                shm_pci_shrink(sdb); - -                if (frct_nm1_post_sdu(&pci, sdb)) { -                        log_err("Failed to hand PDU to FRCT."); -                        return -1; -                } -        } - -        return 0; -} - -static void fmgr_destroy_flows(void) -{ -        int i; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                flow_set_destroy(fmgr.nm1_set[i]); -                flow_set_destroy(fmgr.np1_set[i]); -        } -} - -static void fmgr_destroy_routing(void) -{ -        int i; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                routing_i_destroy(fmgr.routing[i]); -} - -static void fmgr_destroy_pff(void) -{ -        int i; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) -                pff_destroy(fmgr.pff[i]); -} - -int fmgr_init(void) -{ -        int              i; -        int              j; -        struct conn_info info; - -        for (i = 0; i < AP_MAX_FLOWS; ++i) -                fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; - -        for (i = 0; i < IPCPD_MAX_CONNS; ++i) -                fmgr.np1_cep_id_to_fd[i] = -1; - -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fmgr.np1_set[i] = flow_set_create(); -                if (fmgr.np1_set[i] == NULL) { -                        fmgr_destroy_flows(); -                        return -1; -                } - -                fmgr.nm1_set[i] = flow_set_create(); -                if (fmgr.nm1_set[i] == NULL) { -                        fmgr_destroy_flows(); -                        return -1; -                } -        } - -        if (shm_pci_init()) { -                log_err("Failed to init shm pci."); -                fmgr_destroy_flows(); -                return -1; -        } - -        memset(&info, 0, sizeof(info)); - -        strcpy(info.ae_name, DT_AE); -        strcpy(info.protocol, FRCT_PROTO); -        info.pref_version = 1; -        info.pref_syntax = PROTO_FIXED; -        info.addr = ipcpi.dt_addr; - -        fmgr.ae = connmgr_ae_create(info); -        if (fmgr.ae == NULL) { -                log_err("Failed to create AE struct."); -                fmgr_destroy_flows(); -                return -1; -        } - -        fmgr.nbs = nbs_create(); -        if (fmgr.nbs == NULL) { -                log_err("Failed to create neighbors struct."); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        fmgr.nb_notifier.notify_call = fmgr_neighbor_event; -        if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) { -                log_err("Failed to register notifier."); -                nbs_destroy(fmgr.nbs); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        if (routing_init(fmgr.nbs)) { -                log_err("Failed to init routing."); -                nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                nbs_destroy(fmgr.nbs); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) { -                routing_fini(); -                nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                nbs_destroy(fmgr.nbs); -                fmgr_destroy_flows(); -                connmgr_ae_destroy(fmgr.ae); -                return -1; -        } - -        for (i = 0; i < QOS_CUBE_MAX; ++i) { -                fmgr.pff[i] = pff_create(); -                if (fmgr.pff[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                pff_destroy(fmgr.pff[j]); -                        pthread_rwlock_destroy(&fmgr.np1_flows_lock); -                        routing_fini(); -                        nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                        nbs_destroy(fmgr.nbs); -                        fmgr_destroy_flows(); -                        connmgr_ae_destroy(fmgr.ae); -                        return -1; -                } - -                fmgr.routing[i] = routing_i_create(fmgr.pff[i]); -                if (fmgr.routing[i] == NULL) { -                        for (j = 0; j < i; ++j) -                                routing_i_destroy(fmgr.routing[j]); -                        fmgr_destroy_pff(); -                        pthread_rwlock_destroy(&fmgr.np1_flows_lock); -                        routing_fini(); -                        nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); -                        nbs_destroy(fmgr.nbs); -                        fmgr_destroy_flows(); -                        connmgr_ae_destroy(fmgr.ae); -                        return -1; -                } -        } - -        return 0; -} - -void fmgr_fini() -{ -        nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - -        fmgr_destroy_routing(); - -        fmgr_destroy_pff(); - -        routing_fini(); - -        fmgr_destroy_flows(); - -        connmgr_ae_destroy(fmgr.ae); - -        nbs_destroy(fmgr.nbs); -} - -int fmgr_start(void) -{ -        enum pol_gam pg; - -        if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) -            != sizeof(pg)) { -                log_err("Failed to read policy for ribmgr gam."); -                return -1; -        } - -        fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae); -        if (fmgr.gam == NULL) { -                log_err("Failed to init dt graph adjacency manager."); -                return -1; -        } - -        fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); -        if (fmgr.nm1_sdu_sched == NULL) { -                log_err("Failed to create N-1 SDU scheduler."); -                gam_destroy(fmgr.gam); -                return -1; -        } - -        fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); -        if (fmgr.np1_sdu_sched == NULL) { -                log_err("Failed to create N+1 SDU scheduler."); -                sdu_sched_destroy(fmgr.nm1_sdu_sched); -                gam_destroy(fmgr.gam); -                return -1; -        } - -        return 0; -} - -void fmgr_stop(void) -{ -        sdu_sched_destroy(fmgr.np1_sdu_sched); - -        sdu_sched_destroy(fmgr.nm1_sdu_sched); - -        gam_destroy(fmgr.gam); -} - -int fmgr_np1_alloc(int             fd, -                   const uint8_t * dst, -                   qoscube_t       cube) -{ -        cep_id_t         cep_id; -        buffer_t         buf; -        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; -        char             path[RIB_MAX_PATH_LEN + 1]; -        uint64_t         addr; -        ssize_t          ch; -        ssize_t          i; -        char **          children; -        char             hashstr[ipcp_dir_hash_strlen() + 1]; -        char *           dst_ipcp = NULL; - -        ipcp_hash_str(hashstr, dst); - -        assert(strlen(hashstr) + strlen(DIR_PATH) + 1 -               < RIB_MAX_PATH_LEN); - -        strcpy(path, DIR_PATH); - -        rib_path_append(path, hashstr); - -        ch = rib_children(path, &children); -        if (ch <= 0) -                return -1; - -        for (i = 0; i < ch; ++i) -                if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) -                        dst_ipcp = children[i]; -                else -                        free(children[i]); - -        free(children); - -        if (dst_ipcp == NULL) -                return -1; - -        strcpy(path, MEMBERS_PATH); - -        rib_path_append(path, dst_ipcp); - -        free(dst_ipcp); - -        if (rib_read(path, &addr, sizeof(addr)) < 0) -                return -1; - -        msg.code        = FLOW_ALLOC_CODE__FLOW_REQ; -        msg.has_hash    = true; -        msg.hash.len    = ipcp_dir_hash_len(); -        msg.hash.data   = (uint8_t *) dst; -        msg.has_qoscube = true; -        msg.qoscube     = cube; - -        buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) -                return -1; - -        buf.data = malloc(buf.len); -        if (buf.data == NULL) -                return -1; - -        flow_alloc_msg__pack(&msg, buf.data); - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        cep_id = frct_i_create(addr, &buf, cube); -        if (cep_id == INVALID_CEP_ID) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                free(buf.data); -                return -1; -        } - -        free(buf.data); - -        fmgr.np1_fd_to_cep_id[fd] = cep_id; -        fmgr.np1_cep_id_to_fd[cep_id] = fd; - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return 0; -} - -/* Call under np1_flows lock */ -static int np1_flow_dealloc(int fd) -{ -        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; -        buffer_t         buf; -        int              ret; -        qoscube_t        cube; - -        ipcp_flow_get_qoscube(fd, &cube); -        flow_set_del(fmgr.np1_set[cube], fd); - -        msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; - -        buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) -                return -1; - -        buf.data = malloc(buf.len); -        if (buf.data == NULL) -                return -ENOMEM; - -        flow_alloc_msg__pack(&msg, buf.data); - -        ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - -        fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; -        fmgr.np1_fd_to_cep_id[fd] = -1; - -        free(buf.data); - -        return ret; -} - -int fmgr_np1_alloc_resp(int fd, -                        int response) -{ -        struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; -        flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; -        buffer_t         buf; - -        msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; -        msg.response = response; -        msg.has_response = true; - -        pthread_mutex_lock(&ipcpi.alloc_lock); - -        while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) -                pthread_cond_timedwait(&ipcpi.alloc_cond, -                                       &ipcpi.alloc_lock, -                                       &ts); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_mutex_unlock(&ipcpi.alloc_lock); -                return -1; -        } - -        ipcpi.alloc_id = -1; -        pthread_cond_broadcast(&ipcpi.alloc_cond); - -        pthread_mutex_unlock(&ipcpi.alloc_lock); - -        buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) -                return -1; - -        buf.data = malloc(buf.len); -        if (buf.data == NULL) -                return -ENOMEM; - -        flow_alloc_msg__pack(&msg, buf.data); - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        if (response < 0) { -                frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); -                free(buf.data); -                fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] -                        = INVALID_CEP_ID; -                fmgr.np1_fd_to_cep_id[fd] = -1; -        } else { -                qoscube_t cube; -                ipcp_flow_get_qoscube(fd, &cube); -                if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        free(buf.data); -                        return -1; -                } -                flow_set_add(fmgr.np1_set[cube], fd); -        } - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        free(buf.data); - -        return 0; -} - -int fmgr_np1_dealloc(int fd) -{ -        int ret; - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        ret = np1_flow_dealloc(fd); - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return ret; -} - -int fmgr_np1_post_buf(cep_id_t   cep_id, -                      buffer_t * buf) -{ -        struct timespec    ts  = {0, FD_UPDATE_TIMEOUT * 1000}; -        int                ret = 0; -        int                fd; -        flow_alloc_msg_t * msg; -        qoscube_t          cube; - -        /* Depending on the message call the function in ipcp-dev.h */ - -        msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); -        if (msg == NULL) { -                log_err("Failed to unpack flow alloc message"); -                return -1; -        } - -        switch (msg->code) { -        case FLOW_ALLOC_CODE__FLOW_REQ: -                pthread_mutex_lock(&ipcpi.alloc_lock); - -                if (!msg->has_hash) { -                        log_err("Bad flow request."); -                        return -1; -                } - -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &ts); - -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        return -1; -                } - -                assert(ipcpi.alloc_id == -1); - -                fd = ipcp_flow_req_arr(getpid(), -                                       msg->hash.data, -                                       ipcp_dir_hash_len(), -                                       msg->qoscube); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        log_err("Failed to get fd for flow."); -                        return -1; -                } - -                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -                fmgr.np1_fd_to_cep_id[fd] = cep_id; -                fmgr.np1_cep_id_to_fd[cep_id] = fd; - -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); - -                pthread_mutex_unlock(&ipcpi.alloc_lock); - -                break; -        case FLOW_ALLOC_CODE__FLOW_REPLY: -                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -                fd = fmgr.np1_cep_id_to_fd[cep_id]; -                ret = ipcp_flow_alloc_reply(fd, msg->response); -                if (msg->response < 0) { -                        fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; -                        fmgr.np1_cep_id_to_fd[cep_id] = -1; -                } else { -                        ipcp_flow_get_qoscube(fd, &cube); -                        flow_set_add(fmgr.np1_set[cube], -                                     fmgr.np1_cep_id_to_fd[cep_id]); -                } - -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -                break; -        case FLOW_ALLOC_CODE__FLOW_DEALLOC: -                fd = fmgr.np1_cep_id_to_fd[cep_id]; -                ipcp_flow_get_qoscube(fd, &cube); -                flow_set_del(fmgr.np1_set[cube], fd); -                ret = flow_dealloc(fd); -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                ret = -1; -                break; -        } - -        flow_alloc_msg__free_unpacked(msg, NULL); - -        return ret; -} - -int fmgr_np1_post_sdu(cep_id_t             cep_id, -                      struct shm_du_buff * sdb) -{ -        int fd; - -        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - -        fd = fmgr.np1_cep_id_to_fd[cep_id]; -        if (ipcp_flow_write(fd, sdb)) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                log_err("Failed to hand SDU to N flow."); -                return -1; -        } - -        pthread_rwlock_unlock(&fmgr.np1_flows_lock); - -        return 0; -} - -int fmgr_nm1_write_sdu(struct pci *         pci, -                       struct shm_du_buff * sdb) -{ -        int fd; - -        if (pci == NULL || sdb == NULL) -                return -EINVAL; - -        pff_lock(fmgr.pff[pci->qos_id]); -        fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); -        if (fd < 0) { -                pff_unlock(fmgr.pff[pci->qos_id]); -                log_err("Could not get nhop for address %" PRIu64, -                        pci->dst_addr); -                ipcp_flow_del(sdb); -                return -1; -        } -        pff_unlock(fmgr.pff[pci->qos_id]); - -        if (shm_pci_ser(sdb, pci)) { -                log_err("Failed to serialize PDU."); -                ipcp_flow_del(sdb); -                return -1; -        } - -        if (ipcp_flow_write(fd, sdb)) { -                log_err("Failed to write SDU to fd %d.", fd); -                ipcp_flow_del(sdb); -                return -1; -        } - -        return 0; -} - -int fmgr_nm1_write_buf(struct pci * pci, -                       buffer_t *   buf) -{ -        buffer_t * buffer; -        int        fd; - -        if (pci == NULL || buf == NULL || buf->data == NULL) -                return -EINVAL; - -        pff_lock(fmgr.pff[pci->qos_id]); -        fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); -        if (fd < 0) { -                pff_unlock(fmgr.pff[pci->qos_id]); -                log_err("Could not get nhop for address %" PRIu64, -                        pci->dst_addr); -                return -1; -        } -        pff_unlock(fmgr.pff[pci->qos_id]); - -        buffer = shm_pci_ser_buf(buf, pci); -        if (buffer == NULL) { -                log_err("Failed to serialize buffer."); -                return -1; -        } - -        if (flow_write(fd, buffer->data, buffer->len) == -1) { -                log_err("Failed to write buffer to fd."); -                free(buffer); -                return -1; -        } - -        free(buffer->data); -        free(buffer); -        return 0; -} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h deleted file mode 100644 index c59c0875..00000000 --- a/src/ipcpd/normal/fmgr.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - *    Dimitri Staessens <dimitri.staessens@ugent.be> - *    Sander Vrijders   <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_FMGR_H -#define OUROBOROS_IPCPD_NORMAL_FMGR_H - -#include <ouroboros/shared.h> -#include <ouroboros/qos.h> - -#include "ae.h" -#include "frct.h" - -int  fmgr_init(void); - -void fmgr_fini(void); - -int  fmgr_start(void); - -void fmgr_stop(void); - -int  fmgr_np1_alloc(int             fd, -                    const uint8_t * dst, -                    qoscube_t       qos); - -int  fmgr_np1_alloc_resp(int fd, -                         int response); - -int  fmgr_np1_dealloc(int fd); - -int  fmgr_np1_post_buf(cep_id_t   id, -                       buffer_t * buf); - -int  fmgr_np1_post_sdu(cep_id_t             id, -                       struct shm_du_buff * sdb); - -int  fmgr_nm1_write_sdu(struct pci *         pci, -                        struct shm_du_buff * sdb); - -int  fmgr_nm1_write_buf(struct pci * pci, -                        buffer_t *   buf); - -#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */ diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index d873beae..bfcde1b3 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -30,8 +30,9 @@  #include <ouroboros/errno.h>  #include "frct.h" -#include "fmgr.h"  #include "ipcp.h" +#include "dt.h" +#include "fa.h"  #include <stdlib.h>  #include <stdbool.h> @@ -210,8 +211,8 @@ int frct_fini()          return 0;  } -int frct_nm1_post_sdu(struct pci *         pci, -                      struct shm_du_buff * sdb) +int frct_post_sdu(struct pci *         pci, +                  struct shm_du_buff * sdb)  {          struct frct_i * instance;          buffer_t        buf; @@ -250,17 +251,17 @@ int frct_nm1_post_sdu(struct pci *         pci,                  buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);                  buf.data = shm_du_buff_head(sdb); -                if (fmgr_np1_post_buf(id, &buf)) { -                        log_err("Failed to hand buffer to Flow Manager."); +                if (fa_post_buf(id, &buf)) { +                        log_err("Failed to hand buffer to FA.");                          ipcp_flow_del(sdb);                          return -1;                  }                  ipcp_flow_del(sdb);          } else { -                /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ -                if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) { -                        log_err("Failed to hand SDU to FMGR."); +                /* FIXME: Known cep-ids are delivered to FA (minimal DTP) */ +                if (fa_post_sdu(pci->dst_cep_id, sdb)) { +                        log_err("Failed to hand SDU to FA.");                          ipcp_flow_del(sdb);                          return -1;                  } @@ -301,11 +302,11 @@ cep_id_t frct_i_create(uint64_t   address,          pci.seqno = 0;          pci.qos_id = cube; -        if (fmgr_nm1_write_buf(&pci, buf)) { +        if (dt_write_buf(&pci, buf)) {                  pthread_mutex_lock(&frct.instances_lock);                  destroy_frct_i(id);                  pthread_mutex_unlock(&frct.instances_lock); -                log_err("Failed to hand PDU to FMGR."); +                log_err("Failed to hand PDU to DT.");                  return INVALID_CEP_ID;          } @@ -350,7 +351,7 @@ int frct_i_accept(cep_id_t   id,          pthread_mutex_unlock(&frct.instances_lock); -        if (fmgr_nm1_write_buf(&pci, buf)) +        if (dt_write_buf(&pci, buf))                  return -1;          return 0; @@ -390,7 +391,7 @@ int frct_i_destroy(cep_id_t   id,          pthread_mutex_unlock(&frct.instances_lock);          if (buf != NULL && buf->data != NULL) -                if (fmgr_nm1_write_buf(&pci, buf)) +                if (dt_write_buf(&pci, buf))                          return -1;          return 0; @@ -427,9 +428,9 @@ int frct_i_write_sdu(cep_id_t             id,          pci.seqno = (instance->seqno)++;          pci.qos_id = instance->cube; -        if (fmgr_nm1_write_sdu(&pci, sdb)) { +        if (dt_write_sdu(&pci, sdb)) {                  pthread_mutex_unlock(&frct.instances_lock); -                log_err("Failed to hand SDU to FMGR."); +                log_err("Failed to hand SDU to DT.");                  return -1;          } diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index a1dcb151..b179e36b 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -50,7 +50,7 @@ int         frct_i_destroy(cep_id_t   id,  int         frct_i_write_sdu(cep_id_t             id,                               struct shm_du_buff * sdb); -int         frct_nm1_post_sdu(struct pci *         pci, -                              struct shm_du_buff * sdb); +int         frct_post_sdu(struct pci *         pci, +                          struct shm_du_buff * sdb);  #endif /* OUROBOROS_IPCPD_NORMAL_FRCT_H */ diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 67424914..ab8cf387 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -36,7 +36,8 @@  #include "connmgr.h"  #include "dir.h"  #include "enroll.h" -#include "fmgr.h" +#include "fa.h" +#include "dt.h"  #include "ipcp.h"  #include "ribconfig.h"  #include "ribmgr.h" @@ -73,7 +74,7 @@ static int boot_components(void)                         &ipcpi.dir_hash_algo, sizeof(ipcpi.dir_hash_algo));          if (len < 0) {                  log_err("Failed to read hash length: %zd.", len); -                return -1; +                goto fail_name;          }          ipcpi.dir_hash_algo = ntoh32(ipcpi.dir_hash_algo); @@ -82,7 +83,7 @@ static int boot_components(void)          if (rib_add(MEMBERS_PATH, ipcpi.name)) {                  log_err("Failed to add name to " MEMBERS_PATH); -                return -1; +                goto fail_name;          }          log_dbg("Starting components."); @@ -90,27 +91,25 @@ static int boot_components(void)          if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa))              != sizeof(pa)) {                  log_err("Failed to read policy for address authority."); -                return -1; +                goto fail_name;          }          if (addr_auth_init(pa)) {                  log_err("Failed to init address authority."); -                return -1; +                goto fail_name;          }          ipcpi.dt_addr = addr_auth_address();          if (ipcpi.dt_addr == 0) {                  log_err("Failed to get a valid address."); -                addr_auth_fini(); -                return -1; +                goto fail_addr_auth;          }          path[0] = '\0';          rib_path_append(rib_path_append(path, MEMBERS_NAME), ipcpi.name);          if (rib_write(path, &ipcpi.dt_addr, sizeof(&ipcpi.dt_addr))) {                  log_err("Failed to write address to member object."); -                addr_auth_fini(); -                return -1; +                goto fail_addr_auth;          }          log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); @@ -119,91 +118,100 @@ static int boot_components(void)          if (ribmgr_init()) {                  log_err("Failed to initialize RIB manager."); -                addr_auth_fini(); -                return -1; +                goto fail_addr_auth;          }          if (dir_init()) {                  log_err("Failed to initialize directory."); -                ribmgr_fini(); -                addr_auth_fini(); -                return -1; +                goto fail_ribmgr;          }          log_dbg("Ribmgr started.");          if (frct_init()) { -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini();                  log_err("Failed to initialize FRCT."); -                return -1; +                goto fail_dir;          } -        if (fmgr_init()) { -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini(); -                log_err("Failed to initialize flow manager component."); -                return -1; +        if (fa_init()) { +                log_err("Failed to initialize flow allocator ae."); +                goto fail_frct;          } -        if (fmgr_start()) { -                fmgr_fini(); -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini(); -                log_err("Failed to start flow manager."); -                return -1; +        if (dt_init()) { +                log_err("Failed to initialize data transfer ae."); +                goto fail_fa; +        } + +        if (fa_start()) { +                log_err("Failed to start flow allocator."); +                goto fail_dt; +        } + +        if (dt_start()) { +                log_err("Failed to start data transfer ae."); +                goto fail_fa_start;          }          if (enroll_start()) { -                fmgr_stop(); -                fmgr_fini(); -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini();                  log_err("Failed to start enroll."); -                return -1; +                goto fail_dt_start;          }          ipcp_set_state(IPCP_OPERATIONAL);          if (connmgr_start()) { -                ipcp_set_state(IPCP_INIT); -                enroll_stop(); -                fmgr_stop(); -                fmgr_fini(); -                frct_fini(); -                dir_fini(); -                ribmgr_fini(); -                addr_auth_fini();                  log_err("Failed to start AP connection manager."); -                return -1; +                goto fail_enroll;          }          return 0; + + fail_enroll: +        ipcp_set_state(IPCP_INIT); +        enroll_stop(); + fail_dt_start: +        dt_stop(); + fail_fa_start: +        fa_stop(); + fail_dt: +        dt_fini(); + fail_fa: +        fa_fini(); + fail_frct: +        frct_fini(); + fail_dir: +        dir_fini(); + fail_ribmgr: +        ribmgr_fini(); + fail_addr_auth: +        addr_auth_fini(); + fail_name: +        free(ipcpi.dif_name); + +        return -1;  }  void shutdown_components(void)  { -        ribmgr_fini(); -          connmgr_stop();          enroll_stop(); -        frct_fini(); +        dt_stop(); + +        fa_stop(); -        fmgr_stop(); +        dt_fini(); -        fmgr_fini(); +        fa_fini(); + +        frct_fini();          dir_fini(); +        ribmgr_fini(); +          addr_auth_fini();          free(ipcpi.dif_name); @@ -366,9 +374,9 @@ static struct ipcp_ops normal_ops = {          .ipcp_reg             = dir_reg,          .ipcp_unreg           = dir_unreg,          .ipcp_query           = dir_query, -        .ipcp_flow_alloc      = fmgr_np1_alloc, -        .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, -        .ipcp_flow_dealloc    = fmgr_np1_dealloc +        .ipcp_flow_alloc      = fa_alloc, +        .ipcp_flow_alloc_resp = fa_alloc_resp, +        .ipcp_flow_dealloc    = fa_dealloc  };  int main(int    argc, | 
