summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/pol/link_state.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/normal/pol/link_state.c')
-rw-r--r--src/ipcpd/normal/pol/link_state.c620
1 files changed, 454 insertions, 166 deletions
diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c
index 512ced7f..7df09bce 100644
--- a/src/ipcpd/normal/pol/link_state.c
+++ b/src/ipcpd/normal/pol/link_state.c
@@ -27,14 +27,16 @@
#include <ouroboros/errno.h>
#include <ouroboros/list.h>
#include <ouroboros/logs.h>
-#include <ouroboros/rib.h>
-#include <ouroboros/rqueue.h>
+#include <ouroboros/utils.h>
+#include <ouroboros/notifier.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/fqueue.h>
-#include "ribmgr.h"
-#include "ribconfig.h"
+#include "ae.h"
+#include "connmgr.h"
#include "graph.h"
-#include "neighbors.h"
#include "ipcp.h"
+#include "link_state.h"
#include "pff.h"
#include <assert.h>
@@ -43,39 +45,230 @@
#include <string.h>
#include <pthread.h>
-#include "fso.pb-c.h"
-typedef Fso fso_t;
+#include "link_state.pb-c.h"
+typedef LinkStateMsg link_state_msg_t;
-#define BUF_SIZE 256
-#define RECALC_TIME 4
+#define RECALC_TIME 4
+#define LS_UPDATE_TIME 15
+#define LS_TIMEO 60
+#define LSA_MAX_LEN 128
+
+#ifndef CLOCK_REALTIME_COARSE
+#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
+#endif
struct routing_i {
struct pff * pff;
pthread_t calculator;
};
+/* TODO: link weight support. */
+struct adjacency {
+ struct list_head next;
+
+ uint64_t dst;
+ uint64_t src;
+
+ time_t stamp;
+};
+
+enum nb_type {
+ NB_DT = 0,
+ NB_MGMT
+};
+
+struct nb {
+ struct list_head next;
+
+ uint64_t addr;
+ int fd;
+ enum nb_type type;
+};
+
struct {
- struct nbs * nbs;
- struct nb_notifier nb_notifier;
+ struct list_head nbs;
+ fset_t * mgmt_set;
- struct graph * graph;
+ struct list_head db;
- ro_set_t * set;
- rqueue_t * queue;
- pthread_t rib_listener;
-} link_state;
+ pthread_rwlock_t db_lock;
-/* Take under neighbors lock */
-static int addr_to_fd(uint64_t addr)
+ struct graph * graph;
+
+ pthread_t lsupdate;
+ pthread_t lsreader;
+ pthread_t listener;
+} ls;
+
+struct pol_routing_ops link_state_ops = {
+ .init = link_state_init,
+ .fini = link_state_fini,
+ .routing_i_create = link_state_routing_i_create,
+ .routing_i_destroy = link_state_routing_i_destroy
+};
+
+static int lsdb_add_nb(uint64_t addr,
+ int fd,
+ enum nb_type type)
{
- struct list_head * p = NULL;
+ struct list_head * p;
+ struct nb * nb;
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each(p, &ls.nbs) {
+ struct nb * el = list_entry(p, struct nb, next);
+ if (el->addr == addr && el->type == type) {
+ log_dbg("Already know %s neighbor %" PRIu64 ".",
+ type == NB_DT ? "dt" : "mgmt", addr);
+ if (el->fd != fd) {
+ log_warn("Existing neighbor assigned new fd.");
+ el->fd = fd;
+ }
+ pthread_rwlock_unlock(&ls.db_lock);
+ return -EPERM;
+ }
- list_for_each(p, &link_state.nbs->list) {
- struct nb * e = list_entry(p, struct nb, next);
- if (e->conn.conn_info.addr == addr)
- return e->conn.flow_info.fd;
+ if (addr > el->addr)
+ break;
}
+ nb = malloc(sizeof(*nb));
+ if (nb == NULL) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return -ENOMEM;
+ }
+
+ nb->addr = addr;
+ nb->fd = fd;
+ nb->type = type;
+
+ list_add_tail(&nb->next, p);
+
+ log_dbg("Type %s neighbor %" PRIu64 " added.",
+ nb->type == NB_DT ? "dt" : "mgmt", addr);
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return 0;
+}
+
+static int lsdb_del_nb(uint64_t addr,
+ int fd)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->addr == addr && nb->fd == fd) {
+ list_del(&nb->next);
+ pthread_rwlock_unlock(&ls.db_lock);
+ log_dbg("Type %s neighbor %" PRIu64 " deleted.",
+ nb->type == NB_DT ? "dt" : "mgmt", addr);
+ free(nb);
+ return 0;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return -EPERM;
+}
+
+static int lsdb_add_link(uint64_t src,
+ uint64_t dst,
+ qosspec_t * qs)
+{
+ struct list_head * p;
+ struct adjacency * adj;
+ struct timespec now;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each(p, &ls.db) {
+ struct adjacency * a = list_entry(p, struct adjacency, next);
+ if (a->dst == dst && a->src == src) {
+ a->stamp = now.tv_sec;
+ pthread_rwlock_unlock(&ls.db_lock);
+ return 0;
+ }
+
+ if (a->dst > dst || (a->dst == dst && a->src > src))
+ break;
+ }
+
+ adj = malloc(sizeof(*adj));
+ if (adj == NULL) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return -ENOMEM;
+ }
+
+ adj->dst = dst;
+ adj->src = src;
+ adj->stamp = now.tv_sec;
+
+ list_add_tail(&adj->next, p);
+
+ if (graph_update_edge(ls.graph, src, dst, *qs))
+ log_warn("Failed to add edge to graph.");
+
+ log_dbg("Added %" PRIu64 " - %" PRIu64" to lsdb.", adj->src, adj->dst);
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return 0;
+}
+
+static int lsdb_del_link(uint64_t src,
+ uint64_t dst)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.db) {
+ struct adjacency * a = list_entry(p, struct adjacency, next);
+ if (a->dst == dst && a->src == src) {
+ list_del(&a->next);
+ if (graph_del_edge(ls.graph, src, dst))
+ log_warn("Failed to delete edge from graph.");
+
+ log_dbg("Removed %" PRIu64 " - %" PRIu64" from lsdb.",
+ a->src, a->dst);
+
+ pthread_rwlock_unlock(&ls.db_lock);
+ free(a);
+ return 0;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return -EPERM;
+}
+
+static int nbr_to_fd(uint64_t addr)
+{
+ struct list_head * p;
+
+ pthread_rwlock_rdlock(&ls.db_lock);
+
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->addr == addr && nb->type == NB_DT) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return nb->fd;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
return -1;
}
@@ -91,20 +284,19 @@ static void * calculate_pff(void * o)
while (true) {
table = NULL;
- n_table = graph_routing_table(link_state.graph,
+ n_table = graph_routing_table(ls.graph,
ipcpi.dt_addr, &table);
if (n_table < 0) {
sleep(RECALC_TIME);
continue;
}
- pthread_mutex_lock(&link_state.nbs->list_lock);
pff_lock(instance->pff);
pff_flush(instance->pff);
for (i = 0; i < n_table; i++) {
- fd = addr_to_fd(table[i]->nhop);
+ fd = nbr_to_fd(table[i]->nhop);
if (fd == -1)
continue;
@@ -112,7 +304,6 @@ static void * calculate_pff(void * o)
}
pff_unlock(instance->pff);
- pthread_mutex_unlock(&link_state.nbs->list_lock);
freepp(struct routing_table, table, n_table);
sleep(RECALC_TIME);
@@ -121,154 +312,209 @@ static void * calculate_pff(void * o)
return (void *) 0;
}
-static int link_state_neighbor_event(enum nb_event event,
- struct conn conn)
+static void send_lsa(uint64_t dst,
+ uint64_t src)
{
- char path[RIB_MAX_PATH_LEN + 1];
- char fso_name[RIB_MAX_PATH_LEN + 1];
- fso_t fso = FSO__INIT;
- size_t len;
- uint8_t * data;
+ uint8_t buf[LSA_MAX_LEN];
+ link_state_msg_t lsa = LINK_STATE_MSG__INIT;
+ size_t len;
+ struct list_head * p;
- path[0] = '\0';
- sprintf(fso_name, "%" PRIu64 "-%" PRIu64,
- ipcpi.dt_addr, conn.conn_info.addr);
- rib_path_append(rib_path_append(path, ROUTING_NAME), fso_name);
+ lsa.d_addr = dst;
+ lsa.s_addr = src;
- switch (event) {
- case NEIGHBOR_ADDED:
- fso.s_addr = ipcpi.dt_addr;
- fso.d_addr = conn.conn_info.addr;
+ len = link_state_msg__get_packed_size(&lsa);
- len = fso__get_packed_size(&fso);
- if (len == 0)
- return -1;
+ assert(len <= LSA_MAX_LEN);
- data = malloc(len);
- if (data == NULL)
- return -1;
+ link_state_msg__pack(&lsa, buf);
- fso__pack(&fso, data);
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->type == NB_MGMT)
+ flow_write(nb->fd, buf, len);
+ }
+}
- if (rib_add(ROUTING_PATH, fso_name)) {
- log_err("Failed to add FSO.");
- free(data);
- return -1;
- }
+static void * lsupdate(void * o)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
- if (rib_put(path, data, len)) {
- log_err("Failed to put FSO in RIB.");
- rib_del(path);
- free(data);
- return -1;
- }
+ (void) o;
- log_dbg("Added %s to RIB.", path);
+ while (true) {
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_rdlock(&ls.db_lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock,
+ (void *) &ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.db) {
+ struct adjacency * adj;
+ adj = list_entry(p, struct adjacency, next);
+ if (now.tv_sec - adj->stamp > LS_TIMEO) {
+ list_del(&adj->next);
+ log_dbg("%" PRIu64 " - %" PRIu64" timed out.",
+ adj->src, adj->dst);
+ if (graph_del_edge(ls.graph, adj->src,
+ adj->dst))
+ log_dbg("Failed to delete edge.");
+ free(adj);
+ continue;
+ }
- break;
- case NEIGHBOR_REMOVED:
- if (rib_del(path)) {
- log_err("Failed to remove FSO.");
- return -1;
+ if (adj->src == ipcpi.dt_addr) {
+ send_lsa(adj->src, adj->dst);
+ adj->stamp = now.tv_sec;
+ }
}
- log_dbg("Removed %s from RIB.", path);
+ pthread_cleanup_pop(true);
- break;
- case NEIGHBOR_QOS_CHANGE:
- log_info("Not currently supported.");
- break;
- default:
- log_info("Unsupported event for routing.");
- break;
+ sleep(LS_UPDATE_TIME);
}
- return 0;
+ return (void *) 0;
}
-static int read_fso(char * path,
- int32_t flag)
+static void * ls_conn_handle(void * o)
{
- ssize_t len;
- uint8_t ro[BUF_SIZE];
- fso_t * fso;
- qosspec_t qs;
+ struct conn conn;
- memset(&qs, 0, sizeof(qs));
+ (void) o;
- len = rib_read(path, ro, BUF_SIZE);
- if (len < 0) {
- log_err("Failed to read FSO.");
- return -1;
- }
+ while (true) {
+ if (connmgr_wait(AEID_MGMT, &conn)) {
+ log_err("Failed to get next MGMT connection.");
+ continue;
+ }
- fso = fso__unpack(NULL, len, ro);
- if (fso == NULL) {
- log_err("Failed to unpack.");
- return -1;
- }
+ /* NOTE: connection acceptance policy could be here. */
- if (flag & RO_MODIFY) {
- if (graph_update_edge(link_state.graph,
- fso->s_addr, fso->d_addr, qs)) {
- fso__free_unpacked(fso, NULL);
- return -1;
- }
- } else if (flag & RO_DELETE) {
- if (graph_del_edge(link_state.graph, fso->s_addr, fso->d_addr)) {
- fso__free_unpacked(fso, NULL);
- return -1;
- }
+ notifier_event(NOTIFY_MGMT_CONN_ADD, &conn);
}
- fso__free_unpacked(fso, NULL);
-
return 0;
}
-static void * rib_listener(void * o)
+
+static void forward_lsm(uint8_t * buf,
+ size_t len,
+ int in_fd)
{
- int32_t flag;
- char path[RIB_MAX_PATH_LEN + 1];
- char ** children;
- ssize_t len;
- int i;
+ struct list_head * p;
- (void) o;
+ pthread_rwlock_rdlock(&ls.db_lock);
- if (ro_set_add(link_state.set, ROUTING_PATH, RO_MODIFY | RO_DELETE)) {
- log_err("Failed to add to RO set");
- return (void * ) -1;
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->type == NB_MGMT && nb->fd != in_fd)
+ flow_write(nb->fd, buf, len);
}
- len = rib_children(ROUTING_PATH, &children);
- if (len < 0) {
- log_err("Failed to retrieve children.");
+ pthread_rwlock_unlock(&ls.db_lock);
+}
+
+static void * lsreader(void * o)
+{
+ fqueue_t * fq;
+ int ret;
+ uint8_t buf[LSA_MAX_LEN];
+ size_t len;
+ int fd;
+ qosspec_t qs;
+
+ (void) o;
+
+ memset(&qs, 0, sizeof(qs));
+
+ fq = fqueue_create();
+ if (fq == NULL)
return (void *) -1;
- }
- for (i = 0; i < len; i++) {
- if (read_fso(children[i], RO_CREATE)) {
- log_err("Failed to parse FSO.");
+ pthread_cleanup_push((void (*) (void *)) fqueue_destroy,
+ (void *) fq);
+
+ while (true) {
+ ret = fevent(ls.mgmt_set, fq, NULL);
+ if (ret < 0) {
+ log_warn("Event error: %d.", ret);
continue;
}
- }
- while (rib_event_wait(link_state.set, link_state.queue, NULL) == 0) {
- path[0] = '\0';
- flag = rqueue_next(link_state.queue, path);
- if (flag < 0)
- continue;
+ while ((fd = fqueue_next(fq)) >= 0) {
+ link_state_msg_t * msg;
+ len = flow_read(fd, buf, LSA_MAX_LEN);
+ if (len <= 0)
+ continue;
- if (read_fso(path, flag)) {
- log_err("Failed to parse FSO.");
- continue;
+ msg = link_state_msg__unpack(NULL, len, buf);
+ if (msg == NULL) {
+ log_dbg("Failed to unpack link state message.");
+ continue;
+ }
+
+ lsdb_add_link(msg->s_addr, msg->d_addr, &qs);
+
+ link_state_msg__free_unpacked(msg, NULL);
+
+ forward_lsm(buf, len, fd);
}
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
+static void handle_event(int event,
+ const void * o)
+{
+ /* FIXME: Apply correct QoS on graph */
+ struct conn * c;
+ qosspec_t qs;
+
+ c = (struct conn *) o;
+
+ memset(&qs, 0, sizeof(qs));
+
+ switch (event) {
+ case NOTIFY_DT_CONN_ADD:
+ if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT))
+ log_dbg("Failed to add neighbor to LSDB.");
+
+ if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs))
+ log_dbg("Failed to add adjacency to LSDB.");
+ break;
+ case NOTIFY_DT_CONN_DEL:
+ if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd))
+ log_dbg("Failed to delete neighbor from LSDB.");
+
+ if (lsdb_del_link(ipcpi.dt_addr, c->conn_info.addr))
+ log_dbg("Local link was not in LSDB.");
+ break;
+ case NOTIFY_DT_CONN_QOS:
+ log_dbg("QoS changes currently unsupported.");
+ break;
+ case NOTIFY_MGMT_CONN_ADD:
+ fset_add(ls.mgmt_set, c->flow_info.fd);
+ if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT))
+ log_warn("Failed to add mgmt neighbor to LSDB.");
+ break;
+ case NOTIFY_MGMT_CONN_DEL:
+ fset_del(ls.mgmt_set, c->flow_info.fd);
+ if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd))
+ log_warn("Failed to add mgmt neighbor to LSDB.");
+ break;
+ default:
+ log_info("Unknown routing event.");
+ break;
+ }
+}
+
struct routing_i * link_state_routing_i_create(struct pff * pff)
{
struct routing_i * tmp;
@@ -281,7 +527,10 @@ struct routing_i * link_state_routing_i_create(struct pff * pff)
tmp->pff = pff;
- pthread_create(&tmp->calculator, NULL, calculate_pff, (void *) tmp);
+ if (pthread_create(&tmp->calculator, NULL, calculate_pff, tmp)) {
+ free(tmp);
+ return NULL;
+ }
return tmp;
}
@@ -297,61 +546,100 @@ void link_state_routing_i_destroy(struct routing_i * instance)
free(instance);
}
-int link_state_init(struct nbs * nbs)
+int link_state_init(void)
{
- link_state.graph = graph_create();
- if (link_state.graph == NULL)
+ struct conn_info info;
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.ae_name, LS_AE);
+ strcpy(info.protocol, LS_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_GPB;
+ info.addr = ipcpi.dt_addr;
+
+ ls.graph = graph_create();
+ if (ls.graph == NULL)
goto fail_graph;
- if (rib_add(RIB_ROOT, ROUTING_NAME))
- goto fail_rib_add;
+ if (notifier_reg(handle_event))
+ goto fail_notifier_reg;
+
+ if (pthread_rwlock_init(&ls.db_lock, NULL))
+ goto fail_db_lock_init;
+
+ if (connmgr_ae_init(AEID_MGMT, &info))
+ goto fail_connmgr_ae_init;
- link_state.nbs = nbs;
+ ls.mgmt_set = fset_create();
+ if (ls.mgmt_set == NULL)
+ goto fail_fset_create;
- link_state.nb_notifier.notify_call = link_state_neighbor_event;
- if (nbs_reg_notifier(link_state.nbs, &link_state.nb_notifier))
- goto fail_nbs_reg_notifier;
+ list_head_init(&ls.db);
+ list_head_init(&ls.nbs);
- link_state.set = ro_set_create();
- if (link_state.set == NULL)
- goto fail_ro_set_create;
+ if (pthread_create(&ls.lsupdate, NULL, lsupdate, NULL))
+ goto fail_pthread_create_lsupdate;
- link_state.queue = rqueue_create();
- if (link_state.queue == NULL)
- goto fail_rqueue_create;
+ if (pthread_create(&ls.lsreader, NULL, lsreader, NULL))
+ goto fail_pthread_create_lsreader;
- if (pthread_create(&link_state.rib_listener, NULL, rib_listener, NULL))
- goto fail_listener_create;
+ if (pthread_create(&ls.listener, NULL, ls_conn_handle, NULL))
+ goto fail_pthread_create_listener;
return 0;
- fail_listener_create:
- ro_set_destroy(link_state.set);
- fail_rqueue_create:
- ro_set_destroy(link_state.set);
- fail_ro_set_create:
- nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier);
- fail_nbs_reg_notifier:
- rib_del(ROUTING_PATH);
- fail_rib_add:
- graph_destroy(link_state.graph);
+ fail_pthread_create_listener:
+ pthread_cancel(ls.lsreader);
+ pthread_join(ls.lsreader, NULL);
+ fail_pthread_create_lsreader:
+ pthread_cancel(ls.lsupdate);
+ pthread_join(ls.lsupdate, NULL);
+ fail_pthread_create_lsupdate:
+ fset_destroy(ls.mgmt_set);
+ fail_fset_create:
+ connmgr_ae_fini(AEID_MGMT);
+ fail_connmgr_ae_init:
+ pthread_rwlock_destroy(&ls.db_lock);
+ fail_db_lock_init:
+ notifier_unreg(handle_event);
+ fail_notifier_reg:
+ graph_destroy(ls.graph);
fail_graph:
return -1;
}
void link_state_fini(void)
{
- pthread_cancel(link_state.rib_listener);
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_cancel(ls.listener);
+ pthread_join(ls.listener, NULL);
+
+ pthread_cancel(ls.lsreader);
+ pthread_join(ls.lsreader, NULL);
- pthread_join(link_state.rib_listener, NULL);
+ pthread_cancel(ls.lsupdate);
+ pthread_join(ls.lsupdate, NULL);
- rqueue_destroy(link_state.queue);
+ fset_destroy(ls.mgmt_set);
- ro_set_destroy(link_state.set);
+ connmgr_ae_fini(AEID_MGMT);
+
+ graph_destroy(ls.graph);
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.db) {
+ struct adjacency * a = list_entry(p, struct adjacency, next);
+ list_del(&a->next);
+ free(a);
+ }
- graph_destroy(link_state.graph);
+ pthread_rwlock_unlock(&ls.db_lock);
- rib_del(ROUTING_PATH);
+ pthread_rwlock_destroy(&ls.db_lock);
- nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier);
+ notifier_unreg(handle_event);
}