From 69ef99bb2dc05337e8189acc42dc9122f4182ead Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Sat, 8 Oct 2016 19:42:51 +0200 Subject: ipcpd: normal: First version of the fast path bootstrap This is the first version of the fast path bootstrap in the normal IPCP. It sets up a connection with the other end, and creates the appropriate data structures. N+1 and N-1 SDUs are read and written and passed through the right components. --- src/ipcpd/normal/config.h | 28 +++++ src/ipcpd/normal/fmgr.c | 203 ++++++++++++++++++++++-------- src/ipcpd/normal/fmgr.h | 8 +- src/ipcpd/normal/frct.c | 300 +++++++++++++++++++++++++++++++-------------- src/ipcpd/normal/frct.h | 41 +++---- src/ipcpd/normal/ribmgr.c | 26 +++- src/ipcpd/normal/ribmgr.h | 14 ++- src/ipcpd/normal/rmt.c | 163 ++++++++++++++++++++++-- src/ipcpd/normal/rmt.h | 18 ++- src/ipcpd/normal/shm_pci.c | 83 ++++++++++--- src/ipcpd/normal/shm_pci.h | 22 +++- 11 files changed, 697 insertions(+), 209 deletions(-) create mode 100644 src/ipcpd/normal/config.h diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h new file mode 100644 index 00000000..0febf3fd --- /dev/null +++ b/src/ipcpd/normal/config.h @@ -0,0 +1,28 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Normal IPCP configuration constants + * + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCP_CONFIG_H +#define OUROBOROS_IPCP_CONFIG_H + +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ + +#endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index b6ec1984..25898661 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include #include @@ -37,33 +39,41 @@ #include "ribmgr.h" #include "frct.h" #include "ipcp.h" +#include "rmt.h" +#include "shm_pci.h" +#include "config.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; struct n_flow { - int fd; - struct frct_i * frct_i; + int fd; + cep_id_t cep_id; enum qos_cube qos; struct list_head next; }; struct n_1_flow { - int fd; - char * ae_name; + int fd; + char * ae_name; struct list_head next; }; struct { - pthread_t listen_thread; + pthread_t n_1_flow_acceptor; + /* FIXME: Make this a table */ struct list_head n_1_flows; pthread_mutex_t n_1_flows_lock; + /* FIXME: Make this a table */ struct list_head n_flows; /* FIXME: Make this a read/write lock */ pthread_mutex_t n_flows_lock; + + struct flow_set * set; + pthread_t n_reader; } fmgr; static int add_n_1_fd(int fd, char * ae_name) @@ -89,9 +99,37 @@ static int add_n_1_fd(int fd, char * ae_name) return 0; } -static void * fmgr_listen(void * o) +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_fd(int fd) { - int fd; + struct list_head * pos = NULL; + + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->fd == fd) + return e; + } + + return NULL; +} + +/* Call under n_flows lock */ +static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->cep_id == cep_id) + return e; + } + + return NULL; +} + +static void * fmgr_n_1_acceptor(void * o) +{ + int fd; char * ae_name; while (true) { @@ -139,7 +177,7 @@ static void * fmgr_listen(void * o) if (strcmp(ae_name, DT_AE) == 0) { /* FIXME: Pass correct QoS cube */ - if (frct_dt_flow(fd, 0)) { + if (rmt_dt_flow(fd, 0)) { LOG_ERR("Failed to hand fd to FRCT."); flow_dealloc(fd); continue; @@ -156,6 +194,49 @@ static void * fmgr_listen(void * o) return (void *) 0; } +static void * fmgr_n_reader(void * o) +{ + struct shm_du_buff * sdb; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct n_flow * flow; + + while (true) { + int fd = flow_select(fmgr.set, &timeout); + if (fd == -ETIMEDOUT) + continue; + + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } + + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } + + pthread_mutex_lock(&fmgr.n_flows_lock); + flow = get_n_flow_by_fd(fd); + if (flow == NULL) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } + + pthread_mutex_unlock(&fmgr.n_flows_lock); + } + + return (void *) 0; +} + int fmgr_init() { INIT_LIST_HEAD(&fmgr.n_1_flows); @@ -164,7 +245,12 @@ int fmgr_init() pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); pthread_mutex_init(&fmgr.n_flows_lock, NULL); - pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); + fmgr.set = flow_set_create(); + if (fmgr.set == NULL) + return -1; + + pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); + pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL); return 0; } @@ -173,9 +259,11 @@ int fmgr_fini() { struct list_head * pos = NULL; - pthread_cancel(fmgr.listen_thread); + pthread_cancel(fmgr.n_1_flow_acceptor); + pthread_cancel(fmgr.n_reader); - pthread_join(fmgr.listen_thread, NULL); + pthread_join(fmgr.n_1_flow_acceptor, NULL); + pthread_join(fmgr.n_reader, NULL); list_for_each(pos, &fmgr.n_1_flows) { struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); @@ -188,6 +276,8 @@ int fmgr_fini() pthread_mutex_destroy(&fmgr.n_1_flows_lock); pthread_mutex_destroy(&fmgr.n_flows_lock); + flow_set_destroy(fmgr.set); + return 0; } @@ -259,7 +349,7 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos) return -1; } - if (frct_dt_flow(fd, qos)) { + if (rmt_dt_flow(fd, qos)) { LOG_ERR("Failed to hand file descriptor to FRCT"); flow_dealloc(fd); free(ae_name); @@ -276,41 +366,13 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos) return 0; } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) -{ - struct list_head * pos = NULL; - - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->fd == fd) - return e; - } - - return NULL; -} - -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) -{ - struct list_head * pos = NULL; - - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->frct_i == frct_i) - return e; - } - - return NULL; -} - int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) { struct n_flow * flow; - struct frct_i * frct_i; + cep_id_t cep_id; uint32_t address = 0; buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; @@ -343,8 +405,8 @@ int fmgr_flow_alloc(int fd, pthread_mutex_lock(&fmgr.n_flows_lock); - frct_i = frct_i_create(address, &buf, qos); - if (frct_i == NULL) { + cep_id = frct_i_create(address, &buf, qos); + if (cep_id == INVALID_CEP_ID) { free(buf.data); free(flow); pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -354,7 +416,7 @@ int fmgr_flow_alloc(int fd, free(buf.data); flow->fd = fd; - flow->frct_i = frct_i; + flow->cep_id = cep_id; flow->qos = qos; INIT_LIST_HEAD(&flow->next); @@ -374,6 +436,8 @@ static int n_flow_dealloc(int fd) buffer_t buf; int ret; + flow_set_del(fmgr.set, fd); + flow = get_n_flow_by_fd(fd); if (flow == NULL) return -1; @@ -390,7 +454,7 @@ static int n_flow_dealloc(int fd) flow_alloc_msg__pack(&msg, buf.data); - ret = frct_i_destroy(flow->frct_i, &buf); + ret = frct_i_destroy(flow->cep_id, &buf); list_del(&flow->next); free(flow); @@ -432,13 +496,16 @@ int fmgr_flow_alloc_resp(int fd, int response) flow_alloc_msg__pack(&msg, buf.data); if (response < 0) { - frct_i_destroy(flow->frct_i, &buf); + frct_i_destroy(flow->cep_id, &buf); free(buf.data); list_del(&flow->next); free(flow); - } else if (frct_i_accept(flow->frct_i, &buf)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - return -1; + } else { + if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + return -1; + } + flow_set_add(fmgr.set, fd); } pthread_mutex_unlock(&fmgr.n_flows_lock); @@ -457,7 +524,8 @@ int fmgr_flow_dealloc(int fd) return ret; } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) +int fmgr_frct_post_buf(cep_id_t cep_id, + buffer_t * buf) { struct n_flow * flow; int ret = 0; @@ -484,7 +552,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return -1; } - flow->frct_i = frct_i; + flow->cep_id = cep_id; flow->qos = msg->qos_cube; fd = ipcp_flow_req_arr(getpid(), @@ -505,7 +573,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = get_n_flow_by_frct_i(frct_i); + flow = get_n_flow_by_cep_id(cep_id); if (flow == NULL) { pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -517,11 +585,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) if (msg->response < 0) { list_del(&flow->next); free(flow); + } else { + flow_set_add(fmgr.set, flow->fd); } break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = get_n_flow_by_frct_i(frct_i); + flow = get_n_flow_by_cep_id(cep_id); if (flow == NULL) { pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -529,6 +599,8 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return -1; } + flow_set_del(fmgr.set, flow->fd); + ret = flow_dealloc(flow->fd); break; default: @@ -543,3 +615,28 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) return ret; } + +int fmgr_frct_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) +{ + struct n_flow * flow; + + pthread_mutex_lock(&fmgr.n_flows_lock); + + flow = get_n_flow_by_cep_id(cep_id); + if (flow == NULL) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + LOG_ERR("Failed to find N flow."); + return -1; + } + + if (ipcp_flow_write(flow->fd, sdb)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + LOG_ERR("Failed to hand SDU to N flow."); + return -1; + } + + pthread_mutex_unlock(&fmgr.n_flows_lock); + + return 0; +} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 7e3ef5f4..0f2cd045 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -35,7 +35,6 @@ #define DT_AE "Data transfer" int fmgr_init(); - int fmgr_fini(); /* N-flow ops */ @@ -56,8 +55,11 @@ int fmgr_flow_alloc_resp(int fd, int fmgr_flow_dealloc(int fd); /* N+1-flow ops, remote */ -int fmgr_flow_alloc_msg(struct frct_i * frct_i, - buffer_t * buf); +int fmgr_frct_post_buf(cep_id_t id, + buffer_t * buf); +/* SDU for N+1-flow */ +int fmgr_frct_post_sdu(cep_id_t id, + struct shm_du_buff * sdb); #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 417815b7..abbde779 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -22,8 +22,6 @@ #define OUROBOROS_PREFIX "flow-rtx-control" -#define IDS_SIZE 2048 - #include #include #include @@ -34,7 +32,8 @@ #include #include "frct.h" - +#include "rmt.h" +#include "fmgr.h" enum conn_state { CONN_PENDING = 0, @@ -45,29 +44,29 @@ struct frct_i { uint32_t cep_id; uint32_t r_address; uint32_t r_cep_id; + enum qos_cube cube; + uint64_t seqno; enum conn_state state; - struct list_head next; }; -struct frct { - struct dt_const * dtc; +struct { uint32_t address; - struct list_head instances; pthread_mutex_t instances_lock; + struct frct_i ** instances; struct bmp * cep_ids; pthread_mutex_t cep_ids_lock; -} * frct = NULL; +} frct; static int next_cep_id() { int ret; - pthread_mutex_lock(&frct->cep_ids_lock); - ret = bmp_allocate(frct->cep_ids); - pthread_mutex_unlock(&frct->cep_ids_lock); + pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_allocate(frct.cep_ids); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } @@ -76,40 +75,34 @@ static int release_cep_id(int id) { int ret; - pthread_mutex_lock(&frct->cep_ids_lock); - ret = bmp_release(frct->cep_ids, id); - pthread_mutex_unlock(&frct->cep_ids_lock); + pthread_mutex_lock(&frct.cep_ids_lock); + ret = bmp_release(frct.cep_ids, id); + pthread_mutex_unlock(&frct.cep_ids_lock); return ret; } -int frct_init(struct dt_const * dtc, uint32_t address) +int frct_init(uint32_t address) { - if (dtc == NULL) - return -1; + int i; + frct.address = address; - frct = malloc(sizeof(*frct)); - if (frct == NULL) + if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; - frct->dtc = dtc; - frct->address = address; - - INIT_LIST_HEAD(&frct->instances); - - if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) { - free(frct); + if (pthread_mutex_init(&frct.instances_lock, NULL)) return -1; - } - if (pthread_mutex_init(&frct->instances_lock, NULL)) { - free(frct); + frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS); + if (frct.instances == NULL) return -1; - } - frct->cep_ids = bmp_create(IDS_SIZE, 0); - if (frct->cep_ids == NULL) { - free(frct); + for (i = 0; i < IRMD_MAX_FLOWS; i++) + frct.instances[i] = NULL; + + frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1)); + if (frct.cep_ids == NULL) { + free(frct.instances); return -1; } @@ -118,117 +111,246 @@ int frct_init(struct dt_const * dtc, uint32_t address) int frct_fini() { - pthread_mutex_lock(&frct->cep_ids_lock); - bmp_destroy(frct->cep_ids); - pthread_mutex_unlock(&frct->cep_ids_lock); - free(frct); + pthread_mutex_lock(&frct.cep_ids_lock); + bmp_destroy(frct.cep_ids); + pthread_mutex_unlock(&frct.cep_ids_lock); + + free(frct.instances); return 0; } -struct dt_const * frct_dt_const() +static struct frct_i * create_frct_i(uint32_t address, + cep_id_t r_cep_id) { - if (frct == NULL) + struct frct_i * instance; + cep_id_t id; + + instance = malloc(sizeof(*instance)); + if (instance == NULL) return NULL; - return frct->dtc; -} + id = next_cep_id(); + instance->r_address = address; + instance->cep_id = id; + instance->r_cep_id = r_cep_id; + instance->state = CONN_PENDING; + instance->seqno = 0; -int frct_dt_flow(int fd, - enum qos_cube qos) -{ - LOG_MISSING; + frct.instances[id] = instance; - return -1; + return instance; } -int frct_rmt_post() +int frct_rmt_post_sdu(struct pci * pci, + struct shm_du_buff * sdb) { - LOG_MISSING; + struct frct_i * instance; + buffer_t buf; + cep_id_t id; - return -1; + if (pci == NULL || sdb == NULL) + return -1; + + if (pci->dst_cep_id == INVALID_CEP_ID) { + pthread_mutex_lock(&frct.instances_lock); + instance = create_frct_i(pci->src_addr, + pci->src_cep_id); + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + return -1; + } + id = instance->cep_id; + instance->r_cep_id = pci->src_cep_id; + pthread_mutex_unlock(&frct.instances_lock); + + buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + buf.data = shm_du_buff_head(sdb); + + if (fmgr_frct_post_buf(id, &buf)) { + LOG_ERR("Failed to hand buffer to FMGR."); + free(pci); + return -1; + } + } else { + /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ + if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { + LOG_ERR("Failed to hand SDU to FMGR."); + free(pci); + return -1; + } + } + + free(pci); + + return 0; } /* Call under instances lock */ static void destroy_frct_i(struct frct_i * instance) { release_cep_id(instance->cep_id); - list_del(&instance->next); + frct.instances[instance->cep_id] = NULL; free(instance); } -struct frct_i * frct_i_create(uint32_t address, - buffer_t * buf, - enum qos_cube cube) +cep_id_t frct_i_create(uint32_t address, + buffer_t * buf, + enum qos_cube cube) { struct frct_i * instance; + struct pci pci; + cep_id_t id; - if (buf == NULL || - buf->data == NULL) - return NULL; + if (buf == NULL || buf->data == NULL) + return INVALID_CEP_ID; - instance = malloc(sizeof(*instance)); - if (instance == NULL) - return NULL; - - pthread_mutex_lock(&frct->instances_lock); - - instance->r_address = address; - instance->cep_id = next_cep_id(); - instance->state = CONN_PENDING; + pthread_mutex_lock(&frct.instances_lock); + instance = create_frct_i(address, INVALID_CEP_ID); + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + return INVALID_CEP_ID; + } + id = instance->cep_id; + instance->cube = cube; + pthread_mutex_unlock(&frct.instances_lock); + + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = address; + pci.src_addr = frct.address; + pci.dst_cep_id = 0; + pci.src_cep_id = id; + pci.seqno = 0; + pci.qos_id = cube; + + if (rmt_frct_write_buf(&pci, buf)) { + free(instance); + LOG_ERR("Failed to hand PDU to RMT."); + return INVALID_CEP_ID; + } - INIT_LIST_HEAD(&instance->next); - list_add(&instance->next, &frct->instances); + return id; +} - pthread_mutex_unlock(&frct->instances_lock); +int frct_i_accept(cep_id_t id, + buffer_t * buf, + enum qos_cube cube) +{ + struct pci pci; + struct frct_i * instance; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + if (buf == NULL || buf->data == NULL) + return -1; - return instance; -} + pthread_mutex_lock(&frct.instances_lock); -int frct_i_accept(struct frct_i * instance, - buffer_t * buf) -{ - if (instance == NULL || buf == NULL || buf->data == NULL) + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); return -1; + } - pthread_mutex_lock(&frct->instances_lock); if (instance->state != CONN_PENDING) { - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); return -1; } instance->state = CONN_ESTABLISHED; - instance->cep_id = next_cep_id(); + instance->cube = cube; + instance->seqno = 0; - pthread_mutex_unlock(&frct->instances_lock); + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = 0; + pci.qos_id = cube; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + pthread_mutex_unlock(&frct.instances_lock); + + if (rmt_frct_write_buf(&pci, buf)) + return -1; return 0; } -int frct_i_destroy(struct frct_i * instance, - buffer_t * buf) +int frct_i_destroy(cep_id_t id, + buffer_t * buf) { - if (instance == NULL) - return -1; + struct pci pci; + struct frct_i * instance; + + pthread_mutex_lock(&frct.instances_lock); - pthread_mutex_lock(&frct->instances_lock); + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); + return -1; + } if (!(instance->state == CONN_PENDING || instance->state == CONN_ESTABLISHED)) { - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); return -1; } + pci.pdu_type = PDU_TYPE_MGMT; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = 0; + pci.qos_id = instance->cube; + destroy_frct_i(instance); - pthread_mutex_unlock(&frct->instances_lock); + pthread_mutex_unlock(&frct.instances_lock); + + if (buf != NULL && buf->data != NULL) + if (rmt_frct_write_buf(&pci, buf)) + return -1; + + return 0; +} - if (buf != NULL && buf->data != NULL) { +int frct_i_write_sdu(cep_id_t id, + struct shm_du_buff * sdb) +{ + struct pci pci; + struct frct_i * instance; + + if (sdb == NULL) + return -1; - /* FIXME: Pack into FRCT header and hand SDU to RMT */ + pthread_mutex_lock(&frct.instances_lock); + + instance = frct.instances[id]; + if (instance == NULL) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Invalid instance."); + return -1; + } + + if (instance->state != CONN_ESTABLISHED) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Connection is not established."); + return -1; + } + + pci.pdu_type = PDU_TYPE_DTP; + pci.dst_addr = instance->r_address; + pci.src_addr = frct.address; + pci.dst_cep_id = instance->r_cep_id; + pci.src_cep_id = instance->cep_id; + pci.seqno = (instance->seqno)++; + pci.qos_id = instance->cube; + + if (rmt_frct_write_sdu(&pci, sdb)) { + pthread_mutex_unlock(&frct.instances_lock); + LOG_ERR("Failed to hand SDU to RMT."); + return -1; } return 0; diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 0ee87004..2b86f5bd 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -26,34 +26,29 @@ #include #include -#include "dt_const.h" +#include "shm_pci.h" struct frct_i; -int frct_init(struct dt_const * dtc, - uint32_t address); -int frct_fini(); +int frct_init(uint32_t address); +int frct_fini(); -struct dt_const * frct_dt_const(); +/* Called by RMT upon receipt of a PDU for us */ +int frct_rmt_post_sdu(struct pci * pci, + struct shm_du_buff * sdb); -int frct_dt_flow(int fd, - enum qos_cube qos); +cep_id_t frct_i_create(uint32_t address, + buffer_t * buf, + enum qos_cube cube); -/* - * FIXME: Will take the index in the DU map, - * possibly cep-ids and address - */ -int frct_rmt_post(); - -struct frct_i * frct_i_create(uint32_t address, - buffer_t * buf, - enum qos_cube cube); -/* FIXME: Hand QoS cube here too? We received it in the flow alloc message. */ -int frct_i_accept(struct frct_i * instance, - buffer_t * buf); -int frct_i_destroy(struct frct_i * instance, - buffer_t * buf); - -/* FIXME: Add read/write ops for frct instances */ +int frct_i_accept(cep_id_t id, + buffer_t * buf, + enum qos_cube cube); + +int frct_i_destroy(cep_id_t id, + buffer_t * buf); + +int frct_i_write_sdu(cep_id_t id, + struct shm_du_buff * sdb); #endif diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 99d156f5..dd17f9bd 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -39,6 +39,7 @@ #include "frct.h" #include "ipcp.h" #include "cdap_request.h" +#include "rmt.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -241,7 +242,7 @@ int ribmgr_cdap_write(struct cdap * instance, rib.address = msg->address; - if (frct_init(&rib.dtc, rib.address)) { + if (frct_init(rib.address)) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); @@ -250,6 +251,16 @@ int ribmgr_cdap_write(struct cdap * instance, return -1; } + if (rmt_init(rib.address)) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); + frct_fini(); + cdap_send_reply(instance, invoke_id, -1, NULL, 0); + static_info_msg__free_unpacked(msg, NULL); + LOG_ERR("Failed to init RMT"); + return -1; + } + static_info_msg__free_unpacked(msg, NULL); } else { ret = -1; @@ -529,12 +540,23 @@ int ribmgr_bootstrap(struct dif_config * conf) /* FIXME: Set correct address. */ rib.address = 0; - if (frct_init(&rib.dtc, rib.address)) { + if (frct_init(rib.address)) { LOG_ERR("Failed to initialize FRCT."); return -1; } + if (rmt_init(rib.address)) { + LOG_ERR("Failed to initialize RMT."); + frct_fini(); + return -1; + } + LOG_DBG("Bootstrapped RIB Manager."); return 0; } + +struct dt_const * ribmgr_dt_const() +{ + return &(rib.dtc); +} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index e85c65be..f776f7eb 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -25,12 +25,16 @@ #include -int ribmgr_init(); -int ribmgr_fini(); +#include "dt_const.h" -int ribmgr_add_flow(int fd); -int ribmgr_remove_flow(int fd); +int ribmgr_init(); +int ribmgr_fini(); -int ribmgr_bootstrap(struct dif_config * conf); +int ribmgr_add_flow(int fd); +int ribmgr_remove_flow(int fd); + +int ribmgr_bootstrap(struct dif_config * conf); + +struct dt_const * ribmgr_dt_const(); #endif diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c index ee92c3e3..fa4c7edd 100644 --- a/src/ipcpd/normal/rmt.c +++ b/src/ipcpd/normal/rmt.c @@ -24,29 +24,172 @@ #include #include +#include +#include +#include +#include + +#include #include "rmt.h" +#include "config.h" +#include "frct.h" + +struct { + pthread_t sdu_reader; + struct flow_set * set; + uint32_t address; -struct rmt { -}; + /* + * FIXME: Normally the PFF is held here, + * for now we keep 1 fd to forward a PDU on + */ + int fd; +} rmt; -int rmt_init(struct dt_const * dtc) +int rmt_init(uint32_t address) { - LOG_MISSING; + rmt.set = flow_set_create(); + if (rmt.set == NULL) + return -1; - return -1; + rmt.address = address; + + return 0; } int rmt_fini() { - LOG_MISSING; + flow_set_destroy(rmt.set); + + return 0; +} + +void * rmt_sdu_reader(void * o) +{ + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct shm_du_buff * sdb; + struct pci * pci; + + while (true) { + int fd = flow_select(rmt.set, &timeout); + if (fd == -ETIMEDOUT) + continue; + + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } + + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } + + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } + + if (pci->dst_addr != rmt.address) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (frct_rmt_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + } + + return (void *) 0; +} + +int rmt_dt_flow(int fd, + enum qos_cube qos) +{ + struct flow_set * set = rmt.set; + if (set == NULL) + return -1; + + flow_set_add(set, fd); - return -1; + /* FIXME: This will be removed once we have a PFF */ + rmt.fd = fd; + + return 0; } -int rmt_frct_post() +int rmt_frct_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) { - LOG_MISSING; + if (shm_pci_ser(sdb, pci)) { + LOG_ERR("Failed to serialize PDU."); + ipcp_flow_del(sdb); + return -1; + } + + if (ipcp_flow_write(rmt.fd, sdb)) { + LOG_ERR("Failed to write SDU to fd %d.", rmt.fd); + ipcp_flow_del(sdb); + return -1; + } + + return 0; +} + +int rmt_frct_write_buf(struct pci * pci, + buffer_t * buf) +{ + buffer_t * buffer; + + if (pci == NULL || buf == NULL || buf->data == NULL) + return -1; + + buffer = shm_pci_ser_buf(buf, pci); + if (buffer == NULL) { + LOG_ERR("Failed to serialize buffer."); + free(buf->data); + return -1; + } + + if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) { + LOG_ERR("Failed to write buffer to fd."); + free(buffer); + return -1; + } - return -1; + free(buffer); + return 0; } diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h index cdd86a0b..6ce7a7d7 100644 --- a/src/ipcpd/normal/rmt.h +++ b/src/ipcpd/normal/rmt.h @@ -23,12 +23,24 @@ #ifndef OUROBOROS_IPCP_RMT_H #define OUROBOROS_IPCP_RMT_H +#include +#include + #include "dt_const.h" +#include "shm_pci.h" -int rmt_init(struct dt_const * dtc); +int rmt_init(uint32_t address); int rmt_fini(); -/* FIXME: Will take index in DU map */ -int rmt_frct_post(); +int rmt_dt_flow(int fd, + enum qos_cube qos); + +/* Hand PDU to RMT, SDU from N+1 */ +int rmt_frct_write_sdu(struct pci * pci, + struct shm_du_buff * sdb); + +/* Hand PDU to RMT, SDU from N */ +int rmt_frct_write_buf(struct pci * pci, + buffer_t * buf); #endif diff --git a/src/ipcpd/normal/shm_pci.c b/src/ipcpd/normal/shm_pci.c index 94629790..3a16a2da 100644 --- a/src/ipcpd/normal/shm_pci.c +++ b/src/ipcpd/normal/shm_pci.c @@ -32,6 +32,7 @@ #include "shm_pci.h" #include "frct.h" #include "crc32.h" +#include "ribmgr.h" #define QOS_ID_SIZE 1 #define DEFAULT_TTL 60 @@ -57,23 +58,13 @@ static int shm_pci_tail_size(struct dt_const * dtc) return dtc->has_chk ? CHK_SIZE : 0; } -int shm_pci_ser(struct shm_du_buff * sdb, - struct pci * pci) +static void ser_pci_head(uint8_t * head, + struct pci * pci, + struct dt_const * dtc) { - uint8_t * head; - uint8_t * tail; int offset = 0; - struct dt_const * dtc; uint8_t ttl = DEFAULT_TTL; - dtc = frct_dt_const(); - if (dtc == NULL) - return -1; - - head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc)); - if (head == NULL) - return -1; - memcpy(head, &pci->dst_addr, dtc->addr_size); offset += dtc->addr_size; memcpy(head + offset, &pci->src_addr, dtc->addr_size); @@ -90,6 +81,24 @@ int shm_pci_ser(struct shm_du_buff * sdb, offset += QOS_ID_SIZE; if (dtc->has_ttl) memcpy(head + offset, &ttl, TTL_SIZE); +} + +int shm_pci_ser(struct shm_du_buff * sdb, + struct pci * pci) +{ + uint8_t * head; + uint8_t * tail; + struct dt_const * dtc; + + dtc = ribmgr_dt_const(); + if (dtc == NULL) + return -1; + + head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc)); + if (head == NULL) + return -1; + + ser_pci_head(head, pci, dtc); if (dtc->has_chk) { tail = shm_du_buff_tail_alloc(sdb, shm_pci_tail_size(dtc)); @@ -104,6 +113,48 @@ int shm_pci_ser(struct shm_du_buff * sdb, return 0; } +buffer_t * shm_pci_ser_buf(buffer_t * buf, + struct pci * pci) +{ + buffer_t * buffer; + struct dt_const * dtc; + + if (buf == NULL || pci == NULL) + return NULL; + + dtc = ribmgr_dt_const(); + if (dtc == NULL) + return NULL; + + buffer = malloc(sizeof(*buffer)); + if (buffer == NULL) + return NULL; + + buffer->len = buf->len + + shm_pci_head_size(dtc) + + shm_pci_tail_size(dtc); + + buffer->data = malloc(buffer->len); + if (buffer->data == NULL) { + free(buffer); + return NULL; + } + + ser_pci_head(buffer->data, pci, dtc); + memcpy(buffer->data + shm_pci_head_size(dtc), + buf->data, buf->len); + + free(buf->data); + + if (dtc->has_chk) + crc32((uint32_t *) buffer->data + + shm_pci_head_size(dtc) + buf->len, + buffer->data, + shm_pci_head_size(dtc) + buf->len); + + return buffer; +} + struct pci * shm_pci_des(struct shm_du_buff * sdb) { uint8_t * head; @@ -115,7 +166,7 @@ struct pci * shm_pci_des(struct shm_du_buff * sdb) if (head == NULL) return NULL; - dtc = frct_dt_const(); + dtc = ribmgr_dt_const(); if (dtc == NULL) return NULL; @@ -150,7 +201,7 @@ int shm_pci_shrink(struct shm_du_buff * sdb) if (sdb == NULL) return -1; - dtc = frct_dt_const(); + dtc = ribmgr_dt_const(); if (dtc == NULL) return -1; @@ -174,7 +225,7 @@ int shm_pci_dec_ttl(struct shm_du_buff * sdb) uint8_t * head; uint8_t * tail; - dtc = frct_dt_const(); + dtc = ribmgr_dt_const(); if (dtc == NULL) return -1; diff --git a/src/ipcpd/normal/shm_pci.h b/src/ipcpd/normal/shm_pci.h index aa9770b4..2836737c 100644 --- a/src/ipcpd/normal/shm_pci.h +++ b/src/ipcpd/normal/shm_pci.h @@ -25,22 +25,34 @@ #define OUROBOROS_IPCP_SHM_PCI_H #include +#include -#include +#include "dt_const.h" + +#define PDU_TYPE_MGMT 0x40 +#define PDU_TYPE_DTP 0x80 + +typedef uint32_t cep_id_t; +#define INVALID_CEP_ID 0 struct pci { + uint8_t pdu_type; uint64_t dst_addr; uint64_t src_addr; - uint32_t dst_cep_id; - uint32_t src_cep_id; + cep_id_t dst_cep_id; + cep_id_t src_cep_id; + uint8_t qos_id; uint32_t pdu_length; uint64_t seqno; - uint8_t qos_id; uint8_t ttl; + uint8_t flags; }; int shm_pci_ser(struct shm_du_buff * sdb, - struct pci * pci); + struct pci * pci); + +buffer_t * shm_pci_ser_buf(buffer_t * buf, + struct pci * pci); struct pci * shm_pci_des(struct shm_du_buff * sdb); -- cgit v1.2.3 From 43e2f332770007a3fcea011ffb35e8fbb24a6205 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 12 Oct 2016 14:54:18 +0200 Subject: ipcpd: normal: Improve upon the internal design This commit will remove the RMT component from the normal IPCP, as some of its functionality would else be duplicated in the FMGR. Now all reading from flows, either N-1 or N+1 is done in the FMGR, then either passed to the FRCT or a lookup is performed in the PFF (not there yet) and the PDU is forwarded. --- src/ipcpd/normal/CMakeLists.txt | 1 - src/ipcpd/normal/config.h | 28 -- src/ipcpd/normal/fmgr.c | 591 ++++++++++++++++++++++++---------------- src/ipcpd/normal/fmgr.h | 43 ++- src/ipcpd/normal/frct.c | 33 +-- src/ipcpd/normal/frct.h | 9 +- src/ipcpd/normal/main.c | 8 +- src/ipcpd/normal/ribmgr.c | 26 +- src/ipcpd/normal/ribmgr.h | 5 + src/ipcpd/normal/rmt.c | 195 ------------- src/ipcpd/normal/rmt.h | 46 ---- 11 files changed, 411 insertions(+), 574 deletions(-) delete mode 100644 src/ipcpd/normal/config.h delete mode 100644 src/ipcpd/normal/rmt.c delete mode 100644 src/ipcpd/normal/rmt.h diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 654bb127..151721a2 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -28,7 +28,6 @@ set(SOURCE_FILES frct.c main.c ribmgr.c - rmt.c shm_pci.c ) diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h deleted file mode 100644 index 0febf3fd..00000000 --- a/src/ipcpd/normal/config.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP configuration constants - * - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_CONFIG_H -#define OUROBOROS_IPCP_CONFIG_H - -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ - -#endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 25898661..8c627641 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -39,46 +39,46 @@ #include "ribmgr.h" #include "frct.h" #include "ipcp.h" -#include "rmt.h" #include "shm_pci.h" -#include "config.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; -struct n_flow { +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ + +struct np1_flow { int fd; cep_id_t cep_id; enum qos_cube qos; - - struct list_head next; }; -struct n_1_flow { - int fd; - char * ae_name; - struct list_head next; +struct nm1_flow { + int fd; + char * ae_name; + enum qos_cube qos; }; struct { - pthread_t n_1_flow_acceptor; - - /* FIXME: Make this a table */ - struct list_head n_1_flows; - pthread_mutex_t n_1_flows_lock; - - /* FIXME: Make this a table */ - struct list_head n_flows; - /* FIXME: Make this a read/write lock */ - pthread_mutex_t n_flows_lock; - - struct flow_set * set; - pthread_t n_reader; + pthread_t nm1_flow_acceptor; + struct nm1_flow ** nm1_flows; + pthread_rwlock_t nm1_flows_lock; + flow_set_t * nm1_set; + + struct np1_flow ** np1_flows; + struct np1_flow ** np1_flows_cep; + pthread_rwlock_t np1_flows_lock; + flow_set_t * np1_set; + pthread_t np1_sdu_reader; + + /* FIXME: Replace with PFF */ + int fd; } fmgr; -static int add_n_1_fd(int fd, char * ae_name) +static int add_nm1_fd(int fd, + char * ae_name, + enum qos_cube qos) { - struct n_1_flow * tmp; + struct nm1_flow * tmp; if (ae_name == NULL) return -1; @@ -89,45 +89,39 @@ static int add_n_1_fd(int fd, char * ae_name) tmp->fd = fd; tmp->ae_name = ae_name; + tmp->qos = qos; - INIT_LIST_HEAD(&tmp->next); + pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); + fmgr.nm1_flows[fd] = tmp; + pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - pthread_mutex_lock(&fmgr.n_1_flows_lock); - list_add(&tmp->next, &fmgr.n_1_flows); - pthread_mutex_unlock(&fmgr.n_1_flows_lock); + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) +static int add_np1_fd(int fd, + cep_id_t cep_id, + enum qos_cube qos) { - struct list_head * pos = NULL; + struct np1_flow * flow; - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->fd == fd) - return e; - } - - return NULL; -} + flow = malloc(sizeof(*flow)); + if (flow == NULL) + return -1; -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) -{ - struct list_head * pos = NULL; + flow->cep_id = cep_id; + flow->qos = qos; + flow->fd = fd; - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->cep_id == cep_id) - return e; - } + fmgr.np1_flows[fd] = flow; + fmgr.np1_flows_cep[fd] = flow; - return NULL; + return 0; } -static void * fmgr_n_1_acceptor(void * o) +static void * fmgr_nm1_acceptor(void * o) { int fd; char * ae_name; @@ -175,16 +169,8 @@ static void * fmgr_n_1_acceptor(void * o) } } - if (strcmp(ae_name, DT_AE) == 0) { - /* FIXME: Pass correct QoS cube */ - if (rmt_dt_flow(fd, 0)) { - LOG_ERR("Failed to hand fd to FRCT."); - flow_dealloc(fd); - continue; - } - } - - if (add_n_1_fd(fd, ae_name)) { + /* FIXME: Pass correct QoS cube */ + if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) { LOG_ERR("Failed to add file descriptor to list."); flow_dealloc(fd); continue; @@ -194,14 +180,14 @@ static void * fmgr_n_1_acceptor(void * o) return (void *) 0; } -static void * fmgr_n_reader(void * o) +static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct n_flow * flow; + struct np1_flow * flow; while (true) { - int fd = flow_select(fmgr.set, &timeout); + int fd = flow_select(fmgr.np1_set, &timeout); if (fd == -ETIMEDOUT) continue; @@ -215,172 +201,194 @@ static void * fmgr_n_reader(void * o) continue; } - pthread_mutex_lock(&fmgr.n_flows_lock); - flow = get_n_flow_by_fd(fd); + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + flow = fmgr.np1_flows[fd]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); LOG_ERR("Failed to retrieve flow."); continue; } if (frct_i_write_sdu(flow->cep_id, sdb)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); LOG_ERR("Failed to hand SDU to FRCT."); continue; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); } return (void *) 0; } -int fmgr_init() +void * fmgr_nm1_sdu_reader(void * o) { - INIT_LIST_HEAD(&fmgr.n_1_flows); - INIT_LIST_HEAD(&fmgr.n_flows); - - pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); - pthread_mutex_init(&fmgr.n_flows_lock, NULL); + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct shm_du_buff * sdb; + struct pci * pci; - fmgr.set = flow_set_create(); - if (fmgr.set == NULL) - return -1; + while (true) { + int fd = flow_select(fmgr.nm1_set, &timeout); + if (fd == -ETIMEDOUT) + continue; - pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); - pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL); + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } - return 0; -} + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } -int fmgr_fini() -{ - struct list_head * pos = NULL; + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - pthread_cancel(fmgr.n_1_flow_acceptor); - pthread_cancel(fmgr.n_reader); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); - pthread_join(fmgr.n_1_flow_acceptor, NULL); - pthread_join(fmgr.n_reader, NULL); + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } - list_for_each(pos, &fmgr.n_1_flows) { - struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); - if (e->ae_name != NULL) - free(e->ae_name); - if (ribmgr_remove_flow(e->fd)) - LOG_ERR("Failed to remove management flow."); - } + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ + ipcp_flow_del(sdb); + free(pci); + continue; + } - pthread_mutex_destroy(&fmgr.n_1_flows_lock); - pthread_mutex_destroy(&fmgr.n_flows_lock); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); + ipcp_flow_del(sdb); + free(pci); + continue; + } - flow_set_destroy(fmgr.set); + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + } - return 0; + return (void *) 0; } -int fmgr_mgmt_flow(char * dst_name) +int fmgr_init() { - int fd; - int result; - char * ae_name; + int i; - ae_name = strdup(MGMT_AE); - if (ae_name == NULL) + fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); + if (fmgr.nm1_flows == NULL) return -1; - /* FIXME: Request retransmission. */ - fd = flow_alloc(dst_name, MGMT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); - free(ae_name); + fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); + if (fmgr.np1_flows == NULL) { + free(fmgr.nm1_flows); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); - free(ae_name); + fmgr.np1_flows_cep = + malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); + if (fmgr.np1_flows_cep == NULL) { + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } - if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager"); - flow_dealloc(fd); - free(ae_name); + for (i = 0; i < IRMD_MAX_FLOWS; i++) { + fmgr.nm1_flows[i] = NULL; + fmgr.np1_flows[i] = NULL; + fmgr.np1_flows_cep[i] = NULL; + } + + pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); + pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); + + fmgr.np1_set = flow_set_create(); + if (fmgr.np1_set == NULL) { + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } - if (add_n_1_fd(fd, ae_name)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); + fmgr.nm1_set = flow_set_create(); + if (fmgr.nm1_set == NULL) { + flow_set_destroy(fmgr.np1_set); + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } + pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); + pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); + return 0; } -int fmgr_dt_flow(char * dst_name, enum qos_cube qos) +int fmgr_fini() { - int fd; - int result; - char * ae_name; + int i; - ae_name = strdup(DT_AE); - if (ae_name == NULL) - return -1; + pthread_cancel(fmgr.nm1_flow_acceptor); + pthread_cancel(fmgr.np1_sdu_reader); - /* FIXME: Map qos cube on correct QoS. */ - fd = flow_alloc(dst_name, DT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); - free(ae_name); - return -1; - } + pthread_join(fmgr.nm1_flow_acceptor, NULL); + pthread_join(fmgr.np1_sdu_reader, NULL); - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); - free(ae_name); - return -1; + for (i = 0; i < IRMD_MAX_FLOWS; i++) { + if (fmgr.nm1_flows[i] == NULL) + continue; + if (fmgr.nm1_flows[i]->ae_name != NULL) + free(fmgr.nm1_flows[i]->ae_name); + if (ribmgr_remove_flow(fmgr.nm1_flows[i]->fd)) + LOG_ERR("Failed to remove management flow."); } - if (rmt_dt_flow(fd, qos)) { - LOG_ERR("Failed to hand file descriptor to FRCT"); - flow_dealloc(fd); - free(ae_name); - return -1; - } + pthread_rwlock_destroy(&fmgr.nm1_flows_lock); + pthread_rwlock_destroy(&fmgr.np1_flows_lock); - if (add_n_1_fd(fd, ae_name)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); - free(ae_name); - return -1; - } + flow_set_destroy(fmgr.nm1_set); + flow_set_destroy(fmgr.np1_set); + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return 0; } -int fmgr_flow_alloc(int fd, +int fmgr_np1_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) { - struct n_flow * flow; cep_id_t cep_id; uint32_t address = 0; buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -1; - /* FIXME: Obtain correct address here from DIF NSM */ msg.code = FLOW_ALLOC_CODE__FLOW_REQ; @@ -390,55 +398,47 @@ int fmgr_flow_alloc(int fd, msg.has_qos_cube = true; buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) { - free(flow); + if (buf.len == 0) return -1; - } buf.data = malloc(buf.len); - if (buf.data == NULL) { - free(flow); + if (buf.data == NULL) return -1; - } flow_alloc_msg__pack(&msg, buf.data); - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); cep_id = frct_i_create(address, &buf, qos); if (cep_id == INVALID_CEP_ID) { free(buf.data); - free(flow); - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } free(buf.data); - flow->fd = fd; - flow->cep_id = cep_id; - flow->qos = qos; - - INIT_LIST_HEAD(&flow->next); - - list_add(&flow->next, &fmgr.n_flows); + if (add_np1_fd(fd, cep_id, qos)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + return -1; + } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return 0; } -/* Call under n_flows lock */ -static int n_flow_dealloc(int fd) +/* Call under np1_flows lock */ +static int np1_flow_dealloc(int fd) { - struct n_flow * flow; + struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; - flow_set_del(fmgr.set, fd); + flow_set_del(fmgr.np1_set, fd); - flow = get_n_flow_by_fd(fd); + flow = fmgr.np1_flows[fd]; if (flow == NULL) return -1; @@ -455,7 +455,9 @@ static int n_flow_dealloc(int fd) flow_alloc_msg__pack(&msg, buf.data); ret = frct_i_destroy(flow->cep_id, &buf); - list_del(&flow->next); + + fmgr.np1_flows[fd] = NULL; + fmgr.np1_flows_cep[flow->cep_id] = NULL; free(flow); free(buf.data); @@ -463,17 +465,17 @@ static int n_flow_dealloc(int fd) return ret; } -int fmgr_flow_alloc_resp(int fd, int response) +int fmgr_np1_alloc_resp(int fd, int response) { - struct n_flow * flow; + struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - flow = get_n_flow_by_fd(fd); + flow = fmgr.np1_flows[fd]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } @@ -483,13 +485,13 @@ int fmgr_flow_alloc_resp(int fd, int response) buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } buf.data = malloc(buf.len); if (buf.data == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } @@ -498,84 +500,76 @@ int fmgr_flow_alloc_resp(int fd, int response) if (response < 0) { frct_i_destroy(flow->cep_id, &buf); free(buf.data); - list_del(&flow->next); + fmgr.np1_flows[fd] = NULL; + fmgr.np1_flows_cep[flow->cep_id] = NULL; free(flow); } else { if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } - flow_set_add(fmgr.set, fd); + flow_set_add(fmgr.np1_set, fd); } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return 0; } -int fmgr_flow_dealloc(int fd) +int fmgr_np1_dealloc(int fd) { int ret; - pthread_mutex_lock(&fmgr.n_flows_lock); - ret = n_flow_dealloc(fd); - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + ret = np1_flow_dealloc(fd); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return ret; } -int fmgr_frct_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, + buffer_t * buf) { - struct n_flow * flow; + struct np1_flow * flow; int ret = 0; int fd; flow_alloc_msg_t * msg; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: - flow = malloc(sizeof(*flow)); - if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - flow->cep_id = cep_id; - flow->qos = msg->qos_cube; - fd = ipcp_flow_req_arr(getpid(), msg->dst_name, msg->src_ae_name); if (fd < 0) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - free(flow); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to get fd for flow."); return -1; } - flow->fd = fd; - - INIT_LIST_HEAD(&flow->next); + if (add_np1_fd(fd, cep_id, msg->qos_cube)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + LOG_ERR("Failed to add np1 flow."); + return -1; + } - list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; @@ -583,23 +577,24 @@ int fmgr_frct_post_buf(cep_id_t cep_id, ret = ipcp_flow_alloc_reply(flow->fd, msg->response); if (msg->response < 0) { - list_del(&flow->next); + fmgr.np1_flows[flow->fd] = NULL; + fmgr.np1_flows_cep[cep_id] = NULL; free(flow); } else { - flow_set_add(fmgr.set, flow->fd); + flow_set_add(fmgr.np1_set, flow->fd); } break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - flow_set_del(fmgr.set, flow->fd); + flow_set_del(fmgr.np1_set, flow->fd); ret = flow_dealloc(flow->fd); break; @@ -609,34 +604,160 @@ int fmgr_frct_post_buf(cep_id_t cep_id, break; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); return ret; } -int fmgr_frct_post_sdu(cep_id_t cep_id, - struct shm_du_buff * sdb) +int fmgr_np1_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) { - struct n_flow * flow; + struct np1_flow * flow; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to find N flow."); return -1; } if (ipcp_flow_write(flow->fd, sdb)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to hand SDU to N flow."); return -1; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + + return 0; +} + +int fmgr_nm1_mgmt_flow(char * dst_name) +{ + int fd; + int result; + char * ae_name; + + ae_name = strdup(MGMT_AE); + if (ae_name == NULL) + return -1; + + /* FIXME: Request retransmission. */ + fd = flow_alloc(dst_name, MGMT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s", dst_name); + free(ae_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d", + dst_name, result); + free(ae_name); + return -1; + } + + if (ribmgr_add_flow(fd)) { + LOG_ERR("Failed to hand file descriptor to RIB manager"); + flow_dealloc(fd); + free(ae_name); + return -1; + } + + /* FIXME: Pass correct QoS cube */ + if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) { + LOG_ERR("Failed to add file descriptor to list."); + flow_dealloc(fd); + return -1; + } + + return 0; +} + +int fmgr_nm1_dt_flow(char * dst_name, + enum qos_cube qos) +{ + int fd; + int result; + char * ae_name; + + ae_name = strdup(DT_AE); + if (ae_name == NULL) + return -1; + + /* FIXME: Map qos cube on correct QoS. */ + fd = flow_alloc(dst_name, DT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s", dst_name); + free(ae_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d", + dst_name, result); + free(ae_name); + return -1; + } + + if (add_nm1_fd(fd, ae_name, qos)) { + LOG_ERR("Failed to add file descriptor to list."); + flow_dealloc(fd); + free(ae_name); + return -1; + } + + return 0; +} + +int fmgr_nm1_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) +{ + if (pci == NULL || sdb == NULL) + return -1; + + if (shm_pci_ser(sdb, pci)) { + LOG_ERR("Failed to serialize PDU."); + ipcp_flow_del(sdb); + return -1; + } + + if (ipcp_flow_write(fmgr.fd, sdb)) { + LOG_ERR("Failed to write SDU to fd %d.", fmgr.fd); + ipcp_flow_del(sdb); + return -1; + } + + return 0; +} + +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf) +{ + buffer_t * buffer; + + if (pci == NULL || buf == NULL || buf->data == NULL) + return -1; + + buffer = shm_pci_ser_buf(buf, pci); + if (buffer == NULL) { + LOG_ERR("Failed to serialize buffer."); + free(buf->data); + return -1; + } + + if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) { + LOG_ERR("Failed to write buffer to fd."); + free(buffer); + return -1; + } + free(buffer); return 0; } diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 0f2cd045..f97cf858 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -37,29 +37,26 @@ int fmgr_init(); int fmgr_fini(); -/* N-flow ops */ -int fmgr_mgmt_flow(char * dst_name); - -int fmgr_dt_flow(char * dst_name, - enum qos_cube qos); - -/* N+1-flow ops, local */ -int fmgr_flow_alloc(int fd, - char * dst_ap_name, - char * src_ae_name, - enum qos_cube qos); - -int fmgr_flow_alloc_resp(int fd, - int response); - -int fmgr_flow_dealloc(int fd); - -/* N+1-flow ops, remote */ -int fmgr_frct_post_buf(cep_id_t id, - buffer_t * buf); - -/* SDU for N+1-flow */ -int fmgr_frct_post_sdu(cep_id_t id, +int fmgr_np1_alloc(int fd, + char * dst_ap_name, + char * src_ae_name, + enum qos_cube qos); +int fmgr_np1_alloc_resp(int fd, + int response); +int fmgr_np1_dealloc(int fd); + +int fmgr_np1_post_buf(cep_id_t id, + buffer_t * buf); +int fmgr_np1_post_sdu(cep_id_t id, + struct shm_du_buff * sdb); + +int fmgr_nm1_mgmt_flow(char * dst_name); +int fmgr_nm1_dt_flow(char * dst_name, + enum qos_cube qos); + +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb); +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf); #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index abbde779..9daf8755 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -32,8 +32,8 @@ #include #include "frct.h" -#include "rmt.h" #include "fmgr.h" +#include "ribmgr.h" enum conn_state { CONN_PENDING = 0, @@ -51,8 +51,6 @@ struct frct_i { }; struct { - uint32_t address; - pthread_mutex_t instances_lock; struct frct_i ** instances; @@ -82,10 +80,9 @@ static int release_cep_id(int id) return ret; } -int frct_init(uint32_t address) +int frct_init() { int i; - frct.address = address; if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; @@ -142,7 +139,7 @@ static struct frct_i * create_frct_i(uint32_t address, return instance; } -int frct_rmt_post_sdu(struct pci * pci, +int frct_nm1_post_sdu(struct pci * pci, struct shm_du_buff * sdb) { struct frct_i * instance; @@ -167,14 +164,14 @@ int frct_rmt_post_sdu(struct pci * pci, buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); buf.data = shm_du_buff_head(sdb); - if (fmgr_frct_post_buf(id, &buf)) { + if (fmgr_np1_post_buf(id, &buf)) { LOG_ERR("Failed to hand buffer to FMGR."); free(pci); return -1; } } else { /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ - if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { + if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) { LOG_ERR("Failed to hand SDU to FMGR."); free(pci); return -1; @@ -217,15 +214,15 @@ cep_id_t frct_i_create(uint32_t address, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = 0; pci.src_cep_id = id; pci.seqno = 0; pci.qos_id = cube; - if (rmt_frct_write_buf(&pci, buf)) { + if (fmgr_nm1_write_buf(&pci, buf)) { free(instance); - LOG_ERR("Failed to hand PDU to RMT."); + LOG_ERR("Failed to hand PDU to FMGR."); return INVALID_CEP_ID; } @@ -262,7 +259,7 @@ int frct_i_accept(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -270,7 +267,7 @@ int frct_i_accept(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); - if (rmt_frct_write_buf(&pci, buf)) + if (fmgr_nm1_write_buf(&pci, buf)) return -1; return 0; @@ -299,7 +296,7 @@ int frct_i_destroy(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -309,7 +306,7 @@ int frct_i_destroy(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); if (buf != NULL && buf->data != NULL) - if (rmt_frct_write_buf(&pci, buf)) + if (fmgr_nm1_write_buf(&pci, buf)) return -1; return 0; @@ -341,15 +338,15 @@ int frct_i_write_sdu(cep_id_t id, pci.pdu_type = PDU_TYPE_DTP; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = (instance->seqno)++; pci.qos_id = instance->cube; - if (rmt_frct_write_sdu(&pci, sdb)) { + if (fmgr_nm1_write_sdu(&pci, sdb)) { pthread_mutex_unlock(&frct.instances_lock); - LOG_ERR("Failed to hand SDU to RMT."); + LOG_ERR("Failed to hand SDU to FMGR."); return -1; } diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 2b86f5bd..b9e70d0f 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -30,13 +30,9 @@ struct frct_i; -int frct_init(uint32_t address); +int frct_init(); int frct_fini(); -/* Called by RMT upon receipt of a PDU for us */ -int frct_rmt_post_sdu(struct pci * pci, - struct shm_du_buff * sdb); - cep_id_t frct_i_create(uint32_t address, buffer_t * buf, enum qos_cube cube); @@ -51,4 +47,7 @@ int frct_i_destroy(cep_id_t id, int frct_i_write_sdu(cep_id_t id, struct shm_du_buff * sdb); +int frct_nm1_post_sdu(struct pci * pci, + struct shm_du_buff * sdb); + #endif diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 4611408d..0339eaf4 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -109,7 +109,7 @@ static int normal_ipcp_enroll(char * dif_name) pthread_rwlock_unlock(&ipcpi.state_lock); - if (fmgr_mgmt_flow(dif_name)) { + if (fmgr_nm1_mgmt_flow(dif_name)) { LOG_ERR("Failed to establish management flow."); return -1; } @@ -163,9 +163,9 @@ static struct ipcp_ops normal_ops = { .ipcp_enroll = normal_ipcp_enroll, .ipcp_name_reg = normal_ipcp_name_reg, .ipcp_name_unreg = normal_ipcp_name_unreg, - .ipcp_flow_alloc = fmgr_flow_alloc, - .ipcp_flow_alloc_resp = fmgr_flow_alloc_resp, - .ipcp_flow_dealloc = fmgr_flow_dealloc + .ipcp_flow_alloc = fmgr_np1_alloc, + .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, + .ipcp_flow_dealloc = fmgr_np1_dealloc }; int main(int argc, char * argv[]) diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index dd17f9bd..c69a59ce 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -39,7 +39,6 @@ #include "frct.h" #include "ipcp.h" #include "cdap_request.h" -#include "rmt.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -242,7 +241,7 @@ int ribmgr_cdap_write(struct cdap * instance, rib.address = msg->address; - if (frct_init(rib.address)) { + if (frct_init()) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); @@ -251,16 +250,6 @@ int ribmgr_cdap_write(struct cdap * instance, return -1; } - if (rmt_init(rib.address)) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - frct_fini(); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - static_info_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to init RMT"); - return -1; - } - static_info_msg__free_unpacked(msg, NULL); } else { ret = -1; @@ -540,17 +529,11 @@ int ribmgr_bootstrap(struct dif_config * conf) /* FIXME: Set correct address. */ rib.address = 0; - if (frct_init(rib.address)) { + if (frct_init()) { LOG_ERR("Failed to initialize FRCT."); return -1; } - if (rmt_init(rib.address)) { - LOG_ERR("Failed to initialize RMT."); - frct_fini(); - return -1; - } - LOG_DBG("Bootstrapped RIB Manager."); return 0; @@ -560,3 +543,8 @@ struct dt_const * ribmgr_dt_const() { return &(rib.dtc); } + +uint32_t ribmgr_address() +{ + return rib.address; +} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index f776f7eb..ed8bae03 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -35,6 +35,11 @@ int ribmgr_remove_flow(int fd); int ribmgr_bootstrap(struct dif_config * conf); +/* + * FIXME: Should we expose the RIB? + * Else we may end up with a lot of getters and setters + */ struct dt_const * ribmgr_dt_const(); +uint32_t ribmgr_address(); #endif diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c deleted file mode 100644 index fa4c7edd..00000000 --- a/src/ipcpd/normal/rmt.c +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The Relaying and Multiplexing task - * - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include -#include -#include -#include -#include -#include - -#include - -#include "rmt.h" -#include "config.h" -#include "frct.h" - -struct { - pthread_t sdu_reader; - struct flow_set * set; - uint32_t address; - - /* - * FIXME: Normally the PFF is held here, - * for now we keep 1 fd to forward a PDU on - */ - int fd; -} rmt; - -int rmt_init(uint32_t address) -{ - rmt.set = flow_set_create(); - if (rmt.set == NULL) - return -1; - - rmt.address = address; - - return 0; -} - -int rmt_fini() -{ - flow_set_destroy(rmt.set); - - return 0; -} - -void * rmt_sdu_reader(void * o) -{ - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct shm_du_buff * sdb; - struct pci * pci; - - while (true) { - int fd = flow_select(rmt.set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); - continue; - } - - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } - - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } - - if (pci->dst_addr != rmt.address) { - LOG_DBG("PDU needs to be forwarded."); - - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (frct_rmt_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - } - - return (void *) 0; -} - -int rmt_dt_flow(int fd, - enum qos_cube qos) -{ - struct flow_set * set = rmt.set; - if (set == NULL) - return -1; - - flow_set_add(set, fd); - - /* FIXME: This will be removed once we have a PFF */ - rmt.fd = fd; - - return 0; -} - -int rmt_frct_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) -{ - if (shm_pci_ser(sdb, pci)) { - LOG_ERR("Failed to serialize PDU."); - ipcp_flow_del(sdb); - return -1; - } - - if (ipcp_flow_write(rmt.fd, sdb)) { - LOG_ERR("Failed to write SDU to fd %d.", rmt.fd); - ipcp_flow_del(sdb); - return -1; - } - - return 0; -} - -int rmt_frct_write_buf(struct pci * pci, - buffer_t * buf) -{ - buffer_t * buffer; - - if (pci == NULL || buf == NULL || buf->data == NULL) - return -1; - - buffer = shm_pci_ser_buf(buf, pci); - if (buffer == NULL) { - LOG_ERR("Failed to serialize buffer."); - free(buf->data); - return -1; - } - - if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) { - LOG_ERR("Failed to write buffer to fd."); - free(buffer); - return -1; - } - - free(buffer); - return 0; -} diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h deleted file mode 100644 index 6ce7a7d7..00000000 --- a/src/ipcpd/normal/rmt.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The Relaying and Multiplexing task - * - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_RMT_H -#define OUROBOROS_IPCP_RMT_H - -#include -#include - -#include "dt_const.h" -#include "shm_pci.h" - -int rmt_init(uint32_t address); -int rmt_fini(); - -int rmt_dt_flow(int fd, - enum qos_cube qos); - -/* Hand PDU to RMT, SDU from N+1 */ -int rmt_frct_write_sdu(struct pci * pci, - struct shm_du_buff * sdb); - -/* Hand PDU to RMT, SDU from N */ -int rmt_frct_write_buf(struct pci * pci, - buffer_t * buf); - -#endif -- cgit v1.2.3