diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-08 19:42:51 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-11 16:15:33 +0200 | 
| commit | 69ef99bb2dc05337e8189acc42dc9122f4182ead (patch) | |
| tree | 2808e55f80991cb9b59266c15726c982f3a4ca50 /src/ipcpd/normal | |
| parent | dbf3ab8dfb2cbe8167c464e3cf6a9aa757bfff6a (diff) | |
| download | ouroboros-69ef99bb2dc05337e8189acc42dc9122f4182ead.tar.gz ouroboros-69ef99bb2dc05337e8189acc42dc9122f4182ead.zip | |
ipcpd: normal: First version of the fast path bootstrap
This is the first version of the fast path bootstrap in the normal
IPCP. It sets up a connection with the other end, and creates the
appropriate data structures. N+1 and N-1 SDUs are read and written and
passed through the right components.
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/config.h | 28 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 203 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.h | 8 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.c | 300 | ||||
| -rw-r--r-- | src/ipcpd/normal/frct.h | 41 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 26 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.h | 14 | ||||
| -rw-r--r-- | src/ipcpd/normal/rmt.c | 163 | ||||
| -rw-r--r-- | src/ipcpd/normal/rmt.h | 18 | ||||
| -rw-r--r-- | src/ipcpd/normal/shm_pci.c | 83 | ||||
| -rw-r--r-- | src/ipcpd/normal/shm_pci.h | 22 | 
11 files changed, 697 insertions, 209 deletions
| diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h new file mode 100644 index 00000000..0febf3fd --- /dev/null +++ b/src/ipcpd/normal/config.h @@ -0,0 +1,28 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Normal IPCP configuration constants + * + *    Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCP_CONFIG_H +#define OUROBOROS_IPCP_CONFIG_H + +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ + +#endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index b6ec1984..25898661 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,6 +27,8 @@  #include <ouroboros/dev.h>  #include <ouroboros/list.h>  #include <ouroboros/ipcp-dev.h> +#include <ouroboros/select.h> +#include <ouroboros/errno.h>  #include <stdlib.h>  #include <stdbool.h> @@ -37,33 +39,41 @@  #include "ribmgr.h"  #include "frct.h"  #include "ipcp.h" +#include "rmt.h" +#include "shm_pci.h" +#include "config.h"  #include "flow_alloc.pb-c.h"  typedef FlowAllocMsg flow_alloc_msg_t;  struct n_flow { -        int fd; -        struct frct_i * frct_i; +        int           fd; +        cep_id_t      cep_id;          enum qos_cube qos;          struct list_head next;  };  struct n_1_flow { -        int fd; -        char * ae_name; +        int              fd; +        char *           ae_name;          struct list_head next;  };  struct { -        pthread_t listen_thread; +        pthread_t n_1_flow_acceptor; +        /* FIXME: Make this a table */          struct list_head n_1_flows;          pthread_mutex_t n_1_flows_lock; +        /* FIXME: Make this a table */          struct list_head n_flows;          /* FIXME: Make this a read/write lock */          pthread_mutex_t n_flows_lock; + +        struct flow_set * set; +        pthread_t n_reader;  } fmgr;  static int add_n_1_fd(int fd, char * ae_name) @@ -89,9 +99,37 @@ static int add_n_1_fd(int fd, char * ae_name)          return 0;  } -static void * fmgr_listen(void * o) +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_fd(int fd)  { -        int fd; +        struct list_head * pos = NULL; + +        list_for_each(pos, &fmgr.n_flows) { +                struct n_flow * e = list_entry(pos, struct n_flow, next); +                if (e->fd == fd) +                        return e; +        } + +        return NULL; +} + +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) +{ +        struct list_head * pos = NULL; + +        list_for_each(pos, &fmgr.n_flows) { +                struct n_flow * e = list_entry(pos, struct n_flow, next); +                if (e->cep_id == cep_id) +                        return e; +        } + +        return NULL; +} + +static void * fmgr_n_1_acceptor(void * o) +{ +        int    fd;          char * ae_name;          while (true) { @@ -139,7 +177,7 @@ static void * fmgr_listen(void * o)                  if (strcmp(ae_name, DT_AE) == 0) {                          /* FIXME: Pass correct QoS cube */ -                        if (frct_dt_flow(fd, 0)) { +                        if (rmt_dt_flow(fd, 0)) {                                  LOG_ERR("Failed to hand fd to FRCT.");                                  flow_dealloc(fd);                                  continue; @@ -156,6 +194,49 @@ static void * fmgr_listen(void * o)          return (void *) 0;  } +static void * fmgr_n_reader(void * o) +{ +        struct shm_du_buff * sdb; +        struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; +        struct n_flow * flow; + +        while (true) { +                int fd = flow_select(fmgr.set, &timeout); +                if (fd == -ETIMEDOUT) +                        continue; + +                if (fd < 0) { +                        LOG_ERR("Failed to get active fd."); +                        continue; +                } + +                if (ipcp_flow_read(fd, &sdb)) { +                        LOG_ERR("Failed to read SDU from fd %d.", fd); +                        continue; +                } + +                pthread_mutex_lock(&fmgr.n_flows_lock); +                flow = get_n_flow_by_fd(fd); +                if (flow == NULL) { +                        pthread_mutex_unlock(&fmgr.n_flows_lock); +                        ipcp_flow_del(sdb); +                        LOG_ERR("Failed to retrieve flow."); +                        continue; +                } + +                if (frct_i_write_sdu(flow->cep_id, sdb)) { +                        pthread_mutex_unlock(&fmgr.n_flows_lock); +                        ipcp_flow_del(sdb); +                        LOG_ERR("Failed to hand SDU to FRCT."); +                        continue; +                } + +                pthread_mutex_unlock(&fmgr.n_flows_lock); +        } + +        return (void *) 0; +} +  int fmgr_init()  {          INIT_LIST_HEAD(&fmgr.n_1_flows); @@ -164,7 +245,12 @@ int fmgr_init()          pthread_mutex_init(&fmgr.n_1_flows_lock, NULL);          pthread_mutex_init(&fmgr.n_flows_lock, NULL); -        pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); +        fmgr.set = flow_set_create(); +        if (fmgr.set == NULL) +                return -1; + +        pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); +        pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL);          return 0;  } @@ -173,9 +259,11 @@ int fmgr_fini()  {          struct list_head * pos = NULL; -        pthread_cancel(fmgr.listen_thread); +        pthread_cancel(fmgr.n_1_flow_acceptor); +        pthread_cancel(fmgr.n_reader); -        pthread_join(fmgr.listen_thread, NULL); +        pthread_join(fmgr.n_1_flow_acceptor, NULL); +        pthread_join(fmgr.n_reader, NULL);          list_for_each(pos, &fmgr.n_1_flows) {                  struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); @@ -188,6 +276,8 @@ int fmgr_fini()          pthread_mutex_destroy(&fmgr.n_1_flows_lock);          pthread_mutex_destroy(&fmgr.n_flows_lock); +        flow_set_destroy(fmgr.set); +          return 0;  } @@ -259,7 +349,7 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos)                  return -1;          } -        if (frct_dt_flow(fd, qos)) { +        if (rmt_dt_flow(fd, qos)) {                  LOG_ERR("Failed to hand file descriptor to FRCT");                  flow_dealloc(fd);                  free(ae_name); @@ -276,41 +366,13 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos)          return 0;  } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) -{ -        struct list_head * pos = NULL; - -        list_for_each(pos, &fmgr.n_flows) { -                struct n_flow * e = list_entry(pos, struct n_flow, next); -                if (e->fd == fd) -                        return e; -        } - -        return NULL; -} - -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) -{ -        struct list_head * pos = NULL; - -        list_for_each(pos, &fmgr.n_flows) { -                struct n_flow * e = list_entry(pos, struct n_flow, next); -                if (e->frct_i == frct_i) -                        return e; -        } - -        return NULL; -} -  int fmgr_flow_alloc(int           fd,                      char *        dst_ap_name,                      char *        src_ae_name,                      enum qos_cube qos)  {          struct n_flow * flow; -        struct frct_i * frct_i; +        cep_id_t cep_id;          uint32_t address = 0;          buffer_t buf;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; @@ -343,8 +405,8 @@ int fmgr_flow_alloc(int           fd,          pthread_mutex_lock(&fmgr.n_flows_lock); -        frct_i = frct_i_create(address, &buf, qos); -        if (frct_i == NULL) { +        cep_id = frct_i_create(address, &buf, qos); +        if (cep_id == INVALID_CEP_ID) {                  free(buf.data);                  free(flow);                  pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -354,7 +416,7 @@ int fmgr_flow_alloc(int           fd,          free(buf.data);          flow->fd     = fd; -        flow->frct_i = frct_i; +        flow->cep_id = cep_id;          flow->qos    = qos;          INIT_LIST_HEAD(&flow->next); @@ -374,6 +436,8 @@ static int n_flow_dealloc(int fd)          buffer_t buf;          int ret; +        flow_set_del(fmgr.set, fd); +          flow = get_n_flow_by_fd(fd);          if (flow == NULL)                  return -1; @@ -390,7 +454,7 @@ static int n_flow_dealloc(int fd)          flow_alloc_msg__pack(&msg, buf.data); -        ret = frct_i_destroy(flow->frct_i, &buf); +        ret = frct_i_destroy(flow->cep_id, &buf);          list_del(&flow->next);          free(flow); @@ -432,13 +496,16 @@ int fmgr_flow_alloc_resp(int fd, int response)          flow_alloc_msg__pack(&msg, buf.data);          if (response < 0) { -                frct_i_destroy(flow->frct_i, &buf); +                frct_i_destroy(flow->cep_id, &buf);                  free(buf.data);                  list_del(&flow->next);                  free(flow); -        } else if (frct_i_accept(flow->frct_i, &buf)) { -                pthread_mutex_unlock(&fmgr.n_flows_lock); -                return -1; +        } else { +                if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { +                        pthread_mutex_unlock(&fmgr.n_flows_lock); +                        return -1; +                } +                flow_set_add(fmgr.set, fd);          }          pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -457,7 +524,8 @@ int fmgr_flow_dealloc(int fd)          return ret;  } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) +int fmgr_frct_post_buf(cep_id_t   cep_id, +                       buffer_t * buf)  {          struct n_flow * flow;          int ret = 0; @@ -484,7 +552,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)                          return -1;                  } -                flow->frct_i = frct_i; +                flow->cep_id = cep_id;                  flow->qos = msg->qos_cube;                  fd = ipcp_flow_req_arr(getpid(), @@ -505,7 +573,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)                  list_add(&flow->next, &fmgr.n_flows);                  break;          case FLOW_ALLOC_CODE__FLOW_REPLY: -                flow = get_n_flow_by_frct_i(frct_i); +                flow = get_n_flow_by_cep_id(cep_id);                  if (flow == NULL) {                          pthread_mutex_unlock(&fmgr.n_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); @@ -517,11 +585,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)                  if (msg->response < 0) {                          list_del(&flow->next);                          free(flow); +                } else { +                        flow_set_add(fmgr.set, flow->fd);                  }                  break;          case FLOW_ALLOC_CODE__FLOW_DEALLOC: -                flow = get_n_flow_by_frct_i(frct_i); +                flow = get_n_flow_by_cep_id(cep_id);                  if (flow == NULL) {                          pthread_mutex_unlock(&fmgr.n_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL); @@ -529,6 +599,8 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)                          return -1;                  } +                flow_set_del(fmgr.set, flow->fd); +                  ret = flow_dealloc(flow->fd);                  break;          default: @@ -543,3 +615,28 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)          return ret;  } + +int fmgr_frct_post_sdu(cep_id_t             cep_id, +                       struct shm_du_buff * sdb) +{ +        struct n_flow * flow; + +        pthread_mutex_lock(&fmgr.n_flows_lock); + +        flow = get_n_flow_by_cep_id(cep_id); +        if (flow == NULL) { +                pthread_mutex_unlock(&fmgr.n_flows_lock); +                LOG_ERR("Failed to find N flow."); +                return -1; +        } + +        if (ipcp_flow_write(flow->fd, sdb)) { +                pthread_mutex_unlock(&fmgr.n_flows_lock); +                LOG_ERR("Failed to hand SDU to N flow."); +                return -1; +        } + +        pthread_mutex_unlock(&fmgr.n_flows_lock); + +        return 0; +} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 7e3ef5f4..0f2cd045 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -35,7 +35,6 @@  #define DT_AE "Data transfer"  int fmgr_init(); -  int fmgr_fini();  /* N-flow ops */ @@ -56,8 +55,11 @@ int fmgr_flow_alloc_resp(int fd,  int fmgr_flow_dealloc(int fd);  /* N+1-flow ops, remote */ -int fmgr_flow_alloc_msg(struct frct_i * frct_i, -                        buffer_t *      buf); +int fmgr_frct_post_buf(cep_id_t   id, +                       buffer_t * buf); +/* SDU for N+1-flow */ +int fmgr_frct_post_sdu(cep_id_t             id, +                       struct shm_du_buff * sdb);  #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 417815b7..abbde779 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -22,8 +22,6 @@  #define OUROBOROS_PREFIX "flow-rtx-control" -#define IDS_SIZE 2048 -  #include <ouroboros/config.h>  #include <ouroboros/logs.h>  #include <ouroboros/bitmap.h> @@ -34,7 +32,8 @@  #include <pthread.h>  #include "frct.h" - +#include "rmt.h" +#include "fmgr.h"  enum conn_state {          CONN_PENDING = 0, @@ -45,29 +44,29 @@ struct frct_i {          uint32_t cep_id;          uint32_t r_address;          uint32_t r_cep_id; +        enum qos_cube cube; +        uint64_t seqno;          enum conn_state state; -        struct list_head next;  }; -struct frct { -        struct dt_const * dtc; +struct {          uint32_t address; -        struct list_head instances;          pthread_mutex_t instances_lock; +        struct frct_i ** instances;          struct bmp * cep_ids;          pthread_mutex_t cep_ids_lock; -} * frct = NULL; +} frct;  static int next_cep_id()  {          int ret; -        pthread_mutex_lock(&frct->cep_ids_lock); -        ret = bmp_allocate(frct->cep_ids); -        pthread_mutex_unlock(&frct->cep_ids_lock); +        pthread_mutex_lock(&frct.cep_ids_lock); +        ret = bmp_allocate(frct.cep_ids); +        pthread_mutex_unlock(&frct.cep_ids_lock);          return ret;  } @@ -76,40 +75,34 @@ static int release_cep_id(int id)  {          int ret; -        pthread_mutex_lock(&frct->cep_ids_lock); -        ret = bmp_release(frct->cep_ids, id); -        pthread_mutex_unlock(&frct->cep_ids_lock); +        pthread_mutex_lock(&frct.cep_ids_lock); +        ret = bmp_release(frct.cep_ids, id); +        pthread_mutex_unlock(&frct.cep_ids_lock);          return ret;  } -int frct_init(struct dt_const * dtc, uint32_t address) +int frct_init(uint32_t address)  { -        if (dtc == NULL) -                return -1; +        int i; +        frct.address = address; -        frct = malloc(sizeof(*frct)); -        if (frct == NULL) +        if (pthread_mutex_init(&frct.cep_ids_lock, NULL))                  return -1; -        frct->dtc = dtc; -        frct->address = address; - -        INIT_LIST_HEAD(&frct->instances); - -        if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) { -                free(frct); +        if (pthread_mutex_init(&frct.instances_lock, NULL))                  return -1; -        } -        if (pthread_mutex_init(&frct->instances_lock, NULL)) { -                free(frct); +        frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); +        if (frct.instances == NULL)                  return -1; -        } -        frct->cep_ids = bmp_create(IDS_SIZE, 0); -        if (frct->cep_ids == NULL) { -                free(frct); +        for (i = 0; i < IRMD_MAX_FLOWS; i++) +                frct.instances[i] = NULL; + +        frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1)); +        if (frct.cep_ids == NULL) { +                free(frct.instances);                  return -1;          } @@ -118,117 +111,246 @@ int frct_init(struct dt_const * dtc, uint32_t address)  int frct_fini()  { -        pthread_mutex_lock(&frct->cep_ids_lock); -        bmp_destroy(frct->cep_ids); -        pthread_mutex_unlock(&frct->cep_ids_lock); -        free(frct); +        pthread_mutex_lock(&frct.cep_ids_lock); +        bmp_destroy(frct.cep_ids); +        pthread_mutex_unlock(&frct.cep_ids_lock); + +        free(frct.instances);          return 0;  } -struct dt_const * frct_dt_const() +static struct frct_i * create_frct_i(uint32_t address, +                                     cep_id_t r_cep_id)  { -        if (frct == NULL) +        struct frct_i * instance; +        cep_id_t        id; + +        instance = malloc(sizeof(*instance)); +        if (instance == NULL)                  return NULL; -        return frct->dtc; -} +        id = next_cep_id(); +        instance->r_address = address; +        instance->cep_id = id; +        instance->r_cep_id = r_cep_id; +        instance->state = CONN_PENDING; +        instance->seqno = 0; -int frct_dt_flow(int fd, -                 enum qos_cube qos) -{ -        LOG_MISSING; +        frct.instances[id] = instance; -        return -1; +        return instance;  } -int frct_rmt_post() +int frct_rmt_post_sdu(struct pci * pci, +                      struct shm_du_buff * sdb)  { -        LOG_MISSING; +        struct frct_i * instance; +        buffer_t buf; +        cep_id_t id; -        return -1; +        if (pci == NULL || sdb == NULL) +                return -1; + +        if (pci->dst_cep_id == INVALID_CEP_ID) { +                pthread_mutex_lock(&frct.instances_lock); +                instance = create_frct_i(pci->src_addr, +                                         pci->src_cep_id); +                if (instance == NULL) { +                        pthread_mutex_unlock(&frct.instances_lock); +                        return -1; +                } +                id = instance->cep_id; +                instance->r_cep_id = pci->src_cep_id; +                pthread_mutex_unlock(&frct.instances_lock); + +                buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +                buf.data = shm_du_buff_head(sdb); + +                if (fmgr_frct_post_buf(id, &buf)) { +                        LOG_ERR("Failed to hand buffer to FMGR."); +                        free(pci); +                        return -1; +                } +        } else { +                /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ +                if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { +                        LOG_ERR("Failed to hand SDU to FMGR."); +                        free(pci); +                        return -1; +                } +        } + +        free(pci); + +        return 0;  }  /* Call under instances lock */  static void destroy_frct_i(struct frct_i * instance)  {          release_cep_id(instance->cep_id); -        list_del(&instance->next); +        frct.instances[instance->cep_id] = NULL;          free(instance);  } -struct frct_i * frct_i_create(uint32_t      address, -                              buffer_t *    buf, -                              enum qos_cube cube) +cep_id_t frct_i_create(uint32_t      address, +                       buffer_t *    buf, +                       enum qos_cube cube)  {          struct frct_i * instance; +        struct pci pci; +        cep_id_t id; -        if (buf == NULL || -            buf->data == NULL) -                return NULL; +        if (buf == NULL || buf->data == NULL) +                return INVALID_CEP_ID; -        instance = malloc(sizeof(*instance)); -        if (instance == NULL) -                return NULL; - -        pthread_mutex_lock(&frct->instances_lock); - -        instance->r_address = address; -        instance->cep_id = next_cep_id(); -        instance->state = CONN_PENDING; +        pthread_mutex_lock(&frct.instances_lock); +        instance = create_frct_i(address, INVALID_CEP_ID); +        if (instance == NULL) { +                pthread_mutex_unlock(&frct.instances_lock); +                return INVALID_CEP_ID; +        } +        id = instance->cep_id; +        instance->cube = cube; +        pthread_mutex_unlock(&frct.instances_lock); + +        pci.pdu_type = PDU_TYPE_MGMT; +        pci.dst_addr = address; +        pci.src_addr = frct.address; +        pci.dst_cep_id = 0; +        pci.src_cep_id = id; +        pci.seqno = 0; +        pci.qos_id = cube; + +        if (rmt_frct_write_buf(&pci, buf)) { +                free(instance); +                LOG_ERR("Failed to hand PDU to RMT."); +                return INVALID_CEP_ID; +        } -        INIT_LIST_HEAD(&instance->next); -        list_add(&instance->next, &frct->instances); +        return id; +} -        pthread_mutex_unlock(&frct->instances_lock); +int frct_i_accept(cep_id_t       id, +                  buffer_t *     buf, +                  enum qos_cube  cube) +{ +        struct pci pci; +        struct frct_i * instance; -        /* FIXME: Pack into FRCT header and hand SDU to RMT */ +        if (buf == NULL || buf->data == NULL) +                return -1; -        return instance; -} +        pthread_mutex_lock(&frct.instances_lock); -int frct_i_accept(struct frct_i * instance, -                  buffer_t *      buf) -{ -        if (instance == NULL || buf == NULL || buf->data == NULL) +        instance = frct.instances[id]; +        if (instance == NULL) { +                pthread_mutex_unlock(&frct.instances_lock); +                LOG_ERR("Invalid instance.");                  return -1; +        } -        pthread_mutex_lock(&frct->instances_lock);          if (instance->state != CONN_PENDING) { -                pthread_mutex_unlock(&frct->instances_lock); +                pthread_mutex_unlock(&frct.instances_lock);                  return -1;          }          instance->state = CONN_ESTABLISHED; -        instance->cep_id = next_cep_id(); +        instance->cube = cube; +        instance->seqno = 0; -        pthread_mutex_unlock(&frct->instances_lock); +        pci.pdu_type = PDU_TYPE_MGMT; +        pci.dst_addr = instance->r_address; +        pci.src_addr = frct.address; +        pci.dst_cep_id = instance->r_cep_id; +        pci.src_cep_id = instance->cep_id; +        pci.seqno = 0; +        pci.qos_id = cube; -        /* FIXME: Pack into FRCT header and hand SDU to RMT */ +        pthread_mutex_unlock(&frct.instances_lock); + +        if (rmt_frct_write_buf(&pci, buf)) +                return -1;          return 0;  } -int frct_i_destroy(struct frct_i * instance, -                   buffer_t *      buf) +int frct_i_destroy(cep_id_t   id, +                   buffer_t * buf)  { -        if (instance == NULL) -                return -1; +        struct pci pci; +        struct frct_i * instance; + +        pthread_mutex_lock(&frct.instances_lock); -        pthread_mutex_lock(&frct->instances_lock); +        instance = frct.instances[id]; +        if (instance == NULL) { +                pthread_mutex_unlock(&frct.instances_lock); +                LOG_ERR("Invalid instance."); +                return -1; +        }          if (!(instance->state == CONN_PENDING ||                instance->state == CONN_ESTABLISHED)) { -                pthread_mutex_unlock(&frct->instances_lock); +                pthread_mutex_unlock(&frct.instances_lock);                  return -1;          } +        pci.pdu_type = PDU_TYPE_MGMT; +        pci.dst_addr = instance->r_address; +        pci.src_addr = frct.address; +        pci.dst_cep_id = instance->r_cep_id; +        pci.src_cep_id = instance->cep_id; +        pci.seqno = 0; +        pci.qos_id = instance->cube; +          destroy_frct_i(instance); -        pthread_mutex_unlock(&frct->instances_lock); +        pthread_mutex_unlock(&frct.instances_lock); + +        if (buf != NULL && buf->data != NULL) +                if (rmt_frct_write_buf(&pci, buf)) +                        return -1; + +        return 0; +} -        if (buf != NULL && buf->data != NULL) { +int frct_i_write_sdu(cep_id_t             id, +                     struct shm_du_buff * sdb) +{ +        struct pci pci; +        struct frct_i * instance; + +        if (sdb == NULL) +                return -1; -                /* FIXME: Pack into FRCT header and hand SDU to RMT */ +        pthread_mutex_lock(&frct.instances_lock); + +        instance = frct.instances[id]; +        if (instance == NULL) { +                pthread_mutex_unlock(&frct.instances_lock); +                LOG_ERR("Invalid instance."); +                return -1; +        } + +        if (instance->state != CONN_ESTABLISHED) { +                pthread_mutex_unlock(&frct.instances_lock); +                LOG_ERR("Connection is not established."); +                return -1; +        } + +        pci.pdu_type = PDU_TYPE_DTP; +        pci.dst_addr = instance->r_address; +        pci.src_addr = frct.address; +        pci.dst_cep_id = instance->r_cep_id; +        pci.src_cep_id = instance->cep_id; +        pci.seqno = (instance->seqno)++; +        pci.qos_id = instance->cube; + +        if (rmt_frct_write_sdu(&pci, sdb)) { +                pthread_mutex_unlock(&frct.instances_lock); +                LOG_ERR("Failed to hand SDU to RMT."); +                return -1;          }          return 0; diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 0ee87004..2b86f5bd 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -26,34 +26,29 @@  #include <ouroboros/shared.h>  #include <ouroboros/utils.h> -#include "dt_const.h" +#include "shm_pci.h"  struct frct_i; -int               frct_init(struct dt_const * dtc, -                            uint32_t address); -int               frct_fini(); +int         frct_init(uint32_t address); +int         frct_fini(); -struct dt_const * frct_dt_const(); +/* Called by RMT upon receipt of a PDU for us */ +int         frct_rmt_post_sdu(struct pci *         pci, +                              struct shm_du_buff * sdb); -int               frct_dt_flow(int fd, -                               enum qos_cube qos); +cep_id_t    frct_i_create(uint32_t      address, +                          buffer_t *    buf, +                          enum qos_cube cube); -/* - * FIXME: Will take the index in the DU map, - * possibly cep-ids and address - */ -int                frct_rmt_post(); - -struct frct_i *    frct_i_create(uint32_t      address, -                                 buffer_t *    buf, -                                 enum qos_cube cube); -/* FIXME: Hand QoS cube here too? We received it in the flow alloc message. */ -int                frct_i_accept(struct frct_i * instance, -                                 buffer_t *      buf); -int                frct_i_destroy(struct frct_i * instance, -                                  buffer_t *      buf); - -/* FIXME: Add read/write ops for frct instances */ +int         frct_i_accept(cep_id_t      id, +                          buffer_t *    buf, +                          enum qos_cube cube); + +int         frct_i_destroy(cep_id_t   id, +                           buffer_t * buf); + +int         frct_i_write_sdu(cep_id_t             id, +                             struct shm_du_buff * sdb);  #endif diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 99d156f5..dd17f9bd 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -39,6 +39,7 @@  #include "frct.h"  #include "ipcp.h"  #include "cdap_request.h" +#include "rmt.h"  #include "static_info.pb-c.h"  typedef StaticInfoMsg static_info_msg_t; @@ -241,7 +242,7 @@ int ribmgr_cdap_write(struct cdap * instance,                  rib.address = msg->address; -                if (frct_init(&rib.dtc, rib.address)) { +                if (frct_init(rib.address)) {                          ipcp_set_state(IPCP_INIT);                          pthread_rwlock_unlock(&ipcpi.state_lock);                          cdap_send_reply(instance, invoke_id, -1, NULL, 0); @@ -250,6 +251,16 @@ int ribmgr_cdap_write(struct cdap * instance,                          return -1;                  } +                if (rmt_init(rib.address)) { +                        ipcp_set_state(IPCP_INIT); +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        frct_fini(); +                        cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                        static_info_msg__free_unpacked(msg, NULL); +                        LOG_ERR("Failed to init RMT"); +                        return -1; +                } +                  static_info_msg__free_unpacked(msg, NULL);          } else {                  ret = -1; @@ -529,12 +540,23 @@ int ribmgr_bootstrap(struct dif_config * conf)          /* FIXME: Set correct address. */          rib.address = 0; -        if (frct_init(&rib.dtc, rib.address)) { +        if (frct_init(rib.address)) {                  LOG_ERR("Failed to initialize FRCT.");                  return -1;          } +        if (rmt_init(rib.address)) { +                LOG_ERR("Failed to initialize RMT."); +                frct_fini(); +                return -1; +        } +          LOG_DBG("Bootstrapped RIB Manager.");          return 0;  } + +struct dt_const * ribmgr_dt_const() +{ +        return &(rib.dtc); +} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index e85c65be..f776f7eb 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -25,12 +25,16 @@  #include <ouroboros/irm_config.h> -int ribmgr_init(); -int ribmgr_fini(); +#include "dt_const.h" -int ribmgr_add_flow(int fd); -int ribmgr_remove_flow(int fd); +int               ribmgr_init(); +int               ribmgr_fini(); -int ribmgr_bootstrap(struct dif_config * conf); +int               ribmgr_add_flow(int fd); +int               ribmgr_remove_flow(int fd); + +int               ribmgr_bootstrap(struct dif_config * conf); + +struct dt_const * ribmgr_dt_const();  #endif diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c index ee92c3e3..fa4c7edd 100644 --- a/src/ipcpd/normal/rmt.c +++ b/src/ipcpd/normal/rmt.c @@ -24,29 +24,172 @@  #include <ouroboros/config.h>  #include <ouroboros/logs.h> +#include <ouroboros/select.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> + +#include <stdlib.h>  #include "rmt.h" +#include "config.h" +#include "frct.h" + +struct { +        pthread_t         sdu_reader; +        struct flow_set * set; +        uint32_t          address; -struct rmt { -}; +        /* +         * FIXME: Normally the PFF is held here, +         * for now we keep 1 fd to forward a PDU on +         */ +        int fd; +} rmt; -int rmt_init(struct dt_const * dtc) +int rmt_init(uint32_t address)  { -        LOG_MISSING; +        rmt.set = flow_set_create(); +        if (rmt.set == NULL) +                return -1; -        return -1; +        rmt.address = address; + +        return 0;  }  int rmt_fini()  { -        LOG_MISSING; +        flow_set_destroy(rmt.set); + +        return 0; +} + +void * rmt_sdu_reader(void * o) +{ +        struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; +        struct shm_du_buff * sdb; +        struct pci * pci; + +        while (true) { +                int fd = flow_select(rmt.set, &timeout); +                if (fd == -ETIMEDOUT) +                        continue; + +                if (fd < 0) { +                        LOG_ERR("Failed to get active fd."); +                        continue; +                } + +                if (ipcp_flow_read(fd, &sdb)) { +                        LOG_ERR("Failed to read SDU from fd %d.", fd); +                        continue; +                } + +                pci = shm_pci_des(sdb); +                if (pci == NULL) { +                        LOG_ERR("Failed to get PCI."); +                        ipcp_flow_del(sdb); +                        continue; +                } + +                if (pci->dst_addr != rmt.address) { +                        LOG_DBG("PDU needs to be forwarded."); + +                        if (pci->ttl == 0) { +                                LOG_DBG("TTL was zero."); +                                ipcp_flow_del(sdb); +                                free(pci); +                                continue; +                        } + +                        if (shm_pci_dec_ttl(sdb)) { +                                LOG_ERR("Failed to decrease TTL."); +                                ipcp_flow_del(sdb); +                                free(pci); +                                continue; +                        } +                        /* +                         * FIXME: Dropping for now, since +                         * we don't have a PFF yet +                         */ +                        ipcp_flow_del(sdb); +                        free(pci); +                        continue; +                } + +                if (shm_pci_shrink(sdb)) { +                        LOG_ERR("Failed to shrink PDU."); +                        ipcp_flow_del(sdb); +                        free(pci); +                        continue; +                } + +                if (frct_rmt_post_sdu(pci, sdb)) { +                        LOG_ERR("Failed to hand PDU to FRCT."); +                        ipcp_flow_del(sdb); +                        free(pci); +                        continue; +                } +        } + +        return (void *) 0; +} + +int rmt_dt_flow(int           fd, +                enum qos_cube qos) +{ +        struct flow_set * set = rmt.set; +        if (set == NULL) +                return -1; + +        flow_set_add(set, fd); -        return -1; +        /* FIXME: This will be removed once we have a PFF */ +        rmt.fd = fd; + +        return 0;  } -int rmt_frct_post() +int rmt_frct_write_sdu(struct pci *         pci, +                       struct shm_du_buff * sdb)  { -        LOG_MISSING; +        if (shm_pci_ser(sdb, pci)) { +                LOG_ERR("Failed to serialize PDU."); +                ipcp_flow_del(sdb); +                return -1; +        } + +        if (ipcp_flow_write(rmt.fd, sdb)) { +                LOG_ERR("Failed to write SDU to fd %d.", rmt.fd); +                ipcp_flow_del(sdb); +                return -1; +        } + +        return 0; +} + +int rmt_frct_write_buf(struct pci * pci, +                       buffer_t *   buf) +{ +        buffer_t * buffer; + +        if (pci == NULL || buf == NULL || buf->data == NULL) +                return -1; + +        buffer = shm_pci_ser_buf(buf, pci); +        if (buffer == NULL) { +                LOG_ERR("Failed to serialize buffer."); +                free(buf->data); +                return -1; +        } + +        if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) { +                LOG_ERR("Failed to write buffer to fd."); +                free(buffer); +                return -1; +        } -        return -1; +        free(buffer); +        return 0;  } diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h index cdd86a0b..6ce7a7d7 100644 --- a/src/ipcpd/normal/rmt.h +++ b/src/ipcpd/normal/rmt.h @@ -23,12 +23,24 @@  #ifndef OUROBOROS_IPCP_RMT_H  #define OUROBOROS_IPCP_RMT_H +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/utils.h> +  #include "dt_const.h" +#include "shm_pci.h" -int rmt_init(struct dt_const * dtc); +int rmt_init(uint32_t address);  int rmt_fini(); -/* FIXME: Will take index in DU map */ -int rmt_frct_post(); +int rmt_dt_flow(int           fd, +                enum qos_cube qos); + +/* Hand PDU to RMT, SDU from N+1 */ +int rmt_frct_write_sdu(struct pci *         pci, +                       struct shm_du_buff * sdb); + +/* Hand PDU to RMT, SDU from N */ +int rmt_frct_write_buf(struct pci * pci, +                       buffer_t *   buf);  #endif diff --git a/src/ipcpd/normal/shm_pci.c b/src/ipcpd/normal/shm_pci.c index 94629790..3a16a2da 100644 --- a/src/ipcpd/normal/shm_pci.c +++ b/src/ipcpd/normal/shm_pci.c @@ -32,6 +32,7 @@  #include "shm_pci.h"  #include "frct.h"  #include "crc32.h" +#include "ribmgr.h"  #define QOS_ID_SIZE 1  #define DEFAULT_TTL 60 @@ -57,23 +58,13 @@ static int shm_pci_tail_size(struct dt_const * dtc)          return dtc->has_chk ? CHK_SIZE : 0;  } -int shm_pci_ser(struct shm_du_buff * sdb, -                struct pci * pci) +static void ser_pci_head(uint8_t * head, +                         struct pci * pci, +                         struct dt_const * dtc)  { -        uint8_t * head; -        uint8_t * tail;          int offset = 0; -        struct dt_const * dtc;          uint8_t ttl = DEFAULT_TTL; -        dtc = frct_dt_const(); -        if (dtc == NULL) -                return -1; - -        head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc)); -        if (head == NULL) -                return -1; -          memcpy(head, &pci->dst_addr, dtc->addr_size);          offset += dtc->addr_size;          memcpy(head + offset, &pci->src_addr, dtc->addr_size); @@ -90,6 +81,24 @@ int shm_pci_ser(struct shm_du_buff * sdb,          offset += QOS_ID_SIZE;          if (dtc->has_ttl)                  memcpy(head + offset, &ttl, TTL_SIZE); +} + +int shm_pci_ser(struct shm_du_buff * sdb, +                struct pci * pci) +{ +        uint8_t * head; +        uint8_t * tail; +        struct dt_const * dtc; + +        dtc = ribmgr_dt_const(); +        if (dtc == NULL) +                return -1; + +        head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc)); +        if (head == NULL) +                return -1; + +        ser_pci_head(head, pci, dtc);          if (dtc->has_chk) {                  tail = shm_du_buff_tail_alloc(sdb, shm_pci_tail_size(dtc)); @@ -104,6 +113,48 @@ int shm_pci_ser(struct shm_du_buff * sdb,          return 0;  } +buffer_t * shm_pci_ser_buf(buffer_t *   buf, +                           struct pci * pci) +{ +        buffer_t * buffer; +        struct dt_const * dtc; + +        if (buf == NULL || pci == NULL) +                return NULL; + +        dtc = ribmgr_dt_const(); +        if (dtc == NULL) +                return NULL; + +        buffer = malloc(sizeof(*buffer)); +        if (buffer == NULL) +                return NULL; + +        buffer->len = buf->len + +                shm_pci_head_size(dtc) + +                shm_pci_tail_size(dtc); + +        buffer->data = malloc(buffer->len); +        if (buffer->data == NULL) { +                free(buffer); +                return NULL; +        } + +        ser_pci_head(buffer->data, pci, dtc); +        memcpy(buffer->data + shm_pci_head_size(dtc), +               buf->data, buf->len); + +        free(buf->data); + +        if (dtc->has_chk) +                crc32((uint32_t *) buffer->data + +                      shm_pci_head_size(dtc) + buf->len, +                      buffer->data, +                      shm_pci_head_size(dtc) + buf->len); + +        return buffer; +} +  struct pci * shm_pci_des(struct shm_du_buff * sdb)  {          uint8_t * head; @@ -115,7 +166,7 @@ struct pci * shm_pci_des(struct shm_du_buff * sdb)          if (head == NULL)                  return NULL; -        dtc = frct_dt_const(); +        dtc = ribmgr_dt_const();          if (dtc == NULL)                  return NULL; @@ -150,7 +201,7 @@ int shm_pci_shrink(struct shm_du_buff * sdb)          if (sdb == NULL)                  return -1; -        dtc = frct_dt_const(); +        dtc = ribmgr_dt_const();          if (dtc == NULL)                  return -1; @@ -174,7 +225,7 @@ int shm_pci_dec_ttl(struct shm_du_buff * sdb)          uint8_t * head;          uint8_t * tail; -        dtc = frct_dt_const(); +        dtc = ribmgr_dt_const();          if (dtc == NULL)                  return -1; diff --git a/src/ipcpd/normal/shm_pci.h b/src/ipcpd/normal/shm_pci.h index aa9770b4..2836737c 100644 --- a/src/ipcpd/normal/shm_pci.h +++ b/src/ipcpd/normal/shm_pci.h @@ -25,22 +25,34 @@  #define OUROBOROS_IPCP_SHM_PCI_H  #include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/utils.h> -#include <dt_const.h> +#include "dt_const.h" + +#define PDU_TYPE_MGMT 0x40 +#define PDU_TYPE_DTP  0x80 + +typedef uint32_t cep_id_t; +#define INVALID_CEP_ID 0  struct pci { +        uint8_t  pdu_type;          uint64_t dst_addr;          uint64_t src_addr; -        uint32_t dst_cep_id; -        uint32_t src_cep_id; +        cep_id_t dst_cep_id; +        cep_id_t src_cep_id; +        uint8_t  qos_id;          uint32_t pdu_length;          uint64_t seqno; -        uint8_t  qos_id;          uint8_t  ttl; +        uint8_t  flags;  };  int          shm_pci_ser(struct shm_du_buff * sdb, -                         struct pci * pci); +                         struct pci *         pci); + +buffer_t *   shm_pci_ser_buf(buffer_t *   buf, +                             struct pci * pci);  struct pci * shm_pci_des(struct shm_du_buff * sdb); | 
