From 45c6615484ffe347654c34decb72ff1ef9bde0f3 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 9 Sep 2017 13:50:47 +0200 Subject: ipcpd: Revise internals of normal IPCP This removes the RIB as a datastructure and CDAP as the protocol between IPCPs. CDAP, the rib and related sources are deprecated. The link-state protocol policy is udpated to use its own protocol based on a simple broadcast strategy along a tree. The neighbors struct is deprecated and moved to the library as a generic notifier component. --- src/ipcpd/normal/pol/link_state.c | 620 ++++++++++++++++++++++++++++---------- 1 file changed, 454 insertions(+), 166 deletions(-) (limited to 'src/ipcpd/normal/pol/link_state.c') diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index 512ced7f..7df09bce 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -27,14 +27,16 @@ #include #include #include -#include -#include +#include +#include +#include +#include -#include "ribmgr.h" -#include "ribconfig.h" +#include "ae.h" +#include "connmgr.h" #include "graph.h" -#include "neighbors.h" #include "ipcp.h" +#include "link_state.h" #include "pff.h" #include @@ -43,39 +45,230 @@ #include #include -#include "fso.pb-c.h" -typedef Fso fso_t; +#include "link_state.pb-c.h" +typedef LinkStateMsg link_state_msg_t; -#define BUF_SIZE 256 -#define RECALC_TIME 4 +#define RECALC_TIME 4 +#define LS_UPDATE_TIME 15 +#define LS_TIMEO 60 +#define LSA_MAX_LEN 128 + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif struct routing_i { struct pff * pff; pthread_t calculator; }; +/* TODO: link weight support. */ +struct adjacency { + struct list_head next; + + uint64_t dst; + uint64_t src; + + time_t stamp; +}; + +enum nb_type { + NB_DT = 0, + NB_MGMT +}; + +struct nb { + struct list_head next; + + uint64_t addr; + int fd; + enum nb_type type; +}; + struct { - struct nbs * nbs; - struct nb_notifier nb_notifier; + struct list_head nbs; + fset_t * mgmt_set; - struct graph * graph; + struct list_head db; - ro_set_t * set; - rqueue_t * queue; - pthread_t rib_listener; -} link_state; + pthread_rwlock_t db_lock; -/* Take under neighbors lock */ -static int addr_to_fd(uint64_t addr) + struct graph * graph; + + pthread_t lsupdate; + pthread_t lsreader; + pthread_t listener; +} ls; + +struct pol_routing_ops link_state_ops = { + .init = link_state_init, + .fini = link_state_fini, + .routing_i_create = link_state_routing_i_create, + .routing_i_destroy = link_state_routing_i_destroy +}; + +static int lsdb_add_nb(uint64_t addr, + int fd, + enum nb_type type) { - struct list_head * p = NULL; + struct list_head * p; + struct nb * nb; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * el = list_entry(p, struct nb, next); + if (el->addr == addr && el->type == type) { + log_dbg("Already know %s neighbor %" PRIu64 ".", + type == NB_DT ? "dt" : "mgmt", addr); + if (el->fd != fd) { + log_warn("Existing neighbor assigned new fd."); + el->fd = fd; + } + pthread_rwlock_unlock(&ls.db_lock); + return -EPERM; + } - list_for_each(p, &link_state.nbs->list) { - struct nb * e = list_entry(p, struct nb, next); - if (e->conn.conn_info.addr == addr) - return e->conn.flow_info.fd; + if (addr > el->addr) + break; } + nb = malloc(sizeof(*nb)); + if (nb == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + nb->addr = addr; + nb->fd = fd; + nb->type = type; + + list_add_tail(&nb->next, p); + + log_dbg("Type %s neighbor %" PRIu64 " added.", + nb->type == NB_DT ? "dt" : "mgmt", addr); + + pthread_rwlock_unlock(&ls.db_lock); + + return 0; +} + +static int lsdb_del_nb(uint64_t addr, + int fd) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->fd == fd) { + list_del(&nb->next); + pthread_rwlock_unlock(&ls.db_lock); + log_dbg("Type %s neighbor %" PRIu64 " deleted.", + nb->type == NB_DT ? "dt" : "mgmt", addr); + free(nb); + return 0; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -EPERM; +} + +static int lsdb_add_link(uint64_t src, + uint64_t dst, + qosspec_t * qs) +{ + struct list_head * p; + struct adjacency * adj; + struct timespec now; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each(p, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + if (a->dst == dst && a->src == src) { + a->stamp = now.tv_sec; + pthread_rwlock_unlock(&ls.db_lock); + return 0; + } + + if (a->dst > dst || (a->dst == dst && a->src > src)) + break; + } + + adj = malloc(sizeof(*adj)); + if (adj == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + adj->dst = dst; + adj->src = src; + adj->stamp = now.tv_sec; + + list_add_tail(&adj->next, p); + + if (graph_update_edge(ls.graph, src, dst, *qs)) + log_warn("Failed to add edge to graph."); + + log_dbg("Added %" PRIu64 " - %" PRIu64" to lsdb.", adj->src, adj->dst); + + pthread_rwlock_unlock(&ls.db_lock); + + return 0; +} + +static int lsdb_del_link(uint64_t src, + uint64_t dst) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + if (a->dst == dst && a->src == src) { + list_del(&a->next); + if (graph_del_edge(ls.graph, src, dst)) + log_warn("Failed to delete edge from graph."); + + log_dbg("Removed %" PRIu64 " - %" PRIu64" from lsdb.", + a->src, a->dst); + + pthread_rwlock_unlock(&ls.db_lock); + free(a); + return 0; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -EPERM; +} + +static int nbr_to_fd(uint64_t addr) +{ + struct list_head * p; + + pthread_rwlock_rdlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->type == NB_DT) { + pthread_rwlock_unlock(&ls.db_lock); + return nb->fd; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + return -1; } @@ -91,20 +284,19 @@ static void * calculate_pff(void * o) while (true) { table = NULL; - n_table = graph_routing_table(link_state.graph, + n_table = graph_routing_table(ls.graph, ipcpi.dt_addr, &table); if (n_table < 0) { sleep(RECALC_TIME); continue; } - pthread_mutex_lock(&link_state.nbs->list_lock); pff_lock(instance->pff); pff_flush(instance->pff); for (i = 0; i < n_table; i++) { - fd = addr_to_fd(table[i]->nhop); + fd = nbr_to_fd(table[i]->nhop); if (fd == -1) continue; @@ -112,7 +304,6 @@ static void * calculate_pff(void * o) } pff_unlock(instance->pff); - pthread_mutex_unlock(&link_state.nbs->list_lock); freepp(struct routing_table, table, n_table); sleep(RECALC_TIME); @@ -121,154 +312,209 @@ static void * calculate_pff(void * o) return (void *) 0; } -static int link_state_neighbor_event(enum nb_event event, - struct conn conn) +static void send_lsa(uint64_t dst, + uint64_t src) { - char path[RIB_MAX_PATH_LEN + 1]; - char fso_name[RIB_MAX_PATH_LEN + 1]; - fso_t fso = FSO__INIT; - size_t len; - uint8_t * data; + uint8_t buf[LSA_MAX_LEN]; + link_state_msg_t lsa = LINK_STATE_MSG__INIT; + size_t len; + struct list_head * p; - path[0] = '\0'; - sprintf(fso_name, "%" PRIu64 "-%" PRIu64, - ipcpi.dt_addr, conn.conn_info.addr); - rib_path_append(rib_path_append(path, ROUTING_NAME), fso_name); + lsa.d_addr = dst; + lsa.s_addr = src; - switch (event) { - case NEIGHBOR_ADDED: - fso.s_addr = ipcpi.dt_addr; - fso.d_addr = conn.conn_info.addr; + len = link_state_msg__get_packed_size(&lsa); - len = fso__get_packed_size(&fso); - if (len == 0) - return -1; + assert(len <= LSA_MAX_LEN); - data = malloc(len); - if (data == NULL) - return -1; + link_state_msg__pack(&lsa, buf); - fso__pack(&fso, data); + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->type == NB_MGMT) + flow_write(nb->fd, buf, len); + } +} - if (rib_add(ROUTING_PATH, fso_name)) { - log_err("Failed to add FSO."); - free(data); - return -1; - } +static void * lsupdate(void * o) +{ + struct list_head * p; + struct list_head * h; + struct timespec now; - if (rib_put(path, data, len)) { - log_err("Failed to put FSO in RIB."); - rib_del(path); - free(data); - return -1; - } + (void) o; - log_dbg("Added %s to RIB.", path); + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_rdlock(&ls.db_lock); + + pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock, + (void *) &ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * adj; + adj = list_entry(p, struct adjacency, next); + if (now.tv_sec - adj->stamp > LS_TIMEO) { + list_del(&adj->next); + log_dbg("%" PRIu64 " - %" PRIu64" timed out.", + adj->src, adj->dst); + if (graph_del_edge(ls.graph, adj->src, + adj->dst)) + log_dbg("Failed to delete edge."); + free(adj); + continue; + } - break; - case NEIGHBOR_REMOVED: - if (rib_del(path)) { - log_err("Failed to remove FSO."); - return -1; + if (adj->src == ipcpi.dt_addr) { + send_lsa(adj->src, adj->dst); + adj->stamp = now.tv_sec; + } } - log_dbg("Removed %s from RIB.", path); + pthread_cleanup_pop(true); - break; - case NEIGHBOR_QOS_CHANGE: - log_info("Not currently supported."); - break; - default: - log_info("Unsupported event for routing."); - break; + sleep(LS_UPDATE_TIME); } - return 0; + return (void *) 0; } -static int read_fso(char * path, - int32_t flag) +static void * ls_conn_handle(void * o) { - ssize_t len; - uint8_t ro[BUF_SIZE]; - fso_t * fso; - qosspec_t qs; + struct conn conn; - memset(&qs, 0, sizeof(qs)); + (void) o; - len = rib_read(path, ro, BUF_SIZE); - if (len < 0) { - log_err("Failed to read FSO."); - return -1; - } + while (true) { + if (connmgr_wait(AEID_MGMT, &conn)) { + log_err("Failed to get next MGMT connection."); + continue; + } - fso = fso__unpack(NULL, len, ro); - if (fso == NULL) { - log_err("Failed to unpack."); - return -1; - } + /* NOTE: connection acceptance policy could be here. */ - if (flag & RO_MODIFY) { - if (graph_update_edge(link_state.graph, - fso->s_addr, fso->d_addr, qs)) { - fso__free_unpacked(fso, NULL); - return -1; - } - } else if (flag & RO_DELETE) { - if (graph_del_edge(link_state.graph, fso->s_addr, fso->d_addr)) { - fso__free_unpacked(fso, NULL); - return -1; - } + notifier_event(NOTIFY_MGMT_CONN_ADD, &conn); } - fso__free_unpacked(fso, NULL); - return 0; } -static void * rib_listener(void * o) + +static void forward_lsm(uint8_t * buf, + size_t len, + int in_fd) { - int32_t flag; - char path[RIB_MAX_PATH_LEN + 1]; - char ** children; - ssize_t len; - int i; + struct list_head * p; - (void) o; + pthread_rwlock_rdlock(&ls.db_lock); - if (ro_set_add(link_state.set, ROUTING_PATH, RO_MODIFY | RO_DELETE)) { - log_err("Failed to add to RO set"); - return (void * ) -1; + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->type == NB_MGMT && nb->fd != in_fd) + flow_write(nb->fd, buf, len); } - len = rib_children(ROUTING_PATH, &children); - if (len < 0) { - log_err("Failed to retrieve children."); + pthread_rwlock_unlock(&ls.db_lock); +} + +static void * lsreader(void * o) +{ + fqueue_t * fq; + int ret; + uint8_t buf[LSA_MAX_LEN]; + size_t len; + int fd; + qosspec_t qs; + + (void) o; + + memset(&qs, 0, sizeof(qs)); + + fq = fqueue_create(); + if (fq == NULL) return (void *) -1; - } - for (i = 0; i < len; i++) { - if (read_fso(children[i], RO_CREATE)) { - log_err("Failed to parse FSO."); + pthread_cleanup_push((void (*) (void *)) fqueue_destroy, + (void *) fq); + + while (true) { + ret = fevent(ls.mgmt_set, fq, NULL); + if (ret < 0) { + log_warn("Event error: %d.", ret); continue; } - } - while (rib_event_wait(link_state.set, link_state.queue, NULL) == 0) { - path[0] = '\0'; - flag = rqueue_next(link_state.queue, path); - if (flag < 0) - continue; + while ((fd = fqueue_next(fq)) >= 0) { + link_state_msg_t * msg; + len = flow_read(fd, buf, LSA_MAX_LEN); + if (len <= 0) + continue; - if (read_fso(path, flag)) { - log_err("Failed to parse FSO."); - continue; + msg = link_state_msg__unpack(NULL, len, buf); + if (msg == NULL) { + log_dbg("Failed to unpack link state message."); + continue; + } + + lsdb_add_link(msg->s_addr, msg->d_addr, &qs); + + link_state_msg__free_unpacked(msg, NULL); + + forward_lsm(buf, len, fd); } } + pthread_cleanup_pop(true); + return (void *) 0; } +static void handle_event(int event, + const void * o) +{ + /* FIXME: Apply correct QoS on graph */ + struct conn * c; + qosspec_t qs; + + c = (struct conn *) o; + + memset(&qs, 0, sizeof(qs)); + + switch (event) { + case NOTIFY_DT_CONN_ADD: + if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) + log_dbg("Failed to add neighbor to LSDB."); + + if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs)) + log_dbg("Failed to add adjacency to LSDB."); + break; + case NOTIFY_DT_CONN_DEL: + if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_dbg("Failed to delete neighbor from LSDB."); + + if (lsdb_del_link(ipcpi.dt_addr, c->conn_info.addr)) + log_dbg("Local link was not in LSDB."); + break; + case NOTIFY_DT_CONN_QOS: + log_dbg("QoS changes currently unsupported."); + break; + case NOTIFY_MGMT_CONN_ADD: + fset_add(ls.mgmt_set, c->flow_info.fd); + if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT)) + log_warn("Failed to add mgmt neighbor to LSDB."); + break; + case NOTIFY_MGMT_CONN_DEL: + fset_del(ls.mgmt_set, c->flow_info.fd); + if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_warn("Failed to add mgmt neighbor to LSDB."); + break; + default: + log_info("Unknown routing event."); + break; + } +} + struct routing_i * link_state_routing_i_create(struct pff * pff) { struct routing_i * tmp; @@ -281,7 +527,10 @@ struct routing_i * link_state_routing_i_create(struct pff * pff) tmp->pff = pff; - pthread_create(&tmp->calculator, NULL, calculate_pff, (void *) tmp); + if (pthread_create(&tmp->calculator, NULL, calculate_pff, tmp)) { + free(tmp); + return NULL; + } return tmp; } @@ -297,61 +546,100 @@ void link_state_routing_i_destroy(struct routing_i * instance) free(instance); } -int link_state_init(struct nbs * nbs) +int link_state_init(void) { - link_state.graph = graph_create(); - if (link_state.graph == NULL) + struct conn_info info; + + memset(&info, 0, sizeof(info)); + + strcpy(info.ae_name, LS_AE); + strcpy(info.protocol, LS_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_GPB; + info.addr = ipcpi.dt_addr; + + ls.graph = graph_create(); + if (ls.graph == NULL) goto fail_graph; - if (rib_add(RIB_ROOT, ROUTING_NAME)) - goto fail_rib_add; + if (notifier_reg(handle_event)) + goto fail_notifier_reg; + + if (pthread_rwlock_init(&ls.db_lock, NULL)) + goto fail_db_lock_init; + + if (connmgr_ae_init(AEID_MGMT, &info)) + goto fail_connmgr_ae_init; - link_state.nbs = nbs; + ls.mgmt_set = fset_create(); + if (ls.mgmt_set == NULL) + goto fail_fset_create; - link_state.nb_notifier.notify_call = link_state_neighbor_event; - if (nbs_reg_notifier(link_state.nbs, &link_state.nb_notifier)) - goto fail_nbs_reg_notifier; + list_head_init(&ls.db); + list_head_init(&ls.nbs); - link_state.set = ro_set_create(); - if (link_state.set == NULL) - goto fail_ro_set_create; + if (pthread_create(&ls.lsupdate, NULL, lsupdate, NULL)) + goto fail_pthread_create_lsupdate; - link_state.queue = rqueue_create(); - if (link_state.queue == NULL) - goto fail_rqueue_create; + if (pthread_create(&ls.lsreader, NULL, lsreader, NULL)) + goto fail_pthread_create_lsreader; - if (pthread_create(&link_state.rib_listener, NULL, rib_listener, NULL)) - goto fail_listener_create; + if (pthread_create(&ls.listener, NULL, ls_conn_handle, NULL)) + goto fail_pthread_create_listener; return 0; - fail_listener_create: - ro_set_destroy(link_state.set); - fail_rqueue_create: - ro_set_destroy(link_state.set); - fail_ro_set_create: - nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier); - fail_nbs_reg_notifier: - rib_del(ROUTING_PATH); - fail_rib_add: - graph_destroy(link_state.graph); + fail_pthread_create_listener: + pthread_cancel(ls.lsreader); + pthread_join(ls.lsreader, NULL); + fail_pthread_create_lsreader: + pthread_cancel(ls.lsupdate); + pthread_join(ls.lsupdate, NULL); + fail_pthread_create_lsupdate: + fset_destroy(ls.mgmt_set); + fail_fset_create: + connmgr_ae_fini(AEID_MGMT); + fail_connmgr_ae_init: + pthread_rwlock_destroy(&ls.db_lock); + fail_db_lock_init: + notifier_unreg(handle_event); + fail_notifier_reg: + graph_destroy(ls.graph); fail_graph: return -1; } void link_state_fini(void) { - pthread_cancel(link_state.rib_listener); + struct list_head * p; + struct list_head * h; + + pthread_cancel(ls.listener); + pthread_join(ls.listener, NULL); + + pthread_cancel(ls.lsreader); + pthread_join(ls.lsreader, NULL); - pthread_join(link_state.rib_listener, NULL); + pthread_cancel(ls.lsupdate); + pthread_join(ls.lsupdate, NULL); - rqueue_destroy(link_state.queue); + fset_destroy(ls.mgmt_set); - ro_set_destroy(link_state.set); + connmgr_ae_fini(AEID_MGMT); + + graph_destroy(ls.graph); + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + list_del(&a->next); + free(a); + } - graph_destroy(link_state.graph); + pthread_rwlock_unlock(&ls.db_lock); - rib_del(ROUTING_PATH); + pthread_rwlock_destroy(&ls.db_lock); - nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier); + notifier_unreg(handle_event); } -- cgit v1.2.3