summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-08 19:42:51 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-10-11 16:15:33 +0200
commit69ef99bb2dc05337e8189acc42dc9122f4182ead (patch)
tree2808e55f80991cb9b59266c15726c982f3a4ca50
parentdbf3ab8dfb2cbe8167c464e3cf6a9aa757bfff6a (diff)
downloadouroboros-69ef99bb2dc05337e8189acc42dc9122f4182ead.tar.gz
ouroboros-69ef99bb2dc05337e8189acc42dc9122f4182ead.zip
ipcpd: normal: First version of the fast path bootstrap
This is the first version of the fast path bootstrap in the normal IPCP. It sets up a connection with the other end, and creates the appropriate data structures. N+1 and N-1 SDUs are read and written and passed through the right components.
-rw-r--r--src/ipcpd/normal/config.h28
-rw-r--r--src/ipcpd/normal/fmgr.c203
-rw-r--r--src/ipcpd/normal/fmgr.h8
-rw-r--r--src/ipcpd/normal/frct.c300
-rw-r--r--src/ipcpd/normal/frct.h41
-rw-r--r--src/ipcpd/normal/ribmgr.c26
-rw-r--r--src/ipcpd/normal/ribmgr.h14
-rw-r--r--src/ipcpd/normal/rmt.c163
-rw-r--r--src/ipcpd/normal/rmt.h18
-rw-r--r--src/ipcpd/normal/shm_pci.c83
-rw-r--r--src/ipcpd/normal/shm_pci.h22
11 files changed, 697 insertions, 209 deletions
diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h
new file mode 100644
index 00000000..0febf3fd
--- /dev/null
+++ b/src/ipcpd/normal/config.h
@@ -0,0 +1,28 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Normal IPCP configuration constants
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCP_CONFIG_H
+#define OUROBOROS_IPCP_CONFIG_H
+
+#define FD_UPDATE_TIMEOUT 100 /* microseconds */
+
+#endif
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index b6ec1984..25898661 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -27,6 +27,8 @@
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/select.h>
+#include <ouroboros/errno.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -37,33 +39,41 @@
#include "ribmgr.h"
#include "frct.h"
#include "ipcp.h"
+#include "rmt.h"
+#include "shm_pci.h"
+#include "config.h"
#include "flow_alloc.pb-c.h"
typedef FlowAllocMsg flow_alloc_msg_t;
struct n_flow {
- int fd;
- struct frct_i * frct_i;
+ int fd;
+ cep_id_t cep_id;
enum qos_cube qos;
struct list_head next;
};
struct n_1_flow {
- int fd;
- char * ae_name;
+ int fd;
+ char * ae_name;
struct list_head next;
};
struct {
- pthread_t listen_thread;
+ pthread_t n_1_flow_acceptor;
+ /* FIXME: Make this a table */
struct list_head n_1_flows;
pthread_mutex_t n_1_flows_lock;
+ /* FIXME: Make this a table */
struct list_head n_flows;
/* FIXME: Make this a read/write lock */
pthread_mutex_t n_flows_lock;
+
+ struct flow_set * set;
+ pthread_t n_reader;
} fmgr;
static int add_n_1_fd(int fd, char * ae_name)
@@ -89,9 +99,37 @@ static int add_n_1_fd(int fd, char * ae_name)
return 0;
}
-static void * fmgr_listen(void * o)
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_fd(int fd)
{
- int fd;
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
+ if (e->fd == fd)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
+ if (e->cep_id == cep_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+static void * fmgr_n_1_acceptor(void * o)
+{
+ int fd;
char * ae_name;
while (true) {
@@ -139,7 +177,7 @@ static void * fmgr_listen(void * o)
if (strcmp(ae_name, DT_AE) == 0) {
/* FIXME: Pass correct QoS cube */
- if (frct_dt_flow(fd, 0)) {
+ if (rmt_dt_flow(fd, 0)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
continue;
@@ -156,6 +194,49 @@ static void * fmgr_listen(void * o)
return (void *) 0;
}
+static void * fmgr_n_reader(void * o)
+{
+ struct shm_du_buff * sdb;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ struct n_flow * flow;
+
+ while (true) {
+ int fd = flow_select(fmgr.set, &timeout);
+ if (fd == -ETIMEDOUT)
+ continue;
+
+ if (fd < 0) {
+ LOG_ERR("Failed to get active fd.");
+ continue;
+ }
+
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
+
+ pthread_mutex_lock(&fmgr.n_flows_lock);
+ flow = get_n_flow_by_fd(fd);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to retrieve flow.");
+ continue;
+ }
+
+ if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to hand SDU to FRCT.");
+ continue;
+ }
+
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ }
+
+ return (void *) 0;
+}
+
int fmgr_init()
{
INIT_LIST_HEAD(&fmgr.n_1_flows);
@@ -164,7 +245,12 @@ int fmgr_init()
pthread_mutex_init(&fmgr.n_1_flows_lock, NULL);
pthread_mutex_init(&fmgr.n_flows_lock, NULL);
- pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL);
+ fmgr.set = flow_set_create();
+ if (fmgr.set == NULL)
+ return -1;
+
+ pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL);
+ pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL);
return 0;
}
@@ -173,9 +259,11 @@ int fmgr_fini()
{
struct list_head * pos = NULL;
- pthread_cancel(fmgr.listen_thread);
+ pthread_cancel(fmgr.n_1_flow_acceptor);
+ pthread_cancel(fmgr.n_reader);
- pthread_join(fmgr.listen_thread, NULL);
+ pthread_join(fmgr.n_1_flow_acceptor, NULL);
+ pthread_join(fmgr.n_reader, NULL);
list_for_each(pos, &fmgr.n_1_flows) {
struct n_1_flow * e = list_entry(pos, struct n_1_flow, next);
@@ -188,6 +276,8 @@ int fmgr_fini()
pthread_mutex_destroy(&fmgr.n_1_flows_lock);
pthread_mutex_destroy(&fmgr.n_flows_lock);
+ flow_set_destroy(fmgr.set);
+
return 0;
}
@@ -259,7 +349,7 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
return -1;
}
- if (frct_dt_flow(fd, qos)) {
+ if (rmt_dt_flow(fd, qos)) {
LOG_ERR("Failed to hand file descriptor to FRCT");
flow_dealloc(fd);
free(ae_name);
@@ -276,41 +366,13 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
return 0;
}
-/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_fd(int fd)
-{
- struct list_head * pos = NULL;
-
- list_for_each(pos, &fmgr.n_flows) {
- struct n_flow * e = list_entry(pos, struct n_flow, next);
- if (e->fd == fd)
- return e;
- }
-
- return NULL;
-}
-
-/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
-{
- struct list_head * pos = NULL;
-
- list_for_each(pos, &fmgr.n_flows) {
- struct n_flow * e = list_entry(pos, struct n_flow, next);
- if (e->frct_i == frct_i)
- return e;
- }
-
- return NULL;
-}
-
int fmgr_flow_alloc(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos)
{
struct n_flow * flow;
- struct frct_i * frct_i;
+ cep_id_t cep_id;
uint32_t address = 0;
buffer_t buf;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
@@ -343,8 +405,8 @@ int fmgr_flow_alloc(int fd,
pthread_mutex_lock(&fmgr.n_flows_lock);
- frct_i = frct_i_create(address, &buf, qos);
- if (frct_i == NULL) {
+ cep_id = frct_i_create(address, &buf, qos);
+ if (cep_id == INVALID_CEP_ID) {
free(buf.data);
free(flow);
pthread_mutex_unlock(&fmgr.n_flows_lock);
@@ -354,7 +416,7 @@ int fmgr_flow_alloc(int fd,
free(buf.data);
flow->fd = fd;
- flow->frct_i = frct_i;
+ flow->cep_id = cep_id;
flow->qos = qos;
INIT_LIST_HEAD(&flow->next);
@@ -374,6 +436,8 @@ static int n_flow_dealloc(int fd)
buffer_t buf;
int ret;
+ flow_set_del(fmgr.set, fd);
+
flow = get_n_flow_by_fd(fd);
if (flow == NULL)
return -1;
@@ -390,7 +454,7 @@ static int n_flow_dealloc(int fd)
flow_alloc_msg__pack(&msg, buf.data);
- ret = frct_i_destroy(flow->frct_i, &buf);
+ ret = frct_i_destroy(flow->cep_id, &buf);
list_del(&flow->next);
free(flow);
@@ -432,13 +496,16 @@ int fmgr_flow_alloc_resp(int fd, int response)
flow_alloc_msg__pack(&msg, buf.data);
if (response < 0) {
- frct_i_destroy(flow->frct_i, &buf);
+ frct_i_destroy(flow->cep_id, &buf);
free(buf.data);
list_del(&flow->next);
free(flow);
- } else if (frct_i_accept(flow->frct_i, &buf)) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
- return -1;
+ } else {
+ if (frct_i_accept(flow->cep_id, &buf, flow->qos)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ return -1;
+ }
+ flow_set_add(fmgr.set, fd);
}
pthread_mutex_unlock(&fmgr.n_flows_lock);
@@ -457,7 +524,8 @@ int fmgr_flow_dealloc(int fd)
return ret;
}
-int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
+int fmgr_frct_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
{
struct n_flow * flow;
int ret = 0;
@@ -484,7 +552,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
return -1;
}
- flow->frct_i = frct_i;
+ flow->cep_id = cep_id;
flow->qos = msg->qos_cube;
fd = ipcp_flow_req_arr(getpid(),
@@ -505,7 +573,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
list_add(&flow->next, &fmgr.n_flows);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
- flow = get_n_flow_by_frct_i(frct_i);
+ flow = get_n_flow_by_cep_id(cep_id);
if (flow == NULL) {
pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
@@ -517,11 +585,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
if (msg->response < 0) {
list_del(&flow->next);
free(flow);
+ } else {
+ flow_set_add(fmgr.set, flow->fd);
}
break;
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- flow = get_n_flow_by_frct_i(frct_i);
+ flow = get_n_flow_by_cep_id(cep_id);
if (flow == NULL) {
pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
@@ -529,6 +599,8 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
return -1;
}
+ flow_set_del(fmgr.set, flow->fd);
+
ret = flow_dealloc(flow->fd);
break;
default:
@@ -543,3 +615,28 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
return ret;
}
+
+int fmgr_frct_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
+{
+ struct n_flow * flow;
+
+ pthread_mutex_lock(&fmgr.n_flows_lock);
+
+ flow = get_n_flow_by_cep_id(cep_id);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ LOG_ERR("Failed to find N flow.");
+ return -1;
+ }
+
+ if (ipcp_flow_write(flow->fd, sdb)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ LOG_ERR("Failed to hand SDU to N flow.");
+ return -1;
+ }
+
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index 7e3ef5f4..0f2cd045 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -35,7 +35,6 @@
#define DT_AE "Data transfer"
int fmgr_init();
-
int fmgr_fini();
/* N-flow ops */
@@ -56,8 +55,11 @@ int fmgr_flow_alloc_resp(int fd,
int fmgr_flow_dealloc(int fd);
/* N+1-flow ops, remote */
-int fmgr_flow_alloc_msg(struct frct_i * frct_i,
- buffer_t * buf);
+int fmgr_frct_post_buf(cep_id_t id,
+ buffer_t * buf);
+/* SDU for N+1-flow */
+int fmgr_frct_post_sdu(cep_id_t id,
+ struct shm_du_buff * sdb);
#endif
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index 417815b7..abbde779 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -22,8 +22,6 @@
#define OUROBOROS_PREFIX "flow-rtx-control"
-#define IDS_SIZE 2048
-
#include <ouroboros/config.h>
#include <ouroboros/logs.h>
#include <ouroboros/bitmap.h>
@@ -34,7 +32,8 @@
#include <pthread.h>
#include "frct.h"
-
+#include "rmt.h"
+#include "fmgr.h"
enum conn_state {
CONN_PENDING = 0,
@@ -45,29 +44,29 @@ struct frct_i {
uint32_t cep_id;
uint32_t r_address;
uint32_t r_cep_id;
+ enum qos_cube cube;
+ uint64_t seqno;
enum conn_state state;
- struct list_head next;
};
-struct frct {
- struct dt_const * dtc;
+struct {
uint32_t address;
- struct list_head instances;
pthread_mutex_t instances_lock;
+ struct frct_i ** instances;
struct bmp * cep_ids;
pthread_mutex_t cep_ids_lock;
-} * frct = NULL;
+} frct;
static int next_cep_id()
{
int ret;
- pthread_mutex_lock(&frct->cep_ids_lock);
- ret = bmp_allocate(frct->cep_ids);
- pthread_mutex_unlock(&frct->cep_ids_lock);
+ pthread_mutex_lock(&frct.cep_ids_lock);
+ ret = bmp_allocate(frct.cep_ids);
+ pthread_mutex_unlock(&frct.cep_ids_lock);
return ret;
}
@@ -76,40 +75,34 @@ static int release_cep_id(int id)
{
int ret;
- pthread_mutex_lock(&frct->cep_ids_lock);
- ret = bmp_release(frct->cep_ids, id);
- pthread_mutex_unlock(&frct->cep_ids_lock);
+ pthread_mutex_lock(&frct.cep_ids_lock);
+ ret = bmp_release(frct.cep_ids, id);
+ pthread_mutex_unlock(&frct.cep_ids_lock);
return ret;
}
-int frct_init(struct dt_const * dtc, uint32_t address)
+int frct_init(uint32_t address)
{
- if (dtc == NULL)
- return -1;
+ int i;
+ frct.address = address;
- frct = malloc(sizeof(*frct));
- if (frct == NULL)
+ if (pthread_mutex_init(&frct.cep_ids_lock, NULL))
return -1;
- frct->dtc = dtc;
- frct->address = address;
-
- INIT_LIST_HEAD(&frct->instances);
-
- if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) {
- free(frct);
+ if (pthread_mutex_init(&frct.instances_lock, NULL))
return -1;
- }
- if (pthread_mutex_init(&frct->instances_lock, NULL)) {
- free(frct);
+ frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS);
+ if (frct.instances == NULL)
return -1;
- }
- frct->cep_ids = bmp_create(IDS_SIZE, 0);
- if (frct->cep_ids == NULL) {
- free(frct);
+ for (i = 0; i < IRMD_MAX_FLOWS; i++)
+ frct.instances[i] = NULL;
+
+ frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1));
+ if (frct.cep_ids == NULL) {
+ free(frct.instances);
return -1;
}
@@ -118,117 +111,246 @@ int frct_init(struct dt_const * dtc, uint32_t address)
int frct_fini()
{
- pthread_mutex_lock(&frct->cep_ids_lock);
- bmp_destroy(frct->cep_ids);
- pthread_mutex_unlock(&frct->cep_ids_lock);
- free(frct);
+ pthread_mutex_lock(&frct.cep_ids_lock);
+ bmp_destroy(frct.cep_ids);
+ pthread_mutex_unlock(&frct.cep_ids_lock);
+
+ free(frct.instances);
return 0;
}
-struct dt_const * frct_dt_const()
+static struct frct_i * create_frct_i(uint32_t address,
+ cep_id_t r_cep_id)
{
- if (frct == NULL)
+ struct frct_i * instance;
+ cep_id_t id;
+
+ instance = malloc(sizeof(*instance));
+ if (instance == NULL)
return NULL;
- return frct->dtc;
-}
+ id = next_cep_id();
+ instance->r_address = address;
+ instance->cep_id = id;
+ instance->r_cep_id = r_cep_id;
+ instance->state = CONN_PENDING;
+ instance->seqno = 0;
-int frct_dt_flow(int fd,
- enum qos_cube qos)
-{
- LOG_MISSING;
+ frct.instances[id] = instance;
- return -1;
+ return instance;
}
-int frct_rmt_post()
+int frct_rmt_post_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
{
- LOG_MISSING;
+ struct frct_i * instance;
+ buffer_t buf;
+ cep_id_t id;
- return -1;
+ if (pci == NULL || sdb == NULL)
+ return -1;
+
+ if (pci->dst_cep_id == INVALID_CEP_ID) {
+ pthread_mutex_lock(&frct.instances_lock);
+ instance = create_frct_i(pci->src_addr,
+ pci->src_cep_id);
+ if (instance == NULL) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ return -1;
+ }
+ id = instance->cep_id;
+ instance->r_cep_id = pci->src_cep_id;
+ pthread_mutex_unlock(&frct.instances_lock);
+
+ buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ buf.data = shm_du_buff_head(sdb);
+
+ if (fmgr_frct_post_buf(id, &buf)) {
+ LOG_ERR("Failed to hand buffer to FMGR.");
+ free(pci);
+ return -1;
+ }
+ } else {
+ /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */
+ if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) {
+ LOG_ERR("Failed to hand SDU to FMGR.");
+ free(pci);
+ return -1;
+ }
+ }
+
+ free(pci);
+
+ return 0;
}
/* Call under instances lock */
static void destroy_frct_i(struct frct_i * instance)
{
release_cep_id(instance->cep_id);
- list_del(&instance->next);
+ frct.instances[instance->cep_id] = NULL;
free(instance);
}
-struct frct_i * frct_i_create(uint32_t address,
- buffer_t * buf,
- enum qos_cube cube)
+cep_id_t frct_i_create(uint32_t address,
+ buffer_t * buf,
+ enum qos_cube cube)
{
struct frct_i * instance;
+ struct pci pci;
+ cep_id_t id;
- if (buf == NULL ||
- buf->data == NULL)
- return NULL;
+ if (buf == NULL || buf->data == NULL)
+ return INVALID_CEP_ID;
- instance = malloc(sizeof(*instance));
- if (instance == NULL)
- return NULL;
-
- pthread_mutex_lock(&frct->instances_lock);
-
- instance->r_address = address;
- instance->cep_id = next_cep_id();
- instance->state = CONN_PENDING;
+ pthread_mutex_lock(&frct.instances_lock);
+ instance = create_frct_i(address, INVALID_CEP_ID);
+ if (instance == NULL) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ return INVALID_CEP_ID;
+ }
+ id = instance->cep_id;
+ instance->cube = cube;
+ pthread_mutex_unlock(&frct.instances_lock);
+
+ pci.pdu_type = PDU_TYPE_MGMT;
+ pci.dst_addr = address;
+ pci.src_addr = frct.address;
+ pci.dst_cep_id = 0;
+ pci.src_cep_id = id;
+ pci.seqno = 0;
+ pci.qos_id = cube;
+
+ if (rmt_frct_write_buf(&pci, buf)) {
+ free(instance);
+ LOG_ERR("Failed to hand PDU to RMT.");
+ return INVALID_CEP_ID;
+ }
- INIT_LIST_HEAD(&instance->next);
- list_add(&instance->next, &frct->instances);
+ return id;
+}
- pthread_mutex_unlock(&frct->instances_lock);
+int frct_i_accept(cep_id_t id,
+ buffer_t * buf,
+ enum qos_cube cube)
+{
+ struct pci pci;
+ struct frct_i * instance;
- /* FIXME: Pack into FRCT header and hand SDU to RMT */
+ if (buf == NULL || buf->data == NULL)
+ return -1;
- return instance;
-}
+ pthread_mutex_lock(&frct.instances_lock);
-int frct_i_accept(struct frct_i * instance,
- buffer_t * buf)
-{
- if (instance == NULL || buf == NULL || buf->data == NULL)
+ instance = frct.instances[id];
+ if (instance == NULL) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ LOG_ERR("Invalid instance.");
return -1;
+ }
- pthread_mutex_lock(&frct->instances_lock);
if (instance->state != CONN_PENDING) {
- pthread_mutex_unlock(&frct->instances_lock);
+ pthread_mutex_unlock(&frct.instances_lock);
return -1;
}
instance->state = CONN_ESTABLISHED;
- instance->cep_id = next_cep_id();
+ instance->cube = cube;
+ instance->seqno = 0;
- pthread_mutex_unlock(&frct->instances_lock);
+ pci.pdu_type = PDU_TYPE_MGMT;
+ pci.dst_addr = instance->r_address;
+ pci.src_addr = frct.address;
+ pci.dst_cep_id = instance->r_cep_id;
+ pci.src_cep_id = instance->cep_id;
+ pci.seqno = 0;
+ pci.qos_id = cube;
- /* FIXME: Pack into FRCT header and hand SDU to RMT */
+ pthread_mutex_unlock(&frct.instances_lock);
+
+ if (rmt_frct_write_buf(&pci, buf))
+ return -1;
return 0;
}
-int frct_i_destroy(struct frct_i * instance,
- buffer_t * buf)
+int frct_i_destroy(cep_id_t id,
+ buffer_t * buf)
{
- if (instance == NULL)
- return -1;
+ struct pci pci;
+ struct frct_i * instance;
+
+ pthread_mutex_lock(&frct.instances_lock);
- pthread_mutex_lock(&frct->instances_lock);
+ instance = frct.instances[id];
+ if (instance == NULL) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ LOG_ERR("Invalid instance.");
+ return -1;
+ }
if (!(instance->state == CONN_PENDING ||
instance->state == CONN_ESTABLISHED)) {
- pthread_mutex_unlock(&frct->instances_lock);
+ pthread_mutex_unlock(&frct.instances_lock);
return -1;
}
+ pci.pdu_type = PDU_TYPE_MGMT;
+ pci.dst_addr = instance->r_address;
+ pci.src_addr = frct.address;
+ pci.dst_cep_id = instance->r_cep_id;
+ pci.src_cep_id = instance->cep_id;
+ pci.seqno = 0;
+ pci.qos_id = instance->cube;
+
destroy_frct_i(instance);
- pthread_mutex_unlock(&frct->instances_lock);
+ pthread_mutex_unlock(&frct.instances_lock);
+
+ if (buf != NULL && buf->data != NULL)
+ if (rmt_frct_write_buf(&pci, buf))
+ return -1;
+
+ return 0;
+}
- if (buf != NULL && buf->data != NULL) {
+int frct_i_write_sdu(cep_id_t id,
+ struct shm_du_buff * sdb)
+{
+ struct pci pci;
+ struct frct_i * instance;
+
+ if (sdb == NULL)
+ return -1;
- /* FIXME: Pack into FRCT header and hand SDU to RMT */
+ pthread_mutex_lock(&frct.instances_lock);
+
+ instance = frct.instances[id];
+ if (instance == NULL) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ LOG_ERR("Invalid instance.");
+ return -1;
+ }
+
+ if (instance->state != CONN_ESTABLISHED) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ LOG_ERR("Connection is not established.");
+ return -1;
+ }
+
+ pci.pdu_type = PDU_TYPE_DTP;
+ pci.dst_addr = instance->r_address;
+ pci.src_addr = frct.address;
+ pci.dst_cep_id = instance->r_cep_id;
+ pci.src_cep_id = instance->cep_id;
+ pci.seqno = (instance->seqno)++;
+ pci.qos_id = instance->cube;
+
+ if (rmt_frct_write_sdu(&pci, sdb)) {
+ pthread_mutex_unlock(&frct.instances_lock);
+ LOG_ERR("Failed to hand SDU to RMT.");
+ return -1;
}
return 0;
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index 0ee87004..2b86f5bd 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -26,34 +26,29 @@
#include <ouroboros/shared.h>
#include <ouroboros/utils.h>
-#include "dt_const.h"
+#include "shm_pci.h"
struct frct_i;
-int frct_init(struct dt_const * dtc,
- uint32_t address);
-int frct_fini();
+int frct_init(uint32_t address);
+int frct_fini();
-struct dt_const * frct_dt_const();
+/* Called by RMT upon receipt of a PDU for us */
+int frct_rmt_post_sdu(struct pci * pci,
+ struct shm_du_buff * sdb);
-int frct_dt_flow(int fd,
- enum qos_cube qos);
+cep_id_t frct_i_create(uint32_t address,
+ buffer_t * buf,
+ enum qos_cube cube);
-/*
- * FIXME: Will take the index in the DU map,
- * possibly cep-ids and address
- */
-int frct_rmt_post();
-
-struct frct_i * frct_i_create(uint32_t address,
- buffer_t * buf,
- enum qos_cube cube);
-/* FIXME: Hand QoS cube here too? We received it in the flow alloc message. */
-int frct_i_accept(struct frct_i * instance,
- buffer_t * buf);
-int frct_i_destroy(struct frct_i * instance,
- buffer_t * buf);
-
-/* FIXME: Add read/write ops for frct instances */
+int frct_i_accept(cep_id_t id,
+ buffer_t * buf,
+ enum qos_cube cube);
+
+int frct_i_destroy(cep_id_t id,
+ buffer_t * buf);
+
+int frct_i_write_sdu(cep_id_t id,
+ struct shm_du_buff * sdb);
#endif
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 99d156f5..dd17f9bd 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -39,6 +39,7 @@
#include "frct.h"
#include "ipcp.h"
#include "cdap_request.h"
+#include "rmt.h"
#include "static_info.pb-c.h"
typedef StaticInfoMsg static_info_msg_t;
@@ -241,7 +242,7 @@ int ribmgr_cdap_write(struct cdap * instance,
rib.address = msg->address;
- if (frct_init(&rib.dtc, rib.address)) {
+ if (frct_init(rib.address)) {
ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&ipcpi.state_lock);
cdap_send_reply(instance, invoke_id, -1, NULL, 0);
@@ -250,6 +251,16 @@ int ribmgr_cdap_write(struct cdap * instance,
return -1;
}
+ if (rmt_init(rib.address)) {
+ ipcp_set_state(IPCP_INIT);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ frct_fini();
+ cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ static_info_msg__free_unpacked(msg, NULL);
+ LOG_ERR("Failed to init RMT");
+ return -1;
+ }
+
static_info_msg__free_unpacked(msg, NULL);
} else {
ret = -1;
@@ -529,12 +540,23 @@ int ribmgr_bootstrap(struct dif_config * conf)
/* FIXME: Set correct address. */
rib.address = 0;
- if (frct_init(&rib.dtc, rib.address)) {
+ if (frct_init(rib.address)) {
LOG_ERR("Failed to initialize FRCT.");
return -1;
}
+ if (rmt_init(rib.address)) {
+ LOG_ERR("Failed to initialize RMT.");
+ frct_fini();
+ return -1;
+ }
+
LOG_DBG("Bootstrapped RIB Manager.");
return 0;
}
+
+struct dt_const * ribmgr_dt_const()
+{
+ return &(rib.dtc);
+}
diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h
index e85c65be..f776f7eb 100644
--- a/src/ipcpd/normal/ribmgr.h
+++ b/src/ipcpd/normal/ribmgr.h
@@ -25,12 +25,16 @@
#include <ouroboros/irm_config.h>
-int ribmgr_init();
-int ribmgr_fini();
+#include "dt_const.h"
-int ribmgr_add_flow(int fd);
-int ribmgr_remove_flow(int fd);
+int ribmgr_init();
+int ribmgr_fini();
-int ribmgr_bootstrap(struct dif_config * conf);
+int ribmgr_add_flow(int fd);
+int ribmgr_remove_flow(int fd);
+
+int ribmgr_bootstrap(struct dif_config * conf);
+
+struct dt_const * ribmgr_dt_const();
#endif
diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c
index ee92c3e3..fa4c7edd 100644
--- a/src/ipcpd/normal/rmt.c
+++ b/src/ipcpd/normal/rmt.c
@@ -24,29 +24,172 @@
#include <ouroboros/config.h>
#include <ouroboros/logs.h>
+#include <ouroboros/select.h>
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/dev.h>
+
+#include <stdlib.h>
#include "rmt.h"
+#include "config.h"
+#include "frct.h"
+
+struct {
+ pthread_t sdu_reader;
+ struct flow_set * set;
+ uint32_t address;
-struct rmt {
-};
+ /*
+ * FIXME: Normally the PFF is held here,
+ * for now we keep 1 fd to forward a PDU on
+ */
+ int fd;
+} rmt;
-int rmt_init(struct dt_const * dtc)
+int rmt_init(uint32_t address)
{
- LOG_MISSING;
+ rmt.set = flow_set_create();
+ if (rmt.set == NULL)
+ return -1;
- return -1;
+ rmt.address = address;
+
+ return 0;
}
int rmt_fini()
{
- LOG_MISSING;
+ flow_set_destroy(rmt.set);
+
+ return 0;
+}
+
+void * rmt_sdu_reader(void * o)
+{
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ struct shm_du_buff * sdb;
+ struct pci * pci;
+
+ while (true) {
+ int fd = flow_select(rmt.set, &timeout);
+ if (fd == -ETIMEDOUT)
+ continue;
+
+ if (fd < 0) {
+ LOG_ERR("Failed to get active fd.");
+ continue;
+ }
+
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
+
+ pci = shm_pci_des(sdb);
+ if (pci == NULL) {
+ LOG_ERR("Failed to get PCI.");
+ ipcp_flow_del(sdb);
+ continue;
+ }
+
+ if (pci->dst_addr != rmt.address) {
+ LOG_DBG("PDU needs to be forwarded.");
+
+ if (pci->ttl == 0) {
+ LOG_DBG("TTL was zero.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (shm_pci_dec_ttl(sdb)) {
+ LOG_ERR("Failed to decrease TTL.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ /*
+ * FIXME: Dropping for now, since
+ * we don't have a PFF yet
+ */
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (shm_pci_shrink(sdb)) {
+ LOG_ERR("Failed to shrink PDU.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (frct_rmt_post_sdu(pci, sdb)) {
+ LOG_ERR("Failed to hand PDU to FRCT.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ }
+
+ return (void *) 0;
+}
+
+int rmt_dt_flow(int fd,
+ enum qos_cube qos)
+{
+ struct flow_set * set = rmt.set;
+ if (set == NULL)
+ return -1;
+
+ flow_set_add(set, fd);
- return -1;
+ /* FIXME: This will be removed once we have a PFF */
+ rmt.fd = fd;
+
+ return 0;
}
-int rmt_frct_post()
+int rmt_frct_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb)
{
- LOG_MISSING;
+ if (shm_pci_ser(sdb, pci)) {
+ LOG_ERR("Failed to serialize PDU.");
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ if (ipcp_flow_write(rmt.fd, sdb)) {
+ LOG_ERR("Failed to write SDU to fd %d.", rmt.fd);
+ ipcp_flow_del(sdb);
+ return -1;
+ }
+
+ return 0;
+}
+
+int rmt_frct_write_buf(struct pci * pci,
+ buffer_t * buf)
+{
+ buffer_t * buffer;
+
+ if (pci == NULL || buf == NULL || buf->data == NULL)
+ return -1;
+
+ buffer = shm_pci_ser_buf(buf, pci);
+ if (buffer == NULL) {
+ LOG_ERR("Failed to serialize buffer.");
+ free(buf->data);
+ return -1;
+ }
+
+ if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) {
+ LOG_ERR("Failed to write buffer to fd.");
+ free(buffer);
+ return -1;
+ }
- return -1;
+ free(buffer);
+ return 0;
}
diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h
index cdd86a0b..6ce7a7d7 100644
--- a/src/ipcpd/normal/rmt.h
+++ b/src/ipcpd/normal/rmt.h
@@ -23,12 +23,24 @@
#ifndef OUROBOROS_IPCP_RMT_H
#define OUROBOROS_IPCP_RMT_H
+#include <ouroboros/shm_rdrbuff.h>
+#include <ouroboros/utils.h>
+
#include "dt_const.h"
+#include "shm_pci.h"
-int rmt_init(struct dt_const * dtc);
+int rmt_init(uint32_t address);
int rmt_fini();
-/* FIXME: Will take index in DU map */
-int rmt_frct_post();
+int rmt_dt_flow(int fd,
+ enum qos_cube qos);
+
+/* Hand PDU to RMT, SDU from N+1 */
+int rmt_frct_write_sdu(struct pci * pci,
+ struct shm_du_buff * sdb);
+
+/* Hand PDU to RMT, SDU from N */
+int rmt_frct_write_buf(struct pci * pci,
+ buffer_t * buf);
#endif
diff --git a/src/ipcpd/normal/shm_pci.c b/src/ipcpd/normal/shm_pci.c
index 94629790..3a16a2da 100644
--- a/src/ipcpd/normal/shm_pci.c
+++ b/src/ipcpd/normal/shm_pci.c
@@ -32,6 +32,7 @@
#include "shm_pci.h"
#include "frct.h"
#include "crc32.h"
+#include "ribmgr.h"
#define QOS_ID_SIZE 1
#define DEFAULT_TTL 60
@@ -57,23 +58,13 @@ static int shm_pci_tail_size(struct dt_const * dtc)
return dtc->has_chk ? CHK_SIZE : 0;
}
-int shm_pci_ser(struct shm_du_buff * sdb,
- struct pci * pci)
+static void ser_pci_head(uint8_t * head,
+ struct pci * pci,
+ struct dt_const * dtc)
{
- uint8_t * head;
- uint8_t * tail;
int offset = 0;
- struct dt_const * dtc;
uint8_t ttl = DEFAULT_TTL;
- dtc = frct_dt_const();
- if (dtc == NULL)
- return -1;
-
- head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc));
- if (head == NULL)
- return -1;
-
memcpy(head, &pci->dst_addr, dtc->addr_size);
offset += dtc->addr_size;
memcpy(head + offset, &pci->src_addr, dtc->addr_size);
@@ -90,6 +81,24 @@ int shm_pci_ser(struct shm_du_buff * sdb,
offset += QOS_ID_SIZE;
if (dtc->has_ttl)
memcpy(head + offset, &ttl, TTL_SIZE);
+}
+
+int shm_pci_ser(struct shm_du_buff * sdb,
+ struct pci * pci)
+{
+ uint8_t * head;
+ uint8_t * tail;
+ struct dt_const * dtc;
+
+ dtc = ribmgr_dt_const();
+ if (dtc == NULL)
+ return -1;
+
+ head = shm_du_buff_head_alloc(sdb, shm_pci_head_size(dtc));
+ if (head == NULL)
+ return -1;
+
+ ser_pci_head(head, pci, dtc);
if (dtc->has_chk) {
tail = shm_du_buff_tail_alloc(sdb, shm_pci_tail_size(dtc));
@@ -104,6 +113,48 @@ int shm_pci_ser(struct shm_du_buff * sdb,
return 0;
}
+buffer_t * shm_pci_ser_buf(buffer_t * buf,
+ struct pci * pci)
+{
+ buffer_t * buffer;
+ struct dt_const * dtc;
+
+ if (buf == NULL || pci == NULL)
+ return NULL;
+
+ dtc = ribmgr_dt_const();
+ if (dtc == NULL)
+ return NULL;
+
+ buffer = malloc(sizeof(*buffer));
+ if (buffer == NULL)
+ return NULL;
+
+ buffer->len = buf->len +
+ shm_pci_head_size(dtc) +
+ shm_pci_tail_size(dtc);
+
+ buffer->data = malloc(buffer->len);
+ if (buffer->data == NULL) {
+ free(buffer);
+ return NULL;
+ }
+
+ ser_pci_head(buffer->data, pci, dtc);
+ memcpy(buffer->data + shm_pci_head_size(dtc),
+ buf->data, buf->len);
+
+ free(buf->data);
+
+ if (dtc->has_chk)
+ crc32((uint32_t *) buffer->data +
+ shm_pci_head_size(dtc) + buf->len,
+ buffer->data,
+ shm_pci_head_size(dtc) + buf->len);
+
+ return buffer;
+}
+
struct pci * shm_pci_des(struct shm_du_buff * sdb)
{
uint8_t * head;
@@ -115,7 +166,7 @@ struct pci * shm_pci_des(struct shm_du_buff * sdb)
if (head == NULL)
return NULL;
- dtc = frct_dt_const();
+ dtc = ribmgr_dt_const();
if (dtc == NULL)
return NULL;
@@ -150,7 +201,7 @@ int shm_pci_shrink(struct shm_du_buff * sdb)
if (sdb == NULL)
return -1;
- dtc = frct_dt_const();
+ dtc = ribmgr_dt_const();
if (dtc == NULL)
return -1;
@@ -174,7 +225,7 @@ int shm_pci_dec_ttl(struct shm_du_buff * sdb)
uint8_t * head;
uint8_t * tail;
- dtc = frct_dt_const();
+ dtc = ribmgr_dt_const();
if (dtc == NULL)
return -1;
diff --git a/src/ipcpd/normal/shm_pci.h b/src/ipcpd/normal/shm_pci.h
index aa9770b4..2836737c 100644
--- a/src/ipcpd/normal/shm_pci.h
+++ b/src/ipcpd/normal/shm_pci.h
@@ -25,22 +25,34 @@
#define OUROBOROS_IPCP_SHM_PCI_H
#include <ouroboros/shm_rdrbuff.h>
+#include <ouroboros/utils.h>
-#include <dt_const.h>
+#include "dt_const.h"
+
+#define PDU_TYPE_MGMT 0x40
+#define PDU_TYPE_DTP 0x80
+
+typedef uint32_t cep_id_t;
+#define INVALID_CEP_ID 0
struct pci {
+ uint8_t pdu_type;
uint64_t dst_addr;
uint64_t src_addr;
- uint32_t dst_cep_id;
- uint32_t src_cep_id;
+ cep_id_t dst_cep_id;
+ cep_id_t src_cep_id;
+ uint8_t qos_id;
uint32_t pdu_length;
uint64_t seqno;
- uint8_t qos_id;
uint8_t ttl;
+ uint8_t flags;
};
int shm_pci_ser(struct shm_du_buff * sdb,
- struct pci * pci);
+ struct pci * pci);
+
+buffer_t * shm_pci_ser_buf(buffer_t * buf,
+ struct pci * pci);
struct pci * shm_pci_des(struct shm_du_buff * sdb);