From 3d887b172c37c4418c6173048e6a317eb0c36e57 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Tue, 23 May 2017 13:38:03 +0200 Subject: ipcpd: Allow registering protocol machines with DT Other protocol machines now have to register on top of the DT AE. This allows multiple instances of the same protocol machine and avoids preallocating fds for each protocol machine instance. --- src/ipcpd/normal/dt.c | 91 +++++++++++++++++------ src/ipcpd/normal/dt.h | 5 +- src/ipcpd/normal/fa.c | 197 ++++++++++++++++++++++++++------------------------ src/ipcpd/normal/fa.h | 2 - 4 files changed, 176 insertions(+), 119 deletions(-) diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index b22fb59c..1867c13b 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -23,13 +23,15 @@ #define OUROBOROS_PREFIX "dt-ae" #include +#include +#include #include #include #include -#include "dt.h" #include "connmgr.h" #include "ipcp.h" +#include "dt.h" #include "dt_pci.h" #include "pff.h" #include "neighbors.h" @@ -47,12 +49,21 @@ #include #include +struct ae_info { + int (*post_sdu)(void * ae, struct shm_du_buff * sdb); + void * ae; +}; + struct { struct sdu_sched * sdu_sched; struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; + struct bmp * res_fds; + struct ae_info aes[AP_RES_FDS]; + pthread_rwlock_t lock; + struct gam * gam; struct nbs * nbs; struct ae * ae; @@ -120,18 +131,12 @@ static int sdu_handler(int fd, return 0; } - switch (dt_pci.fd) { - case FD_FA: - if (fa_post_sdu(sdb)) { - ipcp_sdb_release(sdb); - return -1; - } - return 0; - default: - log_err("Unknown PDU type received."); + if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) { ipcp_sdb_release(sdb); return -1; } + + return 0; } /* silence compiler */ @@ -167,24 +172,24 @@ int dt_init(void) dt.ae = connmgr_ae_create(info); if (dt.ae == NULL) { log_err("Failed to create AE struct."); - return -1; + goto fail_connmgr; } dt.nbs = nbs_create(); if (dt.nbs == NULL) { log_err("Failed to create neighbors struct."); - goto fail_connmgr; + goto fail_nbs; } dt.nb_notifier.notify_call = dt_neighbor_event; if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) { log_err("Failed to register notifier."); - goto fail_nbs; + goto fail_nbs_notifier; } if (routing_init(pr, dt.nbs)) { log_err("Failed to init routing."); - goto fail_nbs_notifier; + goto fail_routing; } for (i = 0; i < QOS_CUBE_MAX; ++i) { @@ -192,7 +197,7 @@ int dt_init(void) if (dt.pff[i] == NULL) { for (j = 0; j < i; ++j) pff_destroy(dt.pff[j]); - goto fail_routing; + goto fail_pff; } } @@ -201,22 +206,39 @@ int dt_init(void) if (dt.routing[i] == NULL) { for (j = 0; j < i; ++j) routing_i_destroy(dt.routing[j]); - goto fail_pff; + goto fail_routing_i; } } + if (pthread_rwlock_init(&dt.lock, NULL)) { + log_err("Failed to init rwlock."); + goto fail_rwlock_init; + } + + dt.res_fds = bmp_create(AP_RES_FDS, 0); + if (dt.res_fds == NULL) + goto fail_res_fds; + return 0; - fail_pff: + + fail_res_fds: + pthread_rwlock_destroy(&dt.lock); + fail_rwlock_init: + for (j = 0; j < QOS_CUBE_MAX; ++j) + routing_i_destroy(dt.routing[j]); + fail_routing_i: for (i = 0; i < QOS_CUBE_MAX; ++i) pff_destroy(dt.pff[i]); - fail_routing: + fail_pff: routing_fini(); - fail_nbs_notifier: + fail_routing: nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); - fail_nbs: + fail_nbs_notifier: nbs_destroy(dt.nbs); - fail_connmgr: + fail_nbs: connmgr_ae_destroy(dt.ae); + fail_connmgr: + dt_pci_fini(); return -1; } @@ -272,6 +294,33 @@ void dt_stop(void) sdu_sched_destroy(dt.sdu_sched); } +int dt_reg_ae(void * ae, + int (* func)(void * func, struct shm_du_buff *)) +{ + int res_fd; + + assert(func); + + pthread_rwlock_wrlock(&dt.lock); + + res_fd = bmp_allocate(dt.res_fds); + if (!bmp_is_id_valid(dt.res_fds, res_fd)) { + log_warn("Reserved fds depleted."); + pthread_rwlock_unlock(&dt.lock); + return -EBADF; + } + + assert(dt.aes[res_fd].post_sdu == NULL); + assert(dt.aes[res_fd].ae == NULL); + + dt.aes[res_fd].post_sdu = func; + dt.aes[res_fd].ae = ae; + + pthread_rwlock_unlock(&dt.lock); + + return res_fd; +} + int dt_write_sdu(uint64_t dst_addr, qoscube_t qc, int np1_fd, diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 52760154..0e1a8cc3 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -37,9 +37,12 @@ int dt_start(void); void dt_stop(void); +int dt_reg_ae(void * ae, + int (* func)(void * ae, struct shm_du_buff * sdb)); + int dt_write_sdu(uint64_t dst_addr, qoscube_t qc, - int np1_fd, + int res_fd, struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index d6c36a17..d7073617 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -50,6 +50,7 @@ struct { pthread_rwlock_t flows_lock; int r_fd[AP_MAX_FLOWS]; uint64_t r_addr[AP_MAX_FLOWS]; + int fd; struct sdu_sched * sdu_sched; } fa; @@ -78,6 +79,104 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } +static int fa_post_sdu(void * ae, + struct shm_du_buff * sdb) +{ + struct timespec ts = {0, TIMEOUT * 1000}; + int fd; + flow_alloc_msg_t * msg; + + (void) ae; + + assert(ae == &fa); + assert(sdb); + + /* Depending on the message call the function in ipcp-dev.h */ + + msg = flow_alloc_msg__unpack(NULL, + shm_du_buff_tail(sdb) - + shm_du_buff_head(sdb), + shm_du_buff_head(sdb)); + if (msg == NULL) { + log_err("Failed to unpack flow alloc message."); + return -1; + } + + switch (msg->code) { + case FLOW_ALLOC_CODE__FLOW_REQ: + pthread_mutex_lock(&ipcpi.alloc_lock); + + if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { + log_err("Bad flow request."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + while (ipcpi.alloc_id != -1 && + ipcp_get_state() == IPCP_OPERATIONAL) + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &ts); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_dbg("Won't allocate over non-operational IPCP."); + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + assert(ipcpi.alloc_id == -1); + + fd = ipcp_flow_req_arr(getpid(), + msg->hash.data, + ipcp_dir_hash_len(), + msg->qc); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + log_err("Failed to get fd for flow."); + return -1; + } + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_fd[fd] = msg->s_fd; + fa.r_addr[fd] = msg->s_addr; + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + break; + case FLOW_ALLOC_CODE__FLOW_REPLY: + pthread_rwlock_wrlock(&fa.flows_lock); + + ipcp_flow_alloc_reply(msg->r_fd, msg->response); + + if (msg->response < 0) + destroy_conn(msg->r_fd); + else + sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); + + pthread_rwlock_unlock(&fa.flows_lock); + + break; + default: + log_err("Got an unknown flow allocation message."); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + flow_alloc_msg__free_unpacked(msg, NULL); + ipcp_sdb_release(sdb); + + return 0; +} + int fa_init(void) { int i; @@ -88,6 +187,8 @@ int fa_init(void) if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; + fa.fd = dt_reg_ae(&fa, &fa_post_sdu); + return 0; } @@ -191,7 +292,7 @@ int fa_alloc(int fd, if (sdb == NULL) return -1; - if (dt_write_sdu(addr, qc, FD_FA, sdb)) { + if (dt_write_sdu(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -287,97 +388,3 @@ int fa_dealloc(int fd) return 0; } - -int fa_post_sdu(struct shm_du_buff * sdb) -{ - struct timespec ts = {0, TIMEOUT * 1000}; - int fd; - flow_alloc_msg_t * msg; - - assert(sdb); - - /* Depending on the message call the function in ipcp-dev.h */ - - msg = flow_alloc_msg__unpack(NULL, - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb), - shm_du_buff_head(sdb)); - if (msg == NULL) { - log_err("Failed to unpack flow alloc message"); - return -1; - } - - switch (msg->code) { - case FLOW_ALLOC_CODE__FLOW_REQ: - pthread_mutex_lock(&ipcpi.alloc_lock); - - if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { - log_err("Bad flow request."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &ts); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - assert(ipcpi.alloc_id == -1); - - fd = ipcp_flow_req_arr(getpid(), - msg->hash.data, - ipcp_dir_hash_len(), - msg->qc); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - log_err("Failed to get fd for flow."); - return -1; - } - - pthread_rwlock_wrlock(&fa.flows_lock); - - fa.r_fd[fd] = msg->s_fd; - fa.r_addr[fd] = msg->s_addr; - - pthread_rwlock_unlock(&fa.flows_lock); - - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - break; - case FLOW_ALLOC_CODE__FLOW_REPLY: - pthread_rwlock_wrlock(&fa.flows_lock); - - ipcp_flow_alloc_reply(msg->r_fd, msg->response); - - if (msg->response < 0) - destroy_conn(msg->r_fd); - else - sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); - - pthread_rwlock_unlock(&fa.flows_lock); - - break; - default: - log_err("Got an unknown flow allocation message."); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); - - return 0; -} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 264d45ea..a77dc723 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -43,6 +43,4 @@ int fa_alloc_resp(int fd, int fa_dealloc(int fd); -int fa_post_sdu(struct shm_du_buff * sdb); - #endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ -- cgit v1.2.3