From 69ef99bb2dc05337e8189acc42dc9122f4182ead Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Sat, 8 Oct 2016 19:42:51 +0200 Subject: ipcpd: normal: First version of the fast path bootstrap This is the first version of the fast path bootstrap in the normal IPCP. It sets up a connection with the other end, and creates the appropriate data structures. N+1 and N-1 SDUs are read and written and passed through the right components. --- src/ipcpd/normal/config.h | 28 +++++ src/ipcpd/normal/fmgr.c | 203 ++++++++++++++++++++++-------- src/ipcpd/normal/fmgr.h | 8 +- src/ipcpd/normal/frct.c | 300 +++++++++++++++++++++++++++++++-------------- src/ipcpd/normal/frct.h | 41 +++---- src/ipcpd/normal/ribmgr.c | 26 +++- src/ipcpd/normal/ribmgr.h | 14 ++- src/ipcpd/normal/rmt.c | 163 ++++++++++++++++++++++-- src/ipcpd/normal/rmt.h | 18 ++- src/ipcpd/normal/shm_pci.c | 83 ++++++++++--- src/ipcpd/normal/shm_pci.h | 22 +++- 11 files changed, 697 insertions(+), 209 deletions(-) create mode 100644 src/ipcpd/normal/config.h (limited to 'src') diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h new file mode 100644 index 00000000..0febf3fd --- /dev/null +++ b/src/ipcpd/normal/config.h @@ -0,0 +1,28 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Normal IPCP configuration constants + * + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCP_CONFIG_H +#define OUROBOROS_IPCP_CONFIG_H + +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ + +#endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index b6ec1984..25898661 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include #include @@ -37,33 +39,41 @@ #include "ribmgr.h" #include "frct.h" #include "ipcp.h" +#include "rmt.h" +#include "shm_pci.h" +#include "config.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; struct n_flow { - int fd; - struct frct_i * frct_i; + int fd; + cep_id_t cep_id; enum qos_cube qos; struct list_head next; }; struct n_1_flow { - int fd; - char * ae_name; + int fd; + char * ae_name; struct list_head next; }; struct { - pthread_t listen_thread; + pthread_t n_1_flow_acceptor; + /* FIXME: Make this a table */ struct list_head n_1_flows; pthread_mutex_t n_1_flows_lock; + /* FIXME: Make this a table */ struct list_head n_flows; /* FIXME: Make this a read/write lock */ pthread_mutex_t n_flows_lock; + + struct flow_set * set; + pthread_t n_reader; } fmgr; static int add_n_1_fd(int fd, char * ae_name) @@ -89,9 +99,37 @@ static int add_n_1_fd(int fd, char * ae_name) return 0; } -static void * fmgr_listen(void * o) +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_fd(int fd) { - int fd; + struct list_head * pos = NULL; + + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->fd == fd) + return e; + } + + return NULL; +} + +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->cep_id == cep_id) + return e; + } + + return NULL; +} + +static void * fmgr_n_1_acceptor(void * o) +{ + int fd; char * ae_name; while (true) { @@ -139,7 +177,7 @@ static void * fmgr_listen(void * o) if (strcmp(ae_name, DT_AE) == 0) { /* FIXME: Pass correct QoS cube */ - if (frct_dt_flow(fd, 0)) { + if (rmt_dt_flow(fd, 0)) { LOG_ERR("Failed to hand fd to FRCT."); flow_dealloc(fd); continue; @@ -156,6 +194,49 @@ static void * fmgr_listen(void * o) return (void *) 0; } +static void * fmgr_n_reader(void * o) +{ + struct shm_du_buff * sdb; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct n_flow * flow; + + while (true) { + int fd = flow_select(fmgr.set, &timeout); + if (fd == -ETIMEDOUT) + continue; + + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } + + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } + + pthread_mutex_lock(&fmgr.n_flows_lock); + flow = get_n_flow_by_fd(fd); + if (flow == NULL) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } + + pthread_mutex_unlock(&fmgr.n_flows_lock); + } + + return (void *) 0; +} + int fmgr_init() { INIT_LIST_HEAD(&fmgr.n_1_flows); @@ -164,7 +245,12 @@ int fmgr_init() pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); pthread_mutex_init(&fmgr.n_flows_lock, NULL); - pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); + fmgr.set = flow_set_create(); + if (fmgr.set == NULL) + return -1; + + pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); + pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL); return 0; } @@ -173,9 +259,11 @@ int fmgr_fini() { struct list_head * pos = NULL; - pthread_cancel(fmgr.listen_thread); + pthread_cancel(fmgr.n_1_flow_acceptor); + pthread_cancel(fmgr.n_reader); - pthread_join(fmgr.listen_thread, NULL); + pthread_join(fmgr.n_1_flow_acceptor, NULL); + pthread_join(fmgr.n_reader, NULL); list_for_each(pos, &fmgr.n_1_flows) { struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); @@ -188,6 +276,8 @@ int fmgr_fini() pthread_mutex_destroy(&fmgr.n_1_flows_lock); pthread_mutex_destroy(&fmgr.n_flows_lock); + flow_set_destroy(fmgr.set); + return 0; } @@ -259,7 +349,7 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos) return -1; } - if (frct_dt_flow(fd, qos)) { + if (rmt_dt_flow(fd, qos)) { LOG_ERR("Failed to hand file descriptor to FRCT"); flow_dealloc(fd); free(ae_name); @@ -276,41 +366,13 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos) return 0; } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) -{ - struct list_head * pos = NULL; - - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->fd == fd) - return e; - } - - return NULL; -} - -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) -{ - struct list_head * pos = NULL; - - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->frct_i == frct_i) - return e; - } - - return NULL; -} - int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) { struct n_flow * flow; - struct frct_i * frct_i; + cep_id_t cep_id; uint32_t address = 0; buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; @@ -343,8 +405,8 @@ int fmgr_flow_alloc(int fd, pthread_mutex_lock(&fmgr.n_flows_lock); - frct_i = frct_i_create(address, &buf, qos); - if (frct_i == NULL) { + cep_id = frct_i_create(address, &buf, qos); + if (cep_id == INVALID_CEP_ID) { free(buf.data); free(flow); pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -354,7 +416,7 @@ int fmgr_flow_alloc(int fd, free(buf.data); flow->fd = fd; - flow->frct_i = frct_i; + flow->cep_id = cep_id; flow->qos = qos; INIT_LIST_HEAD(&flow->next); @@ -374,6 +436,8 @@ static int n_flow_dealloc(int fd) buffer_t buf; int ret; + flow_set_del(fmgr.set, fd); + flow = get_n_flow_by_fd(fd); if (flow == NULL) return -1; @@ -390,7 +454,7 @@ static int n_flow_dealloc(int fd) flow_alloc_msg__pack(&msg, buf.data); - ret = frct_i_destroy(flow->frct_i, &buf); + ret = frct_i_destroy(flow->cep_id, &buf); list_del(&flow->next); free(flow); @@ -432,13 +496,16 @@ int fmgr_flow_alloc_resp(int fd, int response) flow_alloc_msg__pack(&msg, buf.data); if (response < 0) { - frct_i_destroy(flow->frct_i, &buf); + frct_i_destroy(flow->cep_id, &buf); free(buf.data); list_del(&flow->next); free(flow); - } else if (frct_i_accept(flow->frct_i, &buf)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - return -1; + } else { + if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + return -1; + } + flow_set_add(fmgr.set, fd); } pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -457,7 +524,8 @@ int fmgr_flow_dealloc(int fd) return ret; } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) +int fmgr_frct_post_buf(cep_id_t cep_id, + buffer_t * buf) { struct n_flow * flow; int ret = 0; @@ -484,7 +552,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return -1; } - flow->frct_i = frct_i; + flow->cep_id = cep_id; flow->qos = msg->qos_cube; fd = ipcp_flow_req_arr(getpid(), @@ -505,7 +573,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = get_n_flow_by_frct_i(frct_i); + flow = get_n_flow_by_cep_id(cep_id); if (flow == NULL) { pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -517,11 +585,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) if (msg->response < 0) { list_del(&flow->next); free(flow); + } else { + flow_set_add(fmgr.set, flow->fd); } break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = get_n_flow_by_frct_i(frct_i); + flow = get_n_flow_by_cep_id(cep_id); if (flow == NULL) { pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -529,6 +599,8 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return -1; } + flow_set_del(fmgr.set, flow->fd); + ret = flow_dealloc(flow->fd); break; default: @@ -543,3 +615,28 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return ret; } + +int fmgr_frct_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) +{ + struct n_flow * flow; + + pthread_mutex_lock(&fmgr.n_flows_lock); + + flow = get_n_flow_by_cep_id(cep_id); + if (flow == NULL) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + LOG_ERR("Failed to find N flow."); + return -1; + } + + if (ipcp_flow_write(flow->fd, sdb)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + LOG_ERR("Failed to hand SDU to N flow."); + return -1; + } + + pthread_mutex_unlock(&fmgr.n_flows_lock); + + return 0; +} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 7e3ef5f4..0f2cd045 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -35,7 +35,6 @@ #define DT_AE "Data transfer" int fmgr_init(); - int fmgr_fini(); /* N-flow ops */ @@ -56,8 +55,11 @@ int fmgr_flow_alloc_resp(int fd, int fmgr_flow_dealloc(int fd); /* N+1-flow ops, remote */ -int fmgr_flow_alloc_msg(struct frct_i * frct_i, - buffer_t * buf); +int fmgr_frct_post_buf(cep_id_t id, + buffer_t * buf); +/* SDU for N+1-flow */ +int fmgr_frct_post_sdu(cep_id_t id, + struct shm_du_buff * sdb); #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 417815b7..abbde779 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -22,8 +22,6 @@ #define OUROBOROS_PREFIX "flow-rtx-control" -#define IDS_SIZE 2048 - #include #include #include @@ -34,7 +32,8 @@ #include #include "frct.h" - +#include "rmt.h" +#include "fmgr.h" enum conn_state { CONN_PENDING = 0, @@ -45,29 +44,29 @@ struct frct_i { uint32_t cep_id; uint32_t r_address; uint32_t r_cep_id; + enum qos_cube cube; + uint64_t seqno; enum conn_state state; - struct list_head next; }; -struct frct { - struct dt_const * dtc; +struct { uint32_t address; - struct list_head instances; pthread_mutex_t instances_lock; + struct frct_i ** instances; struct bmp * cep_ids; pthread_mutex_t cep_ids_lock; -} * frct = NULL; +} frct; static int next_cep_id() { int ret; - pthread_mutex_lock(&frct->cep_ids_lock); - ret = bmp_allocate(frct->cep_ids); - pthread_mutex_unlock(&frct->cep_ids_lock); + pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_allocate(frct.cep_ids); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } @@ -76,40 +75,34 @@ static int release_cep_id(int id) { int ret; - pthread_mutex_lock(&frct->cep_ids_lock); - ret = bmp_release(frct->cep_ids, id); - pthread_mutex_unlock(&frct->cep_ids_lock); + pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_release(frct.cep_ids, id); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } -int frct_init(struct dt_const * dtc, uint32_t address) +int frct_init(uint32_t address) { - if (dtc == NULL) - return -1; + int i; + frct.address = address; - frct = malloc(sizeof(*frct)); - if (frct == NULL) + if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; - frct->dtc = dtc; - frct->address = address; - - INIT_LIST_HEAD(&frct->instances); - - if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) { - free(frct); + if (pthread_mutex_init(&frct.instances_lock, NULL)) return -1; - } - if (pthread_mutex_init(&frct->instances_lock, NULL)) { - free(frct); + frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); + if (frct.instances == NULL) return -1; - } - frct->cep_ids = bmp_create(IDS_SIZE, 0); - if (frct->cep_ids == NULL) { - free(frct); + for (i = 0; i < IRMD_MAX_FLOWS; i++) + frct.instances[i] = NULL; + + frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1)); + if (frct.cep_ids == NULL) { + free(frct.instances); return -1; } @@ -118,117 +111,246 @@ int frct_init(struct dt_const * dtc, uint32_t address) int frct_fini() { - pthread_mutex_lock(&frct->cep_ids_lock); - bmp_destroy(frct->cep_ids); - pthread_mutex_unlock(&frct->cep_ids_lock); - free(frct); + pthread_mutex_lock(&frct.cep_ids_lock); + bmp_destroy(frct.cep_ids); + pthread_mutex_unlock(&frct.cep_ids_lock); + + free(frct.instances); return 0; } -struct dt_const * frct_dt_const() +static struct frct_i * create_frct_i(uint32_t address, + cep_id_t r_cep_id) { - if (frct == NULL) + struct frct_i * instance; + cep_id_t id; + + instance = malloc(sizeof(*instance)); + if (instance == NULL) return NULL; - return frct->dtc; -} + id = next_cep_id(); + instance->r_address = address; + instance->cep_id = id; + instance->r_cep_id = r_cep_id; + instance->state = CONN_PENDING; + instance->seqno = 0; -int frct_dt_flow(int fd, - enum qos_cube qos) -{ - LOG_MISSING; + frct.instances[id] = instance; - return -1; + return instance; } -int frct_rmt_post() +int frct_rmt_post_sdu(struct pci * pci, + struct shm_du_buff * sdb) { - LOG_MISSING; + struct frct_i * instance; + buffer_t buf; + cep_id_t id; - return -1; + if (pci == NULL || sdb == NULL) + return -1; + + if (pci->dst_cep_id == INVALID_CEP_ID) { + pthread_mutex_lock(&frct.instances_lock); + instance = create_frct_i(pci->src_addr, + pci->src_cep_id); + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + return -1; + } + id = instance->cep_id; + instance->r_cep_id = pci->src_cep_id; + pthread_mutex_unlock(&frct.instances_lock); + + buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + buf.data = shm_du_buff_head(sdb); + + if (fmgr_frct_post_buf(id, &buf)) { + LOG_ERR("Failed to hand buffer to FMGR."); + free(pci); + return -1; + } + } else { + /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ + if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { + LOG_ERR("Failed to hand SDU to FMGR."); + free(pci); + return -1; + } + } + + free(pci); + + return 0; } /* Call under instances lock */ static void destroy_frct_i(struct frct_i * instance) { release_cep_id(instance->cep_id); - list_del(&instance->next); + frct.instances[instance->cep_id] = NULL; free(instance); } -struct frct_i * frct_i_create(uint32_t address, - buffer_t * buf, - enum qos_cube cube) +cep_id_t frct_i_create(uint32_t address, + buffer_t * buf, + enum qos_cube cube) { struct frct_i * instance; + struct pci pci; + cep_id_t id; - if (buf == NULL || - buf->data == NULL) - return NULL; + if (buf == NULL || buf->data == NULL) + return INVALID_CEP_ID; - instance = malloc(sizeof(*instance)); - if (instance == NULL) - return NULL; - - pthread_mutex_lock(&frct->instances_lock); - - instance->r_address = address; - instance->cep_id = next_cep_id(); - instance->state = CONN_PENDING; + pthread_mutex_lock(&frct.instances_lock); + instance = create_frct_i(address, INVALID_CEP_ID); + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + return INVALID_CEP_ID; + } + id = instance->cep_id; + instance->cube = cube; + pthread_mutex_unlock(&frct.instances_lock); + + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = address; + pci.src_addr = frct.address; + pci.dst_cep_id = 0; + pci.src_cep_id = id; + pci.seqno = 0; + pci.qos_id = cube; + + if (rmt_frct_write_buf(&pci, buf)) { + free(instance); + LOG_ERR("Failed to hand PDU to RMT."); + return INVALID_CEP_ID; + } - INIT_LIST_HEAD(&instance->next); - list_add(&instance->next, &frct->instances); + return id; +} - pthread_mutex_unlock(&frct->instances_lock); +int frct_i_accept(cep_id_t id, + buffer_t * buf, + enum qos_cube cube) +{ + struct pci pci; + struct frct_i * instance; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + if (buf == NULL || buf->data == NULL) + return -1; - return instance; -} + pthread_mutex_lock(&frct.instances_lock); -int frct_i_accept(struct frct_i * instance, - buffer_t * buf) -{ - if (instance == NULL || buf == NULL || buf->data == NULL) + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); return -1; + } - pthread_mutex_lock(&frct->instances_lock); if (instance->state != CONN_PENDING) { - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); return -1; } instance->state = CONN_ESTABLISHED; - instance->cep_id = next_cep_id(); + instance->cube = cube; + instance->seqno = 0; - pthread_mutex_unlock(&frct->instances_lock); + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = 0; + pci.qos_id = cube; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + pthread_mutex_unlock(&frct.instances_lock); + + if (rmt_frct_write_buf(&pci, buf)) + return -1; return 0; } -int frct_i_destroy(struct frct_i * instance, - buffer_t * buf) +int frct_i_destroy(cep_id_t id, + buffer_t * buf) { - if (instance == NULL) - return -1; + struct pci pci; + struct frct_i * instance; + + pthread_mutex_lock(&frct.instances_lock); - pthread_mutex_lock(&frct->instances_lock); + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); + return -1; + } if (!(instance->state == CONN_PENDING || instance->state == CONN_ESTABLISHED)) { - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); return -1; } + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = 0; + pci.qos_id = instance->cube; + destroy_frct_i(instance); - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); + + if (buf != NULL && buf->data != NULL) + if (rmt_frct_write_buf(&pci, buf)) + return -1; + + return 0; +} - if (buf != NULL && buf->data != NULL) { +int frct_i_write_sdu(cep_id_t id, + struct shm_du_buff * sdb) +{ + struct pci pci; + struct frct_i * instance; + + if (sdb == NULL) + return -1; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + pthread_mutex_lock(&frct.instances_lock); + + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); + return -1; + } + + if (instance->state != CONN_ESTABLISHED) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Connection is not established."); + return -1; + } + + pci.pdu_type = PDU_TYPE_DTP; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = (instance->seqno)++; + pci.qos_id = instance->cube; + + if (rmt_frct_write_sdu(&pci, sdb)) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Failed to hand SDU to RMT."); + return -1; } return 0; diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 0ee87004..2b86f5bd 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -26,34 +26,29 @@ #include #include -#include "dt_const.h" +#include "shm_pci.h" struct frct_i; -int frct_init(struct dt_const * dtc, - uint32_t address); -int frct_fini(); +int frct_init(uint32_t address); +int frct_fini(); -struct dt_const * frct_dt_const(); +/* Called by RMT upon receipt of a PDU for us */ +int frct_rmt_post_sdu(struct pci * pci, + struct shm_du_buff * sdb); -int frct_dt_flow(int fd, - enum qos_cube qos); +cep_id_t frct_i_create(uint32_t address, + buffer_t * buf, + enum qos_cube cube); -/* - * FIXME: Will take the index in the DU map, - * possibly cep-ids and address - */ -int frct_rmt_post(); - -struct frct_i * frct_i_create(uint32_t address, - buffer_t * buf, - enum qos_cube cube); -/* FIXME: Hand QoS cube here too? We received it in the flow alloc message. */ -int frct_i_accept(struct frct_i * instance, - buffer_t * buf); -int frct_i_destroy(struct frct_i * instance, - buffer_t * buf); - -/* FIXME: Add read/write ops for frct instances */ +int frct_i_accept(cep_id_t id, + buffer_t * buf, + enum qos_cube cube); + +int frct_i_destroy(cep_id_t id, + buffer_t * buf); + +int frct_i_write_sdu(cep_id_t id, + struct shm_du_buff * sdb); #endif diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 99d156f5..dd17f9bd 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -39,6 +39,7 @@ #include "frct.h" #include "ipcp.h" #include "cdap_request.h" +#include "rmt.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -241,7 +242,7 @@ int ribmgr_cdap_write(struct cdap * instance, rib.address = msg->address; - if (frct_init(&rib.dtc, rib.address)) { + if (frct_init(rib.address)) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); @@ -250,6 +251,16 @@ int ribmgr_cdap_write(struct cdap * instance, return -1; } + if (rmt_init(rib.address)) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); + frct_fini(); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + static_info_msg__free_unpacked(msg, NULL); + LOG_ERR("Failed to init RMT"); + return -1; + } + static_info_msg__free_unpacked(msg, NULL); } else { ret = -1; @@ -529,12 +540,23 @@ int ribmgr_bootstrap(struct dif_config * conf) /* FIXME: Set correct address. */ rib.address = 0; - if (frct_init(&rib.dtc, rib.address)) { + if (frct_init(rib.address)) { LOG_ERR("Failed to initialize FRCT."); return -1; } + if (rmt_init(rib.address)) { + LOG_ERR("Failed to initialize RMT."); + frct_fini(); + return -1; + } + LOG_DBG("Bootstrapped RIB Manager."); return 0; } + +struct dt_const * ribmgr_dt_const() +{ + return &(rib.dtc); +} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index e85c65be..f776f7eb 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -25,12 +25,16 @@ #include -int ribmgr_init(); -int ribmgr_fini(); +#include "dt_const.h" -int ribmgr_add_flow(int fd); -int ribmgr_remove_flow(int fd); +int ribmgr_init(); +int ribmgr_fini(); -int ribmgr_bootstrap(struct dif_config * conf); +int ribmgr_add_flow(int fd); +int ribmgr_remove_flow(int fd); + +int ribmgr_bootstrap(struct dif_config * conf); + +struct dt_const * ribmgr_dt_const(); #endif diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c index ee92c3e3..fa4c7edd 100644 --- a/src/ipcpd/normal/rmt.c +++ b/src/ipcpd/normal/rmt.c @@ -24,29 +24,172 @@ #include #include +#include +#include +#include +#include + +#include #include "rmt.h" +#include "config.h" +#include "frct.h" + +struct { + pthread_t sdu_reader; + struct flow_set * set; + uint32_t address; -struct rmt { -}; + /* + * FIXME: Normally the PFF is held here, + * for now we keep 1 fd to forward a PDU on + */ + int fd; +} rmt; -int rmt_init(struct dt_const * dtc) +int rmt_init(uint32_t address) { - LOG_MISSING; + rmt.set = flow_set_create(); + if (rmt.set == NULL) + return -1; - return -1; + rmt.address = address; + + return 0; } int rmt_fini() { - LOG_MISSING; + flow_set_destroy(rmt.set); + + return 0; +} + +void * rmt_sdu_reader(void * o) +{ + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct shm_du_buff * sdb; + struct pci * pci; + + while (true) { + int fd = flow_select(rmt.set, &timeout); + if (fd == -ETIMEDOUT) + continue; + + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } + + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } + + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } + + if (pci->dst_addr != rmt.address) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (frct_rmt_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + } + + return (void *) 0; +} + +int rmt_dt_flow(int fd, + enum qos_cube qos) +{ + struct flow_set * set = rmt.set; + if (set == NULL) + return -1; + + flow_set_add(set, fd); - return -1; + /* FIXME: This will be removed once we have a PFF */ + rmt.fd = fd; + + return 0; } -int rmt_frct_post() +int rmt_frct_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) { - LOG_MISSING; + if (shm_pci_ser(sdb, pci)) { + LOG_ERR("Failed to serialize PDU."); + ipcp_flow_del(sdb); + return -1; + } + + if (ipcp_flow_write(rmt.fd, sdb)) { + LOG_ERR("Failed to write SDU to fd %d.", rmt.fd); + ipcp_flow_del(sdb); + return -1; + } + + return 0; +} + +int rmt_frct_write_buf(struct pci * pci, + buffer_t * buf) +{ + buffer_t * buffer; + + if (pci == NULL || buf == NULL || buf->data == NULL) + return -1; + + buffer = shm_pci_ser_buf(buf, pci); + if (buffer == NULL) { + LOG_ERR("Failed to serialize buffer."); + free(buf->data); + return -1; + } + + if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) { + LOG_ERR("Failed to write buffer to fd."); + free(buffer); + return -1; + } - return -1; + free(buffer); + return 0; } diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h index cdd86a0b..6ce7a7d7 100644 --- a/src/ipcpd/normal/rmt.h +++ b/src/ipcpd/normal/rmt.h @@ -23,12 +23,24 @@ #ifndef OUROBOROS_IPCP_RMT_H #define OUROBOROS_IPCP_RMT_H +#include +#include + #include "dt_const.h" +#include "shm_pci.h" -int rmt_init(struct dt_const * dtc); +int rmt_init(uint32_t address); int rmt_fini(); -/* FIXME: Will take index in DU map */ -int rmt_frct_post(); +int rmt_dt_flow(int fd, + enum qos_cube qos); + +/* Hand PDU to RMT, SDU from N+1 */ +int rmt_frct_write_sdu(struct pci * pci, + struct shm_du_buff * sdb); + +/* Hand PDU to RMT, SDU from N */ +int rmt_frct_write_buf(struct pci * pci, + buffer_t * buf); #endif diff --git a/src/ipcpd/normal/shm_pci.c b/src/ipcpd/normal/shm_pci.c index 94629790..3a16a2da 100644 --- a/src/ipcpd/normal/shm_pci.c +++ b/src/ipcpd/normal/shm_pci.c @@ -32,6 +32,7 @@ #include "shm_pci.h" #include "frct.h" #include "crc32.h" +#include "ribmgr.h" #define QOS_ID_SIZE 1 #define DEFAULT_TTL 60 @@ -57,23 +58,13 @@ static int shm_pci_tail_size(struct dt_const * dtc) return dtc->has_chk ? CHK_SIZE : 0; } -int shm_pci_ser(struct shm_du_buff * sdb, - struct pci * pci) +static void ser_pci_head(uint8_t * head, + struct pci * pci, + struct dt_const * dtc) { - uint8_t * head; - uint8_t * tail; int offset = 0; - struct dt_const * dtc; uint8_t ttl = DEFAULT_TTL; - dtc = frct_dt_const(); - if (dtc == NULL) - return -1; - - head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc)); - if (head == NULL) - return -1; - memcpy(head, &pci->dst_addr, dtc->addr_size); offset += dtc->addr_size; memcpy(head + offset, &pci->src_addr, dtc->addr_size); @@ -90,6 +81,24 @@ int shm_pci_ser(struct shm_du_buff * sdb, offset += QOS_ID_SIZE; if (dtc->has_ttl) memcpy(head + offset, &ttl, TTL_SIZE); +} + +int shm_pci_ser(struct shm_du_buff * sdb, + struct pci * pci) +{ + uint8_t * head; + uint8_t * tail; + struct dt_const * dtc; + + dtc = ribmgr_dt_const(); + if (dtc == NULL) + return -1; + + head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc)); + if (head == NULL) + return -1; + + ser_pci_head(head, pci, dtc); if (dtc->has_chk) { tail = shm_du_buff_tail_alloc(sdb, shm_pci_tail_size(dtc)); @@ -104,6 +113,48 @@ int shm_pci_ser(struct shm_du_buff * sdb, return 0; } +buffer_t * shm_pci_ser_buf(buffer_t * buf, + struct pci * pci) +{ + buffer_t * buffer; + struct dt_const * dtc; + + if (buf == NULL || pci == NULL) + return NULL; + + dtc = ribmgr_dt_const(); + if (dtc == NULL) + return NULL; + + buffer = malloc(sizeof(*buffer)); + if (buffer == NULL) + return NULL; + + buffer->len = buf->len + + shm_pci_head_size(dtc) + + shm_pci_tail_size(dtc); + + buffer->data = malloc(buffer->len); + if (buffer->data == NULL) { + free(buffer); + return NULL; + } + + ser_pci_head(buffer->data, pci, dtc); + memcpy(buffer->data + shm_pci_head_size(dtc), + buf->data, buf->len); + + free(buf->data); + + if (dtc->has_chk) + crc32((uint32_t *) buffer->data + + shm_pci_head_size(dtc) + buf->len, + buffer->data, + shm_pci_head_size(dtc) + buf->len); + + return buffer; +} + struct pci * shm_pci_des(struct shm_du_buff * sdb) { uint8_t * head; @@ -115,7 +166,7 @@ struct pci * shm_pci_des(struct shm_du_buff * sdb) if (head == NULL) return NULL; - dtc = frct_dt_const(); + dtc = ribmgr_dt_const(); if (dtc == NULL) return NULL; @@ -150,7 +201,7 @@ int shm_pci_shrink(struct shm_du_buff * sdb) if (sdb == NULL) return -1; - dtc = frct_dt_const(); + dtc = ribmgr_dt_const(); if (dtc == NULL) return -1; @@ -174,7 +225,7 @@ int shm_pci_dec_ttl(struct shm_du_buff * sdb) uint8_t * head; uint8_t * tail; - dtc = frct_dt_const(); + dtc = ribmgr_dt_const(); if (dtc == NULL) return -1; diff --git a/src/ipcpd/normal/shm_pci.h b/src/ipcpd/normal/shm_pci.h index aa9770b4..2836737c 100644 --- a/src/ipcpd/normal/shm_pci.h +++ b/src/ipcpd/normal/shm_pci.h @@ -25,22 +25,34 @@ #define OUROBOROS_IPCP_SHM_PCI_H #include +#include -#include +#include "dt_const.h" + +#define PDU_TYPE_MGMT 0x40 +#define PDU_TYPE_DTP 0x80 + +typedef uint32_t cep_id_t; +#define INVALID_CEP_ID 0 struct pci { + uint8_t pdu_type; uint64_t dst_addr; uint64_t src_addr; - uint32_t dst_cep_id; - uint32_t src_cep_id; + cep_id_t dst_cep_id; + cep_id_t src_cep_id; + uint8_t qos_id; uint32_t pdu_length; uint64_t seqno; - uint8_t qos_id; uint8_t ttl; + uint8_t flags; }; int shm_pci_ser(struct shm_du_buff * sdb, - struct pci * pci); + struct pci * pci); + +buffer_t * shm_pci_ser_buf(buffer_t * buf, + struct pci * pci); struct pci * shm_pci_des(struct shm_du_buff * sdb); -- cgit v1.2.3