summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-22 11:13:31 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-08-23 22:40:01 +0200
commitbb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a (patch)
treee378c45c9f834ae8155096387c90b905714abc81 /src/ipcpd/normal
parente2fd96f11b6a90d92f2d33627cb57ebf266e62ef (diff)
downloadouroboros-bb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a.tar.gz
ouroboros-bb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a.zip
ipcpd: normal: Add initial steps for N+1 flow allocation
This adds the initial framework for flow allocation between two N+1 endpoints. The FMGR will receive flow allocation requests and will create a connection as a result, addressed to the right address, it will also pass a flow allocation message to this address. Upon receipt on the other side, the FMGR will be receive a flow allocation message and a pointer to a new connection. The FMGR can then accept or destroy the connection. This commit also introduces the RMT function, which is needed by the FRCT to forward its SDUs on the right file descriptor.
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt10
-rw-r--r--src/ipcpd/normal/flow_alloc.proto13
-rw-r--r--src/ipcpd/normal/fmgr.c373
-rw-r--r--src/ipcpd/normal/fmgr.h12
-rw-r--r--src/ipcpd/normal/frct.c165
-rw-r--r--src/ipcpd/normal/frct.h20
-rw-r--r--src/ipcpd/normal/rmt.c52
-rw-r--r--src/ipcpd/normal/rmt.h34
8 files changed, 646 insertions, 33 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 1e291d30..555260f1 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -12,26 +12,30 @@ include_directories(${CURRENT_BINARY_PARENT_DIR})
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_BINARY_DIR}/include)
-SET(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
+set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
protobuf_generate_c(STATIC_INFO_SRCS STATIC_INFO_HDRS
static_info.proto)
+protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS
+ flow_alloc.proto)
+
set(SOURCE_FILES
# Add source files here
main.c
fmgr.c
frct.c
ribmgr.c
+ rmt.c
)
add_executable (ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${STATIC_INFO_SRCS})
+ ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS})
target_link_libraries (ipcpd-normal LINK_PUBLIC ouroboros)
include(MacroAddCompileFlags)
if (CMAKE_BUILD_TYPE MATCHES Debug)
- MACRO_ADD_COMPILE_FLAGS(ipcpd-normal -DCONFIG_OUROBOROS_DEBUG)
+ macro_add_compile_flags(ipcpd-normal -DCONFIG_OUROBOROS_DEBUG)
endif (CMAKE_BUILD_TYPE MATCHES Debug)
install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)
diff --git a/src/ipcpd/normal/flow_alloc.proto b/src/ipcpd/normal/flow_alloc.proto
new file mode 100644
index 00000000..b1ca7cb8
--- /dev/null
+++ b/src/ipcpd/normal/flow_alloc.proto
@@ -0,0 +1,13 @@
+enum flow_alloc_code {
+ FLOW_REQ = 1;
+ FLOW_REPLY = 2;
+ FLOW_DEALLOC = 3;
+};
+
+message flow_alloc_msg {
+ required flow_alloc_code code = 1;
+ optional string dst_name = 2;
+ optional string src_ae_name = 3;
+ optional uint32 qos_cube = 4;
+ optional sint32 response = 5;
+};
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 70afff37..58ae1dc8 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
+#include <ouroboros/ipcp.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -37,8 +38,19 @@
#include "frct.h"
#include "ipcp.h"
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
extern struct ipcp * _ipcp;
+struct n_flow {
+ struct flow flow;
+ struct frct_i * frct_i;
+ enum qos_cube qos;
+
+ struct list_head next;
+};
+
struct n_1_flow {
int fd;
char * ae_name;
@@ -51,6 +63,9 @@ struct fmgr {
struct list_head n_1_flows;
pthread_mutex_t n_1_flows_lock;
+ struct list_head n_flows;
+ /* FIXME: Make this a read/write lock */
+ pthread_mutex_t n_flows_lock;
} * fmgr = NULL;
static int add_n_1_fd(int fd,
@@ -68,6 +83,8 @@ static int add_n_1_fd(int fd,
tmp->fd = fd;
tmp->ae_name = ae_name;
+ INIT_LIST_HEAD(&tmp->next);
+
pthread_mutex_lock(&fmgr->n_1_flows_lock);
list_add(&tmp->next, &fmgr->n_1_flows);
pthread_mutex_unlock(&fmgr->n_1_flows_lock);
@@ -125,7 +142,8 @@ static void * fmgr_listen(void * o)
}
if (strcmp(ae_name, DT_AE) == 0) {
- if (frct_dt_flow(fd)) {
+ /* FIXME: Pass correct QoS cube */
+ if (frct_dt_flow(fd, 0)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
continue;
@@ -149,13 +167,12 @@ int fmgr_init()
return -1;
INIT_LIST_HEAD(&fmgr->n_1_flows);
+ INIT_LIST_HEAD(&fmgr->n_flows);
pthread_mutex_init(&fmgr->n_1_flows_lock, NULL);
+ pthread_mutex_init(&fmgr->n_flows_lock, NULL);
- pthread_create(&fmgr->listen_thread,
- NULL,
- fmgr_listen,
- NULL);
+ pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL);
return 0;
}
@@ -187,11 +204,17 @@ int fmgr_mgmt_flow(char * dst_name)
{
int fd;
int result;
+ char * ae_name;
+
+ ae_name = strdup(MGMT_AE);
+ if (ae_name == NULL)
+ return -1;
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
return -1;
}
@@ -199,29 +222,98 @@ int fmgr_mgmt_flow(char * dst_name)
if (result < 0) {
LOG_ERR("Result of flow allocation to %s is %d",
dst_name, result);
+ free(ae_name);
return -1;
}
if (ribmgr_add_flow(fd)) {
LOG_ERR("Failed to hand file descriptor to RIB manager");
flow_dealloc(fd);
+ free(ae_name);
+ return -1;
+ }
+
+ if (add_n_1_fd(fd, ae_name)) {
+ LOG_ERR("Failed to add file descriptor to list.");
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_dt_flow(char * dst_name,
+ enum qos_cube qos)
+{
+ int fd;
+ int result;
+ char * ae_name;
+
+ ae_name = strdup(DT_AE);
+ if (ae_name == NULL)
+ return -1;
+
+ /* FIXME: Map qos cube on correct QoS. */
+ fd = flow_alloc(dst_name, DT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d",
+ dst_name, result);
+ free(ae_name);
+ return -1;
+ }
+
+ if (frct_dt_flow(fd, qos)) {
+ LOG_ERR("Failed to hand file descriptor to FRCT");
+ flow_dealloc(fd);
+ free(ae_name);
return -1;
}
- if (add_n_1_fd(fd, strdup(MGMT_AE))) {
+ if (add_n_1_fd(fd, ae_name)) {
LOG_ERR("Failed to add file descriptor to list.");
flow_dealloc(fd);
+ free(ae_name);
return -1;
}
return 0;
}
-int fmgr_dt_flow(char * dst_name)
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_port_id(int port_id)
{
- LOG_MISSING;
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->flow.port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->frct_i == frct_i)
+ return e;
+ }
- return -1;
+ return NULL;
}
int fmgr_flow_alloc(pid_t n_api,
@@ -230,23 +322,274 @@ int fmgr_flow_alloc(pid_t n_api,
char * src_ae_name,
enum qos_cube qos)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ struct frct_i * frct_i;
+ uint32_t address = 0;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL)
+ return -1;
+
+ /* FIXME: Obtain correct address here from DIF NSM */
- return -1;
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.dst_name = dst_ap_name;
+ msg.src_ae_name = src_ae_name;
+ msg.qos_cube = qos;
+ msg.has_qos_cube = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ free(flow);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ free(flow);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ frct_i = frct_i_create(address, &buf, qos);
+ if (frct_i == NULL) {
+ free(buf.data);
+ free(flow);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ free(buf.data);
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ return -1;
+ }
+
+ flow->flow.api = n_api;
+ flow->flow.port_id = port_id;
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = qos;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return 0;
+}
+
+/* Call under n_flows lock */
+static int n_flow_dealloc(int port_id)
+{
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ if (flow->flow.rb != NULL)
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+
+ free(flow);
+ free(buf.data);
+
+ return ret;
}
int fmgr_flow_alloc_resp(pid_t n_api,
int port_id,
int response)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ int ret;
+ buffer_t buf;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
- return -1;
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ if (flow->flow.state != FLOW_PENDING) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Flow is not pending.");
+ return -1;
+ }
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ if (response < 0) {
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ free(buf.data);
+ list_del(&flow->next);
+ free(flow);
+ } else {
+ frct_i_accept(flow->frct_i, &buf);
+ flow->flow.state = FLOW_ALLOCATED;
+ flow->flow.api = n_api;
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return ret;
+ }
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
}
int fmgr_flow_dealloc(int port_id)
{
- LOG_MISSING;
+ int ret;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
+}
+
+int fmgr_flow_alloc_msg(struct frct_i * frct_i,
+ buffer_t * buf)
+{
+ struct n_flow * flow;
+ int ret = 0;
+ int port_id;
+ flow_alloc_msg_t * msg;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ /* Depending on what is in the message call the function in ipcp.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = msg->qos_cube;
+ flow->flow.rb = NULL;
+ flow->flow.api = 0;
+
+ port_id = ipcp_flow_req_arr(getpid(),
+ msg->dst_name,
+ msg->src_ae_name);
+ if (port_id < 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("Failed to get port-id from IRMd.");
+ return -1;
+ }
+
+ flow->flow.port_id = port_id;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_alloc_reply(getpid(),
+ flow->flow.port_id,
+ msg->response);
+
+ if (msg->response < 0) {
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+ free(flow);
+ }
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_dealloc(0, flow->flow.port_id);
+ break;
+ default:
+ LOG_ERR("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return ret;
}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index dc88bbdf..342410ca 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -29,6 +29,8 @@
#include <stdint.h>
#include <sys/types.h>
+#include "frct.h"
+
#define MGMT_AE "Management"
#define DT_AE "Data transfer"
@@ -37,9 +39,10 @@ int fmgr_fini();
/* N-flow ops */
int fmgr_mgmt_flow(char * dst_name);
-int fmgr_dt_flow(char * dst_name);
+int fmgr_dt_flow(char * dst_name,
+ enum qos_cube qos);
-/* N+1-flow ops */
+/* N+1-flow ops, local */
int fmgr_flow_alloc(pid_t n_api,
int port_id,
char * dst_ap_name,
@@ -52,4 +55,9 @@ int fmgr_flow_alloc_resp(pid_t n_api,
int fmgr_flow_dealloc(int port_id);
+/* N+1-flow ops, remote */
+int fmgr_flow_alloc_msg(struct frct_i * frct_i,
+ buffer_t * buf);
+
+
#endif
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index 2de9422d..7c2eba61 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -22,20 +22,67 @@
#define OUROBOROS_PREFIX "flow-rtx-control"
-#include <stdlib.h>
+#define IDS_SIZE 2048
+#include <ouroboros/config.h>
#include <ouroboros/logs.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/list.h>
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <pthread.h>
#include "frct.h"
+
+enum conn_state {
+ CONN_PENDING = 0,
+ CONN_ESTABLISHED
+};
+
struct frct_i {
+ uint32_t cep_id;
+ uint32_t r_address;
+ uint32_t r_cep_id;
+
+ enum conn_state state;
+ struct list_head next;
};
struct frct {
struct dt_const * dtc;
uint32_t address;
+
+ struct list_head instances;
+ pthread_mutex_t instances_lock;
+
+ struct bmp * cep_ids;
+ pthread_mutex_t cep_ids_lock;
} * frct = NULL;
+static int next_cep_id()
+{
+ int ret;
+
+ pthread_mutex_lock(&frct->cep_ids_lock);
+ ret = bmp_allocate(frct->cep_ids);
+ pthread_mutex_unlock(&frct->cep_ids_lock);
+
+ return ret;
+}
+
+static int release_cep_id(int id)
+{
+ int ret;
+
+ pthread_mutex_lock(&frct->cep_ids_lock);
+ ret = bmp_release(frct->cep_ids, id);
+ pthread_mutex_unlock(&frct->cep_ids_lock);
+
+ return ret;
+}
+
int frct_init(struct dt_const * dtc, uint32_t address)
{
if (dtc == NULL)
@@ -48,35 +95,133 @@ int frct_init(struct dt_const * dtc, uint32_t address)
frct->dtc = dtc;
frct->address = address;
+ INIT_LIST_HEAD(&frct->instances);
+
+ if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) {
+ free(frct);
+ return -1;
+ }
+
+ if (pthread_mutex_init(&frct->instances_lock, NULL)) {
+ free(frct);
+ return -1;
+ }
+
+ frct->cep_ids = bmp_create(IDS_SIZE, 0);
+ if (frct->cep_ids == NULL) {
+ free(frct);
+ return -1;
+ }
+
return 0;
}
int frct_fini()
{
- if (frct != NULL)
- free(frct);
+ pthread_mutex_lock(&frct->cep_ids_lock);
+ bmp_destroy(frct->cep_ids);
+ pthread_mutex_unlock(&frct->cep_ids_lock);
+ free(frct);
return 0;
}
-struct frct_i * frct_i_create(int port_id,
- enum qos_cube cube)
+int frct_dt_flow(int fd,
+ enum qos_cube qos)
{
LOG_MISSING;
- return NULL;
+ return -1;
}
-int frct_i_destroy(struct frct_i * instance)
+int frct_rmt_post()
{
LOG_MISSING;
return -1;
}
-int frct_dt_flow(int fd)
+/* Call under instances lock */
+static void destroy_frct_i(struct frct_i * instance)
{
- LOG_MISSING;
+ release_cep_id(instance->cep_id);
+ list_del(&instance->next);
+ free(instance);
+}
- return -1;
+struct frct_i * frct_i_create(uint32_t address,
+ buffer_t * buf,
+ enum qos_cube cube)
+{
+ struct frct_i * instance;
+
+ if (buf == NULL ||
+ buf->data == NULL)
+ return NULL;
+
+ instance = malloc(sizeof(*instance));
+ if (instance == NULL)
+ return NULL;
+
+ pthread_mutex_lock(&frct->instances_lock);
+
+ instance->r_address = address;
+ instance->cep_id = next_cep_id();
+ instance->state = CONN_PENDING;
+
+ INIT_LIST_HEAD(&instance->next);
+ list_add(&instance->next, &frct->instances);
+
+ pthread_mutex_unlock(&frct->instances_lock);
+
+ /* FIXME: Pack into FRCT header and hand SDU to RMT */
+
+ return instance;
+}
+
+int frct_i_accept(struct frct_i * instance,
+ buffer_t * buf)
+{
+ if (instance == NULL || buf == NULL || buf->data == NULL)
+ return -1;
+
+ pthread_mutex_lock(&frct->instances_lock);
+ if (instance->state != CONN_PENDING) {
+ pthread_mutex_unlock(&frct->instances_lock);
+ return -1;
+ }
+
+ instance->state = CONN_ESTABLISHED;
+ instance->cep_id = next_cep_id();
+
+ pthread_mutex_unlock(&frct->instances_lock);
+
+ /* FIXME: Pack into FRCT header and hand SDU to RMT */
+
+ return 0;
+}
+
+int frct_i_destroy(struct frct_i * instance,
+ buffer_t * buf)
+{
+ if (instance == NULL)
+ return -1;
+
+ pthread_mutex_lock(&frct->instances_lock);
+
+ if (!(instance->state == CONN_PENDING ||
+ instance->state == CONN_ESTABLISHED)) {
+ pthread_mutex_unlock(&frct->instances_lock);
+ return -1;
+ }
+
+ destroy_frct_i(instance);
+ pthread_mutex_unlock(&frct->instances_lock);
+
+ if (buf != NULL && buf->data != NULL) {
+
+ /* FIXME: Pack into FRCT header and hand SDU to RMT */
+ }
+
+ return 0;
}
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index 2e965d38..91b2dfc7 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -24,6 +24,7 @@
#define OUROBOROS_IPCP_FRCT_H
#include <ouroboros/shared.h>
+#include <ouroboros/common.h>
#include "dt_const.h"
@@ -33,10 +34,23 @@ int frct_init(struct dt_const * dtc,
uint32_t address);
int frct_fini();
-struct frct_i * frct_i_create(int port_id,
+int frct_dt_flow(int fd,
+ enum qos_cube qos);
+/*
+ * FIXME: Will take the index in the DU map,
+ * possibly cep-ids and address
+ */
+int frct_rmt_post();
+
+struct frct_i * frct_i_create(uint32_t address,
+ buffer_t * buf,
enum qos_cube cube);
-int frct_i_destroy(struct frct_i * instance);
+/* FIXME: Hand QoS cube here too? We received it in the flow alloc message. */
+int frct_i_accept(struct frct_i * instance,
+ buffer_t * buf);
+int frct_i_destroy(struct frct_i * instance,
+ buffer_t * buf);
-int frct_dt_flow(int fd);
+/* FIXME: Add read/write ops for frct instances */
#endif
diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c
new file mode 100644
index 00000000..ee92c3e3
--- /dev/null
+++ b/src/ipcpd/normal/rmt.c
@@ -0,0 +1,52 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The Relaying and Multiplexing task
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "flow-manager"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+
+#include "rmt.h"
+
+struct rmt {
+};
+
+int rmt_init(struct dt_const * dtc)
+{
+ LOG_MISSING;
+
+ return -1;
+}
+
+int rmt_fini()
+{
+ LOG_MISSING;
+
+ return -1;
+}
+
+int rmt_frct_post()
+{
+ LOG_MISSING;
+
+ return -1;
+}
diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h
new file mode 100644
index 00000000..cdd86a0b
--- /dev/null
+++ b/src/ipcpd/normal/rmt.h
@@ -0,0 +1,34 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The Relaying and Multiplexing task
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCP_RMT_H
+#define OUROBOROS_IPCP_RMT_H
+
+#include "dt_const.h"
+
+int rmt_init(struct dt_const * dtc);
+int rmt_fini();
+
+/* FIXME: Will take index in DU map */
+int rmt_frct_post();
+
+#endif