summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/ribmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/ribmgr.c')
-rw-r--r--src/ipcpd/normal/ribmgr.c501
1 files changed, 195 insertions, 306 deletions
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index b0738a0c..1e9bcc18 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -42,7 +42,6 @@
#include "dt_const.h"
#include "frct.h"
#include "ipcp.h"
-#include "cdap_request.h"
#include "ro.h"
#include "path.h"
#include "dir.h"
@@ -85,22 +84,28 @@ struct rnode {
};
struct mgmt_flow {
+ struct list_head next;
+
struct cdap * instance;
int fd;
- struct list_head next;
+
+ pthread_t handler;
};
struct ro_sub {
+ struct list_head next;
+
int sid;
+
char * name;
struct ro_sub_ops * ops;
- struct list_head next;
};
struct ro_id {
+ struct list_head next;
+
uint64_t seqno;
char * full_name;
- struct list_head next;
};
struct {
@@ -124,9 +129,6 @@ struct {
struct list_head flows;
pthread_rwlock_t flows_lock;
- struct list_head cdap_reqs;
- pthread_mutex_t cdap_reqs_lock;
-
struct addr_auth * addr_auth;
enum pol_addr_auth addr_auth_type;
} rib;
@@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name,
pthread_rwlock_unlock(&ipcpi.state_lock);
}
-/* We only have a create operation for now */
+/* We only have a create operation for now. */
static struct ro_sub_ops ribmgr_sub_ops = {
.ro_created = ribmgr_ro_created,
.ro_updated = NULL,
@@ -303,9 +305,12 @@ static void ro_delete_timer(void * o)
{
char * name = (char *) o;
- if (ribmgr_ro_delete(name)) {
+ pthread_mutex_lock(&rib.ro_lock);
+
+ if (ribmgr_ro_delete(name))
LOG_ERR("Failed to delete %s.", name);
- }
+
+ pthread_mutex_unlock(&rib.ro_lock);
}
static struct rnode * ribmgr_ro_create(const char * name,
@@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char * name,
node = node->child;
sibling = false;
- /* Search horizontally */
+ /* Search horizontally. */
while (node != NULL) {
if (strcmp(node->name, token) == 0) {
break;
@@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char * name,
LOG_DBG("Created RO with name %s.", name);
- if (!(attr.expiry.tv_sec == 0 &&
- attr.expiry.tv_nsec == 0)) {
+ if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) {
timeout = attr.expiry.tv_sec * 1000 +
attr.expiry.tv_nsec / MILLION;
- if (timerwheel_add(rib.wheel, ro_delete_timer,
- new->full_name, strlen(new->full_name) + 1,
- timeout)) {
+ if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name,
+ strlen(new->full_name) + 1, timeout))
LOG_ERR("Failed to add deletion timer of RO.");
- }
}
return new;
@@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name,
return node;
}
-/* Call while holding cdap_reqs_lock */
-/* FIXME: better not to call blocking functions under any lock */
-int cdap_result_wait(struct cdap * instance,
- enum cdap_opcode code,
- char * name,
- int invoke_id)
-{
- struct cdap_request * req;
- int ret;
- char * name_dup = strdup(name);
- if (name_dup == NULL)
- return -1;
-
- req = cdap_request_create(code, name_dup, invoke_id, instance);
- if (req == NULL) {
- free(name_dup);
- return -1;
- }
-
- list_add(&req->next, &rib.cdap_reqs);
-
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- ret = cdap_request_wait(req);
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- if (ret == -1) /* should only be on ipcp shutdown */
- LOG_DBG("Waiting CDAP request destroyed.");
-
- if (ret == -ETIMEDOUT)
- LOG_ERR("CDAP Request timed out.");
-
- if (ret)
- LOG_DBG("Unknown error code: %d.", ret);
-
- if (!ret)
- ret = req->result;
-
- list_del(&req->next);
- cdap_request_destroy(req);
-
- return ret;
-}
-
static int write_ro_msg(struct cdap * neighbor,
ro_msg_t * msg,
char * name,
@@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap * neighbor,
{
uint8_t * data;
size_t len;
- int iid = 0;
+ cdap_key_t key;
+ int ret;
len = ro_msg__get_packed_size(msg);
if (len == 0)
@@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap * neighbor,
ro_msg__pack(msg, data);
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- iid = cdap_send_request(neighbor, code,
- name, data, len, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ key = cdap_request_send(neighbor, code, name, data, len, 0);
+ if (key < 0) {
+ LOG_ERR("Failed to send CDAP request.");
free(data);
return -1;
}
- if (cdap_result_wait(neighbor, code, name, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- free(data);
- LOG_ERR("Remote did not receive RIB object.");
+ free(data);
+
+ ret = cdap_reply_wait(neighbor, key, NULL, NULL);
+ if (ret < 0) {
+ LOG_ERR("CDAP command with code %d and name %s failed: %d.",
+ code, name, ret);
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- free(data);
return 0;
}
@@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap * neighbor,
int ribmgr_init()
{
INIT_LIST_HEAD(&rib.flows);
- INIT_LIST_HEAD(&rib.cdap_reqs);
INIT_LIST_HEAD(&rib.subs);
INIT_LIST_HEAD(&rib.ro_ids);
@@ -540,17 +495,9 @@ int ribmgr_init()
return -1;
}
- if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) {
- LOG_ERR("Failed to initialize mutex.");
- pthread_rwlock_destroy(&rib.flows_lock);
- free(rib.root);
- return -1;
- }
-
if (pthread_mutex_init(&rib.ro_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
free(rib.root);
return -1;
}
@@ -558,7 +505,6 @@ int ribmgr_init()
if (pthread_mutex_init(&rib.subs_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
free(rib.root);
return -1;
@@ -567,7 +513,6 @@ int ribmgr_init()
if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
free(rib.root);
@@ -578,7 +523,6 @@ int ribmgr_init()
if (rib.sids == NULL) {
LOG_ERR("Failed to create bitmap.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -591,7 +535,6 @@ int ribmgr_init()
LOG_ERR("Failed to create timerwheel.");
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -605,7 +548,6 @@ int ribmgr_init()
timerwheel_destroy(rib.wheel);
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -633,16 +575,6 @@ int ribmgr_fini()
struct list_head * pos = NULL;
struct list_head * n = NULL;
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- list_for_each_safe(pos, n, &rib.cdap_reqs) {
- struct cdap_request * req =
- list_entry(pos, struct cdap_request, next);
- free(req->name);
- list_del(&req->next);
- free(req);
- }
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
pthread_rwlock_wrlock(&rib.flows_lock);
list_for_each_safe(pos, n, &rib.flows) {
struct mgmt_flow * flow =
@@ -668,7 +600,6 @@ int ribmgr_fini()
timerwheel_destroy(rib.wheel);
pthread_mutex_destroy(&rib.subs_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_rwlock_destroy(&rib.flows_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -676,49 +607,8 @@ int ribmgr_fini()
return 0;
}
-static int ribmgr_cdap_reply(struct cdap * instance,
- int invoke_id,
- int result,
- uint8_t * data,
- size_t len)
-{
- struct list_head * pos, * n = NULL;
-
- /* We never perform reads on other RIBs */
- (void) data;
- (void) len;
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- list_for_each_safe(pos, n, &rib.cdap_reqs) {
- struct cdap_request * req =
- list_entry(pos, struct cdap_request, next);
- if (req->instance == instance &&
- req->invoke_id == invoke_id &&
- req->state == REQ_PENDING) {
- if (result != 0)
- LOG_ERR("CDAP command with code %d and name %s "
- "failed with error %d",
- req->code, req->name, result);
- else
- LOG_DBG("CDAP command with code %d and name %s "
- "executed succesfully",
- req->code, req->name);
-
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- cdap_request_respond(req, result);
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- }
- }
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- return 0;
-}
-
static int ribmgr_cdap_create(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name,
ro_msg_t * msg)
{
@@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance,
struct ro_attr attr;
struct rnode * node;
+ assert(instance);
+
ro_attr_init(&attr);
attr.expiry.tv_sec = msg->sec;
attr.expiry.tv_nsec = msg->nsec;
@@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
ro_data = malloc(msg->value.len);
if (ro_data == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
memcpy(ro_data, msg->value.data, msg->value.len);
@@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
node = ribmgr_ro_create(name, attr, ro_data, msg->value.len);
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
free(ro_data);
return -1;
}
@@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to create request.");
return -1;
}
@@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
}
static int ribmgr_cdap_delete(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name)
{
struct list_head * p = NULL;
@@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
if (ribmgr_ro_delete(name)) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
@@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ if (cdap_reply_send(instance, key, 0, NULL, 0)) {
LOG_ERR("Failed to send reply to create request.");
return -1;
}
@@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
}
static int ribmgr_cdap_write(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name,
ro_msg_t * msg,
uint32_t flags)
@@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
ro_data = malloc(msg->value.len);
if (ro_data == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
memcpy(ro_data, msg->value.data, msg->value.len);
@@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
free(ro_data);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
node->seqno = msg->seqno;
@@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to write request.");
return -1;
}
@@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
return 0;
}
-static int ribmgr_enrol_sync(struct cdap * instance,
- struct rnode * node)
+static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node)
{
int ret = 0;
@@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance,
}
static int ribmgr_cdap_start(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name)
{
- int iid = 0;
-
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_OPERATIONAL &&
- strcmp(name, ENROLLMENT) == 0) {
+ if (strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("New enrollment request.");
- if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("IPCP in wrong state.");
+ return -1;
+ }
+
+ if (cdap_reply_send(instance, key, 0, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to enrollment request.");
return -1;
}
- /* Loop through rtree and send correct objects */
- LOG_DBGF("Sending ROs that need to be sent on enrolment...");
+ /* Loop through rtree and send correct objects. */
+ LOG_DBG("Sending ROs that need to be sent on enrolment...");
pthread_mutex_lock(&rib.ro_lock);
if (ribmgr_enrol_sync(instance, rib.root->child)) {
@@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance,
LOG_ERR("Failed to sync part of the RIB.");
return -1;
}
+
pthread_mutex_unlock(&rib.ro_lock);
LOG_DBGF("Sending stop enrollment...");
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT,
+ key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,
NULL, 0, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ if (key < 0) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send stop of enrollment.");
return -1;
}
- if (cdap_result_wait(instance, CDAP_STOP,
- ENROLLMENT, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ if (cdap_reply_wait(instance, key, NULL, NULL)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
} else {
- if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to send reply to start request.");
- return -1;
- }
+ LOG_WARN("Request to start unknown operation.");
+ if (cdap_reply_send(instance, key, -1, NULL, 0))
+ LOG_ERR("Failed to send negative reply.");
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
-static int ribmgr_cdap_stop(struct cdap * instance,
- int invoke_id,
- char * name)
+static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)
{
int ret = 0;
pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_CONFIG &&
- strcmp(name, ENROLLMENT) == 0) {
+ if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("Stop enrollment received.");
ipcp_set_state(IPCP_BOOTING);
} else
ret = -1;
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to stop request.");
return -1;
@@ -1028,8 +914,7 @@ static void ro_id_delete(void * o)
pthread_mutex_unlock(&rib.ro_ids_lock);
}
-static int ro_id_create(char * name,
- ro_msg_t * msg)
+static int ro_id_create(char * name, ro_msg_t * msg)
{
struct ro_id * tmp;
@@ -1062,105 +947,113 @@ static int ro_id_create(char * name,
return 0;
}
-static int ribmgr_cdap_request(struct cdap * instance,
- int invoke_id,
- enum cdap_opcode opcode,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags)
+static void * cdap_req_handler(void * o)
{
+ struct cdap * instance = (struct cdap *) o;
+ enum cdap_opcode opcode;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ uint32_t flags;
ro_msg_t * msg;
- int ret = -1;
struct list_head * p = NULL;
- if (opcode == CDAP_START)
- return ribmgr_cdap_start(instance,
- invoke_id,
- name);
- else if (opcode == CDAP_STOP)
- return ribmgr_cdap_stop(instance,
- invoke_id,
- name);
-
- msg = ro_msg__unpack(NULL, len, data);
- if (msg == NULL) {
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- LOG_ERR("Failed to unpack RO message");
- return -1;
- }
+ assert(instance);
- pthread_mutex_lock(&rib.ro_ids_lock);
- list_for_each(p, &rib.ro_ids) {
- struct ro_id * e = list_entry(p, struct ro_id, next);
+ while (true) {
+ cdap_key_t key = cdap_request_wait(instance,
+ &opcode,
+ &name,
+ &data,
+ &len,
+ &flags);
+ assert(key >= 0);
- if (strcmp(e->full_name, name) == 0 &&
- e->seqno == msg->seqno) {
- pthread_mutex_unlock(&rib.ro_ids_lock);
- ro_msg__free_unpacked(msg, NULL);
- cdap_send_reply(instance, invoke_id, 0, NULL, 0);
- LOG_DBG("Already received this RO.");
- return 0;
+ if (opcode == CDAP_START) {
+ if (ribmgr_cdap_start(instance, key, name))
+ LOG_WARN("CDAP start failed.");
+ continue;
+ }
+ else if (opcode == CDAP_STOP) {
+ if (ribmgr_cdap_stop(instance, key, name))
+ LOG_WARN("CDAP stop failed.");
+ continue;
}
- }
- pthread_mutex_unlock(&rib.ro_ids_lock);
-
- if (opcode == CDAP_CREATE) {
- ret = ribmgr_cdap_create(instance,
- invoke_id,
- name,
- msg);
- } else if (opcode == CDAP_WRITE) {
- ret = ribmgr_cdap_write(instance,
- invoke_id,
- name, msg,
- flags);
-
- } else if (opcode == CDAP_DELETE) {
- ret = ribmgr_cdap_delete(instance,
- invoke_id,
- name);
- } else {
- LOG_INFO("Unsupported opcode received.");
- ro_msg__free_unpacked(msg, NULL);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- return -1;
- }
- if (ro_id_create(name, msg)) {
- LOG_ERR("Failed to create RO id.");
- return -1;
- }
+ msg = ro_msg__unpack(NULL, len, data);
+ if (msg == NULL) {
+ cdap_reply_send(instance, key, -1, NULL, 0);
+ LOG_WARN("Failed to unpack RO message");
+ continue;
+ }
- if (msg->recv_set == ALL_MEMBERS) {
- pthread_rwlock_rdlock(&rib.flows_lock);
- list_for_each(p, &rib.flows) {
- struct mgmt_flow * e =
- list_entry(p, struct mgmt_flow, next);
+ pthread_mutex_lock(&rib.ro_ids_lock);
+ list_for_each(p, &rib.ro_ids) {
+ struct ro_id * e = list_entry(p, struct ro_id, next);
- /* Don't send it back */
- if (e->instance == instance)
+ if (strcmp(e->full_name, name) == 0 &&
+ e->seqno == msg->seqno) {
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_reply_send(instance, key, 0, NULL, 0);
+ LOG_DBG("Already received this RO.");
continue;
+ }
+ }
+ pthread_mutex_unlock(&rib.ro_ids_lock);
- if (write_ro_msg(e->instance, msg, name, opcode)) {
- LOG_ERR("Failed to send to a neighbor.");
- pthread_rwlock_unlock(&rib.flows_lock);
+ if (opcode == CDAP_CREATE) {
+ if (ribmgr_cdap_create(instance, key, name, msg)) {
+ LOG_WARN("CDAP create failed.");
ro_msg__free_unpacked(msg, NULL);
- return -1;
+ continue;
}
+ } else if (opcode == CDAP_WRITE) {
+ if (ribmgr_cdap_write(instance, key, name,
+ msg, flags)) {
+ LOG_WARN("CDAP write failed.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
+ } else if (opcode == CDAP_DELETE) {
+ if (ribmgr_cdap_delete(instance, key, name)) {
+ LOG_WARN("CDAP delete failed.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
+ } else {
+ LOG_INFO("Unsupported opcode received.");
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_reply_send(instance, key, -1, NULL, 0);
+ continue;
}
- pthread_rwlock_unlock(&rib.flows_lock);
- }
- ro_msg__free_unpacked(msg, NULL);
+ if (ro_id_create(name, msg)) {
+ LOG_WARN("Failed to create RO id.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
- return ret;
-}
+ if (msg->recv_set == ALL_MEMBERS) {
+ pthread_rwlock_rdlock(&rib.flows_lock);
+ list_for_each(p, &rib.flows) {
+ struct mgmt_flow * e =
+ list_entry(p, struct mgmt_flow, next);
-static struct cdap_ops ribmgr_cdap_ops = {
- .cdap_reply = ribmgr_cdap_reply,
- .cdap_request = ribmgr_cdap_request
-};
+ /* Don't send it back. */
+ if (e->instance == instance)
+ continue;
+
+ if (write_ro_msg(e->instance, msg,
+ name, opcode))
+ LOG_WARN("Failed to send to neighbor.");
+ }
+ pthread_rwlock_unlock(&rib.flows_lock);
+ }
+
+ ro_msg__free_unpacked(msg, NULL);
+ }
+}
int ribmgr_add_flow(int fd)
{
@@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd)
flow = malloc(sizeof(*flow));
if (flow == NULL)
- return -1;
+ return -ENOMEM;
- instance = cdap_create(&ribmgr_cdap_ops, fd);
+ instance = cdap_create(fd);
if (instance == NULL) {
LOG_ERR("Failed to create CDAP instance");
free(flow);
@@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd)
flow->instance = instance;
flow->fd = fd;
+ if (pthread_create(&flow->handler, NULL,
+ cdap_req_handler, instance)) {
+ LOG_ERR("Failed to start handler thread for mgt flow.");
+ free(flow);
+ return -1;
+ }
+
pthread_rwlock_wrlock(&rib.flows_lock);
+
list_add(&flow->next, &rib.flows);
+
pthread_rwlock_unlock(&rib.flows_lock);
return 0;
@@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd)
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
if (flow->fd == fd) {
+ pthread_cancel(flow->handler);
if (cdap_destroy(flow->instance))
LOG_ERR("Failed to destroy CDAP instance.");
list_del(&flow->next);
@@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf)
size_t len = 0;
struct ro_attr attr;
- if (conf == NULL ||
- conf->type != IPCP_NORMAL) {
+ if (conf == NULL || conf->type != IPCP_NORMAL) {
LOG_ERR("Bad DIF configuration.");
- return -1;
+ return -EINVAL;
}
ro_attr_init(&attr);
@@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
len = static_info_msg__get_packed_size(&stat_info);
if (len == 0) {
LOG_ERR("Failed to get size of static information.");
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
data = malloc(len);
if (data == NULL) {
LOG_ERR("Failed to allocate memory.");
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
attr, data, len) == NULL) {
LOG_ERR("Failed to create static info RO.");
free(data);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
if (dir_init()) {
LOG_ERR("Failed to init directory");
ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
LOG_ERR("Failed to initialize FRCT.");
dir_fini();
ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1296,12 +1193,14 @@ int ribmgr_enrol(void)
{
struct cdap * instance = NULL;
struct mgmt_flow * flow;
- int iid = 0;
+ cdap_key_t key;
+ int ret;
pthread_rwlock_wrlock(&ipcpi.state_lock);
if (ipcp_get_state() != IPCP_INIT) {
pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("IPCP in wrong state.");
return -1;
}
@@ -1312,36 +1211,31 @@ int ribmgr_enrol(void)
ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("No flows in RIB.");
return -1;
}
- flow = list_entry((&rib.flows)->next, struct mgmt_flow, next);
+ flow = list_first_entry((&rib.flows), struct mgmt_flow, next);
instance = flow->instance;
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- iid = cdap_send_request(instance,
- CDAP_START,
- ENROLLMENT,
- NULL, 0, 0);
- if (iid < 0) {
+ key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0);
+ if (key < 0) {
ipcp_set_state(IPCP_INIT);
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start enrollment.");
return -1;
}
- if (cdap_result_wait(instance, CDAP_START,
- ENROLLMENT, iid)) {
+ ret = cdap_reply_wait(instance, key, NULL, NULL);
+ if (ret) {
ipcp_set_state(IPCP_INIT);
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to start enrollment.");
+ LOG_ERR("Failed to enroll: %d.", ret);
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1351,9 +1245,10 @@ int ribmgr_enrol(void)
int ribmgr_start_policies(void)
{
pthread_rwlock_rdlock(&ipcpi.state_lock);
+
if (ipcp_get_state() != IPCP_BOOTING) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Cannot start policies in wrong state");
+ LOG_ERR("Cannot start policies in wrong state.");
return -1;
}
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void)
}
rib.address = rib.addr_auth->address();
- LOG_DBG("IPCP has address %lu", (unsigned long) rib.address);
+ LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address);
return 0;
}
@@ -1380,22 +1275,21 @@ uint64_t ribmgr_address()
return rib.address;
}
-static int send_neighbors_ro(char * name,
- ro_msg_t * msg,
- enum cdap_opcode code)
+static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code)
{
struct list_head * p = NULL;
pthread_rwlock_rdlock(&rib.flows_lock);
+
list_for_each(p, &rib.flows) {
struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next);
-
if (write_ro_msg(e->instance, msg, name, code)) {
- LOG_ERR("Failed to send to a neighbor.");
pthread_rwlock_unlock(&rib.flows_lock);
+ LOG_ERR("Failed to send to a neighbor.");
return -1;
}
}
+
pthread_rwlock_unlock(&rib.flows_lock);
return 0;
@@ -1499,9 +1393,7 @@ int ro_delete(const char * name)
return 0;
}
-int ro_write(const char * name,
- uint8_t * data,
- size_t len)
+int ro_write(const char * name, uint8_t * data, size_t len)
{
struct rnode * node;
ro_msg_t msg = RO_MSG__INIT;
@@ -1541,8 +1433,7 @@ int ro_write(const char * name,
return 0;
}
-ssize_t ro_read(const char * name,
- uint8_t ** data)
+ssize_t ro_read(const char * name, uint8_t ** data)
{
struct rnode * node;
ssize_t len;
@@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name,
return len;
}
-ssize_t ro_children(const char * name,
- char *** children)
+ssize_t ro_children(const char * name, char *** children)
{
struct rnode * node;
struct rnode * child;
@@ -1640,8 +1530,7 @@ bool ro_exists(const char * name)
return found;
}
-int ro_subscribe(const char * name,
- struct ro_sub_ops * ops)
+int ro_subscribe(const char * name, struct ro_sub_ops * ops)
{
struct ro_sub * sub;
int sid;