summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-12-24 12:54:16 +0100
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-12-24 12:54:16 +0100
commit7348a7d587adc3cb1bc11ef7513ef7b03bc40349 (patch)
tree54c4406c135621d44fd9468c12a6b26e3e5d4f74 /src/ipcpd/normal
parent55eaed508c9f68b350f29cf2c4e96be4e57b0b37 (diff)
parent8910bd28e2b6269f0900c8215352ab5d177a3b54 (diff)
downloadouroboros-7348a7d587adc3cb1bc11ef7513ef7b03bc40349.tar.gz
ouroboros-7348a7d587adc3cb1bc11ef7513ef7b03bc40349.zip
Merged in dstaesse/ouroboros/be-normal (pull request #325)
Be normal
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt1
-rw-r--r--src/ipcpd/normal/cdap_request.c157
-rw-r--r--src/ipcpd/normal/cdap_request.h68
-rw-r--r--src/ipcpd/normal/fmgr.c390
-rw-r--r--src/ipcpd/normal/ribmgr.c501
5 files changed, 347 insertions, 770 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 67a7953b..5f85dd89 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -25,7 +25,6 @@ protobuf_generate_c(RO_SRCS RO_HDRS ro.proto)
set(SOURCE_FILES
# Add source files here
addr_auth.c
- cdap_request.c
crc32.c
dir.c
fmgr.c
diff --git a/src/ipcpd/normal/cdap_request.c b/src/ipcpd/normal/cdap_request.c
deleted file mode 100644
index 8409b508..00000000
--- a/src/ipcpd/normal/cdap_request.c
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016
- *
- * Normal IPCP - RIB Manager - CDAP request
- *
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
- * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <ouroboros/config.h>
-#include <ouroboros/time_utils.h>
-#include <ouroboros/errno.h>
-
-#include "cdap_request.h"
-
-#include <stdlib.h>
-
-struct cdap_request * cdap_request_create(enum cdap_opcode code,
- char * name,
- int invoke_id,
- struct cdap * instance)
-{
- struct cdap_request * creq = malloc(sizeof(*creq));
- pthread_condattr_t cattr;
-
- if (creq == NULL)
- return NULL;
-
- creq->code = code;
- creq->name = name;
- creq->invoke_id = invoke_id;
- creq->instance = instance;
- creq->state = REQ_INIT;
- creq->result = -1;
-
- pthread_condattr_init(&cattr);
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- pthread_cond_init(&creq->cond, &cattr);
- pthread_mutex_init(&creq->lock, NULL);
-
- INIT_LIST_HEAD(&creq->next);
-
- return creq;
-}
-
-void cdap_request_destroy(struct cdap_request * creq)
-{
- if (creq == NULL)
- return;
-
- pthread_mutex_lock(&creq->lock);
-
- if (creq->state == REQ_DESTROY) {
- pthread_mutex_unlock(&creq->lock);
- return;
- }
-
- if (creq->state == REQ_INIT)
- creq->state = REQ_DONE;
-
- if (creq->state == REQ_PENDING) {
- creq->state = REQ_DESTROY;
- pthread_cond_broadcast(&creq->cond);
- }
-
- while (creq->state != REQ_DONE)
- pthread_cond_wait(&creq->cond, &creq->lock);
-
- pthread_mutex_unlock(&creq->lock);
-
- pthread_cond_destroy(&creq->cond);
- pthread_mutex_destroy(&creq->lock);
-
- if (creq->name != NULL)
- free(creq->name);
-
- free(creq);
-}
-
-int cdap_request_wait(struct cdap_request * creq)
-{
- struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000),
- (CDAP_REPLY_TIMEOUT % 1000) * MILLION};
- struct timespec abstime;
- int ret = -1;
-
- if (creq == NULL)
- return -EINVAL;
-
- clock_gettime(CLOCK_REALTIME, &abstime);
- ts_add(&abstime, &timeout, &abstime);
-
- pthread_mutex_lock(&creq->lock);
-
- if (creq->state != REQ_INIT) {
- pthread_mutex_unlock(&creq->lock);
- return -EINVAL;
- }
-
- creq->state = REQ_PENDING;
-
- while (creq->state == REQ_PENDING) {
- if ((ret = -pthread_cond_timedwait(&creq->cond,
- &creq->lock,
- &abstime)) == -ETIMEDOUT) {
- break;
- }
- }
-
- if (creq->state == REQ_DESTROY)
- ret = -1;
-
- creq->state = REQ_DONE;
- pthread_cond_broadcast(&creq->cond);
-
- pthread_mutex_unlock(&creq->lock);
-
- return ret;
-}
-
-void cdap_request_respond(struct cdap_request * creq, int response)
-{
- if (creq == NULL)
- return;
-
- pthread_mutex_lock(&creq->lock);
-
- if (creq->state != REQ_PENDING) {
- pthread_mutex_unlock(&creq->lock);
- return;
- }
-
- creq->state = REQ_RESPONSE;
- creq->result = response;
- pthread_cond_broadcast(&creq->cond);
-
- while (creq->state == REQ_RESPONSE)
- pthread_cond_wait(&creq->cond, &creq->lock);
-
- pthread_mutex_unlock(&creq->lock);
-}
diff --git a/src/ipcpd/normal/cdap_request.h b/src/ipcpd/normal/cdap_request.h
deleted file mode 100644
index 9cccfda5..00000000
--- a/src/ipcpd/normal/cdap_request.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016
- *
- * Normal IPCP - RIB Manager - CDAP request
- *
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
- * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H
-#define OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H
-
-#include <ouroboros/config.h>
-#include <ouroboros/cdap.h>
-#include <ouroboros/list.h>
-
-#include <pthread.h>
-
-enum creq_state {
- REQ_INIT = 0,
- REQ_PENDING,
- REQ_RESPONSE,
- REQ_DONE,
- REQ_DESTROY
-};
-
-struct cdap_request {
- struct list_head next;
-
- enum cdap_opcode code;
- char * name;
- int invoke_id;
- struct cdap * instance;
-
- int result;
-
- enum creq_state state;
- pthread_cond_t cond;
- pthread_mutex_t lock;
-};
-
-struct cdap_request * cdap_request_create(enum cdap_opcode code,
- char * name,
- int invoke_id,
- struct cdap * instance);
-
-void cdap_request_destroy(struct cdap_request * creq);
-
-int cdap_request_wait(struct cdap_request * creq);
-
-void cdap_request_respond(struct cdap_request * creq,
- int response);
-
-#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H */
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 8e416aa4..6684db7c 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
-struct np1_flow {
- int fd;
- cep_id_t cep_id;
- enum qos_cube qos;
-};
-
-struct nm1_flow {
- int fd;
- enum qos_cube qos;
-};
-
struct {
- pthread_t nm1_flow_acceptor;
- struct nm1_flow ** nm1_flows;
+ flow_set_t * nm1_set[QOS_CUBE_MAX];
+ fqueue_t * nm1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t nm1_flows_lock;
- flow_set_t * nm1_set;
- pthread_t nm1_sdu_reader;
- struct np1_flow ** np1_flows;
- struct np1_flow ** np1_flows_cep;
+ flow_set_t * np1_set[QOS_CUBE_MAX];
+ fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
- flow_set_t * np1_set;
+
+ cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
+ int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
+
+ pthread_t nm1_flow_acceptor;
+ pthread_t nm1_sdu_reader;
pthread_t np1_sdu_reader;
/* FIXME: Replace with PFF */
int fd;
} fmgr;
-static int add_nm1_fd(int fd,
- enum qos_cube qos)
-{
- struct nm1_flow * tmp;
-
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL)
- return -1;
-
- tmp->fd = fd;
- tmp->qos = qos;
-
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
- fmgr.nm1_flows[fd] = tmp;
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
-
- flow_set_add(fmgr.nm1_set, fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- return 0;
-}
-
-static int add_np1_fd(int fd,
- cep_id_t cep_id,
- enum qos_cube qos)
-{
- struct np1_flow * flow;
-
- flow = malloc(sizeof(*flow));
- if (flow == NULL)
- return -1;
-
- flow->cep_id = cep_id;
- flow->qos = qos;
- flow->fd = fd;
-
- fmgr.np1_flows[fd] = flow;
- fmgr.np1_flows_cep[cep_id] = flow;
-
- return 0;
-}
-
static void * fmgr_nm1_acceptor(void * o)
{
int fd;
char * ae_name;
+ qoscube_t cube;
qosspec_t qs;
(void) o;
@@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o)
if (!(strcmp(ae_name, MGMT_AE) == 0 ||
strcmp(ae_name, DT_AE) == 0)) {
if (flow_alloc_resp(fd, -1))
- LOG_ERR("Failed to reply to flow allocation.");
+ LOG_WARN("Failed to reply to flow allocation.");
flow_dealloc(fd);
continue;
}
if (flow_alloc_resp(fd, 0)) {
- LOG_ERR("Failed to reply to flow allocation.");
+ LOG_WARN("Failed to reply to flow allocation.");
flow_dealloc(fd);
continue;
}
@@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o)
if (strcmp(ae_name, MGMT_AE) == 0) {
if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand fd to RIB.");
+ LOG_WARN("Failed to hand fd to RIB.");
flow_dealloc(fd);
continue;
}
} else {
- /* FIXME: Pass correct QoS cube */
- if (add_nm1_fd(fd, QOS_CUBE_BE)) {
- LOG_ERR("Failed to add fd to list.");
+ ipcp_flow_get_qoscube(fd, &cube);
+ if (flow_set_add(fmgr.nm1_set[cube], fd)) {
+ LOG_WARN("Failed to add fd.");
flow_dealloc(fd);
continue;
}
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
}
free(ae_name);
@@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- struct np1_flow * flow;
int fd;
- fqueue_t * fq = fqueue_create();
- if (fq == NULL)
- return (void *) 1;
+
+ int i = 0;
+ int ret;
(void) o;
while (true) {
- int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(fmgr.np1_set[i],
+ fmgr.np1_fqs[i],
+ &timeout);
if (ret == -ETIMEDOUT)
continue;
if (ret < 0) {
- LOG_ERR("Event error: %d.", ret);
+ LOG_WARN("Event error: %d.", ret);
continue;
}
- while ((fd = fqueue_next(fq)) >= 0) {
+ while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ LOG_WARN("Failed to read SDU from fd %d.", fd);
continue;
}
pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
-
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
+ LOG_WARN("Failed to hand SDU to FRCT.");
continue;
}
@@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct pci * pci;
int fd;
- fqueue_t * fq = fqueue_create();
- if (fq == NULL)
- return (void *) 1;
+ int i = 0;
+ int ret;
(void) o;
while (true) {
- int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(fmgr.nm1_set[i],
+ fmgr.nm1_fqs[i],
+ &timeout);
if (ret == -ETIMEDOUT)
continue;
@@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o)
continue;
}
- while ((fd = fqueue_next(fq)) >= 0) {
+ while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
LOG_ERR("Failed to read SDU from fd %d.", fd);
continue;
@@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
-int fmgr_init()
+static void fmgr_destroy_flows(void)
{
int i;
- fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS);
- if (fmgr.nm1_flows == NULL)
- return -1;
-
- fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS);
- if (fmgr.np1_flows == NULL) {
- free(fmgr.nm1_flows);
- return -1;
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ flow_set_destroy(fmgr.nm1_set[i]);
+ flow_set_destroy(fmgr.np1_set[i]);
+ fqueue_destroy(fmgr.nm1_fqs[i]);
+ fqueue_destroy(fmgr.np1_fqs[i]);
}
+}
- fmgr.np1_flows_cep =
- malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS);
- if (fmgr.np1_flows_cep == NULL) {
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
- }
+int fmgr_init()
+{
+ int i;
- for (i = 0; i < IRMD_MAX_FLOWS; i++) {
- fmgr.nm1_flows[i] = NULL;
- fmgr.np1_flows[i] = NULL;
- fmgr.np1_flows_cep[i] = NULL;
- }
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
+
+ for (i = 0; i < IPCPD_MAX_CONNS; ++i)
+ fmgr.np1_cep_id_to_fd[i] = -1;
pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
- fmgr.np1_set = flow_set_create();
- if (fmgr.np1_set == NULL) {
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
- }
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fmgr.np1_set[i] = flow_set_create();
+ if (fmgr.np1_set == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
- fmgr.nm1_set = flow_set_create();
- if (fmgr.nm1_set == NULL) {
- flow_set_destroy(fmgr.np1_set);
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
+ fmgr.np1_fqs[i] = fqueue_create();
+ if (fmgr.np1_fqs[i] == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ fmgr.nm1_set[i] = flow_set_create();
+ if (fmgr.nm1_set == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ fmgr.nm1_fqs[i] = fqueue_create();
+ if (fmgr.nm1_fqs[i] == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
}
pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL);
@@ -378,6 +332,7 @@ int fmgr_init()
int fmgr_fini()
{
int i;
+ int j;
pthread_cancel(fmgr.nm1_flow_acceptor);
pthread_cancel(fmgr.np1_sdu_reader);
@@ -387,29 +342,25 @@ int fmgr_fini()
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
- for (i = 0; i < IRMD_MAX_FLOWS; i++) {
- if (fmgr.nm1_flows[i] == NULL)
- continue;
- flow_dealloc(fmgr.nm1_flows[i]->fd);
- free(fmgr.nm1_flows[i]);
- }
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ if (flow_set_has(fmgr.nm1_set[j], i)) {
+ flow_dealloc(i);
+ flow_set_del(fmgr.nm1_set[j], i);
+ }
pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- flow_set_destroy(fmgr.nm1_set);
- flow_set_destroy(fmgr.np1_set);
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
+ fmgr_destroy_flows();
return 0;
}
-int fmgr_np1_alloc(int fd,
- char * dst_ap_name,
- char * src_ae_name,
- enum qos_cube qos)
+int fmgr_np1_alloc(int fd,
+ char * dst_ap_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
cep_id_t cep_id;
buffer_t buf;
@@ -478,10 +429,8 @@ int fmgr_np1_alloc(int fd,
free(ro_data);
- if (add_np1_fd(fd, cep_id, qos)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
+ fmgr.np1_fd_to_cep_id[fd] = cep_id;
+ fmgr.np1_cep_id_to_fd[cep_id] = fd;
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
@@ -491,16 +440,13 @@ int fmgr_np1_alloc(int fd,
/* Call under np1_flows lock */
static int np1_flow_dealloc(int fd)
{
- struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
int ret;
+ qoscube_t cube;
- flow_set_del(fmgr.np1_set, fd);
-
- flow = fmgr.np1_flows[fd];
- if (flow == NULL)
- return -1;
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_del(fmgr.np1_set[cube], fd);
msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
@@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd)
buf.data = malloc(buf.len);
if (buf.data == NULL)
- return -1;
+ return -ENOMEM;
flow_alloc_msg__pack(&msg, buf.data);
- ret = frct_i_destroy(flow->cep_id, &buf);
+ ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
- fmgr.np1_flows[fd] = NULL;
- fmgr.np1_flows_cep[flow->cep_id] = NULL;
+ fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID;
+ fmgr.np1_fd_to_cep_id[fd] = -1;
- free(flow);
free(buf.data);
return ret;
@@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd)
int fmgr_np1_alloc_resp(int fd, int response)
{
- struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
-
msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
msg.response = response;
msg.has_response = true;
buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ if (buf.len == 0)
return -1;
- }
buf.data = malloc(buf.len);
- if (buf.data == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
+ if (buf.data == NULL)
+ return -ENOMEM;
flow_alloc_msg__pack(&msg, buf.data);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
if (response < 0) {
- frct_i_destroy(flow->cep_id, &buf);
+ frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
free(buf.data);
- fmgr.np1_flows[fd] = NULL;
- fmgr.np1_flows_cep[flow->cep_id] = NULL;
- free(flow);
+ fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]]
+ = INVALID_CEP_ID;
+ fmgr.np1_fd_to_cep_id[fd] = -1;
} else {
- if (frct_i_accept(flow->cep_id, &buf, flow->qos)) {
+ qoscube_t cube;
+ ipcp_flow_get_qoscube(fd, &cube);
+ if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
- flow_set_add(fmgr.np1_set, fd);
+ flow_set_add(fmgr.np1_set[cube], fd);
}
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
@@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd)
int ret;
pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
ret = np1_flow_dealloc(fd);
+
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return ret;
}
-int fmgr_np1_post_buf(cep_id_t cep_id,
- buffer_t * buf)
+int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)
{
- struct np1_flow * flow;
int ret = 0;
int fd;
flow_alloc_msg_t * msg;
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+ qoscube_t cube;
/* Depending on the message call the function in ipcp-dev.h */
msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
if (msg == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to unpack flow alloc message");
return -1;
}
@@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
msg->dst_name,
msg->src_ae_name);
if (fd < 0) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("Failed to get fd for flow.");
return -1;
}
- if (add_np1_fd(fd, cep_id, msg->qos_cube)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to add np1 flow.");
- return -1;
- }
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
+ fmgr.np1_fd_to_cep_id[fd] = cep_id;
+ fmgr.np1_cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("No such flow in flow manager.");
- return -1;
- }
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
- ret = ipcp_flow_alloc_reply(flow->fd, msg->response);
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ ret = ipcp_flow_alloc_reply(fd, msg->response);
if (msg->response < 0) {
- fmgr.np1_flows[flow->fd] = NULL;
- fmgr.np1_flows_cep[cep_id] = NULL;
- free(flow);
+ fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID;
+ fmgr.np1_cep_id_to_fd[cep_id] = -1;
} else {
- flow_set_add(fmgr.np1_set, flow->fd);
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_add(fmgr.np1_set[cube],
+ fmgr.np1_cep_id_to_fd[cep_id]);
}
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
break;
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("No such flow in flow manager.");
- return -1;
- }
-
- flow_set_del(fmgr.np1_set, flow->fd);
-
- ret = flow_dealloc(flow->fd);
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_del(fmgr.np1_set[cube], fd);
+ ret = flow_dealloc(fd);
break;
default:
LOG_ERR("Got an unknown flow allocation message.");
@@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
int fmgr_np1_post_sdu(cep_id_t cep_id,
struct shm_du_buff * sdb)
{
- struct np1_flow * flow;
+ int fd;
pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- LOG_ERR("Failed to find N flow.");
- return -1;
- }
-
- if (ipcp_flow_write(flow->fd, sdb)) {
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ if (ipcp_flow_write(fd, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to hand SDU to N flow.");
return -1;
@@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name)
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
return -1;
}
result = flow_alloc_res(fd);
if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
+ LOG_ERR("Result of flow allocation to %s is %d.",
dst_name, result);
return -1;
}
if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand file descriptor to RIB manager");
+ LOG_ERR("Failed to hand file descriptor to RIB manager.");
flow_dealloc(fd);
return -1;
}
@@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name)
return 0;
}
-int fmgr_nm1_dt_flow(char * dst_name,
- enum qos_cube qos)
+int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos)
{
int fd;
int result;
@@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name,
/* FIXME: Map qos cube on correct QoS. */
fd = flow_alloc(dst_name, DT_AE, NULL);
if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
return -1;
}
result = flow_alloc_res(fd);
if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
- dst_name, result);
+ LOG_ERR("Allocate flow to %s result %d.", dst_name, result);
return -1;
}
- if (add_nm1_fd(fd, qos)) {
- LOG_ERR("Failed to add file descriptor to list.");
- flow_dealloc(fd);
- return -1;
- }
+ flow_set_add(fmgr.nm1_set[qos], fd);
+
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
return 0;
}
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
+int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)
{
if (pci == NULL || sdb == NULL)
return -1;
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index b0738a0c..1e9bcc18 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -42,7 +42,6 @@
#include "dt_const.h"
#include "frct.h"
#include "ipcp.h"
-#include "cdap_request.h"
#include "ro.h"
#include "path.h"
#include "dir.h"
@@ -85,22 +84,28 @@ struct rnode {
};
struct mgmt_flow {
+ struct list_head next;
+
struct cdap * instance;
int fd;
- struct list_head next;
+
+ pthread_t handler;
};
struct ro_sub {
+ struct list_head next;
+
int sid;
+
char * name;
struct ro_sub_ops * ops;
- struct list_head next;
};
struct ro_id {
+ struct list_head next;
+
uint64_t seqno;
char * full_name;
- struct list_head next;
};
struct {
@@ -124,9 +129,6 @@ struct {
struct list_head flows;
pthread_rwlock_t flows_lock;
- struct list_head cdap_reqs;
- pthread_mutex_t cdap_reqs_lock;
-
struct addr_auth * addr_auth;
enum pol_addr_auth addr_auth_type;
} rib;
@@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name,
pthread_rwlock_unlock(&ipcpi.state_lock);
}
-/* We only have a create operation for now */
+/* We only have a create operation for now. */
static struct ro_sub_ops ribmgr_sub_ops = {
.ro_created = ribmgr_ro_created,
.ro_updated = NULL,
@@ -303,9 +305,12 @@ static void ro_delete_timer(void * o)
{
char * name = (char *) o;
- if (ribmgr_ro_delete(name)) {
+ pthread_mutex_lock(&rib.ro_lock);
+
+ if (ribmgr_ro_delete(name))
LOG_ERR("Failed to delete %s.", name);
- }
+
+ pthread_mutex_unlock(&rib.ro_lock);
}
static struct rnode * ribmgr_ro_create(const char * name,
@@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char * name,
node = node->child;
sibling = false;
- /* Search horizontally */
+ /* Search horizontally. */
while (node != NULL) {
if (strcmp(node->name, token) == 0) {
break;
@@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char * name,
LOG_DBG("Created RO with name %s.", name);
- if (!(attr.expiry.tv_sec == 0 &&
- attr.expiry.tv_nsec == 0)) {
+ if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) {
timeout = attr.expiry.tv_sec * 1000 +
attr.expiry.tv_nsec / MILLION;
- if (timerwheel_add(rib.wheel, ro_delete_timer,
- new->full_name, strlen(new->full_name) + 1,
- timeout)) {
+ if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name,
+ strlen(new->full_name) + 1, timeout))
LOG_ERR("Failed to add deletion timer of RO.");
- }
}
return new;
@@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name,
return node;
}
-/* Call while holding cdap_reqs_lock */
-/* FIXME: better not to call blocking functions under any lock */
-int cdap_result_wait(struct cdap * instance,
- enum cdap_opcode code,
- char * name,
- int invoke_id)
-{
- struct cdap_request * req;
- int ret;
- char * name_dup = strdup(name);
- if (name_dup == NULL)
- return -1;
-
- req = cdap_request_create(code, name_dup, invoke_id, instance);
- if (req == NULL) {
- free(name_dup);
- return -1;
- }
-
- list_add(&req->next, &rib.cdap_reqs);
-
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- ret = cdap_request_wait(req);
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- if (ret == -1) /* should only be on ipcp shutdown */
- LOG_DBG("Waiting CDAP request destroyed.");
-
- if (ret == -ETIMEDOUT)
- LOG_ERR("CDAP Request timed out.");
-
- if (ret)
- LOG_DBG("Unknown error code: %d.", ret);
-
- if (!ret)
- ret = req->result;
-
- list_del(&req->next);
- cdap_request_destroy(req);
-
- return ret;
-}
-
static int write_ro_msg(struct cdap * neighbor,
ro_msg_t * msg,
char * name,
@@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap * neighbor,
{
uint8_t * data;
size_t len;
- int iid = 0;
+ cdap_key_t key;
+ int ret;
len = ro_msg__get_packed_size(msg);
if (len == 0)
@@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap * neighbor,
ro_msg__pack(msg, data);
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- iid = cdap_send_request(neighbor, code,
- name, data, len, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ key = cdap_request_send(neighbor, code, name, data, len, 0);
+ if (key < 0) {
+ LOG_ERR("Failed to send CDAP request.");
free(data);
return -1;
}
- if (cdap_result_wait(neighbor, code, name, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- free(data);
- LOG_ERR("Remote did not receive RIB object.");
+ free(data);
+
+ ret = cdap_reply_wait(neighbor, key, NULL, NULL);
+ if (ret < 0) {
+ LOG_ERR("CDAP command with code %d and name %s failed: %d.",
+ code, name, ret);
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- free(data);
return 0;
}
@@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap * neighbor,
int ribmgr_init()
{
INIT_LIST_HEAD(&rib.flows);
- INIT_LIST_HEAD(&rib.cdap_reqs);
INIT_LIST_HEAD(&rib.subs);
INIT_LIST_HEAD(&rib.ro_ids);
@@ -540,17 +495,9 @@ int ribmgr_init()
return -1;
}
- if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) {
- LOG_ERR("Failed to initialize mutex.");
- pthread_rwlock_destroy(&rib.flows_lock);
- free(rib.root);
- return -1;
- }
-
if (pthread_mutex_init(&rib.ro_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
free(rib.root);
return -1;
}
@@ -558,7 +505,6 @@ int ribmgr_init()
if (pthread_mutex_init(&rib.subs_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
free(rib.root);
return -1;
@@ -567,7 +513,6 @@ int ribmgr_init()
if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
free(rib.root);
@@ -578,7 +523,6 @@ int ribmgr_init()
if (rib.sids == NULL) {
LOG_ERR("Failed to create bitmap.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -591,7 +535,6 @@ int ribmgr_init()
LOG_ERR("Failed to create timerwheel.");
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -605,7 +548,6 @@ int ribmgr_init()
timerwheel_destroy(rib.wheel);
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -633,16 +575,6 @@ int ribmgr_fini()
struct list_head * pos = NULL;
struct list_head * n = NULL;
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- list_for_each_safe(pos, n, &rib.cdap_reqs) {
- struct cdap_request * req =
- list_entry(pos, struct cdap_request, next);
- free(req->name);
- list_del(&req->next);
- free(req);
- }
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
pthread_rwlock_wrlock(&rib.flows_lock);
list_for_each_safe(pos, n, &rib.flows) {
struct mgmt_flow * flow =
@@ -668,7 +600,6 @@ int ribmgr_fini()
timerwheel_destroy(rib.wheel);
pthread_mutex_destroy(&rib.subs_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_rwlock_destroy(&rib.flows_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -676,49 +607,8 @@ int ribmgr_fini()
return 0;
}
-static int ribmgr_cdap_reply(struct cdap * instance,
- int invoke_id,
- int result,
- uint8_t * data,
- size_t len)
-{
- struct list_head * pos, * n = NULL;
-
- /* We never perform reads on other RIBs */
- (void) data;
- (void) len;
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- list_for_each_safe(pos, n, &rib.cdap_reqs) {
- struct cdap_request * req =
- list_entry(pos, struct cdap_request, next);
- if (req->instance == instance &&
- req->invoke_id == invoke_id &&
- req->state == REQ_PENDING) {
- if (result != 0)
- LOG_ERR("CDAP command with code %d and name %s "
- "failed with error %d",
- req->code, req->name, result);
- else
- LOG_DBG("CDAP command with code %d and name %s "
- "executed succesfully",
- req->code, req->name);
-
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- cdap_request_respond(req, result);
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- }
- }
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- return 0;
-}
-
static int ribmgr_cdap_create(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name,
ro_msg_t * msg)
{
@@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance,
struct ro_attr attr;
struct rnode * node;
+ assert(instance);
+
ro_attr_init(&attr);
attr.expiry.tv_sec = msg->sec;
attr.expiry.tv_nsec = msg->nsec;
@@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
ro_data = malloc(msg->value.len);
if (ro_data == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
memcpy(ro_data, msg->value.data, msg->value.len);
@@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
node = ribmgr_ro_create(name, attr, ro_data, msg->value.len);
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
free(ro_data);
return -1;
}
@@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to create request.");
return -1;
}
@@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
}
static int ribmgr_cdap_delete(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name)
{
struct list_head * p = NULL;
@@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
if (ribmgr_ro_delete(name)) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
@@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ if (cdap_reply_send(instance, key, 0, NULL, 0)) {
LOG_ERR("Failed to send reply to create request.");
return -1;
}
@@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
}
static int ribmgr_cdap_write(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name,
ro_msg_t * msg,
uint32_t flags)
@@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
ro_data = malloc(msg->value.len);
if (ro_data == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
memcpy(ro_data, msg->value.data, msg->value.len);
@@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
free(ro_data);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
node->seqno = msg->seqno;
@@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to write request.");
return -1;
}
@@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
return 0;
}
-static int ribmgr_enrol_sync(struct cdap * instance,
- struct rnode * node)
+static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node)
{
int ret = 0;
@@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance,
}
static int ribmgr_cdap_start(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name)
{
- int iid = 0;
-
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_OPERATIONAL &&
- strcmp(name, ENROLLMENT) == 0) {
+ if (strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("New enrollment request.");
- if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("IPCP in wrong state.");
+ return -1;
+ }
+
+ if (cdap_reply_send(instance, key, 0, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to enrollment request.");
return -1;
}
- /* Loop through rtree and send correct objects */
- LOG_DBGF("Sending ROs that need to be sent on enrolment...");
+ /* Loop through rtree and send correct objects. */
+ LOG_DBG("Sending ROs that need to be sent on enrolment...");
pthread_mutex_lock(&rib.ro_lock);
if (ribmgr_enrol_sync(instance, rib.root->child)) {
@@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance,
LOG_ERR("Failed to sync part of the RIB.");
return -1;
}
+
pthread_mutex_unlock(&rib.ro_lock);
LOG_DBGF("Sending stop enrollment...");
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT,
+ key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,
NULL, 0, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ if (key < 0) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send stop of enrollment.");
return -1;
}
- if (cdap_result_wait(instance, CDAP_STOP,
- ENROLLMENT, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ if (cdap_reply_wait(instance, key, NULL, NULL)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
} else {
- if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to send reply to start request.");
- return -1;
- }
+ LOG_WARN("Request to start unknown operation.");
+ if (cdap_reply_send(instance, key, -1, NULL, 0))
+ LOG_ERR("Failed to send negative reply.");
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
-static int ribmgr_cdap_stop(struct cdap * instance,
- int invoke_id,
- char * name)
+static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)
{
int ret = 0;
pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_CONFIG &&
- strcmp(name, ENROLLMENT) == 0) {
+ if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("Stop enrollment received.");
ipcp_set_state(IPCP_BOOTING);
} else
ret = -1;
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to stop request.");
return -1;
@@ -1028,8 +914,7 @@ static void ro_id_delete(void * o)
pthread_mutex_unlock(&rib.ro_ids_lock);
}
-static int ro_id_create(char * name,
- ro_msg_t * msg)
+static int ro_id_create(char * name, ro_msg_t * msg)
{
struct ro_id * tmp;
@@ -1062,105 +947,113 @@ static int ro_id_create(char * name,
return 0;
}
-static int ribmgr_cdap_request(struct cdap * instance,
- int invoke_id,
- enum cdap_opcode opcode,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags)
+static void * cdap_req_handler(void * o)
{
+ struct cdap * instance = (struct cdap *) o;
+ enum cdap_opcode opcode;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ uint32_t flags;
ro_msg_t * msg;
- int ret = -1;
struct list_head * p = NULL;
- if (opcode == CDAP_START)
- return ribmgr_cdap_start(instance,
- invoke_id,
- name);
- else if (opcode == CDAP_STOP)
- return ribmgr_cdap_stop(instance,
- invoke_id,
- name);
-
- msg = ro_msg__unpack(NULL, len, data);
- if (msg == NULL) {
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- LOG_ERR("Failed to unpack RO message");
- return -1;
- }
+ assert(instance);
- pthread_mutex_lock(&rib.ro_ids_lock);
- list_for_each(p, &rib.ro_ids) {
- struct ro_id * e = list_entry(p, struct ro_id, next);
+ while (true) {
+ cdap_key_t key = cdap_request_wait(instance,
+ &opcode,
+ &name,
+ &data,
+ &len,
+ &flags);
+ assert(key >= 0);
- if (strcmp(e->full_name, name) == 0 &&
- e->seqno == msg->seqno) {
- pthread_mutex_unlock(&rib.ro_ids_lock);
- ro_msg__free_unpacked(msg, NULL);
- cdap_send_reply(instance, invoke_id, 0, NULL, 0);
- LOG_DBG("Already received this RO.");
- return 0;
+ if (opcode == CDAP_START) {
+ if (ribmgr_cdap_start(instance, key, name))
+ LOG_WARN("CDAP start failed.");
+ continue;
+ }
+ else if (opcode == CDAP_STOP) {
+ if (ribmgr_cdap_stop(instance, key, name))
+ LOG_WARN("CDAP stop failed.");
+ continue;
}
- }
- pthread_mutex_unlock(&rib.ro_ids_lock);
-
- if (opcode == CDAP_CREATE) {
- ret = ribmgr_cdap_create(instance,
- invoke_id,
- name,
- msg);
- } else if (opcode == CDAP_WRITE) {
- ret = ribmgr_cdap_write(instance,
- invoke_id,
- name, msg,
- flags);
-
- } else if (opcode == CDAP_DELETE) {
- ret = ribmgr_cdap_delete(instance,
- invoke_id,
- name);
- } else {
- LOG_INFO("Unsupported opcode received.");
- ro_msg__free_unpacked(msg, NULL);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- return -1;
- }
- if (ro_id_create(name, msg)) {
- LOG_ERR("Failed to create RO id.");
- return -1;
- }
+ msg = ro_msg__unpack(NULL, len, data);
+ if (msg == NULL) {
+ cdap_reply_send(instance, key, -1, NULL, 0);
+ LOG_WARN("Failed to unpack RO message");
+ continue;
+ }
- if (msg->recv_set == ALL_MEMBERS) {
- pthread_rwlock_rdlock(&rib.flows_lock);
- list_for_each(p, &rib.flows) {
- struct mgmt_flow * e =
- list_entry(p, struct mgmt_flow, next);
+ pthread_mutex_lock(&rib.ro_ids_lock);
+ list_for_each(p, &rib.ro_ids) {
+ struct ro_id * e = list_entry(p, struct ro_id, next);
- /* Don't send it back */
- if (e->instance == instance)
+ if (strcmp(e->full_name, name) == 0 &&
+ e->seqno == msg->seqno) {
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_reply_send(instance, key, 0, NULL, 0);
+ LOG_DBG("Already received this RO.");
continue;
+ }
+ }
+ pthread_mutex_unlock(&rib.ro_ids_lock);
- if (write_ro_msg(e->instance, msg, name, opcode)) {
- LOG_ERR("Failed to send to a neighbor.");
- pthread_rwlock_unlock(&rib.flows_lock);
+ if (opcode == CDAP_CREATE) {
+ if (ribmgr_cdap_create(instance, key, name, msg)) {
+ LOG_WARN("CDAP create failed.");
ro_msg__free_unpacked(msg, NULL);
- return -1;
+ continue;
}
+ } else if (opcode == CDAP_WRITE) {
+ if (ribmgr_cdap_write(instance, key, name,
+ msg, flags)) {
+ LOG_WARN("CDAP write failed.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
+ } else if (opcode == CDAP_DELETE) {
+ if (ribmgr_cdap_delete(instance, key, name)) {
+ LOG_WARN("CDAP delete failed.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
+ } else {
+ LOG_INFO("Unsupported opcode received.");
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_reply_send(instance, key, -1, NULL, 0);
+ continue;
}
- pthread_rwlock_unlock(&rib.flows_lock);
- }
- ro_msg__free_unpacked(msg, NULL);
+ if (ro_id_create(name, msg)) {
+ LOG_WARN("Failed to create RO id.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
- return ret;
-}
+ if (msg->recv_set == ALL_MEMBERS) {
+ pthread_rwlock_rdlock(&rib.flows_lock);
+ list_for_each(p, &rib.flows) {
+ struct mgmt_flow * e =
+ list_entry(p, struct mgmt_flow, next);
-static struct cdap_ops ribmgr_cdap_ops = {
- .cdap_reply = ribmgr_cdap_reply,
- .cdap_request = ribmgr_cdap_request
-};
+ /* Don't send it back. */
+ if (e->instance == instance)
+ continue;
+
+ if (write_ro_msg(e->instance, msg,
+ name, opcode))
+ LOG_WARN("Failed to send to neighbor.");
+ }
+ pthread_rwlock_unlock(&rib.flows_lock);
+ }
+
+ ro_msg__free_unpacked(msg, NULL);
+ }
+}
int ribmgr_add_flow(int fd)
{
@@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd)
flow = malloc(sizeof(*flow));
if (flow == NULL)
- return -1;
+ return -ENOMEM;
- instance = cdap_create(&ribmgr_cdap_ops, fd);
+ instance = cdap_create(fd);
if (instance == NULL) {
LOG_ERR("Failed to create CDAP instance");
free(flow);
@@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd)
flow->instance = instance;
flow->fd = fd;
+ if (pthread_create(&flow->handler, NULL,
+ cdap_req_handler, instance)) {
+ LOG_ERR("Failed to start handler thread for mgt flow.");
+ free(flow);
+ return -1;
+ }
+
pthread_rwlock_wrlock(&rib.flows_lock);
+
list_add(&flow->next, &rib.flows);
+
pthread_rwlock_unlock(&rib.flows_lock);
return 0;
@@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd)
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
if (flow->fd == fd) {
+ pthread_cancel(flow->handler);
if (cdap_destroy(flow->instance))
LOG_ERR("Failed to destroy CDAP instance.");
list_del(&flow->next);
@@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf)
size_t len = 0;
struct ro_attr attr;
- if (conf == NULL ||
- conf->type != IPCP_NORMAL) {
+ if (conf == NULL || conf->type != IPCP_NORMAL) {
LOG_ERR("Bad DIF configuration.");
- return -1;
+ return -EINVAL;
}
ro_attr_init(&attr);
@@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
len = static_info_msg__get_packed_size(&stat_info);
if (len == 0) {
LOG_ERR("Failed to get size of static information.");
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
data = malloc(len);
if (data == NULL) {
LOG_ERR("Failed to allocate memory.");
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
attr, data, len) == NULL) {
LOG_ERR("Failed to create static info RO.");
free(data);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
if (dir_init()) {
LOG_ERR("Failed to init directory");
ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
LOG_ERR("Failed to initialize FRCT.");
dir_fini();
ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1296,12 +1193,14 @@ int ribmgr_enrol(void)
{
struct cdap * instance = NULL;
struct mgmt_flow * flow;
- int iid = 0;
+ cdap_key_t key;
+ int ret;
pthread_rwlock_wrlock(&ipcpi.state_lock);
if (ipcp_get_state() != IPCP_INIT) {
pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("IPCP in wrong state.");
return -1;
}
@@ -1312,36 +1211,31 @@ int ribmgr_enrol(void)
ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("No flows in RIB.");
return -1;
}
- flow = list_entry((&rib.flows)->next, struct mgmt_flow, next);
+ flow = list_first_entry((&rib.flows), struct mgmt_flow, next);
instance = flow->instance;
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- iid = cdap_send_request(instance,
- CDAP_START,
- ENROLLMENT,
- NULL, 0, 0);
- if (iid < 0) {
+ key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0);
+ if (key < 0) {
ipcp_set_state(IPCP_INIT);
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start enrollment.");
return -1;
}
- if (cdap_result_wait(instance, CDAP_START,
- ENROLLMENT, iid)) {
+ ret = cdap_reply_wait(instance, key, NULL, NULL);
+ if (ret) {
ipcp_set_state(IPCP_INIT);
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to start enrollment.");
+ LOG_ERR("Failed to enroll: %d.", ret);
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1351,9 +1245,10 @@ int ribmgr_enrol(void)
int ribmgr_start_policies(void)
{
pthread_rwlock_rdlock(&ipcpi.state_lock);
+
if (ipcp_get_state() != IPCP_BOOTING) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Cannot start policies in wrong state");
+ LOG_ERR("Cannot start policies in wrong state.");
return -1;
}
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void)
}
rib.address = rib.addr_auth->address();
- LOG_DBG("IPCP has address %lu", (unsigned long) rib.address);
+ LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address);
return 0;
}
@@ -1380,22 +1275,21 @@ uint64_t ribmgr_address()
return rib.address;
}
-static int send_neighbors_ro(char * name,
- ro_msg_t * msg,
- enum cdap_opcode code)
+static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code)
{
struct list_head * p = NULL;
pthread_rwlock_rdlock(&rib.flows_lock);
+
list_for_each(p, &rib.flows) {
struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next);
-
if (write_ro_msg(e->instance, msg, name, code)) {
- LOG_ERR("Failed to send to a neighbor.");
pthread_rwlock_unlock(&rib.flows_lock);
+ LOG_ERR("Failed to send to a neighbor.");
return -1;
}
}
+
pthread_rwlock_unlock(&rib.flows_lock);
return 0;
@@ -1499,9 +1393,7 @@ int ro_delete(const char * name)
return 0;
}
-int ro_write(const char * name,
- uint8_t * data,
- size_t len)
+int ro_write(const char * name, uint8_t * data, size_t len)
{
struct rnode * node;
ro_msg_t msg = RO_MSG__INIT;
@@ -1541,8 +1433,7 @@ int ro_write(const char * name,
return 0;
}
-ssize_t ro_read(const char * name,
- uint8_t ** data)
+ssize_t ro_read(const char * name, uint8_t ** data)
{
struct rnode * node;
ssize_t len;
@@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name,
return len;
}
-ssize_t ro_children(const char * name,
- char *** children)
+ssize_t ro_children(const char * name, char *** children)
{
struct rnode * node;
struct rnode * child;
@@ -1640,8 +1530,7 @@ bool ro_exists(const char * name)
return found;
}
-int ro_subscribe(const char * name,
- struct ro_sub_ops * ops)
+int ro_subscribe(const char * name, struct ro_sub_ops * ops)
{
struct ro_sub * sub;
int sid;