From f97dee45d3c1b0088aa8010a1c9d59593c3d0df0 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 23 Dec 2016 18:13:40 +0100 Subject: ipcpd, lib: Refactor normal ipcp and cdap Refactors the normal IPCP fmgr and ribmgr, and modifies the API for cdap so that no callbacks are needed. --- src/ipcpd/normal/ribmgr.c | 501 ++++++++++++++++++---------------------------- 1 file changed, 195 insertions(+), 306 deletions(-) (limited to 'src/ipcpd/normal/ribmgr.c') diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index b0738a0c..1e9bcc18 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -42,7 +42,6 @@ #include "dt_const.h" #include "frct.h" #include "ipcp.h" -#include "cdap_request.h" #include "ro.h" #include "path.h" #include "dir.h" @@ -85,22 +84,28 @@ struct rnode { }; struct mgmt_flow { + struct list_head next; + struct cdap * instance; int fd; - struct list_head next; + + pthread_t handler; }; struct ro_sub { + struct list_head next; + int sid; + char * name; struct ro_sub_ops * ops; - struct list_head next; }; struct ro_id { + struct list_head next; + uint64_t seqno; char * full_name; - struct list_head next; }; struct { @@ -124,9 +129,6 @@ struct { struct list_head flows; pthread_rwlock_t flows_lock; - struct list_head cdap_reqs; - pthread_mutex_t cdap_reqs_lock; - struct addr_auth * addr_auth; enum pol_addr_auth addr_auth_type; } rib; @@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name, pthread_rwlock_unlock(&ipcpi.state_lock); } -/* We only have a create operation for now */ +/* We only have a create operation for now. */ static struct ro_sub_ops ribmgr_sub_ops = { .ro_created = ribmgr_ro_created, .ro_updated = NULL, @@ -303,9 +305,12 @@ static void ro_delete_timer(void * o) { char * name = (char *) o; - if (ribmgr_ro_delete(name)) { + pthread_mutex_lock(&rib.ro_lock); + + if (ribmgr_ro_delete(name)) LOG_ERR("Failed to delete %s.", name); - } + + pthread_mutex_unlock(&rib.ro_lock); } static struct rnode * ribmgr_ro_create(const char * name, @@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char * name, node = node->child; sibling = false; - /* Search horizontally */ + /* Search horizontally. */ while (node != NULL) { if (strcmp(node->name, token) == 0) { break; @@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char * name, LOG_DBG("Created RO with name %s.", name); - if (!(attr.expiry.tv_sec == 0 && - attr.expiry.tv_nsec == 0)) { + if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) { timeout = attr.expiry.tv_sec * 1000 + attr.expiry.tv_nsec / MILLION; - if (timerwheel_add(rib.wheel, ro_delete_timer, - new->full_name, strlen(new->full_name) + 1, - timeout)) { + if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, + strlen(new->full_name) + 1, timeout)) LOG_ERR("Failed to add deletion timer of RO."); - } } return new; @@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name, return node; } -/* Call while holding cdap_reqs_lock */ -/* FIXME: better not to call blocking functions under any lock */ -int cdap_result_wait(struct cdap * instance, - enum cdap_opcode code, - char * name, - int invoke_id) -{ - struct cdap_request * req; - int ret; - char * name_dup = strdup(name); - if (name_dup == NULL) - return -1; - - req = cdap_request_create(code, name_dup, invoke_id, instance); - if (req == NULL) { - free(name_dup); - return -1; - } - - list_add(&req->next, &rib.cdap_reqs); - - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - ret = cdap_request_wait(req); - - pthread_mutex_lock(&rib.cdap_reqs_lock); - - if (ret == -1) /* should only be on ipcp shutdown */ - LOG_DBG("Waiting CDAP request destroyed."); - - if (ret == -ETIMEDOUT) - LOG_ERR("CDAP Request timed out."); - - if (ret) - LOG_DBG("Unknown error code: %d.", ret); - - if (!ret) - ret = req->result; - - list_del(&req->next); - cdap_request_destroy(req); - - return ret; -} - static int write_ro_msg(struct cdap * neighbor, ro_msg_t * msg, char * name, @@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap * neighbor, { uint8_t * data; size_t len; - int iid = 0; + cdap_key_t key; + int ret; len = ro_msg__get_packed_size(msg); if (len == 0) @@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap * neighbor, ro_msg__pack(msg, data); - pthread_mutex_lock(&rib.cdap_reqs_lock); - iid = cdap_send_request(neighbor, code, - name, data, len, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + key = cdap_request_send(neighbor, code, name, data, len, 0); + if (key < 0) { + LOG_ERR("Failed to send CDAP request."); free(data); return -1; } - if (cdap_result_wait(neighbor, code, name, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); - free(data); - LOG_ERR("Remote did not receive RIB object."); + free(data); + + ret = cdap_reply_wait(neighbor, key, NULL, NULL); + if (ret < 0) { + LOG_ERR("CDAP command with code %d and name %s failed: %d.", + code, name, ret); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - free(data); return 0; } @@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap * neighbor, int ribmgr_init() { INIT_LIST_HEAD(&rib.flows); - INIT_LIST_HEAD(&rib.cdap_reqs); INIT_LIST_HEAD(&rib.subs); INIT_LIST_HEAD(&rib.ro_ids); @@ -540,17 +495,9 @@ int ribmgr_init() return -1; } - if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { - LOG_ERR("Failed to initialize mutex."); - pthread_rwlock_destroy(&rib.flows_lock); - free(rib.root); - return -1; - } - if (pthread_mutex_init(&rib.ro_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); free(rib.root); return -1; } @@ -558,7 +505,6 @@ int ribmgr_init() if (pthread_mutex_init(&rib.subs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); free(rib.root); return -1; @@ -567,7 +513,6 @@ int ribmgr_init() if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); free(rib.root); @@ -578,7 +523,6 @@ int ribmgr_init() if (rib.sids == NULL) { LOG_ERR("Failed to create bitmap."); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -591,7 +535,6 @@ int ribmgr_init() LOG_ERR("Failed to create timerwheel."); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -605,7 +548,6 @@ int ribmgr_init() timerwheel_destroy(rib.wheel); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -633,16 +575,6 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; - pthread_mutex_lock(&rib.cdap_reqs_lock); - list_for_each_safe(pos, n, &rib.cdap_reqs) { - struct cdap_request * req = - list_entry(pos, struct cdap_request, next); - free(req->name); - list_del(&req->next); - free(req); - } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - pthread_rwlock_wrlock(&rib.flows_lock); list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = @@ -668,7 +600,6 @@ int ribmgr_fini() timerwheel_destroy(rib.wheel); pthread_mutex_destroy(&rib.subs_lock); - pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.ro_ids_lock); @@ -676,49 +607,8 @@ int ribmgr_fini() return 0; } -static int ribmgr_cdap_reply(struct cdap * instance, - int invoke_id, - int result, - uint8_t * data, - size_t len) -{ - struct list_head * pos, * n = NULL; - - /* We never perform reads on other RIBs */ - (void) data; - (void) len; - - pthread_mutex_lock(&rib.cdap_reqs_lock); - - list_for_each_safe(pos, n, &rib.cdap_reqs) { - struct cdap_request * req = - list_entry(pos, struct cdap_request, next); - if (req->instance == instance && - req->invoke_id == invoke_id && - req->state == REQ_PENDING) { - if (result != 0) - LOG_ERR("CDAP command with code %d and name %s " - "failed with error %d", - req->code, req->name, result); - else - LOG_DBG("CDAP command with code %d and name %s " - "executed succesfully", - req->code, req->name); - - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - cdap_request_respond(req, result); - - pthread_mutex_lock(&rib.cdap_reqs_lock); - } - } - pthread_mutex_unlock(&rib.cdap_reqs_lock); - - return 0; -} - static int ribmgr_cdap_create(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name, ro_msg_t * msg) { @@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance, struct ro_attr attr; struct rnode * node; + assert(instance); + ro_attr_init(&attr); attr.expiry.tv_sec = msg->sec; attr.expiry.tv_nsec = msg->nsec; @@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance, ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); @@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance, node = ribmgr_ro_create(name, attr, ro_data, msg->value.len); if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); free(ro_data); return -1; } @@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } @@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance, } static int ribmgr_cdap_delete(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name) { struct list_head * p = NULL; @@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, if (ribmgr_ro_delete(name)) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } @@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + if (cdap_reply_send(instance, key, 0, NULL, 0)) { LOG_ERR("Failed to send reply to create request."); return -1; } @@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance, } static int ribmgr_cdap_write(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name, ro_msg_t * msg, uint32_t flags) @@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance, ro_data = malloc(msg->value.len); if (ro_data == NULL) { pthread_mutex_unlock(&rib.ro_lock); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } memcpy(ro_data, msg->value.data, msg->value.len); @@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance, if (node == NULL) { pthread_mutex_unlock(&rib.ro_lock); free(ro_data); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); + cdap_reply_send(instance, key, -1, NULL, 0); return -1; } node->seqno = msg->seqno; @@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance, pthread_mutex_unlock(&rib.subs_lock); pthread_mutex_unlock(&rib.ro_lock); - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); return -1; } @@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance, return 0; } -static int ribmgr_enrol_sync(struct cdap * instance, - struct rnode * node) +static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node) { int ret = 0; @@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance, } static int ribmgr_cdap_start(struct cdap * instance, - int invoke_id, + cdap_key_t key, char * name) { - int iid = 0; - - pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_OPERATIONAL && - strcmp(name, ENROLLMENT) == 0) { + if (strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); - if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { + pthread_rwlock_wrlock(&ipcpi.state_lock); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("IPCP in wrong state."); + return -1; + } + + if (cdap_reply_send(instance, key, 0, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } - /* Loop through rtree and send correct objects */ - LOG_DBGF("Sending ROs that need to be sent on enrolment..."); + /* Loop through rtree and send correct objects. */ + LOG_DBG("Sending ROs that need to be sent on enrolment..."); pthread_mutex_lock(&rib.ro_lock); if (ribmgr_enrol_sync(instance, rib.root->child)) { @@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance, LOG_ERR("Failed to sync part of the RIB."); return -1; } + pthread_mutex_unlock(&rib.ro_lock); LOG_DBGF("Sending stop enrollment..."); - pthread_mutex_lock(&rib.cdap_reqs_lock); - - iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, + key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT, NULL, 0, 0); - if (iid < 0) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + if (key < 0) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send stop of enrollment."); return -1; } - if (cdap_result_wait(instance, CDAP_STOP, - ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib.cdap_reqs_lock); + if (cdap_reply_wait(instance, key, NULL, NULL)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Remote failed to complete enrollment."); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); + + pthread_rwlock_unlock(&ipcpi.state_lock); } else { - if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { - pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to send reply to start request."); - return -1; - } + LOG_WARN("Request to start unknown operation."); + if (cdap_reply_send(instance, key, -1, NULL, 0)) + LOG_ERR("Failed to send negative reply."); } - pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } -static int ribmgr_cdap_stop(struct cdap * instance, - int invoke_id, - char * name) +static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name) { int ret = 0; pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state() == IPCP_CONFIG && - strcmp(name, ENROLLMENT) == 0) { + if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); ipcp_set_state(IPCP_BOOTING); } else ret = -1; - if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { + if (cdap_reply_send(instance, key, ret, NULL, 0)) { pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; @@ -1028,8 +914,7 @@ static void ro_id_delete(void * o) pthread_mutex_unlock(&rib.ro_ids_lock); } -static int ro_id_create(char * name, - ro_msg_t * msg) +static int ro_id_create(char * name, ro_msg_t * msg) { struct ro_id * tmp; @@ -1062,105 +947,113 @@ static int ro_id_create(char * name, return 0; } -static int ribmgr_cdap_request(struct cdap * instance, - int invoke_id, - enum cdap_opcode opcode, - char * name, - uint8_t * data, - size_t len, - uint32_t flags) +static void * cdap_req_handler(void * o) { + struct cdap * instance = (struct cdap *) o; + enum cdap_opcode opcode; + char * name; + uint8_t * data; + size_t len; + uint32_t flags; ro_msg_t * msg; - int ret = -1; struct list_head * p = NULL; - if (opcode == CDAP_START) - return ribmgr_cdap_start(instance, - invoke_id, - name); - else if (opcode == CDAP_STOP) - return ribmgr_cdap_stop(instance, - invoke_id, - name); - - msg = ro_msg__unpack(NULL, len, data); - if (msg == NULL) { - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - LOG_ERR("Failed to unpack RO message"); - return -1; - } + assert(instance); - pthread_mutex_lock(&rib.ro_ids_lock); - list_for_each(p, &rib.ro_ids) { - struct ro_id * e = list_entry(p, struct ro_id, next); + while (true) { + cdap_key_t key = cdap_request_wait(instance, + &opcode, + &name, + &data, + &len, + &flags); + assert(key >= 0); - if (strcmp(e->full_name, name) == 0 && - e->seqno == msg->seqno) { - pthread_mutex_unlock(&rib.ro_ids_lock); - ro_msg__free_unpacked(msg, NULL); - cdap_send_reply(instance, invoke_id, 0, NULL, 0); - LOG_DBG("Already received this RO."); - return 0; + if (opcode == CDAP_START) { + if (ribmgr_cdap_start(instance, key, name)) + LOG_WARN("CDAP start failed."); + continue; + } + else if (opcode == CDAP_STOP) { + if (ribmgr_cdap_stop(instance, key, name)) + LOG_WARN("CDAP stop failed."); + continue; } - } - pthread_mutex_unlock(&rib.ro_ids_lock); - - if (opcode == CDAP_CREATE) { - ret = ribmgr_cdap_create(instance, - invoke_id, - name, - msg); - } else if (opcode == CDAP_WRITE) { - ret = ribmgr_cdap_write(instance, - invoke_id, - name, msg, - flags); - - } else if (opcode == CDAP_DELETE) { - ret = ribmgr_cdap_delete(instance, - invoke_id, - name); - } else { - LOG_INFO("Unsupported opcode received."); - ro_msg__free_unpacked(msg, NULL); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - return -1; - } - if (ro_id_create(name, msg)) { - LOG_ERR("Failed to create RO id."); - return -1; - } + msg = ro_msg__unpack(NULL, len, data); + if (msg == NULL) { + cdap_reply_send(instance, key, -1, NULL, 0); + LOG_WARN("Failed to unpack RO message"); + continue; + } - if (msg->recv_set == ALL_MEMBERS) { - pthread_rwlock_rdlock(&rib.flows_lock); - list_for_each(p, &rib.flows) { - struct mgmt_flow * e = - list_entry(p, struct mgmt_flow, next); + pthread_mutex_lock(&rib.ro_ids_lock); + list_for_each(p, &rib.ro_ids) { + struct ro_id * e = list_entry(p, struct ro_id, next); - /* Don't send it back */ - if (e->instance == instance) + if (strcmp(e->full_name, name) == 0 && + e->seqno == msg->seqno) { + pthread_mutex_unlock(&rib.ro_ids_lock); + ro_msg__free_unpacked(msg, NULL); + cdap_reply_send(instance, key, 0, NULL, 0); + LOG_DBG("Already received this RO."); continue; + } + } + pthread_mutex_unlock(&rib.ro_ids_lock); - if (write_ro_msg(e->instance, msg, name, opcode)) { - LOG_ERR("Failed to send to a neighbor."); - pthread_rwlock_unlock(&rib.flows_lock); + if (opcode == CDAP_CREATE) { + if (ribmgr_cdap_create(instance, key, name, msg)) { + LOG_WARN("CDAP create failed."); ro_msg__free_unpacked(msg, NULL); - return -1; + continue; } + } else if (opcode == CDAP_WRITE) { + if (ribmgr_cdap_write(instance, key, name, + msg, flags)) { + LOG_WARN("CDAP write failed."); + ro_msg__free_unpacked(msg, NULL); + continue; + } + } else if (opcode == CDAP_DELETE) { + if (ribmgr_cdap_delete(instance, key, name)) { + LOG_WARN("CDAP delete failed."); + ro_msg__free_unpacked(msg, NULL); + continue; + } + } else { + LOG_INFO("Unsupported opcode received."); + ro_msg__free_unpacked(msg, NULL); + cdap_reply_send(instance, key, -1, NULL, 0); + continue; } - pthread_rwlock_unlock(&rib.flows_lock); - } - ro_msg__free_unpacked(msg, NULL); + if (ro_id_create(name, msg)) { + LOG_WARN("Failed to create RO id."); + ro_msg__free_unpacked(msg, NULL); + continue; + } - return ret; -} + if (msg->recv_set == ALL_MEMBERS) { + pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { + struct mgmt_flow * e = + list_entry(p, struct mgmt_flow, next); -static struct cdap_ops ribmgr_cdap_ops = { - .cdap_reply = ribmgr_cdap_reply, - .cdap_request = ribmgr_cdap_request -}; + /* Don't send it back. */ + if (e->instance == instance) + continue; + + if (write_ro_msg(e->instance, msg, + name, opcode)) + LOG_WARN("Failed to send to neighbor."); + } + pthread_rwlock_unlock(&rib.flows_lock); + } + + ro_msg__free_unpacked(msg, NULL); + } +} int ribmgr_add_flow(int fd) { @@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd) flow = malloc(sizeof(*flow)); if (flow == NULL) - return -1; + return -ENOMEM; - instance = cdap_create(&ribmgr_cdap_ops, fd); + instance = cdap_create(fd); if (instance == NULL) { LOG_ERR("Failed to create CDAP instance"); free(flow); @@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd) flow->instance = instance; flow->fd = fd; + if (pthread_create(&flow->handler, NULL, + cdap_req_handler, instance)) { + LOG_ERR("Failed to start handler thread for mgt flow."); + free(flow); + return -1; + } + pthread_rwlock_wrlock(&rib.flows_lock); + list_add(&flow->next, &rib.flows); + pthread_rwlock_unlock(&rib.flows_lock); return 0; @@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd) struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { + pthread_cancel(flow->handler); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); @@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf) size_t len = 0; struct ro_attr attr; - if (conf == NULL || - conf->type != IPCP_NORMAL) { + if (conf == NULL || conf->type != IPCP_NORMAL) { LOG_ERR("Bad DIF configuration."); - return -1; + return -EINVAL; } ro_attr_init(&attr); @@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf) len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { LOG_ERR("Failed to get size of static information."); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf) data = malloc(len); if (data == NULL) { LOG_ERR("Failed to allocate memory."); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf) attr, data, len) == NULL) { LOG_ERR("Failed to create static info RO."); free(data); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf) if (dir_init()) { LOG_ERR("Failed to init directory"); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf) LOG_ERR("Failed to initialize FRCT."); dir_fini(); ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); - addr_auth_destroy(rib.addr_auth); ribmgr_ro_delete(RIBMGR_PREFIX); return -1; } @@ -1296,12 +1193,14 @@ int ribmgr_enrol(void) { struct cdap * instance = NULL; struct mgmt_flow * flow; - int iid = 0; + cdap_key_t key; + int ret; pthread_rwlock_wrlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_INIT) { pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("IPCP in wrong state."); return -1; } @@ -1312,36 +1211,31 @@ int ribmgr_enrol(void) ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("No flows in RIB."); return -1; } - flow = list_entry((&rib.flows)->next, struct mgmt_flow, next); + flow = list_first_entry((&rib.flows), struct mgmt_flow, next); instance = flow->instance; - pthread_mutex_lock(&rib.cdap_reqs_lock); - iid = cdap_send_request(instance, - CDAP_START, - ENROLLMENT, - NULL, 0, 0); - if (iid < 0) { + key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); + if (key < 0) { ipcp_set_state(IPCP_INIT); - pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); return -1; } - if (cdap_result_wait(instance, CDAP_START, - ENROLLMENT, iid)) { + ret = cdap_reply_wait(instance, key, NULL, NULL); + if (ret) { ipcp_set_state(IPCP_INIT); - pthread_mutex_unlock(&rib.cdap_reqs_lock); pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Failed to start enrollment."); + LOG_ERR("Failed to enroll: %d.", ret); return -1; } - pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1351,9 +1245,10 @@ int ribmgr_enrol(void) int ribmgr_start_policies(void) { pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() != IPCP_BOOTING) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_ERR("Cannot start policies in wrong state"); + LOG_ERR("Cannot start policies in wrong state."); return -1; } pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void) } rib.address = rib.addr_auth->address(); - LOG_DBG("IPCP has address %lu", (unsigned long) rib.address); + LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address); return 0; } @@ -1380,22 +1275,21 @@ uint64_t ribmgr_address() return rib.address; } -static int send_neighbors_ro(char * name, - ro_msg_t * msg, - enum cdap_opcode code) +static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code) { struct list_head * p = NULL; pthread_rwlock_rdlock(&rib.flows_lock); + list_for_each(p, &rib.flows) { struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); - if (write_ro_msg(e->instance, msg, name, code)) { - LOG_ERR("Failed to send to a neighbor."); pthread_rwlock_unlock(&rib.flows_lock); + LOG_ERR("Failed to send to a neighbor."); return -1; } } + pthread_rwlock_unlock(&rib.flows_lock); return 0; @@ -1499,9 +1393,7 @@ int ro_delete(const char * name) return 0; } -int ro_write(const char * name, - uint8_t * data, - size_t len) +int ro_write(const char * name, uint8_t * data, size_t len) { struct rnode * node; ro_msg_t msg = RO_MSG__INIT; @@ -1541,8 +1433,7 @@ int ro_write(const char * name, return 0; } -ssize_t ro_read(const char * name, - uint8_t ** data) +ssize_t ro_read(const char * name, uint8_t ** data) { struct rnode * node; ssize_t len; @@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name, return len; } -ssize_t ro_children(const char * name, - char *** children) +ssize_t ro_children(const char * name, char *** children) { struct rnode * node; struct rnode * child; @@ -1640,8 +1530,7 @@ bool ro_exists(const char * name) return found; } -int ro_subscribe(const char * name, - struct ro_sub_ops * ops) +int ro_subscribe(const char * name, struct ro_sub_ops * ops) { struct ro_sub * sub; int sid; -- cgit v1.2.3