diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/ae.h | 29 | ||||
| -rw-r--r-- | src/ipcpd/normal/dir.c | 49 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 102 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.h | 12 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.c | 123 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 181 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 277 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.h | 6 | ||||
| -rw-r--r-- | src/ipcpd/normal/ro.h | 6 | 
9 files changed, 453 insertions, 332 deletions
| diff --git a/src/ipcpd/normal/ae.h b/src/ipcpd/normal/ae.h new file mode 100644 index 00000000..229ff4aa --- /dev/null +++ b/src/ipcpd/normal/ae.h @@ -0,0 +1,29 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Application Entities for the normal IPC process + * + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_AE_H +#define OUROBOROS_IPCPD_NORMAL_AE_H + +#define MGMT_AE "Management" +#define DT_AE   "Data transfer" + +#endif /* OUROBOROS_IPCPD_NORMAL_AE_H */ diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 47fb1f6e..c5bb03dd 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * DIF directory   * @@ -26,7 +26,6 @@  #include <ouroboros/errno.h>  #include "dir.h" -#include "ipcp.h"  #include "ro.h"  #include "pathname.h"  #include "ribmgr.h" @@ -95,41 +94,27 @@ int dir_name_reg(char * name)          char * path;          uint64_t * addr; -        pthread_rwlock_rdlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("IPCP is not in RUNNING state."); -                return -1; -        } -          ro_attr_init(&attr);          attr.enrol_sync = true;          attr.recv_set = ALL_MEMBERS;          path = create_path(name); -        if (path == NULL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); +        if (path == NULL)                  return -ENOMEM; -        }          addr = malloc(sizeof(*addr));          if (addr == NULL) { -                pthread_rwlock_unlock(&ipcpi.state_lock);                  pathname_destroy(path);                  return -ENOMEM;          }          *addr = ribmgr_address();          if (ro_create(path, &attr, (uint8_t *) addr, sizeof(*addr))) { -                pthread_rwlock_unlock(&ipcpi.state_lock);                  pathname_destroy(path);                  LOG_ERR("Failed to create RIB object.");                  return -1;          } -        pthread_rwlock_unlock(&ipcpi.state_lock); -          LOG_DBG("Registered %s.", name);          pathname_destroy(path); @@ -140,29 +125,16 @@ int dir_name_unreg(char * name)  {          char * path; -        pthread_rwlock_rdlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("IPCP is not in RUNNING state."); -                return -1; -        } -          path = create_path(name); -        if (path == NULL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); +        if (path == NULL)                  return -ENOMEM; -        }          if (ro_delete(path)) { -                pthread_rwlock_unlock(&ipcpi.state_lock);                  pathname_destroy(path);                  LOG_ERR("No such RIB object exists.");                  return -1;          } -        pthread_rwlock_unlock(&ipcpi.state_lock); -          pathname_destroy(path);          return 0; @@ -176,18 +148,9 @@ int dir_name_query(char * name)          uint64_t addr;          struct dt_const * dtc; -        pthread_rwlock_rdlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                return -1; -        } -          path = create_path(name); -        if (path == NULL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                return -1; -        } +        if (path == NULL) +                return -ENOMEM;          if (ro_exists(path)) {                  if (ro_read(path, &ro_data) < 0) { @@ -206,8 +169,6 @@ int dir_name_query(char * name)                  ret = (addr == ribmgr_address()) ? -1 : 0;          } -        pthread_rwlock_unlock(&ipcpi.state_lock); -          pathname_destroy(path);          return ret; diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 4b24d5a1..64d9a5a9 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * Flow manager of the IPC Process   * @@ -60,7 +60,6 @@ struct {          cep_id_t           np1_fd_to_cep_id[AP_MAX_FLOWS];          int                np1_cep_id_to_fd[IPCPD_MAX_CONNS]; -        pthread_t          nm1_flow_acceptor;          pthread_t          nm1_sdu_reader;          pthread_t          np1_sdu_reader; @@ -68,65 +67,6 @@ struct {          int fd;  } fmgr; -static void * fmgr_nm1_acceptor(void * o) -{ -        int       fd; -        char *    ae_name; -        qoscube_t cube; -        qosspec_t qs; - -        (void) o; - -        while (true) { -                if (ipcp_get_state() == IPCP_SHUTDOWN) -                        return 0; - -                fd = flow_accept(&ae_name, &qs); -                if (fd < 0) { -                        LOG_WARN("Flow accept failed."); -                        continue; -                } - -                if (!(strcmp(ae_name, MGMT_AE) == 0 || -                      strcmp(ae_name, DT_AE) == 0)) { -                        if (flow_alloc_resp(fd, -1)) -                                LOG_WARN("Failed to reply to flow allocation."); -                        flow_dealloc(fd); -                        continue; -                } - -                if (flow_alloc_resp(fd, 0)) { -                        LOG_WARN("Failed to reply to flow allocation."); -                        flow_dealloc(fd); -                        continue; -                } - -                LOG_DBG("Accepted new flow allocation request for AE %s.", -                        ae_name); - -                if (strcmp(ae_name, MGMT_AE) == 0) { -                        if (ribmgr_add_flow(fd)) { -                                LOG_WARN("Failed to hand fd to RIB."); -                                flow_dealloc(fd); -                                continue; -                        } -                } else { -                        ipcp_flow_get_qoscube(fd, &cube); -                        if (flow_set_add(fmgr.nm1_set[cube], fd)) { -                                LOG_WARN("Failed to add fd."); -                                flow_dealloc(fd); -                                continue; -                        } -                        /* FIXME: Temporary, until we have a PFF */ -                        fmgr.fd = fd; -                } - -                free(ae_name); -        } - -        return (void *) 0; -} -  static void * fmgr_np1_sdu_reader(void * o)  {          struct shm_du_buff * sdb; @@ -313,7 +253,6 @@ int fmgr_init()                  }          } -        pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL);          pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);          pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); @@ -325,11 +264,9 @@ int fmgr_fini()          int i;          int j; -        pthread_cancel(fmgr.nm1_flow_acceptor);          pthread_cancel(fmgr.np1_sdu_reader);          pthread_cancel(fmgr.nm1_sdu_reader); -        pthread_join(fmgr.nm1_flow_acceptor, NULL);          pthread_join(fmgr.np1_sdu_reader, NULL);          pthread_join(fmgr.nm1_sdu_reader, NULL); @@ -360,16 +297,6 @@ int fmgr_np1_alloc(int       fd,          uint8_t * ro_data;          uint64_t addr; -        pthread_rwlock_rdlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("IPCP is not enrolled yet."); -                return -1; /* -ENOTINIT */ -        } - -        pthread_rwlock_unlock(&ipcpi.state_lock); -          path = pathname_create(RO_DIR);          if (path == NULL)                  return -1; @@ -605,30 +532,21 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb)          return 0;  } -int fmgr_nm1_mgmt_flow(char * dst_name) +/* FIXME: do this in a topologymanager instance */ +int fmgr_nm1_add_flow(int fd)  { -        int fd; -        int result; +        qoscube_t qos; -        /* FIXME: Request retransmission. */ -        fd = flow_alloc(dst_name, MGMT_AE, NULL); -        if (fd < 0) { -                LOG_ERR("Failed to allocate flow to %s.", dst_name); +        if (flow_alloc_resp(fd, 0) < 0) { +                LOG_ERR("Could not respond to new flow.");                  return -1;          } -        result = flow_alloc_res(fd); -        if (result < 0) { -                LOG_ERR("Result of flow allocation to %s is %d.", -                        dst_name, result); -                return -1; -        } +        ipcp_flow_get_qoscube(fd, &qos); +        flow_set_add(fmgr.nm1_set[qos], fd); -        if (ribmgr_add_flow(fd)) { -                LOG_ERR("Failed to hand file descriptor to RIB manager."); -                flow_dealloc(fd); -                return -1; -        } +        /* FIXME: Temporary, until we have a PFF */ +        fmgr.fd = fd;          return 0;  } diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index e17f3f55..85731081 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * Flow manager of the IPC Process   * @@ -24,15 +24,9 @@  #include <ouroboros/shared.h> -#include <unistd.h> -#include <stdint.h> -#include <sys/types.h> - +#include "ae.h"  #include "frct.h" -#define MGMT_AE "Management" -#define DT_AE "Data transfer" -  int fmgr_init(void);  int fmgr_fini(void); @@ -53,7 +47,7 @@ int fmgr_np1_post_buf(cep_id_t   id,  int fmgr_np1_post_sdu(cep_id_t             id,                        struct shm_du_buff * sdb); -int fmgr_nm1_mgmt_flow(char * dst_name); +int fmgr_nm1_add_flow(int fd);  int fmgr_nm1_dt_flow(char *    dst_name,                       qoscube_t qos); diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 33bd044b..6cd68f18 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * The Flow and Retransmission control component   * @@ -40,21 +40,21 @@ enum conn_state {  };  struct frct_i { -        uint32_t cep_id; -        uint64_t r_address; -        uint32_t r_cep_id; +        uint32_t  cep_id; +        uint64_t  r_address; +        uint32_t  r_cep_id;          qoscube_t cube; -        uint64_t seqno; +        uint64_t  seqno;          enum conn_state state;  };  struct { -        pthread_mutex_t instances_lock;          struct frct_i ** instances; +        pthread_mutex_t  instances_lock; -        struct bmp * cep_ids; -        pthread_mutex_t cep_ids_lock; +        struct bmp *     cep_ids; +        pthread_mutex_t  cep_ids_lock;  } frct;  static cep_id_t next_cep_id(void) @@ -62,9 +62,11 @@ static cep_id_t next_cep_id(void)          cep_id_t ret;          pthread_mutex_lock(&frct.cep_ids_lock); +          ret = bmp_allocate(frct.cep_ids);          if (!bmp_is_id_valid(frct.cep_ids, ret))                  ret = INVALID_CEP_ID; +          pthread_mutex_unlock(&frct.cep_ids_lock);          return ret; @@ -75,51 +77,48 @@ static int release_cep_id(cep_id_t id)          int ret;          pthread_mutex_lock(&frct.cep_ids_lock); +          ret = bmp_release(frct.cep_ids, id); +          pthread_mutex_unlock(&frct.cep_ids_lock);          return ret;  } -int frct_init() +static int init_cep_ids(void)  { -        int i; -          if (pthread_mutex_init(&frct.cep_ids_lock, NULL))                  return -1; -        if (pthread_mutex_init(&frct.instances_lock, NULL)) -                return -1; - -        frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); -        if (frct.instances == NULL) -                return -1; - -        for (i = 0; i < IRMD_MAX_FLOWS; i++) -                frct.instances[i] = NULL; -          frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1));          if (frct.cep_ids == NULL) { -                free(frct.instances); +                pthread_mutex_destroy(&frct.cep_ids_lock);                  return -1;          }          return 0;  } -int frct_fini() +static int init_instances(void)  { -        pthread_mutex_lock(&frct.cep_ids_lock); -        bmp_destroy(frct.cep_ids); -        pthread_mutex_unlock(&frct.cep_ids_lock); +        int i; -        free(frct.instances); +        if (pthread_mutex_init(&frct.instances_lock, NULL)) +                return -1; + +        frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); +        if (frct.instances == NULL) { +                pthread_mutex_destroy(&frct.instances_lock); +                return -1; +        } + +        for (i = 0; i < IRMD_MAX_FLOWS; i++) +                frct.instances[i] = NULL;          return 0;  } -static struct frct_i * create_frct_i(uint64_t address, -                                     cep_id_t r_cep_id) +static struct frct_i * create_frct_i(uint64_t address, cep_id_t r_cep_id)  {          struct frct_i * instance;          cep_id_t        id; @@ -145,6 +144,60 @@ static struct frct_i * create_frct_i(uint64_t address,          return instance;  } +static void destroy_frct_i(struct frct_i * instance) +{ +        free(instance); +} + +static void fini_cep_ids(void) +{ +        pthread_mutex_lock(&frct.cep_ids_lock); + +        bmp_destroy(frct.cep_ids); + +        pthread_mutex_unlock(&frct.cep_ids_lock); + +        pthread_mutex_destroy(&frct.cep_ids_lock); +} + +static void fini_instances(void) +{ +        int i; + +        pthread_mutex_lock(&frct.instances_lock); + +        for (i = 0; i < IRMD_MAX_FLOWS; i++) +                if (frct.instances[i] != NULL) +                        destroy_frct_i(frct.instances[i]); + +        pthread_mutex_unlock(&frct.instances_lock); + +        pthread_mutex_destroy(&frct.instances_lock); + +        free(frct.instances); +} + +int frct_init() +{ +        if (init_cep_ids()) +                return -1; + +        if (init_instances()) { +                fini_cep_ids(); +                return -1; +        } + +        return 0; +} + +int frct_fini() +{ +        fini_cep_ids(); +        fini_instances(); + +        return 0; +} +  int frct_nm1_post_sdu(struct pci * pci,                        struct shm_du_buff * sdb)  { @@ -209,14 +262,6 @@ int frct_nm1_post_sdu(struct pci * pci,          return 0;  } -/* Call under instances lock */ -static void destroy_frct_i(struct frct_i * instance) -{ -        release_cep_id(instance->cep_id); -        frct.instances[instance->cep_id] = NULL; -        free(instance); -} -  cep_id_t frct_i_create(uint64_t   address,                         buffer_t * buf,                         qoscube_t  cube) @@ -328,7 +373,11 @@ int frct_i_destroy(cep_id_t   id,          pci.seqno = 0;          pci.qos_id = instance->cube; +        frct.instances[id] = NULL;          destroy_frct_i(instance); + +        release_cep_id(instance->cep_id); +          pthread_mutex_unlock(&frct.instances_lock);          if (buf != NULL && buf->data != NULL) diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 34ba52da..f6a88f29 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * Normal IPC Process   * @@ -59,7 +59,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)                          pthread_rwlock_wrlock(&ipcpi.state_lock); -                        ipcp_set_state(IPCP_SHUTDOWN); +                        if (ipcp_get_state() == IPCP_INIT) +                                ipcp_set_state(IPCP_NULL); + +                        if (ipcp_get_state() == IPCP_OPERATIONAL) +                                ipcp_set_state(IPCP_SHUTDOWN);                          pthread_rwlock_unlock(&ipcpi.state_lock);                  } @@ -68,12 +72,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)          }  } -static int normal_ipcp_enroll(char * dif_name) +static int normal_ipcp_enroll(char * dst_name)  { -        struct timespec timeout = {(ENROLL_TIMEOUT / 1000), -                                   (ENROLL_TIMEOUT % 1000) * MILLION}; +        int ret; -        pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_wrlock(&ipcpi.state_lock);          if (ipcp_get_state() != IPCP_INIT) {                  pthread_rwlock_unlock(&ipcpi.state_lock); @@ -81,37 +84,62 @@ static int normal_ipcp_enroll(char * dif_name)                  return -1; /* -ENOTINIT */          } -        pthread_rwlock_unlock(&ipcpi.state_lock); - -        if (fmgr_nm1_mgmt_flow(dif_name)) { -                LOG_ERR("Failed to establish management flow."); +        if (ribmgr_init()) { +                LOG_ERR("Failed to initialise RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        if (ribmgr_enrol()) { -                LOG_ERR("Failed to enrol IPCP."); +        if (ribmgr_nm1_mgt_flow(dst_name)) { +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +                LOG_ERR("Failed to establish management flow."); +                pthread_rwlock_unlock(&ipcpi.state_lock);                  return -1;          } -        if (ipcp_wait_state(IPCP_BOOTING, &timeout) == -ETIMEDOUT) { -                LOG_ERR("Enrollment timed out."); +        ret = ribmgr_enrol(); +        if (ret < 0) { +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                if (ret == -ETIMEDOUT) +                        LOG_ERR("Enrollment timed out."); +                else +                        LOG_ERR("Failed to enrol IPCP: %d.", ret);                  return -1;          }          if (ribmgr_start_policies()) { -                pthread_rwlock_wrlock(&ipcpi.state_lock); -                ipcp_set_state(IPCP_INIT);                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to start policies.");                  return -1;          } -        pthread_rwlock_wrlock(&ipcpi.state_lock); +        if (fmgr_init()) { +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("Failed to start flow manager."); +                return -1; +        } + +        if (frct_init()) { +                if (fmgr_fini()) +                        LOG_WARN("Failed to finalize flow manager."); +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("Failed to initialize FRCT."); +                return -1; +        } +          ipcp_set_state(IPCP_OPERATIONAL); +          pthread_rwlock_unlock(&ipcpi.state_lock);          /* FIXME: Remove once we obtain neighbors during enrollment */ -        if (fmgr_nm1_dt_flow(dif_name, QOS_CUBE_BE)) { +        if (fmgr_nm1_dt_flow(dst_name, QOS_CUBE_BE)) {                  LOG_ERR("Failed to establish data transfer flow.");                  return -1;          } @@ -121,6 +149,11 @@ static int normal_ipcp_enroll(char * dif_name)  static int normal_ipcp_bootstrap(struct dif_config * conf)  { +        if (conf == NULL || conf->type != THIS_TYPE) { +                LOG_ERR("Bad DIF configuration."); +                return -EINVAL; +        } +          pthread_rwlock_wrlock(&ipcpi.state_lock);          if (ipcp_get_state() != IPCP_INIT) { @@ -129,26 +162,48 @@ static int normal_ipcp_bootstrap(struct dif_config * conf)                  return -1; /* -ENOTINIT */          } +        if (ribmgr_init()) { +                LOG_ERR("Failed to initialise RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                return -1; +        } +          if (ribmgr_bootstrap(conf)) { +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager.");                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to bootstrap RIB manager.");                  return -1;          } -        ipcp_set_state(IPCP_BOOTING); -        pthread_rwlock_unlock(&ipcpi.state_lock); -          if (ribmgr_start_policies()) { -                pthread_rwlock_wrlock(&ipcpi.state_lock); -                ipcp_set_state(IPCP_INIT); +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager.");                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to start policies.");                  return -1;          } -        pthread_rwlock_wrlock(&ipcpi.state_lock); +        if (fmgr_init()) { +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("Failed to start flow manager."); +                return -1; +        } + +        if (frct_init()) { +                if (fmgr_fini()) +                        LOG_WARN("Failed to finalize flow manager."); +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +                pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("Failed to initialize FRCT."); +                return -1; +        }          ipcp_set_state(IPCP_OPERATIONAL); +          ipcpi.data->dif_name = conf->dif_name;          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -169,10 +224,56 @@ static struct ipcp_ops normal_ops = {          .ipcp_flow_dealloc    = fmgr_np1_dealloc  }; +static void * flow_acceptor(void * o) +{ +        int       fd; +        char *    ae_name; +        qosspec_t qs; + +        (void) o; + +        while (true) { +                pthread_rwlock_rdlock(&ipcpi.state_lock); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        LOG_INFO("Shutting down flow acceptor."); +                        return 0; +                } + +                pthread_rwlock_unlock(&ipcpi.state_lock); + +                fd = flow_accept(&ae_name, &qs); +                if (fd < 0) { +                        LOG_WARN("Flow accept failed."); +                        continue; +                } + +                LOG_DBG("New flow allocation request for AE %s.", ae_name); + +                if (strcmp(ae_name, MGMT_AE) == 0) { +                        ribmgr_add_nm1_flow(fd); +                } else if (strcmp(ae_name, DT_AE) == 0) { +                        fmgr_nm1_add_flow(fd); +                } else { +                        LOG_DBG("Flow allocation request for unknown AE %s.", +                                ae_name); +                        if (flow_alloc_resp(fd, -1)) +                                LOG_WARN("Failed to reply to flow allocation."); +                        flow_dealloc(fd); +                } + +                free(ae_name); +        } + +        return (void *) 0; +} +  int main(int argc, char * argv[])  {          struct sigaction sig_act;          sigset_t sigset; +        pthread_t acceptor;          if (ap_init(argv[0])) {                  LOG_ERR("Failed to init AP"); @@ -215,16 +316,8 @@ int main(int argc, char * argv[])          pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); -        if (ribmgr_init()) { -                fmgr_fini(); -                ipcp_fini(); -                close_logfile(); -                exit(EXIT_FAILURE); -        } -          if (ipcp_create_r(getpid())) {                  LOG_ERR("Failed to notify IRMd we are initialized."); -                fmgr_fini();                  ipcp_fini();                  close_logfile();                  exit(EXIT_FAILURE); @@ -232,22 +325,24 @@ int main(int argc, char * argv[])          ipcp_wait_state(IPCP_OPERATIONAL, NULL); -        if (fmgr_init()) { -                ipcp_fini(); -                close_logfile(); -                exit(EXIT_FAILURE); +        if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) { +                LOG_ERR("Failed to create acceptor thread."); +                ipcp_set_state(IPCP_SHUTDOWN);          }          ipcp_fini(); -        if (fmgr_fini()) -                LOG_ERR("Failed to finalize flow manager."); +        if (ipcp_get_state() == IPCP_SHUTDOWN) { +                pthread_cancel(acceptor); +                pthread_join(acceptor, NULL); -        if (ribmgr_fini()) -                LOG_ERR("Failed to finalize RIB manager."); - -        if (frct_fini()) -                LOG_ERR("Failed to finalize FRCT."); +                if (frct_fini()) +                        LOG_WARN("Failed to finalize FRCT."); +                if (fmgr_fini()) +                        LOG_WARN("Failed to finalize flow manager."); +                if (ribmgr_fini()) +                        LOG_WARN("Failed to finalize RIB manager."); +        }          close_logfile(); diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 3b4a5784..ab492e7f 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * RIB manager of the IPC Process   * @@ -29,21 +29,21 @@  #include <ouroboros/ipcp-dev.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/errno.h> - -#include <stdlib.h> -#include <pthread.h> -#include <string.h> -#include <errno.h> +#include <ouroboros/dev.h>  #include "timerwheel.h"  #include "addr_auth.h"  #include "ribmgr.h"  #include "dt_const.h" -#include "frct.h" -#include "ipcp.h"  #include "ro.h"  #include "pathname.h"  #include "dir.h" +#include "ae.h" + +#include <stdlib.h> +#include <pthread.h> +#include <string.h> +#include <errno.h>  #include "static_info.pb-c.h"  typedef StaticInfoMsg static_info_msg_t; @@ -73,7 +73,7 @@ struct rnode {           * as an index in a B-tree           */ -        /* If there are no children, this is a leaf */ +        /* If there are no children, this is a leaf. */          struct rnode * child;          struct rnode * sibling; @@ -107,6 +107,14 @@ struct ro_id {          char *           full_name;  }; +enum ribmgr_state { +        RIBMGR_NULL, +        RIBMGR_INIT, +        RIBMGR_OPERATIONAL, +        RIBMGR_SHUTDOWN +}; + +/* FIXME: Extract rib from ribmgr. */  struct {          struct rnode *      root;          pthread_mutex_t     ro_lock; @@ -130,6 +138,10 @@ struct {          struct addr_auth *  addr_auth;          enum pol_addr_auth  addr_auth_type; + +        enum ribmgr_state   state; +        pthread_cond_t      state_cond; +        pthread_mutex_t     state_lock;  } rib;  void ribmgr_ro_created(const char * name, @@ -138,15 +150,11 @@ void ribmgr_ro_created(const char * name,  {          static_info_msg_t * stat_msg; -        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_CONFIG && -            strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { +        if (strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) {                  LOG_DBG("Received static DIF information.");                  stat_msg = static_info_msg__unpack(NULL, len, data);                  if (stat_msg == NULL) { -                        ipcp_set_state(IPCP_INIT); -                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to unpack static info message.");                          return;                  } @@ -161,17 +169,8 @@ void ribmgr_ro_created(const char * name,                  rib.dtc.max_pdu_size = stat_msg->max_pdu_size;                  rib.addr_auth_type = stat_msg->addr_auth_type; -                if (frct_init()) { -                        ipcp_set_state(IPCP_INIT); -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        static_info_msg__free_unpacked(stat_msg, NULL); -                        LOG_ERR("Failed to init FRCT"); -                        return; -                } -                  static_info_msg__free_unpacked(stat_msg, NULL);          } -        pthread_rwlock_unlock(&ipcpi.state_lock);  }  /* We only have a create operation for now. */ @@ -185,7 +184,6 @@ static struct rnode * find_rnode_by_name(const char * name)  {          char * str;          char * str1; -        char * saveptr;          char * token;          struct rnode * node; @@ -195,8 +193,8 @@ static struct rnode * find_rnode_by_name(const char * name)          node = rib.root; -        for (str1 = str; ; str1 = NULL) { -                token = strtok_r(str1, PATH_DELIMITER, &saveptr); +        for (str1 = str; node != NULL; str1 = NULL) { +                token = strtok(str1, PATH_DELIMITER);                  if (token == NULL)                          break; @@ -207,18 +205,13 @@ static struct rnode * find_rnode_by_name(const char * name)                                  break;                          else                                  node = node->sibling; - -                if (node == NULL) { -                        free(str); -                        return NULL; -                }          }          free(str);          return node;  } -/* Call under RIB object lock */ +/* Call under RIB object lock. */  static int ro_msg_create(struct rnode * node,                           ro_msg_t *     msg)  { @@ -334,7 +327,7 @@ static struct rnode * ribmgr_ro_create(const char *   name,          node = rib.root; -        for (str1 = str; ; str1 = NULL) { +        for (str1 = str; node != NULL; str1 = NULL) {                  token = strtok_r(str1, PATH_DELIMITER, &saveptr);                  if (token == NULL) {                          LOG_ERR("RO already exists."); @@ -356,9 +349,6 @@ static struct rnode * ribmgr_ro_create(const char *   name,                                  sibling = true;                          }                  } - -                if (node == NULL) -                        break;          }          token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); @@ -554,9 +544,88 @@ int ribmgr_init()                  return -1;          } +        if (pthread_cond_init(&rib.state_cond, NULL)) { +                LOG_ERR("Failed to init condvar."); +                timerwheel_destroy(rib.wheel); +                bmp_destroy(rib.sids); +                pthread_rwlock_destroy(&rib.flows_lock); +                pthread_mutex_destroy(&rib.ro_lock); +                pthread_mutex_destroy(&rib.subs_lock); +                pthread_mutex_destroy(&rib.ro_ids_lock); +                free(rib.root); +                return -1; +        } + +        if (pthread_mutex_init(&rib.state_lock, NULL)) { +                LOG_ERR("Failed to init mutex."); +                pthread_cond_destroy(&rib.state_cond); +                timerwheel_destroy(rib.wheel); +                bmp_destroy(rib.sids); +                pthread_rwlock_destroy(&rib.flows_lock); +                pthread_mutex_destroy(&rib.ro_lock); +                pthread_mutex_destroy(&rib.subs_lock); +                pthread_mutex_destroy(&rib.ro_ids_lock); +                free(rib.root); +                return -1; +        } + +        rib.state = RIBMGR_INIT; +          return 0;  } +static enum ribmgr_state ribmgr_get_state(void) +{ +        enum ribmgr_state state; + +        pthread_mutex_lock(&rib.state_lock); + +        state = rib.state; + +        pthread_mutex_unlock(&rib.state_lock); + +        return state; +} + +static void ribmgr_set_state(enum ribmgr_state state) +{ +        pthread_mutex_lock(&rib.state_lock); + +        rib.state = state; + +        pthread_cond_broadcast(&rib.state_cond); + +        pthread_mutex_unlock(&rib.state_lock); +} + +static int ribmgr_wait_state(enum ribmgr_state       state, +                             const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret = 0; + +        clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        ts_add(&abstime, timeout, &abstime); + +        pthread_mutex_lock(&rib.state_lock); + +        while (rib.state != state +               && rib.state != RIBMGR_SHUTDOWN +               && rib.state != RIBMGR_NULL) { +                if (timeout == NULL) +                        ret = -pthread_cond_wait(&rib.state_cond, +                                                 &rib.state_lock); +                else +                        ret = -pthread_cond_timedwait(&rib.state_cond, +                                                      &rib.state_lock, +                                                      &abstime); +        } + +        pthread_mutex_unlock(&rib.state_lock); + +        return ret; +} +  static void rtree_destroy(struct rnode * node)  {          if (node != NULL) { @@ -574,7 +643,13 @@ int ribmgr_fini()          struct list_head * pos = NULL;          struct list_head * n = NULL; +        pthread_mutex_lock(&rib.state_lock); +        rib.state = RIBMGR_SHUTDOWN; +        pthread_cond_broadcast(&rib.state_cond); +        pthread_mutex_unlock(&rib.state_lock); +          pthread_rwlock_wrlock(&rib.flows_lock); +          list_for_each_safe(pos, n, &rib.flows) {                  struct mgmt_flow * flow =                          list_entry(pos, struct mgmt_flow, next); @@ -583,6 +658,7 @@ int ribmgr_fini()                  list_del(&flow->next);                  free(flow);          } +          pthread_rwlock_unlock(&rib.flows_lock);          ro_unsubscribe(rib.ribmgr_sid); @@ -591,8 +667,10 @@ int ribmgr_fini()                  addr_auth_destroy(rib.addr_auth);          pthread_mutex_lock(&rib.ro_lock); +          rtree_destroy(rib.root->child);          free(rib.root); +          pthread_mutex_unlock(&rib.ro_lock);          bmp_destroy(rib.sids); @@ -603,6 +681,9 @@ int ribmgr_fini()          pthread_rwlock_destroy(&rib.flows_lock);          pthread_mutex_destroy(&rib.ro_ids_lock); +        pthread_cond_destroy(&rib.state_cond); +        pthread_mutex_destroy(&rib.state_lock); +          return 0;  } @@ -827,16 +908,7 @@ static int ribmgr_cdap_start(struct cdap * instance,          if (strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("New enrollment request."); -                pthread_rwlock_wrlock(&ipcpi.state_lock); - -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        LOG_ERR("IPCP in wrong state."); -                        return -1; -                } -                  if (cdap_reply_send(instance, key, 0, NULL, 0)) { -                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send reply to enrollment request.");                          return -1;                  } @@ -847,7 +919,6 @@ static int ribmgr_cdap_start(struct cdap * instance,                  pthread_mutex_lock(&rib.ro_lock);                  if (ribmgr_enrol_sync(instance, rib.root->child)) {                          pthread_mutex_unlock(&rib.ro_lock); -                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to sync part of the RIB.");                          return -1;                  } @@ -859,18 +930,14 @@ static int ribmgr_cdap_start(struct cdap * instance,                  key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,                                          NULL, 0, 0);                  if (key < 0) { -                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send stop of enrollment.");                          return -1;                  }                  if (cdap_reply_wait(instance, key, NULL, NULL)) { -                        pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  } - -                pthread_rwlock_unlock(&ipcpi.state_lock);          } else {                  LOG_WARN("Request to start unknown operation.");                  if (cdap_reply_send(instance, key, -1, NULL, 0)) @@ -884,20 +951,18 @@ static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)  {          int ret = 0; -        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) { +        if (strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("Stop enrollment received."); - -                ipcp_set_state(IPCP_BOOTING); -        } else +                /* FIXME: don't use states to match start to stop. */ +                ribmgr_set_state(RIBMGR_OPERATIONAL); +        } else {                  ret = -1; +        }          if (cdap_reply_send(instance, key, ret, NULL, 0)) { -                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to send reply to stop request.");                  return -1;          } -        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } @@ -1068,7 +1133,7 @@ static void * cdap_req_handler(void * o)          return (void *) 0;  } -int ribmgr_add_flow(int fd) +static int ribmgr_add_flow(int fd)  {          struct cdap * instance = NULL;          struct mgmt_flow * flow; @@ -1127,6 +1192,46 @@ int ribmgr_remove_flow(int fd)          return -1;  } +/* FIXME: do this in a topologymanager instance */ +int ribmgr_add_nm1_flow(int fd) +{ +        if (flow_alloc_resp(fd, 0) < 0) { +                LOG_ERR("Could not respond to new flow."); +                return -1; +        } + +        return ribmgr_add_flow(fd); +} + +int ribmgr_nm1_mgt_flow(char * dst_name) +{ +        int fd; +        int result; + +        /* FIXME: Request retransmission. */ +        fd = flow_alloc(dst_name, MGMT_AE, NULL); +        if (fd < 0) { +                LOG_ERR("Failed to allocate flow to %s.", dst_name); +                return -1; +        } + +        result = flow_alloc_res(fd); +        if (result < 0) { +                LOG_ERR("Result of flow allocation to %s is %d.", +                        dst_name, result); +                flow_dealloc(fd); +                return -1; +        } + +        if (ribmgr_add_flow(fd)) { +                LOG_ERR("Failed to add file descriptor."); +                flow_dealloc(fd); +                return -1; +        } + +        return fd; +} +  int ribmgr_bootstrap(struct dif_config * conf)  {          static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; @@ -1134,11 +1239,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          size_t len = 0;          struct ro_attr attr; -        if (conf == NULL || conf->type != IPCP_NORMAL) { -                LOG_ERR("Bad DIF configuration."); -                return -EINVAL; -        } -          ro_attr_init(&attr);          attr.enrol_sync = true; @@ -1189,83 +1289,54 @@ int ribmgr_bootstrap(struct dif_config * conf)                  return -1;          } -        if (frct_init()) { -                LOG_ERR("Failed to initialize FRCT."); -                dir_fini(); -                ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                ribmgr_ro_delete(RIBMGR_PREFIX); -                return -1; -        } -          LOG_DBG("Bootstrapped RIB Manager.");          return 0;  } -int ribmgr_enrol(void) +int ribmgr_enrol()  {          struct cdap * instance = NULL;          struct mgmt_flow * flow;          cdap_key_t key;          int ret; - -        pthread_rwlock_wrlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_INIT) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("IPCP in wrong state."); -                return -1; -        } - -        ipcp_set_state(IPCP_CONFIG); +        struct timespec timeout = {(ENROLL_TIMEOUT / 1000), +                                   (ENROLL_TIMEOUT % 1000) * MILLION};          pthread_rwlock_wrlock(&rib.flows_lock); -        if (list_empty(&rib.flows)) { -                ipcp_set_state(IPCP_INIT); -                pthread_rwlock_unlock(&rib.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("No flows in RIB."); -                return -1; -        } + +        assert(!list_empty(&rib.flows));          flow = list_first_entry((&rib.flows), struct mgmt_flow, next);          instance = flow->instance;          key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0);          if (key < 0) { -                ipcp_set_state(IPCP_INIT);                  pthread_rwlock_unlock(&rib.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to start enrollment.");                  return -1;          }          ret = cdap_reply_wait(instance, key, NULL, NULL);          if (ret) { -                ipcp_set_state(IPCP_INIT);                  pthread_rwlock_unlock(&rib.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to enroll: %d.", ret);                  return -1;          }          pthread_rwlock_unlock(&rib.flows_lock); -        pthread_rwlock_unlock(&ipcpi.state_lock); + +        if (ribmgr_wait_state(RIBMGR_OPERATIONAL, &timeout) == -ETIMEDOUT) +                LOG_ERR("Enrollment of RIB timed out."); + +        if (ribmgr_get_state() != RIBMGR_OPERATIONAL) +                return -1;          return 0;  }  int ribmgr_start_policies(void)  { -        pthread_rwlock_rdlock(&ipcpi.state_lock); - -        if (ipcp_get_state() != IPCP_BOOTING) { -                pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Cannot start policies in wrong state."); -                return -1; -        } -        pthread_rwlock_unlock(&ipcpi.state_lock); -          rib.addr_auth = addr_auth_create(rib.addr_auth_type);          if (rib.addr_auth == NULL) {                  LOG_ERR("Failed to create address authority."); diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index 954b7a3c..ddc98e6e 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * RIB manager of the IPC Process   * @@ -31,9 +31,9 @@ int               ribmgr_init(void);  int               ribmgr_fini(void); -int               ribmgr_add_flow(int fd); +int               ribmgr_add_nm1_flow(int fd); -int               ribmgr_remove_flow(int fd); +int               ribmgr_nm1_mgt_flow(char * dst_name);  int               ribmgr_bootstrap(struct dif_config * conf); diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h index 43e432c3..6fda2adf 100644 --- a/src/ipcpd/normal/ro.h +++ b/src/ipcpd/normal/ro.h @@ -1,5 +1,5 @@  /* - * Ouroboros - Copyright (C) 2016 + * Ouroboros - Copyright (C) 2016 - 2017   *   * RIB objects   * @@ -22,6 +22,10 @@  #ifndef OUROBOROS_IPCPD_NORMAL_RO_H  #define OUROBOROS_IPCPD_NORMAL_RO_H +#include <stdbool.h> +#include <time.h> +#include <stdint.h> +  enum ro_recv_set {          NO_SYNC = 0,          NEIGHBORS, | 
