summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/fa.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r--src/ipcpd/unicast/fa.c491
1 files changed, 491 insertions, 0 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
new file mode 100644
index 00000000..fbcbc6fa
--- /dev/null
+++ b/src/ipcpd/unicast/fa.c
@@ -0,0 +1,491 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2019
+ *
+ * Flow allocator of the IPC Process
+ *
+ * Dimitri Staessens <dimitri.staessens@ugent.be>
+ * Sander Vrijders <sander.vrijders@ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+
+#define FA "flow-allocator"
+#define OUROBOROS_PREFIX FA
+
+#include <ouroboros/logs.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/ipcp-dev.h>
+
+#include "dir.h"
+#include "fa.h"
+#include "psched.h"
+#include "ipcp.h"
+#include "dt.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define TIMEOUT 10000 /* nanoseconds */
+
+#define FLOW_REQ 0
+#define FLOW_REPLY 1
+
+struct fa_msg {
+ uint64_t s_addr;
+ uint32_t r_eid;
+ uint32_t s_eid;
+ uint8_t code;
+ int8_t response;
+ /* QoS parameters from spec, aligned */
+ uint8_t availability;
+ uint8_t in_order;
+ uint32_t delay;
+ uint64_t bandwidth;
+ uint32_t loss;
+ uint32_t ber;
+ uint32_t max_gap;
+} __attribute__((packed));
+
+struct cmd {
+ struct list_head next;
+ struct shm_du_buff * sdb;
+};
+
+struct {
+ pthread_rwlock_t flows_lock;
+ int r_eid[PROG_MAX_FLOWS];
+ uint64_t r_addr[PROG_MAX_FLOWS];
+ int fd;
+
+ struct list_head cmds;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ pthread_t worker;
+
+ struct psched * psched;
+} fa;
+
+static void packet_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
+ log_warn("Failed to forward packet.");
+ return;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+}
+
+static void destroy_conn(int fd)
+{
+ fa.r_eid[fd] = -1;
+ fa.r_addr[fd] = INVALID_ADDR;
+}
+
+static void fa_post_packet(void * comp,
+ struct shm_du_buff * sdb)
+{
+ struct cmd * cmd;
+
+ assert(comp == &fa);
+
+ (void) comp;
+
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command failed. Out of memory.");
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ cmd->sdb = sdb;
+
+ pthread_mutex_lock(&fa.mtx);
+
+ list_add(&cmd->next, &fa.cmds);
+
+ pthread_cond_signal(&fa.cond);
+
+ pthread_mutex_unlock(&fa.mtx);
+}
+
+static void * fa_handle_packet(void * o)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+
+ (void) o;
+
+ while (true) {
+ struct timespec abstime;
+ int fd;
+ uint8_t * buf;
+ struct fa_msg * msg;
+ qosspec_t qs;
+ struct cmd * cmd;
+
+ pthread_mutex_lock(&fa.mtx);
+
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ &fa.mtx);
+
+ while (list_is_empty(&fa.cmds))
+ pthread_cond_wait(&fa.cond, &fa.mtx);
+
+ cmd = list_last_entry(&fa.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_cleanup_pop(true);
+
+ buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
+ if (buf == NULL) {
+ log_err("Failed to allocate memory.");
+ ipcp_sdb_release(cmd->sdb);
+ free(cmd);
+ continue;
+ }
+
+ msg = (struct fa_msg *) buf;
+
+ /* Depending on the message call the function in ipcp-dev.h */
+
+ assert(sizeof(*msg) + ipcp_dir_hash_len() >=
+ (unsigned long int) (shm_du_buff_tail(cmd->sdb) -
+ shm_du_buff_head(cmd->sdb)));
+
+ memcpy(msg, shm_du_buff_head(cmd->sdb),
+ shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb));
+
+ ipcp_sdb_release(cmd->sdb);
+
+ free(cmd);
+
+ switch (msg->code) {
+ case FLOW_REQ:
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_dbg("Won't allocate over non-operational"
+ "IPCP.");
+ free(msg);
+ continue;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
+
+ fd = ipcp_flow_req_arr((uint8_t *) (msg + 1),
+ ipcp_dir_hash_len(),
+ qs);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_err("Failed to get fd for flow.");
+ free(msg);
+ continue;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_eid[fd] = ntoh32(msg->s_eid);
+ fa.r_addr[fd] = ntoh64(msg->s_addr);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+
+ ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
+ msg->response);
+
+ if (msg->response < 0)
+ destroy_conn(ntoh32(msg->r_eid));
+ else
+ psched_add(fa.psched, ntoh32(msg->r_eid));
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ break;
+ }
+
+ free(msg);
+ }
+}
+
+int fa_init(void)
+{
+ int i;
+
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ destroy_conn(i);
+
+ if (pthread_rwlock_init(&fa.flows_lock, NULL))
+ goto fail_rwlock;
+
+ if (pthread_mutex_init(&fa.mtx, NULL))
+ goto fail_mtx;
+
+ if (pthread_cond_init(&fa.cond, NULL))
+ goto fail_cond;
+
+ list_head_init(&fa.cmds);
+
+ fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
+
+ return 0;
+
+ fail_cond:
+ pthread_mutex_destroy(&fa.mtx);
+ fail_mtx:
+ pthread_rwlock_destroy(&fa.flows_lock);
+ fail_rwlock:
+ log_err("Failed to initialize flow allocator.");
+ return -1;
+}
+
+void fa_fini(void)
+{
+ pthread_cond_destroy(&fa.cond);;
+ pthread_mutex_destroy(&fa.mtx);
+ pthread_rwlock_destroy(&fa.flows_lock);
+}
+
+int fa_start(void)
+{
+ struct sched_param par;
+ int pol;
+ int max;
+
+ fa.psched = psched_create(packet_handler);
+ if (fa.psched == NULL) {
+ log_err("Failed to start packet scheduler.");
+ goto fail_psched;
+ }
+
+ if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) {
+ log_err("Failed to create worker thread.");
+ goto fail_thread;
+ }
+
+ if (pthread_getschedparam(fa.worker, &pol, &par)) {
+ log_err("Failed to get worker thread scheduling parameters.");
+ goto fail_sched;
+ }
+
+ max = sched_get_priority_max(pol);
+ if (max < 0) {
+ log_err("Failed to get max priority for scheduler.");
+ goto fail_sched;
+ }
+
+ par.sched_priority = max;
+
+ if (pthread_setschedparam(fa.worker, pol, &par)) {
+ log_err("Failed to set scheduler priority to maximum.");
+ goto fail_sched;
+ }
+
+ return 0;
+
+ fail_sched:
+ pthread_cancel(fa.worker);
+ pthread_join(fa.worker, NULL);
+ fail_thread:
+ psched_destroy(fa.psched);
+ fail_psched:
+ log_err("Failed to start flow allocator.");
+ return -1;
+}
+
+void fa_stop(void)
+{
+ pthread_cancel(fa.worker);
+ pthread_join(fa.worker, NULL);
+
+ psched_destroy(fa.psched);
+}
+
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs)
+{
+ struct fa_msg * msg;
+ uint64_t addr;
+ struct shm_du_buff * sdb;
+ qoscube_t qc;
+
+ addr = dir_query(dst);
+ if (addr == 0)
+ return -1;
+
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len()))
+ return -1;
+
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg->code = FLOW_REQ;
+ msg->s_eid = hton32(fd);
+ msg->s_addr = hton64(ipcpi.dt_addr);
+ msg->delay = hton32(qs.delay);
+ msg->bandwidth = hton64(qs.bandwidth);
+ msg->availability = qs.availability;
+ msg->loss = hton32(qs.loss);
+ msg->ber = hton32(qs.ber);
+ msg->in_order = qs.in_order;
+ msg->max_gap = hton32(qs.max_gap);
+
+ memcpy(msg + 1, dst, ipcp_dir_hash_len());
+
+ qc = qos_spec_to_cube(qs);
+
+ if (dt_write_packet(addr, qc, fa.fd, sdb)) {
+ ipcp_sdb_release(sdb);
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ assert(fa.r_eid[fd] == -1);
+ fa.r_addr[fd] = addr;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+int fa_alloc_resp(int fd,
+ int response)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ struct timespec abstime;
+ struct fa_msg * msg;
+ struct shm_du_buff * sdb;
+ qoscube_t qc;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) {
+ destroy_conn(fd);
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg->code = FLOW_REPLY;
+ msg->r_eid = hton32(fa.r_eid[fd]);
+ msg->s_eid = hton32(fd);
+ msg->response = response;
+
+ if (response < 0) {
+ destroy_conn(fd);
+ ipcp_sdb_release(sdb);
+ } else {
+ psched_add(fa.psched, fd);
+ }
+
+ ipcp_flow_get_qoscube(fd, &qc);
+
+ assert(qc >= 0 && qc < QOS_CUBE_MAX);
+
+ if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) {
+ destroy_conn(fd);
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+int fa_dealloc(int fd)
+{
+ if (ipcp_flow_fini(fd) < 0)
+ return 0;
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ psched_del(fa.psched, fd);
+
+ destroy_conn(fd);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ flow_dealloc(fd);
+
+ return 0;
+}