summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/normal/dt.c91
-rw-r--r--src/ipcpd/normal/dt.h5
-rw-r--r--src/ipcpd/normal/fa.c197
-rw-r--r--src/ipcpd/normal/fa.h2
4 files changed, 176 insertions, 119 deletions
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index b22fb59c..1867c13b 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -23,13 +23,15 @@
#define OUROBOROS_PREFIX "dt-ae"
#include <ouroboros/config.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
#include <ouroboros/rib.h>
#include <ouroboros/dev.h>
-#include "dt.h"
#include "connmgr.h"
#include "ipcp.h"
+#include "dt.h"
#include "dt_pci.h"
#include "pff.h"
#include "neighbors.h"
@@ -47,12 +49,21 @@
#include <inttypes.h>
#include <assert.h>
+struct ae_info {
+ int (*post_sdu)(void * ae, struct shm_du_buff * sdb);
+ void * ae;
+};
+
struct {
struct sdu_sched * sdu_sched;
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
+ struct bmp * res_fds;
+ struct ae_info aes[AP_RES_FDS];
+ pthread_rwlock_t lock;
+
struct gam * gam;
struct nbs * nbs;
struct ae * ae;
@@ -120,18 +131,12 @@ static int sdu_handler(int fd,
return 0;
}
- switch (dt_pci.fd) {
- case FD_FA:
- if (fa_post_sdu(sdb)) {
- ipcp_sdb_release(sdb);
- return -1;
- }
- return 0;
- default:
- log_err("Unknown PDU type received.");
+ if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
+
+ return 0;
}
/* silence compiler */
@@ -167,24 +172,24 @@ int dt_init(void)
dt.ae = connmgr_ae_create(info);
if (dt.ae == NULL) {
log_err("Failed to create AE struct.");
- return -1;
+ goto fail_connmgr;
}
dt.nbs = nbs_create();
if (dt.nbs == NULL) {
log_err("Failed to create neighbors struct.");
- goto fail_connmgr;
+ goto fail_nbs;
}
dt.nb_notifier.notify_call = dt_neighbor_event;
if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) {
log_err("Failed to register notifier.");
- goto fail_nbs;
+ goto fail_nbs_notifier;
}
if (routing_init(pr, dt.nbs)) {
log_err("Failed to init routing.");
- goto fail_nbs_notifier;
+ goto fail_routing;
}
for (i = 0; i < QOS_CUBE_MAX; ++i) {
@@ -192,7 +197,7 @@ int dt_init(void)
if (dt.pff[i] == NULL) {
for (j = 0; j < i; ++j)
pff_destroy(dt.pff[j]);
- goto fail_routing;
+ goto fail_pff;
}
}
@@ -201,22 +206,39 @@ int dt_init(void)
if (dt.routing[i] == NULL) {
for (j = 0; j < i; ++j)
routing_i_destroy(dt.routing[j]);
- goto fail_pff;
+ goto fail_routing_i;
}
}
+ if (pthread_rwlock_init(&dt.lock, NULL)) {
+ log_err("Failed to init rwlock.");
+ goto fail_rwlock_init;
+ }
+
+ dt.res_fds = bmp_create(AP_RES_FDS, 0);
+ if (dt.res_fds == NULL)
+ goto fail_res_fds;
+
return 0;
- fail_pff:
+
+ fail_res_fds:
+ pthread_rwlock_destroy(&dt.lock);
+ fail_rwlock_init:
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ routing_i_destroy(dt.routing[j]);
+ fail_routing_i:
for (i = 0; i < QOS_CUBE_MAX; ++i)
pff_destroy(dt.pff[i]);
- fail_routing:
+ fail_pff:
routing_fini();
- fail_nbs_notifier:
+ fail_routing:
nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
- fail_nbs:
+ fail_nbs_notifier:
nbs_destroy(dt.nbs);
- fail_connmgr:
+ fail_nbs:
connmgr_ae_destroy(dt.ae);
+ fail_connmgr:
+ dt_pci_fini();
return -1;
}
@@ -272,6 +294,33 @@ void dt_stop(void)
sdu_sched_destroy(dt.sdu_sched);
}
+int dt_reg_ae(void * ae,
+ int (* func)(void * func, struct shm_du_buff *))
+{
+ int res_fd;
+
+ assert(func);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ res_fd = bmp_allocate(dt.res_fds);
+ if (!bmp_is_id_valid(dt.res_fds, res_fd)) {
+ log_warn("Reserved fds depleted.");
+ pthread_rwlock_unlock(&dt.lock);
+ return -EBADF;
+ }
+
+ assert(dt.aes[res_fd].post_sdu == NULL);
+ assert(dt.aes[res_fd].ae == NULL);
+
+ dt.aes[res_fd].post_sdu = func;
+ dt.aes[res_fd].ae = ae;
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ return res_fd;
+}
+
int dt_write_sdu(uint64_t dst_addr,
qoscube_t qc,
int np1_fd,
diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h
index 52760154..0e1a8cc3 100644
--- a/src/ipcpd/normal/dt.h
+++ b/src/ipcpd/normal/dt.h
@@ -37,9 +37,12 @@ int dt_start(void);
void dt_stop(void);
+int dt_reg_ae(void * ae,
+ int (* func)(void * ae, struct shm_du_buff * sdb));
+
int dt_write_sdu(uint64_t dst_addr,
qoscube_t qc,
- int np1_fd,
+ int res_fd,
struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_NORMAL_DT_H */
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index d6c36a17..d7073617 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -50,6 +50,7 @@ struct {
pthread_rwlock_t flows_lock;
int r_fd[AP_MAX_FLOWS];
uint64_t r_addr[AP_MAX_FLOWS];
+ int fd;
struct sdu_sched * sdu_sched;
} fa;
@@ -78,6 +79,104 @@ static void destroy_conn(int fd)
fa.r_addr[fd] = INVALID_ADDR;
}
+static int fa_post_sdu(void * ae,
+ struct shm_du_buff * sdb)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ int fd;
+ flow_alloc_msg_t * msg;
+
+ (void) ae;
+
+ assert(ae == &fa);
+ assert(sdb);
+
+ /* Depending on the message call the function in ipcp-dev.h */
+
+ msg = flow_alloc_msg__unpack(NULL,
+ shm_du_buff_tail(sdb) -
+ shm_du_buff_head(sdb),
+ shm_du_buff_head(sdb));
+ if (msg == NULL) {
+ log_err("Failed to unpack flow alloc message.");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) {
+ log_err("Bad flow request.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
+ fd = ipcp_flow_req_arr(getpid(),
+ msg->hash.data,
+ ipcp_dir_hash_len(),
+ msg->qc);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ log_err("Failed to get fd for flow.");
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_fd[fd] = msg->s_fd;
+ fa.r_addr[fd] = msg->s_addr;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ipcp_flow_alloc_reply(msg->r_fd, msg->response);
+
+ if (msg->response < 0)
+ destroy_conn(msg->r_fd);
+ else
+ sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ ipcp_sdb_release(sdb);
+
+ return 0;
+}
+
int fa_init(void)
{
int i;
@@ -88,6 +187,8 @@ int fa_init(void)
if (pthread_rwlock_init(&fa.flows_lock, NULL))
return -1;
+ fa.fd = dt_reg_ae(&fa, &fa_post_sdu);
+
return 0;
}
@@ -191,7 +292,7 @@ int fa_alloc(int fd,
if (sdb == NULL)
return -1;
- if (dt_write_sdu(addr, qc, FD_FA, sdb)) {
+ if (dt_write_sdu(addr, qc, fa.fd, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
@@ -287,97 +388,3 @@ int fa_dealloc(int fd)
return 0;
}
-
-int fa_post_sdu(struct shm_du_buff * sdb)
-{
- struct timespec ts = {0, TIMEOUT * 1000};
- int fd;
- flow_alloc_msg_t * msg;
-
- assert(sdb);
-
- /* Depending on the message call the function in ipcp-dev.h */
-
- msg = flow_alloc_msg__unpack(NULL,
- shm_du_buff_tail(sdb) -
- shm_du_buff_head(sdb),
- shm_du_buff_head(sdb));
- if (msg == NULL) {
- log_err("Failed to unpack flow alloc message");
- return -1;
- }
-
- switch (msg->code) {
- case FLOW_ALLOC_CODE__FLOW_REQ:
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) {
- log_err("Bad flow request.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
- }
-
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL)
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &ts);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- fd = ipcp_flow_req_arr(getpid(),
- msg->hash.data,
- ipcp_dir_hash_len(),
- msg->qc);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- log_err("Failed to get fd for flow.");
- return -1;
- }
-
- pthread_rwlock_wrlock(&fa.flows_lock);
-
- fa.r_fd[fd] = msg->s_fd;
- fa.r_addr[fd] = msg->s_addr;
-
- pthread_rwlock_unlock(&fa.flows_lock);
-
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- break;
- case FLOW_ALLOC_CODE__FLOW_REPLY:
- pthread_rwlock_wrlock(&fa.flows_lock);
-
- ipcp_flow_alloc_reply(msg->r_fd, msg->response);
-
- if (msg->response < 0)
- destroy_conn(msg->r_fd);
- else
- sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]);
-
- pthread_rwlock_unlock(&fa.flows_lock);
-
- break;
- default:
- log_err("Got an unknown flow allocation message.");
- flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
- }
-
- flow_alloc_msg__free_unpacked(msg, NULL);
- ipcp_sdb_release(sdb);
-
- return 0;
-}
diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h
index 264d45ea..a77dc723 100644
--- a/src/ipcpd/normal/fa.h
+++ b/src/ipcpd/normal/fa.h
@@ -43,6 +43,4 @@ int fa_alloc_resp(int fd,
int fa_dealloc(int fd);
-int fa_post_sdu(struct shm_du_buff * sdb);
-
#endif /* OUROBOROS_IPCPD_NORMAL_FA_H */