From 43e2f332770007a3fcea011ffb35e8fbb24a6205 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 12 Oct 2016 14:54:18 +0200 Subject: ipcpd: normal: Improve upon the internal design This commit will remove the RMT component from the normal IPCP, as some of its functionality would else be duplicated in the FMGR. Now all reading from flows, either N-1 or N+1 is done in the FMGR, then either passed to the FRCT or a lookup is performed in the PFF (not there yet) and the PDU is forwarded. --- src/ipcpd/normal/CMakeLists.txt | 1 - src/ipcpd/normal/config.h | 28 -- src/ipcpd/normal/fmgr.c | 591 ++++++++++++++++++++++++---------------- src/ipcpd/normal/fmgr.h | 43 ++- src/ipcpd/normal/frct.c | 33 +-- src/ipcpd/normal/frct.h | 9 +- src/ipcpd/normal/main.c | 8 +- src/ipcpd/normal/ribmgr.c | 26 +- src/ipcpd/normal/ribmgr.h | 5 + src/ipcpd/normal/rmt.c | 195 ------------- src/ipcpd/normal/rmt.h | 46 ---- 11 files changed, 411 insertions(+), 574 deletions(-) delete mode 100644 src/ipcpd/normal/config.h delete mode 100644 src/ipcpd/normal/rmt.c delete mode 100644 src/ipcpd/normal/rmt.h (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 654bb127..151721a2 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -28,7 +28,6 @@ set(SOURCE_FILES frct.c main.c ribmgr.c - rmt.c shm_pci.c ) diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h deleted file mode 100644 index 0febf3fd..00000000 --- a/src/ipcpd/normal/config.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP configuration constants - * - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_CONFIG_H -#define OUROBOROS_IPCP_CONFIG_H - -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ - -#endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 25898661..8c627641 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -39,46 +39,46 @@ #include "ribmgr.h" #include "frct.h" #include "ipcp.h" -#include "rmt.h" #include "shm_pci.h" -#include "config.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; -struct n_flow { +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ + +struct np1_flow { int fd; cep_id_t cep_id; enum qos_cube qos; - - struct list_head next; }; -struct n_1_flow { - int fd; - char * ae_name; - struct list_head next; +struct nm1_flow { + int fd; + char * ae_name; + enum qos_cube qos; }; struct { - pthread_t n_1_flow_acceptor; - - /* FIXME: Make this a table */ - struct list_head n_1_flows; - pthread_mutex_t n_1_flows_lock; - - /* FIXME: Make this a table */ - struct list_head n_flows; - /* FIXME: Make this a read/write lock */ - pthread_mutex_t n_flows_lock; - - struct flow_set * set; - pthread_t n_reader; + pthread_t nm1_flow_acceptor; + struct nm1_flow ** nm1_flows; + pthread_rwlock_t nm1_flows_lock; + flow_set_t * nm1_set; + + struct np1_flow ** np1_flows; + struct np1_flow ** np1_flows_cep; + pthread_rwlock_t np1_flows_lock; + flow_set_t * np1_set; + pthread_t np1_sdu_reader; + + /* FIXME: Replace with PFF */ + int fd; } fmgr; -static int add_n_1_fd(int fd, char * ae_name) +static int add_nm1_fd(int fd, + char * ae_name, + enum qos_cube qos) { - struct n_1_flow * tmp; + struct nm1_flow * tmp; if (ae_name == NULL) return -1; @@ -89,45 +89,39 @@ static int add_n_1_fd(int fd, char * ae_name) tmp->fd = fd; tmp->ae_name = ae_name; + tmp->qos = qos; - INIT_LIST_HEAD(&tmp->next); + pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); + fmgr.nm1_flows[fd] = tmp; + pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - pthread_mutex_lock(&fmgr.n_1_flows_lock); - list_add(&tmp->next, &fmgr.n_1_flows); - pthread_mutex_unlock(&fmgr.n_1_flows_lock); + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) +static int add_np1_fd(int fd, + cep_id_t cep_id, + enum qos_cube qos) { - struct list_head * pos = NULL; + struct np1_flow * flow; - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->fd == fd) - return e; - } - - return NULL; -} + flow = malloc(sizeof(*flow)); + if (flow == NULL) + return -1; -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) -{ - struct list_head * pos = NULL; + flow->cep_id = cep_id; + flow->qos = qos; + flow->fd = fd; - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->cep_id == cep_id) - return e; - } + fmgr.np1_flows[fd] = flow; + fmgr.np1_flows_cep[fd] = flow; - return NULL; + return 0; } -static void * fmgr_n_1_acceptor(void * o) +static void * fmgr_nm1_acceptor(void * o) { int fd; char * ae_name; @@ -175,16 +169,8 @@ static void * fmgr_n_1_acceptor(void * o) } } - if (strcmp(ae_name, DT_AE) == 0) { - /* FIXME: Pass correct QoS cube */ - if (rmt_dt_flow(fd, 0)) { - LOG_ERR("Failed to hand fd to FRCT."); - flow_dealloc(fd); - continue; - } - } - - if (add_n_1_fd(fd, ae_name)) { + /* FIXME: Pass correct QoS cube */ + if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) { LOG_ERR("Failed to add file descriptor to list."); flow_dealloc(fd); continue; @@ -194,14 +180,14 @@ static void * fmgr_n_1_acceptor(void * o) return (void *) 0; } -static void * fmgr_n_reader(void * o) +static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct n_flow * flow; + struct np1_flow * flow; while (true) { - int fd = flow_select(fmgr.set, &timeout); + int fd = flow_select(fmgr.np1_set, &timeout); if (fd == -ETIMEDOUT) continue; @@ -215,172 +201,194 @@ static void * fmgr_n_reader(void * o) continue; } - pthread_mutex_lock(&fmgr.n_flows_lock); - flow = get_n_flow_by_fd(fd); + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + flow = fmgr.np1_flows[fd]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); LOG_ERR("Failed to retrieve flow."); continue; } if (frct_i_write_sdu(flow->cep_id, sdb)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); LOG_ERR("Failed to hand SDU to FRCT."); continue; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); } return (void *) 0; } -int fmgr_init() +void * fmgr_nm1_sdu_reader(void * o) { - INIT_LIST_HEAD(&fmgr.n_1_flows); - INIT_LIST_HEAD(&fmgr.n_flows); - - pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); - pthread_mutex_init(&fmgr.n_flows_lock, NULL); + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct shm_du_buff * sdb; + struct pci * pci; - fmgr.set = flow_set_create(); - if (fmgr.set == NULL) - return -1; + while (true) { + int fd = flow_select(fmgr.nm1_set, &timeout); + if (fd == -ETIMEDOUT) + continue; - pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); - pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL); + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } - return 0; -} + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } -int fmgr_fini() -{ - struct list_head * pos = NULL; + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - pthread_cancel(fmgr.n_1_flow_acceptor); - pthread_cancel(fmgr.n_reader); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); - pthread_join(fmgr.n_1_flow_acceptor, NULL); - pthread_join(fmgr.n_reader, NULL); + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } - list_for_each(pos, &fmgr.n_1_flows) { - struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); - if (e->ae_name != NULL) - free(e->ae_name); - if (ribmgr_remove_flow(e->fd)) - LOG_ERR("Failed to remove management flow."); - } + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ + ipcp_flow_del(sdb); + free(pci); + continue; + } - pthread_mutex_destroy(&fmgr.n_1_flows_lock); - pthread_mutex_destroy(&fmgr.n_flows_lock); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); + ipcp_flow_del(sdb); + free(pci); + continue; + } - flow_set_destroy(fmgr.set); + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + } - return 0; + return (void *) 0; } -int fmgr_mgmt_flow(char * dst_name) +int fmgr_init() { - int fd; - int result; - char * ae_name; + int i; - ae_name = strdup(MGMT_AE); - if (ae_name == NULL) + fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); + if (fmgr.nm1_flows == NULL) return -1; - /* FIXME: Request retransmission. */ - fd = flow_alloc(dst_name, MGMT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); - free(ae_name); + fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); + if (fmgr.np1_flows == NULL) { + free(fmgr.nm1_flows); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); - free(ae_name); + fmgr.np1_flows_cep = + malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); + if (fmgr.np1_flows_cep == NULL) { + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } - if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager"); - flow_dealloc(fd); - free(ae_name); + for (i = 0; i < IRMD_MAX_FLOWS; i++) { + fmgr.nm1_flows[i] = NULL; + fmgr.np1_flows[i] = NULL; + fmgr.np1_flows_cep[i] = NULL; + } + + pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); + pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); + + fmgr.np1_set = flow_set_create(); + if (fmgr.np1_set == NULL) { + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } - if (add_n_1_fd(fd, ae_name)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); + fmgr.nm1_set = flow_set_create(); + if (fmgr.nm1_set == NULL) { + flow_set_destroy(fmgr.np1_set); + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } + pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); + pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); + return 0; } -int fmgr_dt_flow(char * dst_name, enum qos_cube qos) +int fmgr_fini() { - int fd; - int result; - char * ae_name; + int i; - ae_name = strdup(DT_AE); - if (ae_name == NULL) - return -1; + pthread_cancel(fmgr.nm1_flow_acceptor); + pthread_cancel(fmgr.np1_sdu_reader); - /* FIXME: Map qos cube on correct QoS. */ - fd = flow_alloc(dst_name, DT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); - free(ae_name); - return -1; - } + pthread_join(fmgr.nm1_flow_acceptor, NULL); + pthread_join(fmgr.np1_sdu_reader, NULL); - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); - free(ae_name); - return -1; + for (i = 0; i < IRMD_MAX_FLOWS; i++) { + if (fmgr.nm1_flows[i] == NULL) + continue; + if (fmgr.nm1_flows[i]->ae_name != NULL) + free(fmgr.nm1_flows[i]->ae_name); + if (ribmgr_remove_flow(fmgr.nm1_flows[i]->fd)) + LOG_ERR("Failed to remove management flow."); } - if (rmt_dt_flow(fd, qos)) { - LOG_ERR("Failed to hand file descriptor to FRCT"); - flow_dealloc(fd); - free(ae_name); - return -1; - } + pthread_rwlock_destroy(&fmgr.nm1_flows_lock); + pthread_rwlock_destroy(&fmgr.np1_flows_lock); - if (add_n_1_fd(fd, ae_name)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); - free(ae_name); - return -1; - } + flow_set_destroy(fmgr.nm1_set); + flow_set_destroy(fmgr.np1_set); + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return 0; } -int fmgr_flow_alloc(int fd, +int fmgr_np1_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) { - struct n_flow * flow; cep_id_t cep_id; uint32_t address = 0; buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -1; - /* FIXME: Obtain correct address here from DIF NSM */ msg.code = FLOW_ALLOC_CODE__FLOW_REQ; @@ -390,55 +398,47 @@ int fmgr_flow_alloc(int fd, msg.has_qos_cube = true; buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) { - free(flow); + if (buf.len == 0) return -1; - } buf.data = malloc(buf.len); - if (buf.data == NULL) { - free(flow); + if (buf.data == NULL) return -1; - } flow_alloc_msg__pack(&msg, buf.data); - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); cep_id = frct_i_create(address, &buf, qos); if (cep_id == INVALID_CEP_ID) { free(buf.data); - free(flow); - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } free(buf.data); - flow->fd = fd; - flow->cep_id = cep_id; - flow->qos = qos; - - INIT_LIST_HEAD(&flow->next); - - list_add(&flow->next, &fmgr.n_flows); + if (add_np1_fd(fd, cep_id, qos)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + return -1; + } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return 0; } -/* Call under n_flows lock */ -static int n_flow_dealloc(int fd) +/* Call under np1_flows lock */ +static int np1_flow_dealloc(int fd) { - struct n_flow * flow; + struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; - flow_set_del(fmgr.set, fd); + flow_set_del(fmgr.np1_set, fd); - flow = get_n_flow_by_fd(fd); + flow = fmgr.np1_flows[fd]; if (flow == NULL) return -1; @@ -455,7 +455,9 @@ static int n_flow_dealloc(int fd) flow_alloc_msg__pack(&msg, buf.data); ret = frct_i_destroy(flow->cep_id, &buf); - list_del(&flow->next); + + fmgr.np1_flows[fd] = NULL; + fmgr.np1_flows_cep[flow->cep_id] = NULL; free(flow); free(buf.data); @@ -463,17 +465,17 @@ static int n_flow_dealloc(int fd) return ret; } -int fmgr_flow_alloc_resp(int fd, int response) +int fmgr_np1_alloc_resp(int fd, int response) { - struct n_flow * flow; + struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - flow = get_n_flow_by_fd(fd); + flow = fmgr.np1_flows[fd]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } @@ -483,13 +485,13 @@ int fmgr_flow_alloc_resp(int fd, int response) buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } buf.data = malloc(buf.len); if (buf.data == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } @@ -498,84 +500,76 @@ int fmgr_flow_alloc_resp(int fd, int response) if (response < 0) { frct_i_destroy(flow->cep_id, &buf); free(buf.data); - list_del(&flow->next); + fmgr.np1_flows[fd] = NULL; + fmgr.np1_flows_cep[flow->cep_id] = NULL; free(flow); } else { if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } - flow_set_add(fmgr.set, fd); + flow_set_add(fmgr.np1_set, fd); } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return 0; } -int fmgr_flow_dealloc(int fd) +int fmgr_np1_dealloc(int fd) { int ret; - pthread_mutex_lock(&fmgr.n_flows_lock); - ret = n_flow_dealloc(fd); - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + ret = np1_flow_dealloc(fd); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return ret; } -int fmgr_frct_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, + buffer_t * buf) { - struct n_flow * flow; + struct np1_flow * flow; int ret = 0; int fd; flow_alloc_msg_t * msg; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: - flow = malloc(sizeof(*flow)); - if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - flow->cep_id = cep_id; - flow->qos = msg->qos_cube; - fd = ipcp_flow_req_arr(getpid(), msg->dst_name, msg->src_ae_name); if (fd < 0) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - free(flow); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to get fd for flow."); return -1; } - flow->fd = fd; - - INIT_LIST_HEAD(&flow->next); + if (add_np1_fd(fd, cep_id, msg->qos_cube)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + LOG_ERR("Failed to add np1 flow."); + return -1; + } - list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; @@ -583,23 +577,24 @@ int fmgr_frct_post_buf(cep_id_t cep_id, ret = ipcp_flow_alloc_reply(flow->fd, msg->response); if (msg->response < 0) { - list_del(&flow->next); + fmgr.np1_flows[flow->fd] = NULL; + fmgr.np1_flows_cep[cep_id] = NULL; free(flow); } else { - flow_set_add(fmgr.set, flow->fd); + flow_set_add(fmgr.np1_set, flow->fd); } break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - flow_set_del(fmgr.set, flow->fd); + flow_set_del(fmgr.np1_set, flow->fd); ret = flow_dealloc(flow->fd); break; @@ -609,34 +604,160 @@ int fmgr_frct_post_buf(cep_id_t cep_id, break; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); return ret; } -int fmgr_frct_post_sdu(cep_id_t cep_id, - struct shm_du_buff * sdb) +int fmgr_np1_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) { - struct n_flow * flow; + struct np1_flow * flow; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to find N flow."); return -1; } if (ipcp_flow_write(flow->fd, sdb)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to hand SDU to N flow."); return -1; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + + return 0; +} + +int fmgr_nm1_mgmt_flow(char * dst_name) +{ + int fd; + int result; + char * ae_name; + + ae_name = strdup(MGMT_AE); + if (ae_name == NULL) + return -1; + + /* FIXME: Request retransmission. */ + fd = flow_alloc(dst_name, MGMT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s", dst_name); + free(ae_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d", + dst_name, result); + free(ae_name); + return -1; + } + + if (ribmgr_add_flow(fd)) { + LOG_ERR("Failed to hand file descriptor to RIB manager"); + flow_dealloc(fd); + free(ae_name); + return -1; + } + + /* FIXME: Pass correct QoS cube */ + if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) { + LOG_ERR("Failed to add file descriptor to list."); + flow_dealloc(fd); + return -1; + } + + return 0; +} + +int fmgr_nm1_dt_flow(char * dst_name, + enum qos_cube qos) +{ + int fd; + int result; + char * ae_name; + + ae_name = strdup(DT_AE); + if (ae_name == NULL) + return -1; + + /* FIXME: Map qos cube on correct QoS. */ + fd = flow_alloc(dst_name, DT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s", dst_name); + free(ae_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d", + dst_name, result); + free(ae_name); + return -1; + } + + if (add_nm1_fd(fd, ae_name, qos)) { + LOG_ERR("Failed to add file descriptor to list."); + flow_dealloc(fd); + free(ae_name); + return -1; + } + + return 0; +} + +int fmgr_nm1_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) +{ + if (pci == NULL || sdb == NULL) + return -1; + + if (shm_pci_ser(sdb, pci)) { + LOG_ERR("Failed to serialize PDU."); + ipcp_flow_del(sdb); + return -1; + } + + if (ipcp_flow_write(fmgr.fd, sdb)) { + LOG_ERR("Failed to write SDU to fd %d.", fmgr.fd); + ipcp_flow_del(sdb); + return -1; + } + + return 0; +} + +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf) +{ + buffer_t * buffer; + + if (pci == NULL || buf == NULL || buf->data == NULL) + return -1; + + buffer = shm_pci_ser_buf(buf, pci); + if (buffer == NULL) { + LOG_ERR("Failed to serialize buffer."); + free(buf->data); + return -1; + } + + if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) { + LOG_ERR("Failed to write buffer to fd."); + free(buffer); + return -1; + } + free(buffer); return 0; } diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 0f2cd045..f97cf858 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -37,29 +37,26 @@ int fmgr_init(); int fmgr_fini(); -/* N-flow ops */ -int fmgr_mgmt_flow(char * dst_name); - -int fmgr_dt_flow(char * dst_name, - enum qos_cube qos); - -/* N+1-flow ops, local */ -int fmgr_flow_alloc(int fd, - char * dst_ap_name, - char * src_ae_name, - enum qos_cube qos); - -int fmgr_flow_alloc_resp(int fd, - int response); - -int fmgr_flow_dealloc(int fd); - -/* N+1-flow ops, remote */ -int fmgr_frct_post_buf(cep_id_t id, - buffer_t * buf); - -/* SDU for N+1-flow */ -int fmgr_frct_post_sdu(cep_id_t id, +int fmgr_np1_alloc(int fd, + char * dst_ap_name, + char * src_ae_name, + enum qos_cube qos); +int fmgr_np1_alloc_resp(int fd, + int response); +int fmgr_np1_dealloc(int fd); + +int fmgr_np1_post_buf(cep_id_t id, + buffer_t * buf); +int fmgr_np1_post_sdu(cep_id_t id, + struct shm_du_buff * sdb); + +int fmgr_nm1_mgmt_flow(char * dst_name); +int fmgr_nm1_dt_flow(char * dst_name, + enum qos_cube qos); + +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb); +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf); #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index abbde779..9daf8755 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -32,8 +32,8 @@ #include #include "frct.h" -#include "rmt.h" #include "fmgr.h" +#include "ribmgr.h" enum conn_state { CONN_PENDING = 0, @@ -51,8 +51,6 @@ struct frct_i { }; struct { - uint32_t address; - pthread_mutex_t instances_lock; struct frct_i ** instances; @@ -82,10 +80,9 @@ static int release_cep_id(int id) return ret; } -int frct_init(uint32_t address) +int frct_init() { int i; - frct.address = address; if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; @@ -142,7 +139,7 @@ static struct frct_i * create_frct_i(uint32_t address, return instance; } -int frct_rmt_post_sdu(struct pci * pci, +int frct_nm1_post_sdu(struct pci * pci, struct shm_du_buff * sdb) { struct frct_i * instance; @@ -167,14 +164,14 @@ int frct_rmt_post_sdu(struct pci * pci, buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); buf.data = shm_du_buff_head(sdb); - if (fmgr_frct_post_buf(id, &buf)) { + if (fmgr_np1_post_buf(id, &buf)) { LOG_ERR("Failed to hand buffer to FMGR."); free(pci); return -1; } } else { /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ - if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { + if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) { LOG_ERR("Failed to hand SDU to FMGR."); free(pci); return -1; @@ -217,15 +214,15 @@ cep_id_t frct_i_create(uint32_t address, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = 0; pci.src_cep_id = id; pci.seqno = 0; pci.qos_id = cube; - if (rmt_frct_write_buf(&pci, buf)) { + if (fmgr_nm1_write_buf(&pci, buf)) { free(instance); - LOG_ERR("Failed to hand PDU to RMT."); + LOG_ERR("Failed to hand PDU to FMGR."); return INVALID_CEP_ID; } @@ -262,7 +259,7 @@ int frct_i_accept(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -270,7 +267,7 @@ int frct_i_accept(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); - if (rmt_frct_write_buf(&pci, buf)) + if (fmgr_nm1_write_buf(&pci, buf)) return -1; return 0; @@ -299,7 +296,7 @@ int frct_i_destroy(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -309,7 +306,7 @@ int frct_i_destroy(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); if (buf != NULL && buf->data != NULL) - if (rmt_frct_write_buf(&pci, buf)) + if (fmgr_nm1_write_buf(&pci, buf)) return -1; return 0; @@ -341,15 +338,15 @@ int frct_i_write_sdu(cep_id_t id, pci.pdu_type = PDU_TYPE_DTP; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = (instance->seqno)++; pci.qos_id = instance->cube; - if (rmt_frct_write_sdu(&pci, sdb)) { + if (fmgr_nm1_write_sdu(&pci, sdb)) { pthread_mutex_unlock(&frct.instances_lock); - LOG_ERR("Failed to hand SDU to RMT."); + LOG_ERR("Failed to hand SDU to FMGR."); return -1; } diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 2b86f5bd..b9e70d0f 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -30,13 +30,9 @@ struct frct_i; -int frct_init(uint32_t address); +int frct_init(); int frct_fini(); -/* Called by RMT upon receipt of a PDU for us */ -int frct_rmt_post_sdu(struct pci * pci, - struct shm_du_buff * sdb); - cep_id_t frct_i_create(uint32_t address, buffer_t * buf, enum qos_cube cube); @@ -51,4 +47,7 @@ int frct_i_destroy(cep_id_t id, int frct_i_write_sdu(cep_id_t id, struct shm_du_buff * sdb); +int frct_nm1_post_sdu(struct pci * pci, + struct shm_du_buff * sdb); + #endif diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 4611408d..0339eaf4 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -109,7 +109,7 @@ static int normal_ipcp_enroll(char * dif_name) pthread_rwlock_unlock(&ipcpi.state_lock); - if (fmgr_mgmt_flow(dif_name)) { + if (fmgr_nm1_mgmt_flow(dif_name)) { LOG_ERR("Failed to establish management flow."); return -1; } @@ -163,9 +163,9 @@ static struct ipcp_ops normal_ops = { .ipcp_enroll = normal_ipcp_enroll, .ipcp_name_reg = normal_ipcp_name_reg, .ipcp_name_unreg = normal_ipcp_name_unreg, - .ipcp_flow_alloc = fmgr_flow_alloc, - .ipcp_flow_alloc_resp = fmgr_flow_alloc_resp, - .ipcp_flow_dealloc = fmgr_flow_dealloc + .ipcp_flow_alloc = fmgr_np1_alloc, + .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, + .ipcp_flow_dealloc = fmgr_np1_dealloc }; int main(int argc, char * argv[]) diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index dd17f9bd..c69a59ce 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -39,7 +39,6 @@ #include "frct.h" #include "ipcp.h" #include "cdap_request.h" -#include "rmt.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -242,7 +241,7 @@ int ribmgr_cdap_write(struct cdap * instance, rib.address = msg->address; - if (frct_init(rib.address)) { + if (frct_init()) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); @@ -251,16 +250,6 @@ int ribmgr_cdap_write(struct cdap * instance, return -1; } - if (rmt_init(rib.address)) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - frct_fini(); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - static_info_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to init RMT"); - return -1; - } - static_info_msg__free_unpacked(msg, NULL); } else { ret = -1; @@ -540,17 +529,11 @@ int ribmgr_bootstrap(struct dif_config * conf) /* FIXME: Set correct address. */ rib.address = 0; - if (frct_init(rib.address)) { + if (frct_init()) { LOG_ERR("Failed to initialize FRCT."); return -1; } - if (rmt_init(rib.address)) { - LOG_ERR("Failed to initialize RMT."); - frct_fini(); - return -1; - } - LOG_DBG("Bootstrapped RIB Manager."); return 0; @@ -560,3 +543,8 @@ struct dt_const * ribmgr_dt_const() { return &(rib.dtc); } + +uint32_t ribmgr_address() +{ + return rib.address; +} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index f776f7eb..ed8bae03 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -35,6 +35,11 @@ int ribmgr_remove_flow(int fd); int ribmgr_bootstrap(struct dif_config * conf); +/* + * FIXME: Should we expose the RIB? + * Else we may end up with a lot of getters and setters + */ struct dt_const * ribmgr_dt_const(); +uint32_t ribmgr_address(); #endif diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c deleted file mode 100644 index fa4c7edd..00000000 --- a/src/ipcpd/normal/rmt.c +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The Relaying and Multiplexing task - * - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include -#include -#include -#include -#include -#include - -#include - -#include "rmt.h" -#include "config.h" -#include "frct.h" - -struct { - pthread_t sdu_reader; - struct flow_set * set; - uint32_t address; - - /* - * FIXME: Normally the PFF is held here, - * for now we keep 1 fd to forward a PDU on - */ - int fd; -} rmt; - -int rmt_init(uint32_t address) -{ - rmt.set = flow_set_create(); - if (rmt.set == NULL) - return -1; - - rmt.address = address; - - return 0; -} - -int rmt_fini() -{ - flow_set_destroy(rmt.set); - - return 0; -} - -void * rmt_sdu_reader(void * o) -{ - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct shm_du_buff * sdb; - struct pci * pci; - - while (true) { - int fd = flow_select(rmt.set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); - continue; - } - - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } - - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } - - if (pci->dst_addr != rmt.address) { - LOG_DBG("PDU needs to be forwarded."); - - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (frct_rmt_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - } - - return (void *) 0; -} - -int rmt_dt_flow(int fd, - enum qos_cube qos) -{ - struct flow_set * set = rmt.set; - if (set == NULL) - return -1; - - flow_set_add(set, fd); - - /* FIXME: This will be removed once we have a PFF */ - rmt.fd = fd; - - return 0; -} - -int rmt_frct_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) -{ - if (shm_pci_ser(sdb, pci)) { - LOG_ERR("Failed to serialize PDU."); - ipcp_flow_del(sdb); - return -1; - } - - if (ipcp_flow_write(rmt.fd, sdb)) { - LOG_ERR("Failed to write SDU to fd %d.", rmt.fd); - ipcp_flow_del(sdb); - return -1; - } - - return 0; -} - -int rmt_frct_write_buf(struct pci * pci, - buffer_t * buf) -{ - buffer_t * buffer; - - if (pci == NULL || buf == NULL || buf->data == NULL) - return -1; - - buffer = shm_pci_ser_buf(buf, pci); - if (buffer == NULL) { - LOG_ERR("Failed to serialize buffer."); - free(buf->data); - return -1; - } - - if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) { - LOG_ERR("Failed to write buffer to fd."); - free(buffer); - return -1; - } - - free(buffer); - return 0; -} diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h deleted file mode 100644 index 6ce7a7d7..00000000 --- a/src/ipcpd/normal/rmt.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The Relaying and Multiplexing task - * - * Sander Vrijders - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_RMT_H -#define OUROBOROS_IPCP_RMT_H - -#include -#include - -#include "dt_const.h" -#include "shm_pci.h" - -int rmt_init(uint32_t address); -int rmt_fini(); - -int rmt_dt_flow(int fd, - enum qos_cube qos); - -/* Hand PDU to RMT, SDU from N+1 */ -int rmt_frct_write_sdu(struct pci * pci, - struct shm_du_buff * sdb); - -/* Hand PDU to RMT, SDU from N */ -int rmt_frct_write_buf(struct pci * pci, - buffer_t * buf); - -#endif -- cgit v1.2.3