diff options
author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-05-24 09:54:12 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-05-24 09:54:12 +0000 |
commit | be49633c7069c2a2273add8c1dd0425a8f25dd1e (patch) | |
tree | 4647e7be5bf83ca853b32b35d926cbc0450ddb48 | |
parent | 6cbe31eb5a14eaa993200fe7fe5710aab4b999f3 (diff) | |
parent | 3d887b172c37c4418c6173048e6a317eb0c36e57 (diff) | |
download | ouroboros-be49633c7069c2a2273add8c1dd0425a8f25dd1e.tar.gz ouroboros-be49633c7069c2a2273add8c1dd0425a8f25dd1e.zip |
Merged in dstaesse/ouroboros/be-normal-dt-reg (pull request #510)
ipcpd: Allow registering protocol machines with DT
-rw-r--r-- | src/ipcpd/normal/dt.c | 91 | ||||
-rw-r--r-- | src/ipcpd/normal/dt.h | 5 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.c | 197 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.h | 2 |
4 files changed, 176 insertions, 119 deletions
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index b22fb59c..1867c13b 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -23,13 +23,15 @@ #define OUROBOROS_PREFIX "dt-ae" #include <ouroboros/config.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> #include <ouroboros/logs.h> #include <ouroboros/rib.h> #include <ouroboros/dev.h> -#include "dt.h" #include "connmgr.h" #include "ipcp.h" +#include "dt.h" #include "dt_pci.h" #include "pff.h" #include "neighbors.h" @@ -47,12 +49,21 @@ #include <inttypes.h> #include <assert.h> +struct ae_info { + int (*post_sdu)(void * ae, struct shm_du_buff * sdb); + void * ae; +}; + struct { struct sdu_sched * sdu_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; + struct bmp * res_fds; + struct ae_info aes[AP_RES_FDS]; + pthread_rwlock_t lock; + struct gam * gam; struct nbs * nbs; struct ae * ae; @@ -120,18 +131,12 @@ static int sdu_handler(int fd, return 0; } - switch (dt_pci.fd) { - case FD_FA: - if (fa_post_sdu(sdb)) { - ipcp_sdb_release(sdb); - return -1; - } - return 0; - default: - log_err("Unknown PDU type received."); + if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) { ipcp_sdb_release(sdb); return -1; } + + return 0; } /* silence compiler */ @@ -167,24 +172,24 @@ int dt_init(void) dt.ae = connmgr_ae_create(info); if (dt.ae == NULL) { log_err("Failed to create AE struct."); - return -1; + goto fail_connmgr; } dt.nbs = nbs_create(); if (dt.nbs == NULL) { log_err("Failed to create neighbors struct."); - goto fail_connmgr; + goto fail_nbs; } dt.nb_notifier.notify_call = dt_neighbor_event; if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) { log_err("Failed to register notifier."); - goto fail_nbs; + goto fail_nbs_notifier; } if (routing_init(pr, dt.nbs)) { log_err("Failed to init routing."); - goto fail_nbs_notifier; + goto fail_routing; } for (i = 0; i < QOS_CUBE_MAX; ++i) { @@ -192,7 +197,7 @@ int dt_init(void) if (dt.pff[i] == NULL) { for (j = 0; j < i; ++j) pff_destroy(dt.pff[j]); - goto fail_routing; + goto fail_pff; } } @@ -201,22 +206,39 @@ int dt_init(void) if (dt.routing[i] == NULL) { for (j = 0; j < i; ++j) routing_i_destroy(dt.routing[j]); - goto fail_pff; + goto fail_routing_i; } } + if (pthread_rwlock_init(&dt.lock, NULL)) { + log_err("Failed to init rwlock."); + goto fail_rwlock_init; + } + + dt.res_fds = bmp_create(AP_RES_FDS, 0); + if (dt.res_fds == NULL) + goto fail_res_fds; + return 0; - fail_pff: + + fail_res_fds: + pthread_rwlock_destroy(&dt.lock); + fail_rwlock_init: + for (j = 0; j < QOS_CUBE_MAX; ++j) + routing_i_destroy(dt.routing[j]); + fail_routing_i: for (i = 0; i < QOS_CUBE_MAX; ++i) pff_destroy(dt.pff[i]); - fail_routing: + fail_pff: routing_fini(); - fail_nbs_notifier: + fail_routing: nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); - fail_nbs: + fail_nbs_notifier: nbs_destroy(dt.nbs); - fail_connmgr: + fail_nbs: connmgr_ae_destroy(dt.ae); + fail_connmgr: + dt_pci_fini(); return -1; } @@ -272,6 +294,33 @@ void dt_stop(void) sdu_sched_destroy(dt.sdu_sched); } +int dt_reg_ae(void * ae, + int (* func)(void * func, struct shm_du_buff *)) +{ + int res_fd; + + assert(func); + + pthread_rwlock_wrlock(&dt.lock); + + res_fd = bmp_allocate(dt.res_fds); + if (!bmp_is_id_valid(dt.res_fds, res_fd)) { + log_warn("Reserved fds depleted."); + pthread_rwlock_unlock(&dt.lock); + return -EBADF; + } + + assert(dt.aes[res_fd].post_sdu == NULL); + assert(dt.aes[res_fd].ae == NULL); + + dt.aes[res_fd].post_sdu = func; + dt.aes[res_fd].ae = ae; + + pthread_rwlock_unlock(&dt.lock); + + return res_fd; +} + int dt_write_sdu(uint64_t dst_addr, qoscube_t qc, int np1_fd, diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 52760154..0e1a8cc3 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -37,9 +37,12 @@ int dt_start(void); void dt_stop(void); +int dt_reg_ae(void * ae, + int (* func)(void * ae, struct shm_du_buff * sdb)); + int dt_write_sdu(uint64_t dst_addr, qoscube_t qc, - int np1_fd, + int res_fd, struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index d6c36a17..d7073617 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -50,6 +50,7 @@ struct { pthread_rwlock_t flows_lock; int r_fd[AP_MAX_FLOWS]; uint64_t r_addr[AP_MAX_FLOWS]; + int fd; struct sdu_sched * sdu_sched; } fa; @@ -78,6 +79,104 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } +static int fa_post_sdu(void * ae, + struct shm_du_buff * sdb) +{ + struct timespec ts = {0, TIMEOUT * 1000}; + int fd; + flow_alloc_msg_t * msg; + + (void) ae; + + assert(ae == &fa); + assert(sdb); + + /* Depending on the message call the function in ipcp-dev.h */ + + msg = flow_alloc_msg__unpack(NULL, + shm_du_buff_tail(sdb) - + shm_du_buff_head(sdb), + shm_du_buff_head(sdb)); + if (msg == NULL) { + log_err("Failed to unpack flow alloc message."); + return -1; + } + + switch (msg->code) { + case FLOW_ALLOC_CODE__FLOW_REQ: + pthread_mutex_lock(&ipcpi.alloc_lock); + + if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { + log_err("Bad flow request."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + while (ipcpi.alloc_id != -1 && + ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_dbg("Won't allocate over non-operational IPCP."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + assert(ipcpi.alloc_id == -1); + + fd = ipcp_flow_req_arr(getpid(), + msg->hash.data, + ipcp_dir_hash_len(), + msg->qc); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + log_err("Failed to get fd for flow."); + return -1; + } + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_fd[fd] = msg->s_fd; + fa.r_addr[fd] = msg->s_addr; + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + break; + case FLOW_ALLOC_CODE__FLOW_REPLY: + pthread_rwlock_wrlock(&fa.flows_lock); + + ipcp_flow_alloc_reply(msg->r_fd, msg->response); + + if (msg->response < 0) + destroy_conn(msg->r_fd); + else + sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); + + pthread_rwlock_unlock(&fa.flows_lock); + + break; + default: + log_err("Got an unknown flow allocation message."); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + flow_alloc_msg__free_unpacked(msg, NULL); + ipcp_sdb_release(sdb); + + return 0; +} + int fa_init(void) { int i; @@ -88,6 +187,8 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; + fa.fd = dt_reg_ae(&fa, &fa_post_sdu); + return 0; } @@ -191,7 +292,7 @@ int fa_alloc(int fd, if (sdb == NULL) return -1; - if (dt_write_sdu(addr, qc, FD_FA, sdb)) { + if (dt_write_sdu(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -287,97 +388,3 @@ int fa_dealloc(int fd) return 0; } - -int fa_post_sdu(struct shm_du_buff * sdb) -{ - struct timespec ts = {0, TIMEOUT * 1000}; - int fd; - flow_alloc_msg_t * msg; - - assert(sdb); - - /* Depending on the message call the function in ipcp-dev.h */ - - msg = flow_alloc_msg__unpack(NULL, - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb), - shm_du_buff_head(sdb)); - if (msg == NULL) { - log_err("Failed to unpack flow alloc message"); - return -1; - } - - switch (msg->code) { - case FLOW_ALLOC_CODE__FLOW_REQ: - pthread_mutex_lock(&ipcpi.alloc_lock); - - if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { - log_err("Bad flow request."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &ts); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - assert(ipcpi.alloc_id == -1); - - fd = ipcp_flow_req_arr(getpid(), - msg->hash.data, - ipcp_dir_hash_len(), - msg->qc); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - log_err("Failed to get fd for flow."); - return -1; - } - - pthread_rwlock_wrlock(&fa.flows_lock); - - fa.r_fd[fd] = msg->s_fd; - fa.r_addr[fd] = msg->s_addr; - - pthread_rwlock_unlock(&fa.flows_lock); - - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - break; - case FLOW_ALLOC_CODE__FLOW_REPLY: - pthread_rwlock_wrlock(&fa.flows_lock); - - ipcp_flow_alloc_reply(msg->r_fd, msg->response); - - if (msg->response < 0) - destroy_conn(msg->r_fd); - else - sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); - - pthread_rwlock_unlock(&fa.flows_lock); - - break; - default: - log_err("Got an unknown flow allocation message."); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); - - return 0; -} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 264d45ea..a77dc723 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -43,6 +43,4 @@ int fa_alloc_resp(int fd, int fa_dealloc(int fd); -int fa_post_sdu(struct shm_du_buff * sdb); - #endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ |