diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-09-09 13:50:47 +0200 |
---|---|---|
committer | Dimitri Staessens <dimitri.staessens@ugent.be> | 2017-09-12 08:33:26 -0600 |
commit | 45c6615484ffe347654c34decb72ff1ef9bde0f3 (patch) | |
tree | f912e0eef256371f61b87a5a78e7604d9b623194 /src | |
parent | 7c69c0f6b25a199bb3632eea66ccb7de1db06ccc (diff) | |
download | ouroboros-45c6615484ffe347654c34decb72ff1ef9bde0f3.tar.gz ouroboros-45c6615484ffe347654c34decb72ff1ef9bde0f3.zip |
ipcpd: Revise internals of normal IPCP
This removes the RIB as a datastructure and CDAP as the protocol
between IPCPs. CDAP, the rib and related sources are deprecated. The
link-state protocol policy is udpated to use its own protocol based on
a simple broadcast strategy along a tree. The neighbors struct is
deprecated and moved to the library as a generic notifier component.
Diffstat (limited to 'src')
35 files changed, 701 insertions, 4111 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index aebc6c35..e5fc33da 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -20,7 +20,7 @@ protobuf_generate_c(ENROLL_PROTO_SRCS ENROLL_PROTO_HDRS enroll.proto protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto) # Add GPB sources of policies last -protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto) +protobuf_generate_c(LS_PROTO_SRCS LS_PROTO_HDRS pol/link_state.proto) math(EXPR PFT_EXPR "1 << 12") set(PFT_SIZE ${PFT_EXPR} CACHE STRING @@ -37,9 +37,7 @@ set(SOURCE_FILES enroll.c fa.c main.c - neighbors.c pff.c - ribmgr.c routing.c sdu_sched.c # Add policies last @@ -49,7 +47,7 @@ set(SOURCE_FILES ) add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES} - ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS} ${ENROLL_PROTO_SRCS}) + ${FLOW_ALLOC_SRCS} ${LS_PROTO_SRCS} ${KAD_PROTO_SRCS} ${ENROLL_PROTO_SRCS}) target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros) include(AddCompileFlags) diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index 9feac0f6..8d3da709 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -26,16 +26,15 @@ #include <ouroboros/dev.h> #include <ouroboros/cacep.h> -#include <ouroboros/cdap.h> #include <ouroboros/errno.h> #include <ouroboros/list.h> #include <ouroboros/logs.h> +#include <ouroboros/notifier.h> #include "ae.h" #include "connmgr.h" #include "enroll.h" #include "ipcp.h" -#include "ribmgr.h" #include <pthread.h> #include <string.h> @@ -198,8 +197,7 @@ void connmgr_stop(void) } int connmgr_ae_init(enum ae_id id, - const struct conn_info * info, - struct nbs * nbs) + const struct conn_info * info) { struct ae * ae; @@ -220,8 +218,6 @@ int connmgr_ae_init(enum ae_id id, memcpy(&connmgr.aes[id].info, info, sizeof(connmgr.aes[id].info)); - connmgr.aes[id].nbs = nbs; - return 0; } @@ -258,8 +254,6 @@ void connmgr_ae_fini(enum ae_id id) pthread_mutex_destroy(&ae->lock); memset(&connmgr.aes[id].info, 0, sizeof(connmgr.aes[id].info)); - - connmgr.aes[id].nbs = NULL; } int connmgr_ipcp_connect(const char * dst, @@ -394,8 +388,16 @@ int connmgr_alloc(enum ae_id id, return -1; } - if (connmgr.aes[id].nbs != NULL) - nbs_add(connmgr.aes[id].nbs, *conn); + switch (id) { + case AEID_DT: + notifier_event(NOTIFY_DT_CONN_ADD, conn); + break; + case AEID_MGMT: + notifier_event(NOTIFY_MGMT_CONN_ADD, conn); + break; + default: + break; + } return 0; } @@ -403,8 +405,16 @@ int connmgr_alloc(enum ae_id id, int connmgr_dealloc(enum ae_id id, struct conn * conn) { - if (connmgr.aes[id].nbs != NULL) - nbs_del(connmgr.aes[id].nbs, conn->flow_info.fd); + switch (id) { + case AEID_DT: + notifier_event(NOTIFY_DT_CONN_DEL, conn); + break; + case AEID_MGMT: + notifier_event(NOTIFY_MGMT_CONN_DEL, conn); + break; + default: + break; + } return flow_dealloc(conn->flow_info.fd); } diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h index 379877e6..ca5288ae 100644 --- a/src/ipcpd/normal/connmgr.h +++ b/src/ipcpd/normal/connmgr.h @@ -27,7 +27,13 @@ #include <ouroboros/qos.h> #include "ae.h" -#include "neighbors.h" + +#define NOTIFY_DT_CONN_ADD 0x00D0 +#define NOTIFY_DT_CONN_DEL 0x00D1 +#define NOTIFY_DT_CONN_QOS 0x00D2 + +#define NOTIFY_MGMT_CONN_ADD 0x00F0 +#define NOTIFY_MGMT_CONN_DEL 0x00F1 int connmgr_init(void); @@ -38,8 +44,7 @@ int connmgr_start(void); void connmgr_stop(void); int connmgr_ae_init(enum ae_id id, - const struct conn_info * info, - struct nbs * nbs); + const struct conn_info * info); void connmgr_ae_fini(enum ae_id id); diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index d139cb91..b1ba44a8 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -328,9 +328,6 @@ static void kad_req_destroy(struct kad_req * req) { assert(req); - if (req->key != NULL) - free(req->key); - pthread_mutex_lock(&req->lock); switch (req->state) { @@ -351,7 +348,7 @@ static void kad_req_destroy(struct kad_req * req) break; } - while (req->state != REQ_NULL) + while (req->state != REQ_NULL && req->state != REQ_DONE) pthread_cond_wait(&req->cond, &req->lock); pthread_mutex_unlock(&req->lock); @@ -359,6 +356,9 @@ static void kad_req_destroy(struct kad_req * req) pthread_cond_destroy(&req->cond); pthread_mutex_destroy(&req->lock); + if (req->key != NULL) + free(req->key); + free(req); } @@ -391,7 +391,7 @@ static int kad_req_wait(struct kad_req * req, case REQ_PENDING: /* ETIMEDOUT */ case REQ_RESPONSE: req->state = REQ_DONE; - pthread_cond_signal(&req->cond); + pthread_cond_broadcast(&req->cond); break; default: break; @@ -1859,7 +1859,7 @@ static void * work(void * o) if (now.tv_sec > v->t_exp) { list_del(&v->next); val_destroy(v); - } + } if (now.tv_sec > v->t_rep) { kad_publish(dht, e->key, v->addr, @@ -2018,7 +2018,7 @@ static void kad_handle_response(struct dht * dht, case KAD_FIND_VALUE: case KAD_FIND_NODE: if (dht_get_state(dht) != DHT_RUNNING) - return; + break; kad_handle_find_resp(dht, req, msg); break; default: diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index d2cda4f9..6d04c66a 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -27,13 +27,11 @@ #include <ouroboros/endian.h> #include <ouroboros/errno.h> #include <ouroboros/logs.h> -#include <ouroboros/rib.h> #include <ouroboros/utils.h> #include "dir.h" #include "dht.h" #include "ipcp.h" -#include "ribconfig.h" #include <stdlib.h> #include <string.h> diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 282f6bee..2df17163 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -29,19 +29,17 @@ #include <ouroboros/bitmap.h> #include <ouroboros/errno.h> #include <ouroboros/logs.h> -#include <ouroboros/rib.h> #include <ouroboros/dev.h> +#include <ouroboros/notifier.h> #include "connmgr.h" #include "ipcp.h" #include "dt.h" #include "dt_pci.h" #include "pff.h" -#include "neighbors.h" #include "routing.h" #include "sdu_sched.h" #include "ae.h" -#include "ribconfig.h" #include "fa.h" #include <stdlib.h> @@ -66,36 +64,33 @@ struct { struct ae_info aes[AP_RES_FDS]; pthread_rwlock_t lock; - struct nbs * nbs; - - struct nb_notifier nb_notifier; - pthread_t listener; } dt; -static int dt_neighbor_event(enum nb_event event, - struct conn conn) +static void handle_event(int event, + const void * o) { - /* We are only interested in neighbors being added and removed. */ + struct conn * c; + + c = (struct conn *) o; + switch (event) { - case NEIGHBOR_ADDED: - sdu_sched_add(dt.sdu_sched, conn.flow_info.fd); - log_dbg("Added fd %d to SDU scheduler.", conn.flow_info.fd); + case NOTIFY_DT_CONN_ADD: + sdu_sched_add(dt.sdu_sched, c->flow_info.fd); + log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); break; - case NEIGHBOR_REMOVED: - sdu_sched_del(dt.sdu_sched, conn.flow_info.fd); - log_dbg("Removed fd %d from SDU scheduler.", conn.flow_info.fd); + case NOTIFY_DT_CONN_DEL: + sdu_sched_del(dt.sdu_sched, c->flow_info.fd); + log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); break; default: break; } - - return 0; } -static int sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { struct dt_pci dt_pci; @@ -107,45 +102,38 @@ static int sdu_handler(int fd, if (dt_pci.ttl == 0) { log_dbg("TTL was zero."); ipcp_sdb_release(sdb); - return 0; + return; } fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); if (fd < 0) { log_err("No next hop for %" PRIu64, dt_pci.dst_addr); ipcp_sdb_release(sdb); - return -1; + return; } if (ipcp_flow_write(fd, sdb)) { log_err("Failed to write SDU to fd %d.", fd); ipcp_sdb_release(sdb); - return -1; + return; } } else { dt_pci_shrink(sdb); if (dt_pci.fd > AP_RES_FDS) { - if (ipcp_flow_write(dt_pci.fd, sdb)) { + if (ipcp_flow_write(dt_pci.fd, sdb)) ipcp_sdb_release(sdb); - return -1; - } - return 0; + return; } if (dt.aes[dt_pci.fd].post_sdu == NULL) { log_err("No registered AE on fd %d.", dt_pci.fd); ipcp_sdb_release(sdb); - return -EPERM; + return; } dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb); - - return 0; } - - /* silence compiler */ - return 0; } static void * dt_conn_handle(void * o) @@ -160,11 +148,9 @@ static void * dt_conn_handle(void * o) continue; } - log_dbg("Got new connection."); - /* NOTE: connection acceptance policy could be here. */ - nbs_add(dt.nbs, conn); + notifier_event(NOTIFY_DT_CONN_ADD, &conn); } return 0; @@ -192,24 +178,17 @@ int dt_init(enum pol_routing pr, goto fail_pci_init; } - dt.nbs = nbs_create(); - if (dt.nbs == NULL) { - log_err("Failed to create neighbors struct."); - goto fail_nbs; + if (notifier_reg(handle_event)) { + log_err("Failed to register with notifier."); + goto fail_notifier_reg; } - dt.nb_notifier.notify_call = dt_neighbor_event; - if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) { - log_err("Failed to register notifier."); - goto fail_nbs_notifier; - } - - if (connmgr_ae_init(AEID_DT, &info, dt.nbs)) { + if (connmgr_ae_init(AEID_DT, &info)) { log_err("Failed to register with connmgr."); goto fail_connmgr_ae_init; } - if (routing_init(pr, dt.nbs)) { + if (routing_init(pr)) { log_err("Failed to init routing."); goto fail_routing; } @@ -249,20 +228,17 @@ int dt_init(enum pol_routing pr, for (j = 0; j < QOS_CUBE_MAX; ++j) routing_i_destroy(dt.routing[j]); fail_routing_i: - connmgr_ae_fini(AEID_DT); - fail_connmgr_ae_init: for (i = 0; i < QOS_CUBE_MAX; ++i) pff_destroy(dt.pff[i]); fail_pff: routing_fini(); fail_routing: - nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); - fail_nbs_notifier: - nbs_destroy(dt.nbs); - fail_nbs: + connmgr_ae_fini(AEID_DT); + fail_connmgr_ae_init: + notifier_unreg(&handle_event); + fail_notifier_reg: dt_pci_fini(); fail_pci_init: - connmgr_ae_fini(AEID_DT); return -1; } @@ -282,11 +258,11 @@ void dt_fini(void) routing_fini(); - nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); + connmgr_ae_fini(AEID_DT); - nbs_destroy(dt.nbs); + notifier_unreg(&handle_event); - connmgr_ae_fini(AEID_DT); + dt_pci_fini(); } int dt_start(void) diff --git a/src/ipcpd/normal/dt_pci.c b/src/ipcpd/normal/dt_pci.c index 5704a09a..4684265d 100644 --- a/src/ipcpd/normal/dt_pci.c +++ b/src/ipcpd/normal/dt_pci.c @@ -21,10 +21,8 @@ */ #include <ouroboros/errno.h> -#include <ouroboros/rib.h> #include "dt_pci.h" -#include "ribconfig.h" #include <stdlib.h> #include <string.h> diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index ad229f40..d14c62ac 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -29,14 +29,12 @@ #include <ouroboros/time_utils.h> #include <ouroboros/dev.h> #include <ouroboros/logs.h> -#include <ouroboros/rib.h> #include <ouroboros/errno.h> #include <ouroboros/sockets.h> #include "connmgr.h" #include "enroll.h" #include "ipcp.h" -#include "ribconfig.h" #include <assert.h> #include <stdlib.h> @@ -270,6 +268,8 @@ static void * enroll_handle(void * o) else log_dbg("Neigbor reported failed enrollment."); + enroll_msg__free_unpacked(msg, NULL); + connmgr_dealloc(AEID_ENROLL, &conn); } @@ -339,7 +339,7 @@ int enroll_init(void) info.pref_syntax = PROTO_GPB; info.addr = 0; - if (connmgr_ae_init(AEID_ENROLL, &info, NULL)) { + if (connmgr_ae_init(AEID_ENROLL, &info)) { log_err("Failed to register with connmgr."); return -1; } diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 682dc5c6..e684abd2 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -28,7 +28,6 @@ #include <ouroboros/logs.h> #include <ouroboros/fqueue.h> -#include <ouroboros/rib.h> #include <ouroboros/errno.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> @@ -38,7 +37,6 @@ #include "fa.h" #include "sdu_sched.h" #include "ipcp.h" -#include "ribconfig.h" #include "dt.h" #include <pthread.h> @@ -59,9 +57,9 @@ struct { struct sdu_sched * sdu_sched; } fa; -static int sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) +static void sdu_handler(int fd, + qoscube_t qc, + struct shm_du_buff * sdb) { pthread_rwlock_rdlock(&fa.flows_lock); @@ -69,12 +67,10 @@ static int sdu_handler(int fd, pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); log_warn("Failed to forward SDU."); - return -1; + return; } pthread_rwlock_unlock(&fa.flows_lock); - - return 0; } static void destroy_conn(int fd) diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 22b6e718..2b35a04a 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -31,9 +31,9 @@ #include <ouroboros/ipcp-dev.h> #include <ouroboros/time_utils.h> #include <ouroboros/irm.h> -#include <ouroboros/rib.h> #include <ouroboros/hash.h> #include <ouroboros/errno.h> +#include <ouroboros/notifier.h> #include "addr_auth.h" #include "connmgr.h" @@ -42,8 +42,6 @@ #include "fa.h" #include "dt.h" #include "ipcp.h" -#include "ribconfig.h" -#include "ribmgr.h" #include <stdbool.h> #include <signal.h> @@ -56,11 +54,6 @@ static int initialize_components(const struct ipcp_config * conf) { - if (rib_init()) { - log_err("Failed to initialize RIB."); - goto fail_rib_init; - } - ipcpi.dif_name = strdup(conf->dif_info.dif_name); if (ipcpi.dif_name == NULL) { log_err("Failed to set DIF name."); @@ -85,11 +78,6 @@ static int initialize_components(const struct ipcp_config * conf) log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); - if (ribmgr_init()) { - log_err("Failed to initialize RIB manager."); - goto fail_ribmgr; - } - if (dt_init(conf->routing_type, conf->addr_size, conf->fd_size, @@ -117,14 +105,10 @@ static int initialize_components(const struct ipcp_config * conf) fail_fa: dt_fini(); fail_dt: - ribmgr_fini(); - fail_ribmgr: addr_auth_fini(); fail_addr_auth: free(ipcpi.dif_name); fail_dif_name: - rib_fini(); - fail_rib_init: return -1; } @@ -136,13 +120,9 @@ static void finalize_components(void) dt_fini(); - ribmgr_fini(); - addr_auth_fini(); free(ipcpi.dif_name); - - rib_fini(); } static int start_components(void) @@ -151,11 +131,6 @@ static int start_components(void) ipcp_set_state(IPCP_OPERATIONAL); - if (ribmgr_start()) { - log_err("Failed to start RIB manager."); - goto fail_ribmgr_start; - } - if (fa_start()) { log_err("Failed to start flow allocator."); goto fail_fa_start; @@ -178,8 +153,6 @@ static int start_components(void) fail_enroll_start: fa_stop(); fail_fa_start: - ribmgr_stop(); - fail_ribmgr_start: ipcp_set_state(IPCP_INIT); return -1; } @@ -195,8 +168,6 @@ static void stop_components(void) fa_stop(); - ribmgr_stop(); - ipcp_set_state(IPCP_INIT); } @@ -377,6 +348,11 @@ int main(int argc, goto fail_enroll_init; } + if (notifier_init()) { + log_err("Failed to initialize notifier component."); + goto fail_notifier_init; + } + if (ipcp_boot() < 0) { log_err("Failed to boot IPCP."); goto fail_boot; @@ -396,6 +372,8 @@ int main(int argc, finalize_components(); } + notifier_fini(); + enroll_fini(); connmgr_fini(); @@ -409,6 +387,8 @@ int main(int argc, fail_create_r: ipcp_shutdown(); fail_boot: + notifier_fini(); + fail_notifier_init: enroll_fini(); fail_enroll_init: connmgr_fini(); diff --git a/src/ipcpd/normal/neighbors.c b/src/ipcpd/normal/neighbors.c deleted file mode 100644 index c32e9aa2..00000000 --- a/src/ipcpd/normal/neighbors.c +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Neighbors - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 199309L - -#define OUROBOROS_PREFIX "neighbors" - -#include <ouroboros/qoscube.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/errno.h> -#include <ouroboros/logs.h> - -#include "neighbors.h" - -#include <stdlib.h> -#include <assert.h> -#include <inttypes.h> - -static void notify_listeners(enum nb_event event, - struct nb * nb, - struct nbs * nbs) -{ - struct list_head * p = NULL; - - pthread_mutex_lock(&nbs->notifiers_lock); - - list_for_each(p, &nbs->notifiers) { - struct nb_notifier * e = - list_entry(p, struct nb_notifier, next); - if (e->notify_call(event, nb->conn)) - log_err("Listener reported an error."); - } - - pthread_mutex_unlock(&nbs->notifiers_lock); -} - -struct nbs * nbs_create(void) -{ - struct nbs * nbs; - - nbs = malloc(sizeof(*nbs)); - if (nbs == NULL) - return NULL; - - list_head_init(&nbs->list); - list_head_init(&nbs->notifiers); - - if (pthread_mutex_init(&nbs->list_lock, NULL)) - return NULL; - - if (pthread_mutex_init(&nbs->notifiers_lock, NULL)) { - pthread_mutex_destroy(&nbs->list_lock); - return NULL; - } - - return nbs; -} - -void nbs_destroy(struct nbs * nbs) -{ - struct list_head * p = NULL; - struct list_head * n = NULL; - - assert(nbs); - - pthread_mutex_lock(&nbs->list_lock); - - list_for_each_safe(p, n, &nbs->list) { - struct nb * e = list_entry(p, struct nb, next); - list_del(&e->next); - free(e); - } - - pthread_mutex_unlock(&nbs->list_lock); - - pthread_mutex_destroy(&nbs->list_lock); - pthread_mutex_destroy(&nbs->notifiers_lock); - - free(nbs); -} - -int nbs_add(struct nbs * nbs, - struct conn conn) -{ - struct nb * nb; - - assert(nbs); - - nb = malloc(sizeof(*nb)); - if (nb == NULL) - return -ENOMEM; - - nb->conn = conn; - - pthread_mutex_lock(&nbs->list_lock); - - list_add(&nb->next, &nbs->list); - - notify_listeners(NEIGHBOR_ADDED, nb, nbs); - - pthread_mutex_unlock(&nbs->list_lock); - - log_info("Added neighbor with fd %d and address %" PRIu64 " to list.", - conn.flow_info.fd, conn.conn_info.addr); - - return 0; -} - -int nbs_update_qos(struct nbs * nbs, - int fd, - qosspec_t qs) -{ - struct list_head * p = NULL; - - assert(nbs); - - pthread_mutex_lock(&nbs->list_lock); - - list_for_each(p, &nbs->list) { - struct nb * e = list_entry(p, struct nb, next); - if (e->conn.flow_info.fd == fd) { - e->conn.flow_info.qs = qs; - - notify_listeners(NEIGHBOR_QOS_CHANGE, e, nbs); - - pthread_mutex_unlock(&nbs->list_lock); - return 0; - } - } - - pthread_mutex_unlock(&nbs->list_lock); - - return -1; -} - -int nbs_del(struct nbs * nbs, - int fd) -{ - struct list_head * p = NULL; - struct list_head * n = NULL; - - assert(nbs); - - pthread_mutex_lock(&nbs->list_lock); - - list_for_each_safe(p, n, &nbs->list) { - struct nb * e = list_entry(p, struct nb, next); - if (e->conn.flow_info.fd == fd) { - notify_listeners(NEIGHBOR_REMOVED, e, nbs); - list_del(&e->next); - free(e); - pthread_mutex_unlock(&nbs->list_lock); - return 0; - } - } - - pthread_mutex_unlock(&nbs->list_lock); - - return -1; -} - -bool nbs_has(struct nbs * nbs, - uint64_t addr) -{ - struct list_head * p = NULL; - - assert(nbs); - - pthread_mutex_lock(&nbs->list_lock); - - list_for_each(p, &nbs->list) { - struct nb * e = list_entry(p, struct nb, next); - if (e->conn.conn_info.addr == addr) { - pthread_mutex_unlock(&nbs->list_lock); - return true; - } - } - - pthread_mutex_unlock(&nbs->list_lock); - - return false; -} - -int nbs_reg_notifier(struct nbs * nbs, - struct nb_notifier * notify) -{ - assert(nbs); - assert(notify); - - pthread_mutex_lock(&nbs->notifiers_lock); - - list_add(¬ify->next, &nbs->notifiers); - - pthread_mutex_unlock(&nbs->notifiers_lock); - - return 0; -} - -int nbs_unreg_notifier(struct nbs * nbs, - struct nb_notifier * notify) -{ - struct list_head * p = NULL; - struct list_head * n = NULL; - - pthread_mutex_lock(&nbs->notifiers_lock); - - list_for_each_safe(p, n, &nbs->notifiers) { - struct nb_notifier * e = - list_entry(p, struct nb_notifier, next); - if (e == notify) { - list_del(&e->next); - pthread_mutex_unlock(&nbs->notifiers_lock); - return 0; - } - } - - pthread_mutex_unlock(&nbs->notifiers_lock); - - return -1; -} diff --git a/src/ipcpd/normal/neighbors.h b/src/ipcpd/normal/neighbors.h deleted file mode 100644 index 9c5a6e50..00000000 --- a/src/ipcpd/normal/neighbors.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Neighbors - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H -#define OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H - -#include <ouroboros/ipcp.h> -#include <ouroboros/list.h> -#include <ouroboros/qos.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/cacep.h> - -#include "ae.h" - -enum nb_event { - NEIGHBOR_ADDED, - NEIGHBOR_REMOVED, - NEIGHBOR_QOS_CHANGE -}; - -typedef int (* nb_notify_t)(enum nb_event event, - struct conn conn); - -struct nb { - struct list_head next; - struct conn conn; -}; - -struct nb_notifier { - struct list_head next; - nb_notify_t notify_call; -}; - -struct nbs { - struct list_head notifiers; - pthread_mutex_t notifiers_lock; - - struct list_head list; - pthread_mutex_t list_lock; -}; - -struct nbs * nbs_create(void); - -void nbs_destroy(struct nbs * nbs); - -int nbs_add(struct nbs * nbs, - struct conn conn); - -int nbs_update_qos(struct nbs * nbs, - int fd, - qosspec_t qs); - -int nbs_del(struct nbs * nbs, - int fd); - -bool nbs_has(struct nbs * nbs, - uint64_t addr); - -int nbs_reg_notifier(struct nbs * nbs, - struct nb_notifier * notify); - -int nbs_unreg_notifier(struct nbs * nbs, - struct nb_notifier * notify); - -#endif diff --git a/src/ipcpd/normal/pol-routing-ops.h b/src/ipcpd/normal/pol-routing-ops.h index 0fec10fc..9804d5ad 100644 --- a/src/ipcpd/normal/pol-routing-ops.h +++ b/src/ipcpd/normal/pol-routing-ops.h @@ -26,7 +26,7 @@ #include "pff.h" struct pol_routing_ops { - int (* init)(struct nbs * nbs); + int (* init)(void); void (* fini)(void); diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c index 7a5a785e..0c4648c5 100644 --- a/src/ipcpd/normal/pol/flat.c +++ b/src/ipcpd/normal/pol/flat.c @@ -27,11 +27,9 @@ #include <ouroboros/logs.h> #include <ouroboros/errno.h> #include <ouroboros/time_utils.h> -#include <ouroboros/rib.h> #include <ouroboros/utils.h> #include "ipcp.h" -#include "ribconfig.h" #include <time.h> #include <stdlib.h> diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index 512ced7f..7df09bce 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -27,14 +27,16 @@ #include <ouroboros/errno.h> #include <ouroboros/list.h> #include <ouroboros/logs.h> -#include <ouroboros/rib.h> -#include <ouroboros/rqueue.h> +#include <ouroboros/utils.h> +#include <ouroboros/notifier.h> +#include <ouroboros/dev.h> +#include <ouroboros/fqueue.h> -#include "ribmgr.h" -#include "ribconfig.h" +#include "ae.h" +#include "connmgr.h" #include "graph.h" -#include "neighbors.h" #include "ipcp.h" +#include "link_state.h" #include "pff.h" #include <assert.h> @@ -43,39 +45,230 @@ #include <string.h> #include <pthread.h> -#include "fso.pb-c.h" -typedef Fso fso_t; +#include "link_state.pb-c.h" +typedef LinkStateMsg link_state_msg_t; -#define BUF_SIZE 256 -#define RECALC_TIME 4 +#define RECALC_TIME 4 +#define LS_UPDATE_TIME 15 +#define LS_TIMEO 60 +#define LSA_MAX_LEN 128 + +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif struct routing_i { struct pff * pff; pthread_t calculator; }; +/* TODO: link weight support. */ +struct adjacency { + struct list_head next; + + uint64_t dst; + uint64_t src; + + time_t stamp; +}; + +enum nb_type { + NB_DT = 0, + NB_MGMT +}; + +struct nb { + struct list_head next; + + uint64_t addr; + int fd; + enum nb_type type; +}; + struct { - struct nbs * nbs; - struct nb_notifier nb_notifier; + struct list_head nbs; + fset_t * mgmt_set; - struct graph * graph; + struct list_head db; - ro_set_t * set; - rqueue_t * queue; - pthread_t rib_listener; -} link_state; + pthread_rwlock_t db_lock; -/* Take under neighbors lock */ -static int addr_to_fd(uint64_t addr) + struct graph * graph; + + pthread_t lsupdate; + pthread_t lsreader; + pthread_t listener; +} ls; + +struct pol_routing_ops link_state_ops = { + .init = link_state_init, + .fini = link_state_fini, + .routing_i_create = link_state_routing_i_create, + .routing_i_destroy = link_state_routing_i_destroy +}; + +static int lsdb_add_nb(uint64_t addr, + int fd, + enum nb_type type) { - struct list_head * p = NULL; + struct list_head * p; + struct nb * nb; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * el = list_entry(p, struct nb, next); + if (el->addr == addr && el->type == type) { + log_dbg("Already know %s neighbor %" PRIu64 ".", + type == NB_DT ? "dt" : "mgmt", addr); + if (el->fd != fd) { + log_warn("Existing neighbor assigned new fd."); + el->fd = fd; + } + pthread_rwlock_unlock(&ls.db_lock); + return -EPERM; + } - list_for_each(p, &link_state.nbs->list) { - struct nb * e = list_entry(p, struct nb, next); - if (e->conn.conn_info.addr == addr) - return e->conn.flow_info.fd; + if (addr > el->addr) + break; } + nb = malloc(sizeof(*nb)); + if (nb == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + nb->addr = addr; + nb->fd = fd; + nb->type = type; + + list_add_tail(&nb->next, p); + + log_dbg("Type %s neighbor %" PRIu64 " added.", + nb->type == NB_DT ? "dt" : "mgmt", addr); + + pthread_rwlock_unlock(&ls.db_lock); + + return 0; +} + +static int lsdb_del_nb(uint64_t addr, + int fd) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->fd == fd) { + list_del(&nb->next); + pthread_rwlock_unlock(&ls.db_lock); + log_dbg("Type %s neighbor %" PRIu64 " deleted.", + nb->type == NB_DT ? "dt" : "mgmt", addr); + free(nb); + return 0; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -EPERM; +} + +static int lsdb_add_link(uint64_t src, + uint64_t dst, + qosspec_t * qs) +{ + struct list_head * p; + struct adjacency * adj; + struct timespec now; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each(p, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + if (a->dst == dst && a->src == src) { + a->stamp = now.tv_sec; + pthread_rwlock_unlock(&ls.db_lock); + return 0; + } + + if (a->dst > dst || (a->dst == dst && a->src > src)) + break; + } + + adj = malloc(sizeof(*adj)); + if (adj == NULL) { + pthread_rwlock_unlock(&ls.db_lock); + return -ENOMEM; + } + + adj->dst = dst; + adj->src = src; + adj->stamp = now.tv_sec; + + list_add_tail(&adj->next, p); + + if (graph_update_edge(ls.graph, src, dst, *qs)) + log_warn("Failed to add edge to graph."); + + log_dbg("Added %" PRIu64 " - %" PRIu64" to lsdb.", adj->src, adj->dst); + + pthread_rwlock_unlock(&ls.db_lock); + + return 0; +} + +static int lsdb_del_link(uint64_t src, + uint64_t dst) +{ + struct list_head * p; + struct list_head * h; + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + if (a->dst == dst && a->src == src) { + list_del(&a->next); + if (graph_del_edge(ls.graph, src, dst)) + log_warn("Failed to delete edge from graph."); + + log_dbg("Removed %" PRIu64 " - %" PRIu64" from lsdb.", + a->src, a->dst); + + pthread_rwlock_unlock(&ls.db_lock); + free(a); + return 0; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -EPERM; +} + +static int nbr_to_fd(uint64_t addr) +{ + struct list_head * p; + + pthread_rwlock_rdlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->type == NB_DT) { + pthread_rwlock_unlock(&ls.db_lock); + return nb->fd; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + return -1; } @@ -91,20 +284,19 @@ static void * calculate_pff(void * o) while (true) { table = NULL; - n_table = graph_routing_table(link_state.graph, + n_table = graph_routing_table(ls.graph, ipcpi.dt_addr, &table); if (n_table < 0) { sleep(RECALC_TIME); continue; } - pthread_mutex_lock(&link_state.nbs->list_lock); pff_lock(instance->pff); pff_flush(instance->pff); for (i = 0; i < n_table; i++) { - fd = addr_to_fd(table[i]->nhop); + fd = nbr_to_fd(table[i]->nhop); if (fd == -1) continue; @@ -112,7 +304,6 @@ static void * calculate_pff(void * o) } pff_unlock(instance->pff); - pthread_mutex_unlock(&link_state.nbs->list_lock); freepp(struct routing_table, table, n_table); sleep(RECALC_TIME); @@ -121,154 +312,209 @@ static void * calculate_pff(void * o) return (void *) 0; } -static int link_state_neighbor_event(enum nb_event event, - struct conn conn) +static void send_lsa(uint64_t dst, + uint64_t src) { - char path[RIB_MAX_PATH_LEN + 1]; - char fso_name[RIB_MAX_PATH_LEN + 1]; - fso_t fso = FSO__INIT; - size_t len; - uint8_t * data; + uint8_t buf[LSA_MAX_LEN]; + link_state_msg_t lsa = LINK_STATE_MSG__INIT; + size_t len; + struct list_head * p; - path[0] = '\0'; - sprintf(fso_name, "%" PRIu64 "-%" PRIu64, - ipcpi.dt_addr, conn.conn_info.addr); - rib_path_append(rib_path_append(path, ROUTING_NAME), fso_name); + lsa.d_addr = dst; + lsa.s_addr = src; - switch (event) { - case NEIGHBOR_ADDED: - fso.s_addr = ipcpi.dt_addr; - fso.d_addr = conn.conn_info.addr; + len = link_state_msg__get_packed_size(&lsa); - len = fso__get_packed_size(&fso); - if (len == 0) - return -1; + assert(len <= LSA_MAX_LEN); - data = malloc(len); - if (data == NULL) - return -1; + link_state_msg__pack(&lsa, buf); - fso__pack(&fso, data); + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->type == NB_MGMT) + flow_write(nb->fd, buf, len); + } +} - if (rib_add(ROUTING_PATH, fso_name)) { - log_err("Failed to add FSO."); - free(data); - return -1; - } +static void * lsupdate(void * o) +{ + struct list_head * p; + struct list_head * h; + struct timespec now; - if (rib_put(path, data, len)) { - log_err("Failed to put FSO in RIB."); - rib_del(path); - free(data); - return -1; - } + (void) o; - log_dbg("Added %s to RIB.", path); + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_rdlock(&ls.db_lock); + + pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock, + (void *) &ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * adj; + adj = list_entry(p, struct adjacency, next); + if (now.tv_sec - adj->stamp > LS_TIMEO) { + list_del(&adj->next); + log_dbg("%" PRIu64 " - %" PRIu64" timed out.", + adj->src, adj->dst); + if (graph_del_edge(ls.graph, adj->src, + adj->dst)) + log_dbg("Failed to delete edge."); + free(adj); + continue; + } - break; - case NEIGHBOR_REMOVED: - if (rib_del(path)) { - log_err("Failed to remove FSO."); - return -1; + if (adj->src == ipcpi.dt_addr) { + send_lsa(adj->src, adj->dst); + adj->stamp = now.tv_sec; + } } - log_dbg("Removed %s from RIB.", path); + pthread_cleanup_pop(true); - break; - case NEIGHBOR_QOS_CHANGE: - log_info("Not currently supported."); - break; - default: - log_info("Unsupported event for routing."); - break; + sleep(LS_UPDATE_TIME); } - return 0; + return (void *) 0; } -static int read_fso(char * path, - int32_t flag) +static void * ls_conn_handle(void * o) { - ssize_t len; - uint8_t ro[BUF_SIZE]; - fso_t * fso; - qosspec_t qs; + struct conn conn; - memset(&qs, 0, sizeof(qs)); + (void) o; - len = rib_read(path, ro, BUF_SIZE); - if (len < 0) { - log_err("Failed to read FSO."); - return -1; - } + while (true) { + if (connmgr_wait(AEID_MGMT, &conn)) { + log_err("Failed to get next MGMT connection."); + continue; + } - fso = fso__unpack(NULL, len, ro); - if (fso == NULL) { - log_err("Failed to unpack."); - return -1; - } + /* NOTE: connection acceptance policy could be here. */ - if (flag & RO_MODIFY) { - if (graph_update_edge(link_state.graph, - fso->s_addr, fso->d_addr, qs)) { - fso__free_unpacked(fso, NULL); - return -1; - } - } else if (flag & RO_DELETE) { - if (graph_del_edge(link_state.graph, fso->s_addr, fso->d_addr)) { - fso__free_unpacked(fso, NULL); - return -1; - } + notifier_event(NOTIFY_MGMT_CONN_ADD, &conn); } - fso__free_unpacked(fso, NULL); - return 0; } -static void * rib_listener(void * o) + +static void forward_lsm(uint8_t * buf, + size_t len, + int in_fd) { - int32_t flag; - char path[RIB_MAX_PATH_LEN + 1]; - char ** children; - ssize_t len; - int i; + struct list_head * p; - (void) o; + pthread_rwlock_rdlock(&ls.db_lock); - if (ro_set_add(link_state.set, ROUTING_PATH, RO_MODIFY | RO_DELETE)) { - log_err("Failed to add to RO set"); - return (void * ) -1; + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->type == NB_MGMT && nb->fd != in_fd) + flow_write(nb->fd, buf, len); } - len = rib_children(ROUTING_PATH, &children); - if (len < 0) { - log_err("Failed to retrieve children."); + pthread_rwlock_unlock(&ls.db_lock); +} + +static void * lsreader(void * o) +{ + fqueue_t * fq; + int ret; + uint8_t buf[LSA_MAX_LEN]; + size_t len; + int fd; + qosspec_t qs; + + (void) o; + + memset(&qs, 0, sizeof(qs)); + + fq = fqueue_create(); + if (fq == NULL) return (void *) -1; - } - for (i = 0; i < len; i++) { - if (read_fso(children[i], RO_CREATE)) { - log_err("Failed to parse FSO."); + pthread_cleanup_push((void (*) (void *)) fqueue_destroy, + (void *) fq); + + while (true) { + ret = fevent(ls.mgmt_set, fq, NULL); + if (ret < 0) { + log_warn("Event error: %d.", ret); continue; } - } - while (rib_event_wait(link_state.set, link_state.queue, NULL) == 0) { - path[0] = '\0'; - flag = rqueue_next(link_state.queue, path); - if (flag < 0) - continue; + while ((fd = fqueue_next(fq)) >= 0) { + link_state_msg_t * msg; + len = flow_read(fd, buf, LSA_MAX_LEN); + if (len <= 0) + continue; - if (read_fso(path, flag)) { - log_err("Failed to parse FSO."); - continue; + msg = link_state_msg__unpack(NULL, len, buf); + if (msg == NULL) { + log_dbg("Failed to unpack link state message."); + continue; + } + + lsdb_add_link(msg->s_addr, msg->d_addr, &qs); + + link_state_msg__free_unpacked(msg, NULL); + + forward_lsm(buf, len, fd); } } + pthread_cleanup_pop(true); + return (void *) 0; } +static void handle_event(int event, + const void * o) +{ + /* FIXME: Apply correct QoS on graph */ + struct conn * c; + qosspec_t qs; + + c = (struct conn *) o; + + memset(&qs, 0, sizeof(qs)); + + switch (event) { + case NOTIFY_DT_CONN_ADD: + if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) + log_dbg("Failed to add neighbor to LSDB."); + + if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs)) + log_dbg("Failed to add adjacency to LSDB."); + break; + case NOTIFY_DT_CONN_DEL: + if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_dbg("Failed to delete neighbor from LSDB."); + + if (lsdb_del_link(ipcpi.dt_addr, c->conn_info.addr)) + log_dbg("Local link was not in LSDB."); + break; + case NOTIFY_DT_CONN_QOS: + log_dbg("QoS changes currently unsupported."); + break; + case NOTIFY_MGMT_CONN_ADD: + fset_add(ls.mgmt_set, c->flow_info.fd); + if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT)) + log_warn("Failed to add mgmt neighbor to LSDB."); + break; + case NOTIFY_MGMT_CONN_DEL: + fset_del(ls.mgmt_set, c->flow_info.fd); + if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_warn("Failed to add mgmt neighbor to LSDB."); + break; + default: + log_info("Unknown routing event."); + break; + } +} + struct routing_i * link_state_routing_i_create(struct pff * pff) { struct routing_i * tmp; @@ -281,7 +527,10 @@ struct routing_i * link_state_routing_i_create(struct pff * pff) tmp->pff = pff; - pthread_create(&tmp->calculator, NULL, calculate_pff, (void *) tmp); + if (pthread_create(&tmp->calculator, NULL, calculate_pff, tmp)) { + free(tmp); + return NULL; + } return tmp; } @@ -297,61 +546,100 @@ void link_state_routing_i_destroy(struct routing_i * instance) free(instance); } -int link_state_init(struct nbs * nbs) +int link_state_init(void) { - link_state.graph = graph_create(); - if (link_state.graph == NULL) + struct conn_info info; + + memset(&info, 0, sizeof(info)); + + strcpy(info.ae_name, LS_AE); + strcpy(info.protocol, LS_PROTO); + info.pref_version = 1; + info.pref_syntax = PROTO_GPB; + info.addr = ipcpi.dt_addr; + + ls.graph = graph_create(); + if (ls.graph == NULL) goto fail_graph; - if (rib_add(RIB_ROOT, ROUTING_NAME)) - goto fail_rib_add; + if (notifier_reg(handle_event)) + goto fail_notifier_reg; + + if (pthread_rwlock_init(&ls.db_lock, NULL)) + goto fail_db_lock_init; + + if (connmgr_ae_init(AEID_MGMT, &info)) + goto fail_connmgr_ae_init; - link_state.nbs = nbs; + ls.mgmt_set = fset_create(); + if (ls.mgmt_set == NULL) + goto fail_fset_create; - link_state.nb_notifier.notify_call = link_state_neighbor_event; - if (nbs_reg_notifier(link_state.nbs, &link_state.nb_notifier)) - goto fail_nbs_reg_notifier; + list_head_init(&ls.db); + list_head_init(&ls.nbs); - link_state.set = ro_set_create(); - if (link_state.set == NULL) - goto fail_ro_set_create; + if (pthread_create(&ls.lsupdate, NULL, lsupdate, NULL)) + goto fail_pthread_create_lsupdate; - link_state.queue = rqueue_create(); - if (link_state.queue == NULL) - goto fail_rqueue_create; + if (pthread_create(&ls.lsreader, NULL, lsreader, NULL)) + goto fail_pthread_create_lsreader; - if (pthread_create(&link_state.rib_listener, NULL, rib_listener, NULL)) - goto fail_listener_create; + if (pthread_create(&ls.listener, NULL, ls_conn_handle, NULL)) + goto fail_pthread_create_listener; return 0; - fail_listener_create: - ro_set_destroy(link_state.set); - fail_rqueue_create: - ro_set_destroy(link_state.set); - fail_ro_set_create: - nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier); - fail_nbs_reg_notifier: - rib_del(ROUTING_PATH); - fail_rib_add: - graph_destroy(link_state.graph); + fail_pthread_create_listener: + pthread_cancel(ls.lsreader); + pthread_join(ls.lsreader, NULL); + fail_pthread_create_lsreader: + pthread_cancel(ls.lsupdate); + pthread_join(ls.lsupdate, NULL); + fail_pthread_create_lsupdate: + fset_destroy(ls.mgmt_set); + fail_fset_create: + connmgr_ae_fini(AEID_MGMT); + fail_connmgr_ae_init: + pthread_rwlock_destroy(&ls.db_lock); + fail_db_lock_init: + notifier_unreg(handle_event); + fail_notifier_reg: + graph_destroy(ls.graph); fail_graph: return -1; } void link_state_fini(void) { - pthread_cancel(link_state.rib_listener); + struct list_head * p; + struct list_head * h; + + pthread_cancel(ls.listener); + pthread_join(ls.listener, NULL); + + pthread_cancel(ls.lsreader); + pthread_join(ls.lsreader, NULL); - pthread_join(link_state.rib_listener, NULL); + pthread_cancel(ls.lsupdate); + pthread_join(ls.lsupdate, NULL); - rqueue_destroy(link_state.queue); + fset_destroy(ls.mgmt_set); - ro_set_destroy(link_state.set); + connmgr_ae_fini(AEID_MGMT); + + graph_destroy(ls.graph); + + pthread_rwlock_wrlock(&ls.db_lock); + + list_for_each_safe(p, h, &ls.db) { + struct adjacency * a = list_entry(p, struct adjacency, next); + list_del(&a->next); + free(a); + } - graph_destroy(link_state.graph); + pthread_rwlock_unlock(&ls.db_lock); - rib_del(ROUTING_PATH); + pthread_rwlock_destroy(&ls.db_lock); - nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier); + notifier_unreg(handle_event); } diff --git a/src/ipcpd/normal/pol/link_state.h b/src/ipcpd/normal/pol/link_state.h index 9b96bcab..58f90d91 100644 --- a/src/ipcpd/normal/pol/link_state.h +++ b/src/ipcpd/normal/pol/link_state.h @@ -23,9 +23,12 @@ #ifndef OUROBOROS_IPCPD_NORMAL_POL_LINK_STATE_H #define OUROBOROS_IPCPD_NORMAL_POL_LINK_STATE_H +#define LS_AE "Management" +#define LS_PROTO "LSP" + #include "pol-routing-ops.h" -int link_state_init(struct nbs * nbs); +int link_state_init(void); void link_state_fini(void); @@ -33,11 +36,6 @@ struct routing_i * link_state_routing_i_create(struct pff * pff); void link_state_routing_i_destroy(struct routing_i * instance); -struct pol_routing_ops link_state_ops = { - .init = link_state_init, - .fini = link_state_fini, - .routing_i_create = link_state_routing_i_create, - .routing_i_destroy = link_state_routing_i_destroy -}; +struct pol_routing_ops link_state_ops; #endif /* OUROBOROS_IPCPD_NORMAL_POL_LINK_STATE_H */ diff --git a/src/ipcpd/normal/pol/fso.proto b/src/ipcpd/normal/pol/link_state.proto index 27a78efd..4e2280b0 100644 --- a/src/ipcpd/normal/pol/fso.proto +++ b/src/ipcpd/normal/pol/link_state.proto @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * Flow State Object message + * Link State message * * Dimitri Staessens <dimitri.staessens@ugent.be> * Sander Vrijders <sander.vrijders@ugent.be> @@ -22,8 +22,8 @@ syntax = "proto2"; -message fso { - required uint64 s_addr = 1; - required uint64 d_addr = 2; +message link_state_msg { + required uint64 d_addr = 1; + required uint64 s_addr = 2; /* Add QoS parameters of link here */ }; diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h deleted file mode 100644 index f6d10133..00000000 --- a/src/ipcpd/normal/ribconfig.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Normal IPC Process - RIB configuration - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H -#define OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H - -/* RIB configuration for normal */ -#define RIB_MAX_PATH_LEN 256 - -#define DLR "/" -#define ROUTING_NAME "fsdb" -#define ROUTING_PATH DLR ROUTING_NAME - -#endif /* OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H */ diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c deleted file mode 100644 index a5e7d6ce..00000000 --- a/src/ipcpd/normal/ribmgr.c +++ /dev/null @@ -1,423 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * RIB manager of the IPC Process - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200112L - -#define OUROBOROS_PREFIX "rib-manager" - -#include <ouroboros/logs.h> -#include <ouroboros/cdap.h> -#include <ouroboros/list.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/errno.h> -#include <ouroboros/dev.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/rib.h> - -#include "ae.h" -#include "connmgr.h" -#include "ipcp.h" -#include "neighbors.h" -#include "ribconfig.h" -#include "ribmgr.h" - -#include <stdlib.h> -#include <pthread.h> -#include <string.h> -#include <errno.h> -#include <assert.h> - -#define MGMT_AE "Management" -#define RIB_SYNC_TIMEOUT 1 - -enum ribmgr_state { - RIBMGR_NULL = 0, - RIBMGR_INIT, - RIBMGR_OPERATIONAL, - RIBMGR_SHUTDOWN -}; - -struct { - struct cdap * cdap; - - pthread_t reader; - pthread_t sync; - - struct nbs * nbs; - struct ae * ae; - - struct nb_notifier nb_notifier; - - pthread_rwlock_t state_lock; - enum ribmgr_state state; -} ribmgr; - -static int ribmgr_neighbor_event(enum nb_event event, - struct conn conn) -{ - switch (event) { - case NEIGHBOR_ADDED: - cdap_add_flow(ribmgr.cdap, conn.flow_info.fd); - break; - case NEIGHBOR_REMOVED: - cdap_del_flow(ribmgr.cdap, conn.flow_info.fd); - break; - default: - /* Don't care about other events */ - break; - } - - return 0; -} - -static enum ribmgr_state ribmgr_get_state(void) -{ - enum ribmgr_state state; - - pthread_rwlock_rdlock(&ribmgr.state_lock); - - state = ribmgr.state; - - pthread_rwlock_unlock(&ribmgr.state_lock); - - return state; -} - -static void ribmgr_set_state(enum ribmgr_state state) -{ - pthread_rwlock_wrlock(&ribmgr.state_lock); - - ribmgr.state = state; - - pthread_rwlock_unlock(&ribmgr.state_lock); -} - -static void * reader(void * o) -{ - cdap_key_t key; - enum cdap_opcode oc; - char * name; - uint8_t * data; - size_t len; - ssize_t slen; - uint32_t flags; - uint8_t * buf; - int rval; - - (void) o; - - while (ribmgr_get_state() == RIBMGR_OPERATIONAL) { - key = cdap_request_wait(ribmgr.cdap, &oc, &name, &data, - (size_t *) &len , &flags); - assert(key != -EINVAL); - - if (key == INVALID_CDAP_KEY) { - log_warn("Bad CDAP request."); - continue; - } - - assert(name); - assert(strlen(name)); - - switch (oc) { - case CDAP_READ: - assert(len == 0); - slen = rib_pack(name, &buf, PACK_HASH_ROOT); - if (slen < 0) { - log_err("Failed to pack %s.", name); - cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); - free(name); - continue; - } - - log_dbg("Packed %s (%zu bytes).", name, slen); - - free(name); - - if (cdap_reply_send(ribmgr.cdap, key, 0, buf, slen)) { - log_err("Failed to send CDAP reply."); - free(buf); - continue; - } - - free(buf); - break; - case CDAP_WRITE: - assert(len); - assert(data); - - rval = rib_unpack(data, len, 0); - switch(rval) { - case 0: - break; - case -EFAULT: - log_warn("Hash mismatch, not in sync."); - free(data); - break; - default: - log_warn("Error unpacking %s.", name); - cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); - free(name); - free(data); - continue; - } - - free(name); - - if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) { - log_err("Failed to send CDAP reply."); - continue; - } - break; - case CDAP_CREATE: - assert(len); - assert(data); - - rval = rib_unpack(data, len, UNPACK_CREATE); - switch(rval) { - case 0: - break; - case -EFAULT: - log_warn("Hash mismatch, not yet in sync."); - free(data); - break; - default: - log_warn("Error unpacking %s.", name); - cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); - free(name); - free(data); - continue; - } - - free(name); - - if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) { - log_err("Failed to send CDAP reply."); - continue; - } - break; - case CDAP_DELETE: - assert(len == 0); - if (rib_del(name)) { - log_warn("Failed deleting %s.", name); - cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0); - } - - free(name); - - if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) { - log_err("Failed to send CDAP reply."); - continue; - } - break; - case CDAP_START: - case CDAP_STOP: - log_warn("Unsupported CDAP command."); - if (len) - free(data); - break; - default: - log_err("Bad CDAP command."); - if (len) - free(data); - break; - } - } - - return (void *) 0; -} - -char path[RIB_MAX_PATH_LEN + 1]; - -static void path_reset(void) { - path[strlen(RIB_ROOT)] = '\0'; - assert(strcmp(path, RIB_ROOT) == 0); -} - -static int ribmgr_sync(const char * path) -{ - uint8_t * buf; - ssize_t len; - cdap_key_t * keys; - - len = rib_pack(path, &buf, PACK_HASH_ALL); - if (len < 0) { - log_warn("Failed to pack %s.", path); - return -1; - } - - keys = cdap_request_send(ribmgr.cdap, CDAP_CREATE, path, buf, len, 0); - if (keys != NULL) { - cdap_key_t * key = keys; - while (*key != INVALID_CDAP_KEY) - cdap_reply_wait(ribmgr.cdap, *(key++), NULL, NULL); - free(keys); - } - - free(buf); - - return 0; -} - -/* FIXME: Temporary thread, syncs rib with neighbors every second */ -static void * sync_rib(void *o) -{ - char ** children; - ssize_t ch; - - (void) o; - - strcpy(path, RIB_ROOT); - - while (ribmgr_get_state() == RIBMGR_OPERATIONAL) { - sleep(RIB_SYNC_TIMEOUT); - - ch = rib_children(RIB_ROOT, &children); - if (ch <= 0) - continue; - - while (ch > 0) { - path_reset(); - - rib_path_append(path, children[--ch]); - free(children[ch]); - - /* Sync fsdb */ - if (strcmp(path, ROUTING_PATH) == 0) - ribmgr_sync(path); - } - - free(children); - } - - return (void *) 0; -} - -int ribmgr_init(void) -{ - struct conn_info info; - - memset(&info, 0, sizeof(info)); - - strcpy(info.ae_name, MGMT_AE); - strcpy(info.protocol, CDAP_PROTO); - info.pref_version = 1; - info.pref_syntax = PROTO_GPB; - info.addr = 0; - - ribmgr.nbs = nbs_create(); - if (ribmgr.nbs == NULL) { - log_err("Failed to create neighbors."); - goto fail_nbs_create; - } - - if (connmgr_ae_init(AEID_MGMT, &info, ribmgr.nbs)) { - log_err("Failed to register with connmgr."); - goto fail_connmgr_ae_init; - }; - - ribmgr.cdap = cdap_create(); - if (ribmgr.cdap == NULL) { - log_err("Failed to create CDAP instance."); - goto fail_cdap_create; - } - - ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event; - if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) { - log_err("Failed to register notifier."); - goto fail_nbs_reg_notifier; - } - - if (pthread_rwlock_init(&ribmgr.state_lock, NULL)) { - log_err("Failed to init rwlock."); - goto fail_rwlock_init; - } - - ribmgr.state = RIBMGR_INIT; - - return 0; - - fail_rwlock_init: - nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier); - fail_nbs_reg_notifier: - cdap_destroy(ribmgr.cdap); - fail_cdap_create: - connmgr_ae_fini(AEID_MGMT); - fail_connmgr_ae_init: - nbs_destroy(ribmgr.nbs); - fail_nbs_create: - return -1; -} - -void ribmgr_fini(void) -{ - if (ribmgr_get_state() == RIBMGR_SHUTDOWN) { - pthread_join(ribmgr.reader, NULL); - pthread_join(ribmgr.sync, NULL); - } - - nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier); - cdap_destroy(ribmgr.cdap); - nbs_destroy(ribmgr.nbs); - - connmgr_ae_fini(AEID_MGMT); -} - -int ribmgr_start(void) -{ - ribmgr_set_state(RIBMGR_OPERATIONAL); - - if (pthread_create(&ribmgr.sync, NULL, sync_rib, NULL)) { - ribmgr_set_state(RIBMGR_NULL); - return -1; - } - - if (pthread_create(&ribmgr.reader, NULL, reader, NULL)) { - ribmgr_set_state(RIBMGR_SHUTDOWN); - pthread_cancel(ribmgr.reader); - return -1; - } - - return 0; -} - -void ribmgr_stop(void) -{ - if (ribmgr_get_state() == RIBMGR_OPERATIONAL) { - ribmgr_set_state(RIBMGR_SHUTDOWN); - pthread_cancel(ribmgr.reader); - } -} - -int ribmgr_disseminate(char * path, - enum diss_target target, - enum diss_freq freq, - size_t delay) -{ - (void) path; - (void) target; - (void) freq; - (void) delay; - - return 0; -} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h deleted file mode 100644 index 20f87548..00000000 --- a/src/ipcpd/normal/ribmgr.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * RIB manager of the IPC Process - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_RIBMGR_H -#define OUROBOROS_IPCPD_NORMAL_RIBMGR_H - -#include <ouroboros/ipcp.h> -#include <ouroboros/utils.h> -#include <ouroboros/qos.h> - -enum diss_target { - NONE = 0, - NEIGHBORS, - ALL_MEMBERS -}; - -enum diss_freq { - SINGLE = 0, - PERIODIC -}; - -int ribmgr_init(void); - -void ribmgr_fini(void); - -int ribmgr_start(void); - -void ribmgr_stop(void); - -int ribmgr_disseminate(char * path, - enum diss_target target, - enum diss_freq freq, - size_t delay); - -#endif /* OUROBOROS_IPCPD_NORMAL_RIBMGR_H */ diff --git a/src/ipcpd/normal/routing.c b/src/ipcpd/normal/routing.c index 04e6fd76..47ce3518 100644 --- a/src/ipcpd/normal/routing.c +++ b/src/ipcpd/normal/routing.c @@ -22,29 +22,24 @@ #define _POSIX_C_SOURCE 200112L -#define OUROBOROS_PREFIX "routing" - -#include <ouroboros/logs.h> +#include <ouroboros/errno.h> #include "routing.h" #include "pol/link_state.h" - struct pol_routing_ops * r_ops; -int routing_init(enum pol_routing pr, - struct nbs * nbs) +int routing_init(enum pol_routing pr) { switch (pr) { case LINK_STATE: r_ops = &link_state_ops; break; default: - log_err("Unknown routing type."); - return -1; + return -ENOTSUP; } - return r_ops->init(nbs); + return r_ops->init(); } struct routing_i * routing_i_create(struct pff * pff) diff --git a/src/ipcpd/normal/routing.h b/src/ipcpd/normal/routing.h index 0ef11020..6c8cae76 100644 --- a/src/ipcpd/normal/routing.h +++ b/src/ipcpd/normal/routing.h @@ -27,12 +27,10 @@ #include <ouroboros/qos.h> #include "pff.h" -#include "neighbors.h" #include <stdint.h> -int routing_init(enum pol_routing pr, - struct nbs * nbs); +int routing_init(enum pol_routing pr); void routing_fini(void); diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index c7e799e2..7a82a874 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -38,9 +38,9 @@ #define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ struct sdu_sched { - fset_t * set[QOS_CUBE_MAX]; - next_sdu_t callback; - pthread_t sdu_readers[IPCP_SCHED_THREADS]; + fset_t * set[QOS_CUBE_MAX]; + next_sdu_fn_t callback; + pthread_t sdu_readers[IPCP_SCHED_THREADS]; }; static void cleanup_reader(void * o) @@ -95,10 +95,7 @@ static void * sdu_reader(void * o) continue; } - if (sched->callback(fd, i, sdb)) { - log_warn("Callback reported an error."); - continue; - } + sched->callback(fd, i, sdb); } } @@ -107,7 +104,7 @@ static void * sdu_reader(void * o) return (void *) 0; } -struct sdu_sched * sdu_sched_create(next_sdu_t callback) +struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback) { struct sdu_sched * sdu_sched; int i; diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h index 05371452..733f5648 100644 --- a/src/ipcpd/normal/sdu_sched.h +++ b/src/ipcpd/normal/sdu_sched.h @@ -26,11 +26,11 @@ #include <ouroboros/ipcp-dev.h> #include <ouroboros/fqueue.h> -typedef int (* next_sdu_t)(int fd, - qoscube_t qc, - struct shm_du_buff * sdb); +typedef void (* next_sdu_fn_t)(int fd, + qoscube_t qc, + struct shm_du_buff * sdb); -struct sdu_sched * sdu_sched_create(next_sdu_t callback); +struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback); void sdu_sched_destroy(struct sdu_sched * sdu_sched); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index eeb7966b..f126a52a 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -8,8 +8,6 @@ protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto) protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto) protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS ipcp_config.proto) -protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto) -protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto) protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) if (NOT APPLE) @@ -134,8 +132,6 @@ else () endif () set(SOCKET_TIMEOUT 1000 CACHE STRING "Default timeout for responses from IPCPs (ms)") -set(CDAP_REPLY_TIMEOUT 6000 CACHE STRING - "Timeout for CDAP to wait for reply") set(SHM_PREFIX "ouroboros" CACHE STRING "String to prepend to POSIX shared memory filenames") set(SHM_RBUFF_PREFIX "/${SHM_PREFIX}.rbuff." CACHE INTERNAL @@ -154,8 +150,6 @@ set(SOURCE_FILES bitmap.c btree.c cacep.c - cdap.c - cdap_req.c crc32.c dev.c frct_pci.c @@ -166,10 +160,10 @@ set(SOURCE_FILES lockfile.c logs.c md5.c + notifier.c qos.c qoscube.c random.c - rib.c sha3.c shm_flow_set.c shm_rbuff.c @@ -185,8 +179,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in" "${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY) add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS} - ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS} - ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS}) + ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CACEP_PROTO_SRCS}) include(AddCompileFlags) if (CMAKE_BUILD_TYPE MATCHES Debug) diff --git a/src/lib/cdap.c b/src/lib/cdap.c deleted file mode 100644 index d9cb2036..00000000 --- a/src/lib/cdap.c +++ /dev/null @@ -1,868 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * The Common Distributed Application Protocol - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200809L - -#include <ouroboros/cdap.h> -#include <ouroboros/bitmap.h> -#include <ouroboros/dev.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/errno.h> - -#include "cdap_req.h" - -#include <stdlib.h> -#include <pthread.h> -#include <string.h> -#include <assert.h> - -#include "cdap.pb-c.h" -typedef Cdap cdap_t; - -#define CDAP_REPLY (CDAP_DELETE + 1) - -#define INVALID_ID -1 -#define IDS_SIZE 2048 -#define BUF_SIZE 2048 - -struct fd_el { - struct list_head next; - - int fd; -}; - -struct cdap { - fset_t * set; - fqueue_t * fq; - - bool proc; - pthread_mutex_t mtx; - pthread_cond_t cond; - - size_t n_flows; - struct list_head flows; - pthread_rwlock_t flows_lock; - - struct bmp * ids; - pthread_mutex_t ids_lock; - - struct list_head sent; - pthread_rwlock_t sent_lock; - - struct list_head rcvd; - pthread_cond_t rcvd_cond; - pthread_mutex_t rcvd_lock; - - pthread_t reader; -}; - -struct cdap_rcvd { - struct list_head next; - - int fd; - bool proc; - - invoke_id_t iid; - cdap_key_t key; - - enum cdap_opcode opcode; - char * name; - void * data; - size_t len; - uint32_t flags; -}; - -static int next_id(struct cdap * instance) -{ - int ret; - - assert(instance); - - pthread_mutex_lock(&instance->ids_lock); - - ret = bmp_allocate(instance->ids); - if (!bmp_is_id_valid(instance->ids, ret)) - ret = INVALID_ID; - - pthread_mutex_unlock(&instance->ids_lock); - - return ret; -} - -static int release_id(struct cdap * instance, - int32_t id) -{ - int ret; - - assert(instance); - - pthread_mutex_lock(&instance->ids_lock); - - ret = bmp_release(instance->ids, id); - - pthread_mutex_unlock(&instance->ids_lock); - - return ret; -} - -#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL) - -static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, - cdap_key_t key) -{ - struct list_head * p = NULL; - struct cdap_req * req = NULL; - - assert(instance); - - pthread_rwlock_rdlock(&instance->sent_lock); - - list_for_each(p, &instance->sent) { - req = list_entry(p, struct cdap_req, next); - if (req->key == key) { - pthread_rwlock_unlock(&instance->sent_lock); - return req; - } - } - - pthread_rwlock_unlock(&instance->sent_lock); - - return NULL; -} - -static struct cdap_req * cdap_sent_get_by_iid(struct cdap * instance, - invoke_id_t iid) -{ - struct list_head * p = NULL; - struct cdap_req * req = NULL; - - assert(instance); - - pthread_rwlock_rdlock(&instance->sent_lock); - - list_for_each(p, &instance->sent) { - req = list_entry(p, struct cdap_req, next); - if (req->iid == iid) { - pthread_rwlock_unlock(&instance->sent_lock); - return req; - } - } - - pthread_rwlock_unlock(&instance->sent_lock); - - return NULL; -} - -static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance, - cdap_key_t key) -{ - struct list_head * p = NULL; - struct list_head * h = NULL; - struct cdap_rcvd * rcvd = NULL; - - assert(instance); - - pthread_mutex_lock(&instance->rcvd_lock); - - list_for_each_safe(p, h, &instance->rcvd) { - rcvd = list_entry(p, struct cdap_rcvd, next); - if (rcvd->key == key) { - list_del(&rcvd->next); - pthread_mutex_unlock(&instance->rcvd_lock); - return rcvd; - } - } - - pthread_mutex_unlock(&instance->rcvd_lock); - - assert(false); - - return NULL; -} - -static struct cdap_req * cdap_sent_add(struct cdap * instance, - int fd, - invoke_id_t iid, - cdap_key_t key) -{ - struct cdap_req * req; - - assert(instance); - assert(!cdap_sent_has_key(instance, key)); - - req = cdap_req_create(fd, iid, key); - if (req == NULL) - return NULL; - - pthread_rwlock_wrlock(&instance->sent_lock); - - list_add(&req->next, &instance->sent); - - pthread_rwlock_unlock(&instance->sent_lock); - - return req; -} - -static void cdap_sent_del(struct cdap * instance, - struct cdap_req * req) -{ - assert(instance); - assert(req); - - assert(cdap_sent_has_key(instance, req->key)); - - pthread_rwlock_wrlock(&instance->sent_lock); - - list_del(&req->next); - - pthread_rwlock_unlock(&instance->sent_lock); - - cdap_req_destroy(req); -} - -static void cdap_sent_destroy(struct cdap * instance) -{ - struct list_head * p = NULL; - struct list_head * h = NULL; - - assert(instance); - - pthread_rwlock_wrlock(&instance->sent_lock); - - list_for_each_safe(p, h, &instance->sent) { - struct cdap_req * req = list_entry(p, struct cdap_req, next); - list_del(&req->next); - cdap_req_cancel(req); - cdap_req_destroy(req); - } - - pthread_rwlock_unlock(&instance->sent_lock); -} - -static void cdap_rcvd_destroy(struct cdap * instance) -{ - struct list_head * p = NULL; - struct list_head * h = NULL; - - assert(instance); - - pthread_mutex_lock(&instance->rcvd_lock); - - list_for_each_safe(p, h, &instance->rcvd) { - struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next); - list_del(&r->next); - if (r->data != NULL) - free(r->data); - if (r->name != NULL) - free(r->name); - free(r); - } - - pthread_cond_broadcast(&instance->rcvd_cond); - - pthread_mutex_unlock(&instance->rcvd_lock); -} - -static void set_proc(struct cdap * instance, - bool status) -{ - pthread_mutex_lock(&instance->mtx); - - instance->proc = status; - pthread_cond_signal(&instance->cond); - - pthread_mutex_unlock(&instance->mtx); -} - -static void * sdu_reader(void * o) -{ - struct cdap * instance = (struct cdap *) o; - struct cdap_req * req; - struct cdap_rcvd * rcvd; - cdap_t * msg; - uint8_t buf[BUF_SIZE]; - ssize_t len; - buffer_t data; - - while (fevent(instance->set, instance->fq, NULL)) { - int fd; - set_proc(instance, true); - fd = fqueue_next(instance->fq); - len = flow_read(fd, buf, BUF_SIZE); - if (len < 0) { - set_proc(instance, false); - continue; - } - - msg = cdap__unpack(NULL, len, buf); - if (msg == NULL) { - set_proc(instance, false); - continue; - } - - if (msg->opcode != CDAP_REPLY) { - rcvd = malloc(sizeof(*rcvd)); - if (rcvd == NULL) { - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - continue; - } - - assert(msg->name); - - rcvd->opcode = msg->opcode; - rcvd->fd = fd; - rcvd->iid = msg->invoke_id; - rcvd->key = next_id(instance); - if (rcvd->key == INVALID_ID) { - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - free(rcvd); - continue; - } - - rcvd->flags = msg->flags; - rcvd->proc = false; - rcvd->name = strdup(msg->name); - if (rcvd->name == NULL) { - release_id(instance, rcvd->key); - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - free(rcvd); - continue; - } - - if (msg->has_value) { - rcvd->len = msg->value.len; - rcvd->data = malloc(rcvd->len); - if (rcvd->data == NULL) { - release_id(instance, rcvd->key); - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - free(rcvd->name); - free(rcvd); - continue; - } - memcpy(rcvd->data, msg->value.data, rcvd->len); - } else { - rcvd->len = 0; - rcvd->data = NULL; - } - - pthread_mutex_lock(&instance->rcvd_lock); - - list_add(&rcvd->next, &instance->rcvd); - - pthread_cond_signal(&instance->rcvd_cond); - pthread_mutex_unlock(&instance->rcvd_lock); - } else { - req = cdap_sent_get_by_iid(instance, msg->invoke_id); - if (req == NULL) { - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - continue; - } - - if (msg->has_value) { - data.len = msg->value.len; - data.data = malloc(data.len); - if (data.data == NULL) { - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - continue; - } - memcpy(data.data, msg->value.data, data.len); - } else { - data.len = 0; - data.data = NULL; - } - - cdap_req_respond(req, msg->result, data); - } - - cdap__free_unpacked(msg, NULL); - set_proc(instance, false); - } - - return (void *) 0; -} - -struct cdap * cdap_create() -{ - struct cdap * instance = NULL; - - instance = malloc(sizeof(*instance)); - if (instance == NULL) - goto fail_malloc; - - if (pthread_rwlock_init(&instance->flows_lock, NULL)) - goto fail_flows_lock; - - if (pthread_mutex_init(&instance->ids_lock, NULL)) - goto fail_ids_lock; - - if (pthread_mutex_init(&instance->rcvd_lock, NULL)) - goto fail_rcvd_lock; - - if (pthread_rwlock_init(&instance->sent_lock, NULL)) - goto fail_sent_lock; - - if (pthread_cond_init(&instance->rcvd_cond, NULL)) - goto fail_rcvd_cond; - - if (pthread_mutex_init(&instance->mtx, NULL)) - goto fail_mtx; - - if (pthread_cond_init(&instance->cond, NULL)) - goto fail_cond; - - instance->ids = bmp_create(IDS_SIZE, 0); - if (instance->ids == NULL) - goto fail_bmp_create; - - instance->set = fset_create(); - if (instance->set == NULL) - goto fail_set_create; - - instance->fq = fqueue_create(); - if (instance->fq == NULL) - goto fail_fqueue_create; - - instance->n_flows = 0; - instance->proc = false; - - list_head_init(&instance->flows); - list_head_init(&instance->sent); - list_head_init(&instance->rcvd); - - if (pthread_create(&instance->reader, NULL, sdu_reader, instance)) - goto fail_pthread_create; - - return instance; - - fail_pthread_create: - fqueue_destroy(instance->fq); - fail_fqueue_create: - fset_destroy(instance->set); - fail_set_create: - bmp_destroy(instance->ids); - fail_bmp_create: - pthread_cond_destroy(&instance->cond); - fail_cond: - pthread_mutex_destroy(&instance->mtx); - fail_mtx: - pthread_cond_destroy(&instance->rcvd_cond); - fail_rcvd_cond: - pthread_rwlock_destroy(&instance->sent_lock); - fail_sent_lock: - pthread_mutex_destroy(&instance->rcvd_lock); - fail_rcvd_lock: - pthread_mutex_destroy(&instance->ids_lock); - fail_ids_lock: - pthread_rwlock_destroy(&instance->flows_lock); - fail_flows_lock: - free(instance); - fail_malloc: - return NULL; -} - -int cdap_destroy(struct cdap * instance) -{ - struct list_head * p; - struct list_head * h; - - if (instance == NULL) - return 0; - - pthread_cancel(instance->reader); - pthread_join(instance->reader, NULL); - - fqueue_destroy(instance->fq); - - fset_destroy(instance->set); - - pthread_cond_destroy(&instance->cond); - pthread_mutex_destroy(&instance->mtx); - - pthread_rwlock_wrlock(&instance->flows_lock); - - list_for_each_safe(p,h, &instance->flows) { - struct fd_el * e = list_entry(p, struct fd_el, next); - list_del(&e->next); - free(e); - } - - pthread_rwlock_unlock(&instance->flows_lock); - - pthread_rwlock_destroy(&instance->flows_lock); - - pthread_mutex_lock(&instance->ids_lock); - - bmp_destroy(instance->ids); - - pthread_mutex_unlock(&instance->ids_lock); - - pthread_mutex_destroy(&instance->ids_lock); - - cdap_sent_destroy(instance); - - pthread_rwlock_destroy(&instance->sent_lock); - - cdap_rcvd_destroy(instance); - - pthread_mutex_destroy(&instance->rcvd_lock); - - free(instance); - - return 0; -} - -int cdap_add_flow(struct cdap * instance, - int fd) -{ - struct fd_el * e; - - if (fd < 0) - return -EINVAL; - - e = malloc(sizeof(*e)); - if (e == NULL) - return -ENOMEM; - - e->fd = fd; - - pthread_rwlock_wrlock(&instance->flows_lock); - - if (fset_add(instance->set, fd)) { - pthread_rwlock_unlock(&instance->flows_lock); - free(e); - return -1; - } - - list_add(&e->next, &instance->flows); - - ++instance->n_flows; - - pthread_rwlock_unlock(&instance->flows_lock); - - return 0; -} - -int cdap_del_flow(struct cdap * instance, - int fd) -{ - struct list_head * p; - struct list_head * h; - - if (fd < 0) - return -EINVAL; - - pthread_rwlock_wrlock(&instance->flows_lock); - - fset_del(instance->set, fd); - - list_for_each_safe(p, h, &instance->flows) { - struct fd_el * e = list_entry(p, struct fd_el, next); - if (e->fd == fd) { - list_del(&e->next); - free(e); - break; - } - } - - --instance->n_flows; - - pthread_rwlock_unlock(&instance->flows_lock); - - pthread_mutex_lock(&instance->mtx); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &instance->mtx); - - while (instance->proc) - pthread_cond_wait(&instance->cond, &instance->mtx); - - pthread_cleanup_pop(true); - - return 0; -} - -static int write_msg(int fd, - cdap_t * msg) -{ - uint8_t * data; - size_t len; - - assert(msg); - - len = cdap__get_packed_size(msg); - if (len == 0) - return -1; - - data = malloc(len); - if (data == NULL) - return -ENOMEM; - - cdap__pack(msg, data); - - if (flow_write(fd, data, len)) { - free(data); - return -1; - } - - free(data); - - return 0; -} - -cdap_key_t * cdap_request_send(struct cdap * instance, - enum cdap_opcode code, - const char * name, - const void * data, - size_t len, - uint32_t flags) -{ - cdap_key_t * keys; - cdap_key_t * key; - cdap_t msg = CDAP__INIT; - struct list_head * p; - int ret; - - if (instance == NULL || name == NULL || code > CDAP_DELETE) - return NULL; - - pthread_rwlock_rdlock(&instance->flows_lock); - - keys = malloc(sizeof(*keys) * (instance->n_flows + 1)); - if (keys == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); - return NULL; - } - - memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1)); - - key = keys; - - cdap__init(&msg); - - msg.opcode = code; - msg.name = (char *) name; - msg.has_flags = true; - msg.flags = flags; - - if (data != NULL) { - msg.has_value = true; - msg.value.data = (uint8_t *) data; - msg.value.len = len; - } - - list_for_each(p, &instance->flows) { - struct cdap_req * req; - invoke_id_t iid; - struct fd_el * e; - - iid = next_id(instance); - if (iid == INVALID_ID) { - pthread_rwlock_unlock(&instance->flows_lock); - return keys; - } - - msg.invoke_id = iid; - - e = list_entry(p, struct fd_el, next); - - *key = next_id(instance); - if (*key == INVALID_ID) { - release_id(instance, iid); - pthread_rwlock_unlock(&instance->flows_lock); - return keys; - } - - req = cdap_sent_add(instance, e->fd, iid, *key); - if (req == NULL) { - release_id(instance, *key); - release_id(instance, iid); - pthread_rwlock_unlock(&instance->flows_lock); - *key = INVALID_CDAP_KEY; - return keys; - } - - ret = write_msg(e->fd, &msg); - if (ret == -ENOMEM) { - cdap_sent_del(instance, req); - release_id(instance, *key); - release_id(instance, iid); - pthread_rwlock_unlock(&instance->flows_lock); - *key = INVALID_CDAP_KEY; - return keys; - } - - if (ret < 0) { - cdap_sent_del(instance, req); - release_id(instance, *key); - release_id(instance, iid); - pthread_rwlock_unlock(&instance->flows_lock); - *key = INVALID_CDAP_KEY; - return keys; - } - - ++key; - } - - pthread_rwlock_unlock(&instance->flows_lock); - - return keys; -} - -int cdap_reply_wait(struct cdap * instance, - cdap_key_t key, - uint8_t ** data, - size_t * len) -{ - int ret; - struct cdap_req * r; - invoke_id_t iid; - - if (instance == NULL || (data != NULL && len == NULL)) - return -EINVAL; - - r = cdap_sent_get_by_key(instance, key); - if (r == NULL) - return -EINVAL; - - iid = r->iid; - - ret = cdap_req_wait(r); - if (ret < 0) { - cdap_sent_del(instance, r); - release_id(instance, iid); - release_id(instance, key); - return ret; - } - - assert(ret == 0); - - if (data != NULL) { - *data = r->data.data; - *len = r->data.len; - } - - ret = r->response; - - cdap_sent_del(instance, r); - release_id(instance, iid); - release_id(instance, key); - - return ret; -} - -cdap_key_t cdap_request_wait(struct cdap * instance, - enum cdap_opcode * opcode, - char ** name, - uint8_t ** data, - size_t * len, - uint32_t * flags) -{ - struct cdap_rcvd * rcv = NULL; - - if (instance == NULL || opcode == NULL || name == NULL || data == NULL - || len == NULL || flags == NULL) - return -EINVAL; - - pthread_mutex_lock(&instance->rcvd_lock); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &instance->rcvd_lock); - - while (rcv == NULL) { - while (list_is_empty(&instance->rcvd)) - pthread_cond_wait(&instance->rcvd_cond, - &instance->rcvd_lock); - - rcv = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); - if (rcv->proc) { - rcv = NULL; - pthread_cond_wait(&instance->rcvd_cond, - &instance->rcvd_lock); - } - } - - assert(rcv->proc == false); - - rcv->proc = true; - list_del(&rcv->next); - list_add_tail(&rcv->next, &instance->rcvd); - - pthread_cleanup_pop(true); - - *opcode = rcv->opcode; - *name = rcv->name; - *data = rcv->data; - *len = rcv->len; - *flags = rcv->flags; - - rcv->name = NULL; - rcv->data = NULL; - - return rcv->key; -} - -int cdap_reply_send(struct cdap * instance, - cdap_key_t key, - int result, - const void * data, - size_t len) -{ - int fd; - cdap_t msg = CDAP__INIT; - struct cdap_rcvd * rcvd; - - if (instance == NULL) - return -EINVAL; - - rcvd = cdap_rcvd_get_by_key(instance, key); - if (rcvd == NULL) - return -1; - - msg.opcode = CDAP_REPLY; - msg.invoke_id = rcvd->iid; - msg.has_result = true; - msg.result = result; - - if (data != NULL) { - msg.has_value = true; - msg.value.data = (uint8_t *) data; - msg.value.len = len; - } - - fd = rcvd->fd; - - release_id(instance, rcvd->key); - - assert(rcvd->data == NULL); - assert(rcvd->name == NULL); - assert(rcvd->proc); - - free(rcvd); - - return write_msg(fd, &msg); -} diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto deleted file mode 100644 index 29effc9a..00000000 --- a/src/lib/cdap.proto +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016- 2017 - * - * CDAP message - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -syntax = "proto2"; - -message cdap { - required uint32 opcode = 1; - required uint32 invoke_id = 2; - optional uint32 flags = 3; - optional string name = 4; - optional bytes value = 5; - optional int32 result = 6; -} diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c deleted file mode 100644 index a9b85525..00000000 --- a/src/lib/cdap_req.c +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * CDAP - CDAP request management - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200809L - -#include "config.h" - -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> - -#include "cdap_req.h" - -#include <stdlib.h> -#include <assert.h> - -struct cdap_req * cdap_req_create(int fd, - invoke_id_t iid, - cdap_key_t key) -{ - struct cdap_req * creq = malloc(sizeof(*creq)); - pthread_condattr_t cattr; - - if (creq == NULL) - return NULL; - - creq->fd = fd; - creq->iid = iid; - creq->key = key; - creq->state = REQ_INIT; - creq->response = -1; - creq->data.data = NULL; - creq->data.len = 0; - - if (pthread_mutex_init(&creq->lock, NULL)) { - free(creq); - return NULL; - } - - pthread_condattr_init(&cattr); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(&creq->cond, &cattr)) { - pthread_condattr_destroy(&cattr); - pthread_mutex_destroy(&creq->lock); - free(creq); - return NULL; - } - - pthread_condattr_destroy(&cattr); - - list_head_init(&creq->next); - - clock_gettime(PTHREAD_COND_CLOCK, &creq->birth); - - return creq; -} - -void cdap_req_destroy(struct cdap_req * creq) -{ - assert(creq); - - pthread_mutex_lock(&creq->lock); - - switch(creq->state) { - case REQ_DESTROY: - pthread_mutex_unlock(&creq->lock); - return; - case REQ_INIT: - creq->state = REQ_NULL; - pthread_cond_broadcast(&creq->cond); - break; - case REQ_INIT_PENDING: - case REQ_PENDING: - case REQ_RESPONSE: - creq->state = REQ_DESTROY; - pthread_cond_broadcast(&creq->cond); - break; - default: - break; - } - - while (creq->state != REQ_NULL) - pthread_cond_wait(&creq->cond, &creq->lock); - - pthread_mutex_unlock(&creq->lock); - - pthread_cond_destroy(&creq->cond); - pthread_mutex_destroy(&creq->lock); - - free(creq); -} - -int cdap_req_wait(struct cdap_req * creq) -{ - struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), - (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; - struct timespec abstime; - int ret = -1; - - assert(creq); - - ts_add(&creq->birth, &timeout, &abstime); - - pthread_mutex_lock(&creq->lock); - - if (creq->state != REQ_INIT) { - pthread_mutex_unlock(&creq->lock); - return -EINVAL; - } - - creq->state = REQ_PENDING; - pthread_cond_broadcast(&creq->cond); - - while (creq->state == REQ_PENDING && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&creq->cond, - &creq->lock, - &abstime); - - switch(creq->state) { - case REQ_DESTROY: - ret = -1; - /* FALLTHRU */ - case REQ_PENDING: - creq->state = REQ_NULL; - pthread_cond_broadcast(&creq->cond); - break; - case REQ_RESPONSE: - creq->state = REQ_DONE; - pthread_cond_broadcast(&creq->cond); - break; - default: - assert(false); - break; - } - - pthread_mutex_unlock(&creq->lock); - - return ret; -} - -void cdap_req_respond(struct cdap_req * creq, - int response, - buffer_t data) -{ - assert(creq); - - pthread_mutex_lock(&creq->lock); - - if (creq->state == REQ_INIT) - creq->state = REQ_INIT_PENDING; - - while (creq->state == REQ_INIT_PENDING) - pthread_cond_wait(&creq->cond, &creq->lock); - - if (creq->state != REQ_PENDING) { - creq->state = REQ_NULL; - pthread_cond_broadcast(&creq->cond); - pthread_mutex_unlock(&creq->lock); - return; - } - - creq->state = REQ_RESPONSE; - creq->response = response; - creq->data = data; - - pthread_cond_broadcast(&creq->cond); - - while (creq->state == REQ_RESPONSE) - pthread_cond_wait(&creq->cond, &creq->lock); - - creq->state = REQ_NULL; - pthread_cond_broadcast(&creq->cond); - - pthread_mutex_unlock(&creq->lock); -} - - -void cdap_req_cancel(struct cdap_req * creq) -{ - assert(creq); - - pthread_mutex_lock(&creq->lock); - - creq->state = REQ_NULL; - pthread_cond_broadcast(&creq->cond); - - pthread_mutex_unlock(&creq->lock); -} diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h deleted file mode 100644 index 4c9cd15b..00000000 --- a/src/lib/cdap_req.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * CDAP - CDAP request management - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#ifndef OUROBOROS_CDAP_REQ_H -#define OUROBOROS_CDAP_REQ_H - -#include <ouroboros/cdap.h> -#include <ouroboros/list.h> -#include <ouroboros/utils.h> - -#include <pthread.h> - -typedef cdap_key_t invoke_id_t; - -enum creq_state { - REQ_NULL = 0, - REQ_INIT, - REQ_INIT_PENDING, - REQ_PENDING, - REQ_RESPONSE, - REQ_DONE, - REQ_DESTROY -}; - -struct cdap_req { - struct list_head next; - - int fd; - struct timespec birth; - cdap_key_t key; - invoke_id_t iid; - - int response; - buffer_t data; - - enum creq_state state; - pthread_cond_t cond; - pthread_mutex_t lock; -}; - -struct cdap_req * cdap_req_create(int fd, - cdap_key_t key, - invoke_id_t iid); - -void cdap_req_destroy(struct cdap_req * creq); - -int cdap_req_wait(struct cdap_req * creq); - -void cdap_req_respond(struct cdap_req * creq, - int response, - buffer_t data); - -void cdap_req_cancel(struct cdap_req * creq); - -#endif /* OUROBOROS_CDAP_REQ_H */ diff --git a/src/lib/hashtable.c b/src/lib/hashtable.c index 75cdee84..2aa248ba 100644 --- a/src/lib/hashtable.c +++ b/src/lib/hashtable.c @@ -38,7 +38,8 @@ struct htable { uint64_t buckets_size; }; -struct htable * htable_create(uint64_t buckets, bool hash_key) +struct htable * htable_create(uint64_t buckets, + bool hash_key) { struct htable * tmp; unsigned int i; diff --git a/src/lib/notifier.c b/src/lib/notifier.c new file mode 100644 index 00000000..cfd383d4 --- /dev/null +++ b/src/lib/notifier.c @@ -0,0 +1,128 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Notifier event system using callbacks + * + * Dimitri Staessens <dimitri.staessens@ugent.be> + * Sander Vrijders <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/errno.h> +#include <ouroboros/notifier.h> +#include <ouroboros/list.h> + +#include <pthread.h> +#include <stdlib.h> + +struct listener { + struct list_head next; + notifier_fn_t callback; +}; + +struct { + struct list_head listeners; + pthread_mutex_t lock; +} notifier; + +int notifier_init(void) +{ + if (pthread_mutex_init(¬ifier.lock, NULL)) + return -1; + + list_head_init(¬ifier.listeners); + + return 0; +} + +void notifier_fini(void) +{ + struct list_head * p; + struct list_head * h; + + pthread_mutex_lock(¬ifier.lock); + + list_for_each_safe(p, h, ¬ifier.listeners) { + struct listener * l = list_entry(p, struct listener, next); + list_del(&l->next); + free(l); + } + + pthread_mutex_unlock(¬ifier.lock); + + pthread_mutex_destroy(¬ifier.lock); +} + +void notifier_event(int event, + const void * o) +{ + struct list_head * p; + + pthread_mutex_lock(¬ifier.lock); + + list_for_each(p, ¬ifier.listeners) + list_entry(p, struct listener, next)->callback(event, o); + + pthread_mutex_unlock(¬ifier.lock); +} + +int notifier_reg(notifier_fn_t callback) +{ + struct listener * l; + struct list_head * p; + + pthread_mutex_lock(¬ifier.lock); + + list_for_each(p, ¬ifier.listeners) { + struct listener * l = list_entry(p, struct listener, next); + if (l->callback == callback) { + pthread_mutex_unlock(¬ifier.lock); + return -EPERM; + } + } + + l = malloc(sizeof(*l)); + if (l == NULL) { + pthread_mutex_unlock(¬ifier.lock); + return -ENOMEM; + } + + l->callback = callback; + + list_add(&l->next, ¬ifier.listeners); + + pthread_mutex_unlock(¬ifier.lock); + + return 0; +} + +void notifier_unreg(notifier_fn_t callback) +{ + struct list_head * p; + struct list_head * h; + + pthread_mutex_lock(¬ifier.lock); + + list_for_each_safe(p, h, ¬ifier.listeners) { + struct listener * l = list_entry(p, struct listener, next); + if (l->callback == callback) { + list_del(&l->next); + free(l); + break; + } + } + + pthread_mutex_unlock(¬ifier.lock); +} diff --git a/src/lib/rib.c b/src/lib/rib.c deleted file mode 100644 index 9e45a302..00000000 --- a/src/lib/rib.c +++ /dev/null @@ -1,1431 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Resource Information Base - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200809L - -#include "config.h" - -#include <ouroboros/errno.h> -#include <ouroboros/list.h> -#include <ouroboros/rib.h> -#include <ouroboros/rqueue.h> -#include <ouroboros/bitmap.h> -#include <ouroboros/crc32.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/sha3.h> -#include <ouroboros/btree.h> - -#include "ro.pb-c.h" -typedef RoMsg ro_msg_t; - -#include <pthread.h> -#include <string.h> -#include <assert.h> -#include <stdlib.h> -#include <stdio.h> - -#define RIB_PATH_DLR "/" -#define RIB_BTREE_ORDER 64 -#define GEN_NAME_SIZE 8 - -struct revent { - struct list_head next; - - char * path; - int32_t flags; -}; - -struct rqueue { - struct list_head events; -}; - -struct ro_set { - uint32_t sid; -}; - -struct rn_ptr { - struct list_head next; - - struct rnode * node; -}; - -struct rib_sub { - struct list_head next; - - uint32_t sid; - - struct list_head rnodes; - - struct list_head events; - - pthread_cond_t cond; - pthread_mutex_t lock; -}; - -struct rn_sub { - struct list_head next; - - struct rib_sub * sub; - int32_t flags; -}; - -struct rnode { - char * path; - char * name; - - uint8_t * data; - size_t len; - - uint8_t sha3[SHA3_256_HASH_LEN]; - - struct rnode * parent; - - size_t chlen; - struct list_head children; - - struct list_head subs; -}; - -struct child { - struct list_head next; - - struct rnode * node; -}; - -struct rib { - struct rnode * root; - - struct btree * idx; - - pthread_rwlock_t lock; - - struct bmp * sids; - - struct list_head subs; - - pthread_rwlock_t s_lock; -} rib; - -static void rnode_hash(struct rnode * node) -{ - struct sha3_ctx ctx; - struct list_head * p; - - assert(node); - assert(node->path); - assert(node->name); - - rhash_sha3_256_init(&ctx); - - rhash_sha3_update(&ctx, (uint8_t *) node->path, strlen(node->path)); - - if (node->data != NULL) - rhash_sha3_update(&ctx, node->data, node->len); - - list_for_each(p, &node->children) { - struct child * c = list_entry(p, struct child, next); - rhash_sha3_update(&ctx, c->node->sha3, SHA3_256_HASH_LEN); - } - - rhash_sha3_final(&ctx, node->sha3); -} - -static void branch_hash(struct rnode * node) -{ - assert(node); - - do { - rnode_hash(node); - node = node->parent; - } while (node != NULL); -} - -static struct revent * revent_dup(struct revent * ev) -{ - struct revent * re; - - assert(ev); - assert(ev->path); - - re = malloc(sizeof(*re)); - if (re == NULL) - return NULL; - - re->path = strdup(ev->path); - if (re->path == NULL) { - free(re); - return NULL; - } - - re->flags = ev->flags; - - list_head_init(&re->next); - - return re; -} - -/* defined below but needed here */ -static void rib_sub_del_rnode(struct rib_sub * sub, - struct rnode * node); - -static void rnode_notify_subs(struct rnode * node, - struct rnode * ch, - struct revent * ev) -{ - struct list_head * p; - - assert(node); - - list_for_each(p, &node->subs) { - struct rn_sub * s = list_entry(p, struct rn_sub, next); - if (s->flags & ev->flags) { - struct revent * e = revent_dup(ev); - if (e == NULL) - continue; - - pthread_mutex_lock(&s->sub->lock); - list_add_tail(&e->next, &s->sub->events); - pthread_cond_signal(&s->sub->cond); - pthread_mutex_unlock(&s->sub->lock); - } - - if (ev->flags & RO_DELETE) - rib_sub_del_rnode(s->sub, ch); - } -} - -static int rnode_throw_event(struct rnode * node, - int32_t flags) -{ - struct revent * ev = malloc(sizeof(*ev)); - struct rnode * rn = node; - - assert(node); - assert(node->path); - - if (ev == NULL) - return -ENOMEM; - - list_head_init(&ev->next); - - ev->path = strdup(node->path); - if (ev->path == NULL) { - free(ev); - return -ENOMEM; - } - - ev->flags = flags; - - do { - rnode_notify_subs(rn, node, ev); - rn = rn->parent; - } while (rn != NULL); - - free(ev->path); - free(ev); - - return 0; -} - -static int rnode_add_child(struct rnode * node, - struct rnode * child) -{ - struct child * c; - struct list_head * p; - struct child * n; - - assert(node); - assert(child); - - c = malloc(sizeof(*c)); - if (c == NULL) - return -ENOMEM; - - c->node = child; - - list_for_each(p, &node->children) { - n = list_entry(p, struct child, next); - if (strcmp(n->node->name, child->name) > 0) - break; - } - - list_add_tail(&c->next, p); - - ++node->chlen; - - return 0; -} - -static void rnode_remove_child(struct rnode * node, - struct rnode * child) -{ - struct list_head * p; - struct list_head * h; - - assert(node); - assert(child); - - list_for_each_safe(p, h, &node->children) { - struct child * c = list_entry(p, struct child, next); - if (c->node == child) { - list_del(&c->next); - free(c); - --node->chlen; - return; - } - } -} - -static struct rnode * rnode_create(struct rnode * parent, - const char * name) -{ - struct rnode * node; - char * parent_path; - - uint32_t crc = 0; - - assert(name); - - node = malloc(sizeof(*node)); - if (node == NULL) - return NULL; - - list_head_init(&node->children); - list_head_init(&node->subs); - - if (parent == NULL) - parent_path = ""; - else - parent_path = parent->path; - - node->path = malloc(strlen(parent_path) - + strlen(RIB_PATH_DLR) - + strlen(name) - + 1); - if (node->path == NULL) { - free(node); - return NULL; - } - - strcpy(node->path, parent_path); - node->name = node->path + strlen(parent_path); - if (parent != NULL) { - strcpy(node->name, RIB_PATH_DLR); - node->name += strlen(RIB_PATH_DLR); - } - - strcpy(node->name, name); - - if (parent != NULL) { - if (rnode_add_child(parent, node)) { - free(node->path); - free(node); - return NULL; - } - } - - node->data = NULL; - node->len = 0; - - node->parent = parent; - - node->chlen = 0; - - crc32(&crc, node->path, strlen(node->path)); - btree_insert(rib.idx, crc, node); - - branch_hash(node); - rnode_throw_event(node, RO_CREATE); - - return node; -} - -static void destroy_rnode(struct rnode * node) -{ - struct list_head * p; - struct list_head * h; - - uint32_t crc = 0; - - assert(node); - - if (node != rib.root) { - rnode_remove_child(node->parent, node); - branch_hash(node->parent); - } - - if (node->parent != NULL) - rnode_throw_event(node->parent, RO_DELETE); - - list_for_each_safe(p, h, &node->subs) { - struct rn_sub * s = list_entry(p, struct rn_sub, next); - list_del(&s->next); - free(s); - } - - crc32(&crc, node->path, strlen(node->path)); - btree_remove(rib.idx, crc); - - free(node->path); - if (node->data != NULL) - free(node->data); - - free(node); -} - -static void destroy_rtree(struct rnode * node) -{ - struct list_head * p; - struct list_head * h; - - assert(node); - - list_for_each_safe(p, h, &node->children) { - struct child * c = list_entry(p, struct child, next); - destroy_rtree(c->node); - } - - destroy_rnode(node); -} - -static void rnode_update(struct rnode * node, - uint8_t * data, - size_t len) -{ - assert(node); - assert(!(data == NULL && len != 0)); - assert(!(data != NULL && len == 0)); - - if (node->data != NULL) - free(node->data); - - node->data = data; - node->len = len; - - rnode_throw_event(node, RO_MODIFY); - - branch_hash(node); -} - -static struct rn_sub * rnode_get_sub(struct rnode * node, - struct rib_sub * sub) -{ - struct list_head * p; - - list_for_each(p, &node->subs) { - struct rn_sub * r = list_entry(p, struct rn_sub, next); - if (r->sub == sub) - return r; - } - - return NULL; -} - -static int rnode_add_sub(struct rnode * node, - struct rib_sub * sub, - int32_t flags) -{ - struct rn_sub * rs; - - assert(node); - assert(sub); - - rs = rnode_get_sub(node, sub); - if (rs != NULL) - return -EPERM; - - rs = malloc(sizeof(*rs)); - if (rs == NULL) - return -ENOMEM; - - rs->sub = sub; - rs->flags = flags; - - list_add(&rs->next, &node->subs); - - return 0; -} - -static int rnode_del_sub(struct rnode * node, - struct rib_sub * sub) -{ - struct rn_sub * rs; - - assert(node); - assert(sub); - - rs = rnode_get_sub(node, sub); - if (rs == NULL) - return 0; - - list_del(&rs->next); - free(rs); - - return 0; -} - -static struct rnode * find_rnode_by_path(const char * path) -{ - uint32_t crc = 0; - - if (strcmp(path, RIB_ROOT) == 0) - return rib.root; - - crc32(&crc, path, strlen(path)); - - return (struct rnode *) btree_search(rib.idx, crc); -} - -int rib_init(void) -{ - if (rib.root != NULL) - return -EPERM; - - rib.idx = btree_create(RIB_BTREE_ORDER); - if (rib.idx == NULL) { - destroy_rtree(rib.root); - rib.root = NULL; - return -1; - } - - rib.root = rnode_create(NULL, ""); - if (rib.root == NULL) - return -ENOMEM; - - rib.sids = bmp_create(32, 1); - if (rib.sids == NULL) { - btree_destroy(rib.idx); - destroy_rtree(rib.root); - rib.root = NULL; - return -1; - } - - if (pthread_rwlock_init(&rib.lock, NULL)) { - bmp_destroy(rib.sids); - btree_destroy(rib.idx); - destroy_rtree(rib.root); - rib.root = NULL; - return -1; - } - - if (pthread_rwlock_init(&rib.s_lock, NULL)) { - pthread_rwlock_destroy(&rib.lock); - bmp_destroy(rib.sids); - btree_destroy(rib.idx); - destroy_rtree(rib.root); - rib.root = NULL; - return -1; - } - - list_head_init(&rib.subs); - - assert(rib.root); - - return 0; -} - -void rib_fini(void) -{ - if (rib.root == NULL) - return; - - bmp_destroy(rib.sids); - - destroy_rtree(rib.root); - rib.root = NULL; - - btree_destroy(rib.idx); - - pthread_rwlock_destroy(&rib.lock); -} - -int rib_add(const char * path, - const char * name) -{ - struct rnode * parent; - struct rnode * node; - - if (name == NULL) - return -EINVAL; - - pthread_rwlock_wrlock(&rib.lock); - - parent = find_rnode_by_path(path); - if (parent == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EPERM; - } - - node = rnode_create(parent, name); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -ENOMEM; - } - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -int rib_del(char * path) -{ - struct rnode * node; - - if (path == NULL) - return -EINVAL; - - pthread_rwlock_wrlock(&rib.lock); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EINVAL; - } - - destroy_rtree(node); - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -ssize_t rib_read(const char * path, - void * data, - size_t len) -{ - struct rnode * node; - ssize_t rlen; - - if (path == NULL || data == NULL) - return -EINVAL; - - pthread_rwlock_rdlock(&rib.lock); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EPERM; - } - - if (len < node->len) { - pthread_rwlock_unlock(&rib.lock); - return -EFBIG; - } - - if (node->data == NULL) { - pthread_rwlock_unlock(&rib.lock); - return 0; - } - - assert(node->len > 0); - - memcpy(data, node->data, node->len); - rlen = node->len; - - rnode_throw_event(node, RO_READ); - - pthread_rwlock_unlock(&rib.lock); - - return rlen; -} - -int rib_write(const char * path, - const void * data, - size_t len) -{ - struct rnode * node; - - uint8_t * cdata; - - if (path == NULL || data == NULL || len == 0) - return -EINVAL; - - cdata = malloc(len); - if (cdata == NULL) - return -ENOMEM; - - memcpy(cdata, data, len); - - pthread_rwlock_rdlock(&rib.lock); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - free(cdata); - return -1; - } - - rnode_update(node, cdata, len); - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -int rib_put(const char * path, - void * data, - size_t len) -{ - struct rnode * node; - - if (path == NULL) - return -EINVAL; - - pthread_rwlock_rdlock(&rib.lock); - - node = find_rnode_by_path(path); - if (node != NULL) - rnode_update(node, (uint8_t *) data, len); - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -bool rib_has(const char * path) -{ - struct rnode * node; - - if (path == NULL) - return -EINVAL; - - pthread_rwlock_rdlock(&rib.lock); - - node = find_rnode_by_path(path); - - pthread_rwlock_unlock(&rib.lock); - - return node != NULL; -} - -ssize_t rib_children(const char * path, - char *** children) -{ - struct list_head * p; - - struct rnode * node; - - ssize_t i = 0; - - if (path == NULL) - return -EINVAL; - - pthread_rwlock_rdlock(&rib.lock); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EPERM; - } - - if (children == NULL) { - pthread_rwlock_unlock(&rib.lock); - assert((ssize_t) node->chlen >= 0); - return (ssize_t) node->chlen; - } - - if (node->chlen == 0) { - pthread_rwlock_unlock(&rib.lock); - *children = NULL; - return 0; - } - - *children = malloc(sizeof(**children) * node->chlen); - if (*children == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -ENOMEM; - } - - list_for_each(p, &node->children) { - struct child * c = list_entry(p, struct child, next); - (*children)[i] = strdup(c->node->name); - if ((*children)[i] == NULL) { - ssize_t j; - pthread_rwlock_unlock(&rib.lock); - for (j = 0; j < i; ++j) - free((*children)[j]); - free(*children); - return -ENOMEM; - } - ++i; - } - - assert(i > 0); - assert((size_t) i == node->chlen); - - pthread_rwlock_unlock(&rib.lock); - - return i; -} - -static struct rib_sub * rib_get_sub(uint32_t sid) -{ - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &rib.subs) { - struct rib_sub * r = list_entry(p, struct rib_sub, next); - if (r->sid == sid) - return r; - } - - return NULL; -} - -static struct rib_sub * rib_sub_create(uint32_t sid) -{ - pthread_condattr_t cattr; - struct rib_sub * sub = malloc(sizeof(*sub)); - if (sub == NULL) - return NULL; - - if (pthread_condattr_init(&cattr)) { - free(sub); - return NULL; - } -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(&sub->cond, &cattr)) { - free(sub); - return NULL; - } - - if (pthread_mutex_init(&sub->lock, NULL)) { - pthread_cond_destroy(&sub->cond); - free(sub); - return NULL; - } - - list_head_init(&sub->rnodes); - list_head_init(&sub->events); - - sub->sid = sid; - - return sub; -} - -static void rib_sub_zero(struct rib_sub * sub) -{ - struct list_head * p; - struct list_head * h; - - assert(sub); - - list_for_each_safe(p, h, &sub->rnodes) { - struct rn_ptr * r = list_entry(p, struct rn_ptr, next); - assert(r->node); - rnode_del_sub(r->node, sub); - list_del(&r->next); - free(r); - } - - list_for_each_safe(p, h, &sub->events) { - struct revent * r = list_entry(p, struct revent, next); - list_del(&r->next); - assert(r->path); - free(r->path); - free(r); - } -} - -static struct rn_ptr * rib_sub_get_rn_ptr(struct rib_sub * sub, - struct rnode * node) -{ - struct list_head * p; - - list_for_each(p, &sub->rnodes) { - struct rn_ptr * r = list_entry(p, struct rn_ptr, next); - assert(r->node); - if (r->node == node) - return r; - } - - return NULL; -} - -static int rib_sub_add_rnode(struct rib_sub * sub, - struct rnode * node) -{ - struct rn_ptr * rn; - - assert(sub); - assert(node); - - if (rib_sub_get_rn_ptr(sub, node) != NULL) - return 0; - - rn = malloc(sizeof(*rn)); - if (rn == NULL) - return -ENOMEM; - - rn->node = node; - - list_add(&rn->next, &sub->rnodes); - - return 0; -} - -static void rib_sub_del_rnode(struct rib_sub * sub, - struct rnode * node) -{ - struct rn_ptr * rn; - - assert(sub); - assert(node); - - rn = rib_sub_get_rn_ptr(sub, node); - if (rn == NULL) - return; - - list_del(&rn->next); - - free(rn); -} - -static void rib_sub_destroy(struct rib_sub * sub) -{ - assert(sub); - - rib_sub_zero(sub); - - free(sub); -} - -/* Event calls from rqueue.h. */ -ro_set_t * ro_set_create(void) -{ - ro_set_t * set; - struct rib_sub * sub; - - set = malloc(sizeof(*set)); - if (set == NULL) - return NULL; - - pthread_rwlock_wrlock(&rib.s_lock); - - set->sid = bmp_allocate(rib.sids); - if (!bmp_is_id_valid(rib.sids, set->sid)) { - pthread_rwlock_unlock(&rib.s_lock); - free(set); - return NULL; - } - - pthread_rwlock_unlock(&rib.s_lock); - - pthread_rwlock_wrlock(&rib.lock); - - sub = rib_sub_create(set->sid); - if (sub == NULL) { - pthread_rwlock_unlock(&rib.lock); - free(set); - return NULL; - } - - list_add(&sub->next, &rib.subs); - - pthread_rwlock_unlock(&rib.lock); - - return set; -} - -void ro_set_destroy(ro_set_t * set) -{ - struct rib_sub * sub = NULL; - - struct list_head * p; - struct list_head * h; - - pthread_rwlock_wrlock(&rib.lock); - - list_for_each_safe(p, h, &rib.subs) { - struct rib_sub * r = list_entry(p, struct rib_sub, next); - if (r->sid == set->sid) { - sub = r; - break; - } - } - - if (sub != NULL) - rib_sub_destroy(sub); - - pthread_rwlock_unlock(&rib.lock); - - pthread_rwlock_wrlock(&rib.s_lock); - - bmp_release(rib.sids, set->sid); - - pthread_rwlock_unlock(&rib.s_lock); - - free(set); -} - -rqueue_t * rqueue_create(void) -{ - rqueue_t * rq = malloc(sizeof(*rq)); - if (rq == NULL) - return NULL; - - list_head_init(&rq->events); - - return rq; -} - -int rqueue_destroy(struct rqueue * rq) -{ - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &rq->events) { - struct revent * e = list_entry(p, struct revent, next); - list_del(&e->next); - free(e->path); - free(e); - } - - free(rq); - - return 0; -} - -int ro_set_zero(ro_set_t * set) -{ - struct rib_sub * sub; - - if (set == NULL) - return -EINVAL; - - pthread_rwlock_wrlock(&rib.lock); - - sub = rib_get_sub(set->sid); - - assert(sub); - - rib_sub_zero(sub); - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -int ro_set_add(ro_set_t * set, - const char * path, - int32_t flags) -{ - struct rib_sub * sub; - struct rnode * node; - - if (set == NULL) - return -EINVAL; - - pthread_rwlock_wrlock(&rib.lock); - - sub = rib_get_sub(set->sid); - - assert(sub); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -1; - } - - if (rnode_add_sub(node, sub, flags)) { - pthread_rwlock_unlock(&rib.lock); - return -ENOMEM; - } - - if (rib_sub_add_rnode(sub, node)) { - pthread_rwlock_unlock(&rib.lock); - return -ENOMEM; - } - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -int ro_set_del(ro_set_t * set, - const char * path) -{ - struct rib_sub * sub; - struct rnode * node; - - if (set == NULL) - return -EINVAL; - - pthread_rwlock_wrlock(&rib.lock); - - sub = rib_get_sub(set->sid); - - assert(sub); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -1; - } - - rnode_del_sub(node, sub); - - rib_sub_del_rnode(sub, node); - - pthread_rwlock_unlock(&rib.lock); - - return 0; -} - -int32_t rqueue_next(rqueue_t * rq, - char * path) -{ - struct revent * ev; - int32_t ret; - - if (list_is_empty(&rq->events)) - return -1; - - ev = list_first_entry(&rq->events, struct revent, next); - list_del(&ev->next); - - strcpy(path, ev->path); - ret = ev->flags; - - free(ev->path); - free(ev); - - return ret; -} - -int rib_event_wait(ro_set_t * set, - rqueue_t * rq, - const struct timespec * timeout) -{ - struct rib_sub * sub; - struct timespec abstime; - struct revent * ev; - - ssize_t ret = 0; - - if (set == NULL || rq == NULL) - return -EINVAL; - - if (!list_is_empty(&rq->events)) - return 0; - - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_rwlock_rdlock(&rib.lock); - - sub = rib_get_sub(set->sid); - - assert(sub); - - pthread_rwlock_unlock(&rib.lock); - - pthread_mutex_lock(&sub->lock); - - pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, - (void *) &sub->lock); - - while (list_is_empty(&sub->events) && ret != -ETIMEDOUT) { - if (timeout != NULL) - ret = -pthread_cond_timedwait(&sub->cond, - &sub->lock, - &abstime); - else - ret = -pthread_cond_wait(&sub->cond, &sub->lock); - } - - pthread_cleanup_pop(true); - - pthread_rwlock_wrlock(&rib.lock); - - if (ret != -ETIMEDOUT) { - ev = list_first_entry(&sub->events, struct revent, next); - list_move(&ev->next, &rq->events); - } - - pthread_rwlock_unlock(&rib.lock); - - return ret; -} - -/* Path name management. */ -char * rib_path_append(char * path, - const char * name) -{ - char * pos; - - if (path == NULL || name == NULL || strstr(name, RIB_PATH_DLR)) - return NULL; - - pos = path + strlen(path); - memcpy(pos++, RIB_PATH_DLR, 1); - strcpy(pos, name); - - return path; -} - -char * rib_name_gen(void * data, - size_t len) -{ - uint32_t crc = 0; - char * name; - - if (data == NULL || len == 0) - return NULL; - - name= malloc(GEN_NAME_SIZE + 1); - if (name == NULL) - return NULL; - - crc32(&crc, data, len); - - sprintf(name, "%08x", crc); - - return name; -} - -static ro_msg_t * rnode_pack(struct rnode * node, - uint32_t flags, - bool root) -{ - ro_msg_t * msg; - - assert(node); - - if (node->parent == NULL) - return NULL; - - msg = malloc(sizeof(*msg)); - if (msg == NULL) - return NULL; - - ro_msg__init(msg); - - msg->name = node->name; - if (root) { - assert(node->parent->path); - msg->parent = node->parent->path; - } - - if ((root && (flags & PACK_HASH_ROOT)) || - (flags & PACK_HASH_ALL)) { - msg->has_hash = true; - msg->hash.data = node->sha3; - msg->hash.len = SHA3_256_HASH_LEN; - } - - if (node->data != NULL) { - msg->has_data = true; - msg->data.data = node->data; - msg->data.len = node->len; - } - - if (node->chlen > 0) { - int n = 0; - struct list_head * p; - ro_msg_t ** msgs = malloc(sizeof(*msgs) * node->chlen); - if (msgs == NULL) { - free(msg); - return NULL; - } - - msg->n_children = node->chlen; - - list_for_each(p, &node->children) { - struct child * c = list_entry(p, struct child, next); - msgs[n] = rnode_pack(c->node, flags, false); - if (msgs[n] == NULL) { - int i; - for (i = 0; i < n; ++i) - free(msgs[i]); - free(msgs); - free(msg); - return NULL; - } - ++n; - } - msg->children = msgs; - } - - return msg; -} - -static void free_ro_msg(ro_msg_t * msg) -{ - size_t n = 0; - while (n < msg->n_children) - free_ro_msg(msg->children[n++]); - - free(msg->children); - free(msg); -} - -ssize_t rib_pack(const char * path, - uint8_t ** buf, - uint32_t flags) -{ - struct rnode * node; - ro_msg_t * msg; - ssize_t len; - - if (path == NULL) - return -EINVAL; - - pthread_rwlock_rdlock(&rib.lock); - - node = find_rnode_by_path(path); - if (node == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EPERM; - } - - msg = rnode_pack(node, flags, true); - if (msg == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EPERM; - } - - len = ro_msg__get_packed_size(msg); - if (len == 0) { - pthread_rwlock_unlock(&rib.lock); - free_ro_msg(msg); - return 0; - } - - *buf = malloc(len); - if (*buf == NULL) { - pthread_rwlock_unlock(&rib.lock); - free_ro_msg(msg); - return -ENOMEM; - } - - ro_msg__pack(msg, *buf); - - pthread_rwlock_unlock(&rib.lock); - - free_ro_msg(msg); - - return len; -} - -static struct rnode * rnode_get_child(struct rnode * node, - const char * name) -{ - struct list_head * p; - - list_for_each(p, &node->children) { - struct child * c = list_entry(p, struct child, next); - if (strcmp(c->node->name, name) == 0) - return c->node; - } - - return NULL; -} - -static int rnode_unpack(ro_msg_t * msg, - struct rnode * parent, - uint32_t flags) -{ - struct rnode * node; - - size_t i; - - assert(msg); - assert(parent); - - node = rnode_get_child(parent, msg->name); - if (node == NULL) { - if (flags & UNPACK_CREATE) - node = rnode_create(parent, msg->name); - else - return -EPERM; - } - - if (node == NULL) - return -ENOMEM; - - /* Unpack in reverse order for faster insertion */ - i = msg->n_children; - while (i > 0) - rnode_unpack(msg->children[--i], node, flags); - - if (msg->has_data) { - uint8_t * data = malloc(msg->data.len); - if (data == NULL) - return -ENOMEM; - - memcpy(data, msg->data.data, msg->data.len); - rnode_update(node, data, msg->data.len); - } - - return 0; -} - -int rib_unpack(uint8_t * packed, - size_t len, - uint32_t flags) -{ - ro_msg_t * msg; - struct rnode * root; - int ret; - - if (packed == NULL) - return -EINVAL; - - msg = ro_msg__unpack(NULL, len, packed); - if (msg == NULL) - return -EPERM; - - assert(msg->parent); - - pthread_rwlock_wrlock(&rib.lock); - - root = find_rnode_by_path(msg->parent); - if (root == NULL) { - pthread_rwlock_unlock(&rib.lock); - return -EPERM; - } - - ret = rnode_unpack(msg, root, flags); - - if (ret == 0 && msg->has_hash) { - root = rnode_get_child(root, msg->name); - if (memcmp(msg->hash.data, root->sha3, SHA3_256_HASH_LEN)) { - ro_msg__free_unpacked(msg, NULL); - pthread_rwlock_unlock(&rib.lock); - return -EFAULT; - } - } - - pthread_rwlock_unlock(&rib.lock); - - ro_msg__free_unpacked(msg, NULL); - - free(packed); - - return ret; -} diff --git a/src/lib/ro.proto b/src/lib/ro.proto deleted file mode 100644 index 8c547f14..00000000 --- a/src/lib/ro.proto +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * RIB object message - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -syntax = "proto2"; - -message ro_msg { - required string name = 1; - optional string parent = 2; - optional bytes data = 3; - optional bytes hash = 4; - repeated ro_msg children = 5; -}
\ No newline at end of file diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 0223262a..a93bf321 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -14,7 +14,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c crc32_test.c hashtable_test.c md5_test.c - rib_test.c sha3_test.c time_utils_test.c ${TIMERWHEEL_TEST} diff --git a/src/lib/tests/rib_test.c b/src/lib/tests/rib_test.c deleted file mode 100644 index 6a2446b9..00000000 --- a/src/lib/tests/rib_test.c +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Test of the RIB - * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 199309L - -#include <ouroboros/time_utils.h> -#include <ouroboros/rib.h> -#include <ouroboros/rqueue.h> -#include <ouroboros/errno.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#define RIB_MAX_PATH_LEN 256 - -int rib_test(int argc, - char ** argv) -{ - uint64_t * address; - - size_t addr_size = 8; - size_t addr_chk; - - char * addr_name; - - ro_set_t * set; - rqueue_t * rq; - - int ret; - - char tmp[RIB_MAX_PATH_LEN]; - - char ** kids; - ssize_t ch; - - uint8_t * buf; - ssize_t buf_len; - - struct timespec t = {0, 100 * MILLION}; - - (void) argc; - (void) argv; - - address = malloc(sizeof(*address)); - if (address == NULL) - return -ENOMEM; - - if (rib_init()) { - printf("Failed to initialize rib.\n"); - return -1; - } - - rib_fini(); - - if (rib_init()) { - printf("Failed to re-initialize rib.\n"); - return -1; - } - - if (rib_add(RIB_ROOT, "static_info")) { - printf("Failed to add element to rib.\n"); - rib_fini(); - return -1; - } - - ch = rib_children("/static_info", &kids); - if (ch != 0) { - printf("Wrong number of children returned.\n"); - rib_fini(); - while (ch > 0) - free(kids[--ch]); - free(kids); - return -1; - } - - if (!rib_has("/static_info")) { - printf("Failed to find added element.\n"); - rib_fini(); - return -1; - } - - if (rib_add(RIB_ROOT, "dynamic_info")) { - printf("Failed to add element to rib.\n"); - rib_fini(); - return -1; - } - - if (rib_add("/static_info", "addr_size")) { - printf("Failed to add sub-element to rib.\n"); - rib_fini(); - return -1; - } - - if (rib_write("/static_info/addr_size", - &addr_size, sizeof(addr_size))) { - printf("Failed to add sub-element to rib.\n"); - rib_fini(); - return -1; - } - - if (rib_add("/static_info", "addresses")) { - printf("Failed to add sub-element to rib.\n"); - rib_fini(); - return -1; - } - - if (!rib_has("/static_info/addr_size")) { - printf("Failed to find added subelement.\n"); - rib_fini(); - return -1; - } - - if (rib_read("/static_info/addr_size", - &addr_chk, sizeof(addr_chk)) - != sizeof(addr_chk)) { - printf("Failed to read added element.\n"); - rib_fini(); - return -1; - } - - ch = rib_children("/static_info", &kids); - if (ch != 2) { - printf("Wrong number of children returned.\n"); - rib_fini(); - return -1; - } - - while (ch > 0) - free(kids[--ch]); - free(kids); - - if (addr_chk != addr_size) { - printf("Failed to verify added element contents.\n"); - rib_fini(); - return -1; - } - - addr_size = 16; - - if (rib_write("/static_info/addr_size", - &addr_size, sizeof(addr_size))) { - printf("Failed to write into added element.\n"); - rib_fini(); - return -1; - } - - if (rib_read("/static_info/addr_size", - &addr_chk, sizeof(addr_chk)) - != sizeof(addr_chk)) { - printf("Failed to verify added element update size.\n"); - rib_fini(); - return -1; - } - - if (addr_chk != addr_size) { - printf("Failed to verify added element update size.\n"); - rib_fini(); - return -1; - } - - addr_name = rib_name_gen(address, sizeof(*address)); - if (addr_name == NULL) { - printf("Failed to create a name.\n"); - rib_fini(); - return -1; - } - - strcpy(tmp, "/dynamic_info"); - - if (rib_add(tmp, addr_name)) { - free(addr_name); - printf("Failed to add address.\n"); - rib_fini(); - return -1; - } - - rib_path_append(tmp, addr_name); - - if (rib_put(tmp, address, sizeof(*address))) { - free(addr_name); - printf("Failed to add address.\n"); - rib_fini(); - return -1; - } - - free(addr_name); - - buf_len = rib_pack("/static_info", &buf, PACK_HASH_ALL); - if (buf_len < 0) { - printf("Failed pack.\n"); - rib_fini(); - return -1; - } - - if (rib_del("/static_info")) { - printf("Failed to delete.\n"); - rib_fini(); - return -1; - } - - if (rib_unpack(buf, buf_len, UNPACK_CREATE)) { - printf("Failed to unpack.\n"); - rib_fini(); - return -1; - } - - if (!rib_has("/static_info")) { - printf("Failed to find unpacked element.\n"); - rib_fini(); - return -1; - } - - ch = rib_children("/static_info", &kids); - if (ch != 2) { - printf("Wrong number of children returned.\n"); - rib_fini(); - return -1; - } - - while (ch > 0) - free(kids[--ch]); - free(kids); - - set = ro_set_create(); - if (set == NULL) { - printf("Failed to create ro_set.\n"); - rib_fini(); - return -1; - } - - rq = rqueue_create(); - if (rq == NULL) { - printf("Failed to create rqueue.\n"); - ro_set_destroy(set); - rib_fini(); - return -1; - } - - if (ro_set_add(set, "/static_info", RO_ALL_OPS)) { - printf("Failed to add to rqueue.\n"); - ro_set_destroy(set); - rqueue_destroy(rq); - rib_fini(); - return -1; - } - - ret = rib_event_wait(set, rq, &t); - if (ret != -ETIMEDOUT) { - printf("Wait failed to timeout: %d.\n", ret); - ro_set_destroy(set); - rqueue_destroy(rq); - rib_fini(); - return -1; - } - - if (rib_del("/static_info")) { - printf("Failed to delete rib subtree.\n"); - rib_fini(); - return -1; - } - - ro_set_destroy(set); - - rqueue_destroy(rq); - - rib_fini(); - - return 0; -} |