summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/dt.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2019-07-25 12:50:46 +0200
committerSander Vrijders <sander@ouroboros.rocks>2019-07-29 19:36:45 +0200
commitdae15c284248d49079ad5f8a3d8ff30e217f419e (patch)
treeea7942e940396c0c78304fef8b43fb25c5aebba8 /src/ipcpd/normal/dt.c
parentc9232acef855b51d1bc199a68c03c0695ac11192 (diff)
downloadouroboros-dae15c284248d49079ad5f8a3d8ff30e217f419e.tar.gz
ouroboros-dae15c284248d49079ad5f8a3d8ff30e217f419e.zip
build: Refactor normal to unicast
This completes the renaming of the normal IPCP to the unicast IPCP in the sources, to get everything consistent with the documentation. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/normal/dt.c')
-rw-r--r--src/ipcpd/normal/dt.c913
1 files changed, 0 insertions, 913 deletions
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
deleted file mode 100644
index 2fd3c060..00000000
--- a/src/ipcpd/normal/dt.c
+++ /dev/null
@@ -1,913 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2019
- *
- * Data Transfer Component
- *
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#if defined(__linux__) || defined(__CYGWIN__)
-#define _DEFAULT_SOURCE
-#else
-#define _POSIX_C_SOURCE 200112L
-#endif
-
-#include "config.h"
-
-#define DT "dt"
-#define OUROBOROS_PREFIX DT
-
-#include <ouroboros/bitmap.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/notifier.h>
-#include <ouroboros/rib.h>
-#ifdef IPCP_FLOW_STATS
-#include <ouroboros/fccntl.h>
-#endif
-
-#include "connmgr.h"
-#include "ipcp.h"
-#include "dt.h"
-#include "pff.h"
-#include "routing.h"
-#include "psched.h"
-#include "comp.h"
-#include "fa.h"
-
-#include <stdlib.h>
-#include <stdbool.h>
-#include <pthread.h>
-#include <string.h>
-#include <inttypes.h>
-#include <assert.h>
-
-#define QOS_BLOCK_LEN 672
-#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
-
-#ifndef CLOCK_REALTIME_COARSE
-#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
-#endif
-
-struct comp_info {
- void (* post_packet)(void * comp, struct shm_du_buff * sdb);
- void * comp;
- char * name;
-};
-
-/* Fixed field lengths */
-#define TTL_LEN 1
-#define QOS_LEN 1
-#define ECN_LEN 1
-
-struct dt_pci {
- uint64_t dst_addr;
- qoscube_t qc;
- uint8_t ttl;
- uint8_t ecn;
- uint32_t eid;
-};
-
-struct {
- uint8_t addr_size;
- uint8_t eid_size;
- size_t head_size;
-
- /* Offsets */
- size_t qc_o;
- size_t ttl_o;
- size_t ecn_o;
- size_t eid_o;
-
- /* Initial TTL value */
- uint8_t max_ttl;
-} dt_pci_info;
-
-static int dt_pci_ser(struct shm_du_buff * sdb,
- struct dt_pci * dt_pci)
-{
- uint8_t * head;
- uint8_t ttl = dt_pci_info.max_ttl;
-
- assert(sdb);
- assert(dt_pci);
-
- head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size);
- if (head == NULL)
- return -EPERM;
-
- /* FIXME: Add check and operations for Big Endian machines. */
- memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size);
- memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN);
- memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN);
- memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
- memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size);
-
- return 0;
-}
-
-static void dt_pci_des(struct shm_du_buff * sdb,
- struct dt_pci * dt_pci)
-{
- uint8_t * head;
-
- assert(sdb);
- assert(dt_pci);
-
- head = shm_du_buff_head(sdb);
-
- /* Decrease TTL */
- --*(head + dt_pci_info.ttl_o);
-
- /* FIXME: Add check and operations for Big Endian machines. */
- memcpy(&dt_pci->dst_addr, head, dt_pci_info.addr_size);
- memcpy(&dt_pci->qc, head + dt_pci_info.qc_o, QOS_LEN);
- memcpy(&dt_pci->ttl, head + dt_pci_info.ttl_o, TTL_LEN);
- memcpy(&dt_pci->ecn, head + dt_pci_info.ecn_o, ECN_LEN);
- memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size);
-}
-
-static void dt_pci_shrink(struct shm_du_buff * sdb)
-{
- assert(sdb);
-
- shm_du_buff_head_release(sdb, dt_pci_info.head_size);
-}
-
-struct {
- struct psched * psched;
-
- struct pff * pff[QOS_CUBE_MAX];
- struct routing_i * routing[QOS_CUBE_MAX];
-#ifdef IPCP_FLOW_STATS
- struct {
- time_t stamp;
- uint64_t addr;
- size_t snd_pkt[QOS_CUBE_MAX];
- size_t rcv_pkt[QOS_CUBE_MAX];
- size_t snd_bytes[QOS_CUBE_MAX];
- size_t rcv_bytes[QOS_CUBE_MAX];
- size_t lcl_r_pkt[QOS_CUBE_MAX];
- size_t lcl_r_bytes[QOS_CUBE_MAX];
- size_t lcl_w_pkt[QOS_CUBE_MAX];
- size_t lcl_w_bytes[QOS_CUBE_MAX];
- size_t r_drp_pkt[QOS_CUBE_MAX];
- size_t r_drp_bytes[QOS_CUBE_MAX];
- size_t w_drp_pkt[QOS_CUBE_MAX];
- size_t w_drp_bytes[QOS_CUBE_MAX];
- size_t f_nhp_pkt[QOS_CUBE_MAX];
- size_t f_nhp_bytes[QOS_CUBE_MAX];
- pthread_mutex_t lock;
- } stat[PROG_MAX_FLOWS];
-
- size_t n_flows;
-#endif
- struct bmp * res_fds;
- struct comp_info comps[PROG_RES_FDS];
- pthread_rwlock_t lock;
-
- pthread_t listener;
-} dt;
-
-static int dt_stat_read(const char * path,
- char * buf,
- size_t len)
-{
-#ifdef IPCP_FLOW_STATS
- int fd;
- int i;
- char str[QOS_BLOCK_LEN + 1];
- char addrstr[20];
- char tmstr[20];
- size_t rxqlen = 0;
- size_t txqlen = 0;
- struct tm * tm;
-
- /* NOTE: we may need stronger checks. */
- fd = atoi(path);
-
- if (len < STAT_FILE_LEN)
- return 0;
-
- buf[0] = '\0';
-
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- if (dt.stat[fd].stamp == 0) {
- pthread_mutex_unlock(&dt.stat[fd].lock);
- return 0;
- }
-
- if (dt.stat[fd].addr == ipcpi.dt_addr)
- sprintf(addrstr, "%s", dt.comps[fd].name);
- else
- sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr);
-
- tm = localtime(&dt.stat[fd].stamp);
- strftime(tmstr, sizeof(tmstr), "%F %T", tm);
-
- if (fd >= PROG_RES_FDS) {
- fccntl(fd, FLOWGRXQLEN, &rxqlen);
- fccntl(fd, FLOWGTXQLEN, &txqlen);
- }
-
- sprintf(buf,
- "Flow established at: %20s\n"
- "Endpoint address: %20s\n"
- "Queued packets (rx): %20zu\n"
- "Queued packets (tx): %20zu\n\n",
- tmstr, addrstr, rxqlen, txqlen);
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- sprintf(str,
- "Qos cube %3d:\n"
- " sent (packets): %20zu\n"
- " sent (bytes): %20zu\n"
- " rcvd (packets): %20zu\n"
- " rcvd (bytes): %20zu\n"
- " local sent (packets): %20zu\n"
- " local sent (bytes): %20zu\n"
- " local rcvd (packets): %20zu\n"
- " local rcvd (bytes): %20zu\n"
- " dropped ttl (packets): %20zu\n"
- " dropped ttl (bytes): %20zu\n"
- " failed writes (packets): %20zu\n"
- " failed writes (bytes): %20zu\n"
- " failed nhop (packets): %20zu\n"
- " failed nhop (bytes): %20zu\n",
- i,
- dt.stat[fd].snd_pkt[i],
- dt.stat[fd].snd_bytes[i],
- dt.stat[fd].rcv_pkt[i],
- dt.stat[fd].rcv_bytes[i],
- dt.stat[fd].lcl_w_pkt[i],
- dt.stat[fd].lcl_w_bytes[i],
- dt.stat[fd].lcl_r_pkt[i],
- dt.stat[fd].lcl_r_bytes[i],
- dt.stat[fd].r_drp_pkt[i],
- dt.stat[fd].r_drp_bytes[i],
- dt.stat[fd].w_drp_pkt[i],
- dt.stat[fd].w_drp_bytes[i],
- dt.stat[fd].f_nhp_pkt[i],
- dt.stat[fd].f_nhp_bytes[i]
- );
- strcat(buf, str);
- }
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-
- return STAT_FILE_LEN;
-#else
- (void) path;
- (void) buf;
- (void) len;
- return 0;
-#endif
-}
-
-static int dt_stat_readdir(char *** buf)
-{
-#ifdef IPCP_FLOW_STATS
- char entry[RIB_PATH_LEN + 1];
- size_t i;
- int idx = 0;
-
- pthread_rwlock_rdlock(&dt.lock);
-
- if (dt.n_flows < 1) {
- pthread_rwlock_unlock(&dt.lock);
- return 0;
- }
-
- *buf = malloc(sizeof(**buf) * dt.n_flows);
- if (*buf == NULL) {
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
-
- for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- pthread_mutex_lock(&dt.stat[i].lock);
-
- if (dt.stat[i].stamp == 0) {
- pthread_mutex_unlock(&dt.stat[i].lock);
- /* Optimization: skip unused res_fds. */
- if (i < PROG_RES_FDS)
- i = PROG_RES_FDS;
- continue;
- }
-
- sprintf(entry, "%zu", i);
-
- (*buf)[idx] = malloc(strlen(entry) + 1);
- if ((*buf)[idx] == NULL) {
- while (idx-- > 0)
- free((*buf)[idx]);
- free(buf);
- pthread_mutex_unlock(&dt.stat[i].lock);
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
-
- strcpy((*buf)[idx++], entry);
-
- pthread_mutex_unlock(&dt.stat[i].lock);
- }
-
- pthread_rwlock_unlock(&dt.lock);
-
- assert((size_t) idx == dt.n_flows);
-
- return idx;
-#else
- (void) buf;
- return 0;
-#endif
-}
-
-static int dt_stat_getattr(const char * path,
- struct stat * st)
-{
-#ifdef IPCP_FLOW_STATS
- int fd;
-
- fd = atoi(path);
-
- st->st_mode = S_IFREG | 0755;
- st->st_nlink = 1;
- st->st_uid = getuid();
- st->st_gid = getgid();
-
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- if (dt.stat[fd].stamp != -1) {
- st->st_size = STAT_FILE_LEN;
- st->st_mtime = dt.stat[fd].stamp;
- } else {
- st->st_size = 0;
- st->st_mtime = 0;
- }
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-#else
- (void) path;
- (void) st;
-#endif
- return 0;
-}
-
-static struct rib_ops r_ops = {
- .read = dt_stat_read,
- .readdir = dt_stat_readdir,
- .getattr = dt_stat_getattr
-};
-
-#ifdef IPCP_FLOW_STATS
-
-static void stat_used(int fd,
- uint64_t addr)
-{
- struct timespec now;
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- memset(&dt.stat[fd], 0, sizeof(dt.stat[fd]));
-
- dt.stat[fd].stamp = (addr != INVALID_ADDR) ? now.tv_sec : 0;
- dt.stat[fd].addr = addr;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-
- pthread_rwlock_wrlock(&dt.lock);
-
- (addr != INVALID_ADDR) ? ++dt.n_flows : --dt.n_flows;
-
- pthread_rwlock_unlock(&dt.lock);
-}
-#endif
-
-static void handle_event(void * self,
- int event,
- const void * o)
-{
- struct conn * c;
-
- (void) self;
-
- c = (struct conn *) o;
-
- switch (event) {
- case NOTIFY_DT_CONN_ADD:
-#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, c->conn_info.addr);
-#endif
- psched_add(dt.psched, c->flow_info.fd);
- log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd);
- break;
- case NOTIFY_DT_CONN_DEL:
-#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, INVALID_ADDR);
-#endif
- psched_del(dt.psched, c->flow_info.fd);
- log_dbg("Removed fd %d from "
- "packet scheduler.", c->flow_info.fd);
- break;
- default:
- break;
- }
-}
-
-static void packet_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
-{
- struct dt_pci dt_pci;
- int ret;
- int ofd;
-#ifdef IPCP_FLOW_STATS
- size_t len;
-#else
- (void) fd;
-#endif
-
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
- memset(&dt_pci, 0, sizeof(dt_pci));
- dt_pci_des(sdb, &dt_pci);
- if (dt_pci.dst_addr != ipcpi.dt_addr) {
- if (dt_pci.ttl == 0) {
- log_dbg("TTL was zero.");
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
- ++dt.stat[fd].r_drp_pkt[qc];
- dt.stat[fd].r_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-#endif
- return;
- }
-
- /* FIXME: Use qoscube from PCI instead of incoming flow. */
- ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
- if (ofd < 0) {
- log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
- ++dt.stat[fd].f_nhp_pkt[qc];
- dt.stat[fd].f_nhp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-#endif
- return;
- }
-
- ret = ipcp_flow_write(ofd, sdb);
- if (ret < 0) {
- log_dbg("Failed to write packet to fd %d.", ofd);
- if (ret == -EFLOWDOWN)
- notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd);
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[ofd].lock);
-
- ++dt.stat[ofd].w_drp_pkt[qc];
- dt.stat[ofd].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[ofd].lock);
-#endif
- return;
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[ofd].lock);
-
- ++dt.stat[ofd].snd_pkt[qc];
- dt.stat[ofd].snd_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[ofd].lock);
-#endif
- } else {
- dt_pci_shrink(sdb);
- if (dt_pci.eid >= PROG_RES_FDS) {
- if (ipcp_flow_write(dt_pci.eid, sdb)) {
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[fd].lock);
-
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
-
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].rcv_pkt[qc];
- dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
- ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
- dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
- return;
- }
-
- if (dt.comps[dt_pci.eid].post_packet == NULL) {
- log_err("No registered component on eid %d.",
- dt_pci.eid);
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
- return;
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
- ++dt.stat[fd].lcl_r_pkt[qc];
- dt.stat[fd].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].snd_pkt[qc];
- dt.stat[dt_pci.eid].snd_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
- dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp,
- sdb);
- }
-}
-
-static void * dt_conn_handle(void * o)
-{
- struct conn conn;
-
- (void) o;
-
- while (true) {
- if (connmgr_wait(COMPID_DT, &conn)) {
- log_err("Failed to get next DT connection.");
- continue;
- }
-
- /* NOTE: connection acceptance policy could be here. */
-
- notifier_event(NOTIFY_DT_CONN_ADD, &conn);
- }
-
- return 0;
-}
-
-int dt_init(enum pol_routing pr,
- enum pol_pff pp,
- uint8_t addr_size,
- uint8_t eid_size,
- uint8_t max_ttl)
-{
- int i;
- int j;
- char dtstr[256];
- struct conn_info info;
-
- memset(&info, 0, sizeof(info));
-
- strcpy(info.comp_name, DT_COMP);
- strcpy(info.protocol, DT_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
-
- dt_pci_info.addr_size = addr_size;
- dt_pci_info.eid_size = eid_size;
- dt_pci_info.max_ttl = max_ttl;
-
- dt_pci_info.qc_o = dt_pci_info.addr_size;
- dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN;
- dt_pci_info.ecn_o = dt_pci_info.ttl_o + TTL_LEN;
- dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN;
- dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size;
-
- if (notifier_reg(handle_event, NULL)) {
- log_err("Failed to register with notifier.");
- goto fail_notifier_reg;
- }
-
- if (connmgr_comp_init(COMPID_DT, &info)) {
- log_err("Failed to register with connmgr.");
- goto fail_connmgr_comp_init;
- }
-
- if (routing_init(pr)) {
- log_err("Failed to init routing.");
- goto fail_routing;
- }
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- dt.pff[i] = pff_create(pp);
- if (dt.pff[i] == NULL) {
- log_err("Failed to create a PFF.");
- for (j = 0; j < i; ++j)
- pff_destroy(dt.pff[j]);
- goto fail_pff;
- }
- }
-
- for (i = 0; i < QOS_CUBE_MAX; ++i) {
- dt.routing[i] = routing_i_create(dt.pff[i]);
- if (dt.routing[i] == NULL) {
- for (j = 0; j < i; ++j)
- routing_i_destroy(dt.routing[j]);
- goto fail_routing_i;
- }
- }
-
- if (pthread_rwlock_init(&dt.lock, NULL)) {
- log_err("Failed to init rwlock.");
- goto fail_rwlock_init;
- }
-
- dt.res_fds = bmp_create(PROG_RES_FDS, 0);
- if (dt.res_fds == NULL)
- goto fail_res_fds;
-#ifdef IPCP_FLOW_STATS
- memset(dt.stat, 0, sizeof(dt.stat));
-
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
- if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
- for (j = 0; j < i; ++j)
- pthread_mutex_destroy(&dt.stat[j].lock);
- goto fail_stat_lock;
- }
-
- dt.n_flows = 0;
-#endif
- sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
- if (rib_reg(dtstr, &r_ops))
- goto fail_rib_reg;
-
- return 0;
-
- fail_rib_reg:
-#ifdef IPCP_FLOW_STATS
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
- pthread_mutex_destroy(&dt.stat[i].lock);
- fail_stat_lock:
-#endif
- bmp_destroy(dt.res_fds);
- fail_res_fds:
- pthread_rwlock_destroy(&dt.lock);
- fail_rwlock_init:
- for (j = 0; j < QOS_CUBE_MAX; ++j)
- routing_i_destroy(dt.routing[j]);
- fail_routing_i:
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- pff_destroy(dt.pff[i]);
- fail_pff:
- routing_fini();
- fail_routing:
- connmgr_comp_fini(COMPID_DT);
- fail_connmgr_comp_init:
- notifier_unreg(&handle_event);
- fail_notifier_reg:
- return -1;
-}
-
-void dt_fini(void)
-{
- int i;
-
- rib_unreg(DT);
-#ifdef IPCP_FLOW_STATS
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
- pthread_mutex_destroy(&dt.stat[i].lock);
-#endif
- bmp_destroy(dt.res_fds);
-
- pthread_rwlock_destroy(&dt.lock);
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- routing_i_destroy(dt.routing[i]);
-
- for (i = 0; i < QOS_CUBE_MAX; ++i)
- pff_destroy(dt.pff[i]);
-
- routing_fini();
-
- connmgr_comp_fini(COMPID_DT);
-
- notifier_unreg(&handle_event);
-}
-
-int dt_start(void)
-{
- dt.psched = psched_create(packet_handler);
- if (dt.psched == NULL) {
- log_err("Failed to create N-1 packet scheduler.");
- return -1;
- }
-
- if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) {
- log_err("Failed to create listener thread.");
- psched_destroy(dt.psched);
- return -1;
- }
-
- return 0;
-}
-
-void dt_stop(void)
-{
- pthread_cancel(dt.listener);
- pthread_join(dt.listener, NULL);
- psched_destroy(dt.psched);
-}
-
-int dt_reg_comp(void * comp,
- void (* func)(void * func, struct shm_du_buff *),
- char * name)
-{
- int res_fd;
-
- assert(func);
-
- pthread_rwlock_wrlock(&dt.lock);
-
- res_fd = bmp_allocate(dt.res_fds);
- if (!bmp_is_id_valid(dt.res_fds, res_fd)) {
- log_warn("Reserved fds depleted.");
- pthread_rwlock_unlock(&dt.lock);
- return -EBADF;
- }
-
- assert(dt.comps[res_fd].post_packet == NULL);
- assert(dt.comps[res_fd].comp == NULL);
- assert(dt.comps[res_fd].name == NULL);
-
- dt.comps[res_fd].post_packet = func;
- dt.comps[res_fd].comp = comp;
- dt.comps[res_fd].name = name;
-
- pthread_rwlock_unlock(&dt.lock);
-#ifdef IPCP_FLOW_STATS
- stat_used(res_fd, ipcpi.dt_addr);
-#endif
- return res_fd;
-}
-
-int dt_write_packet(uint64_t dst_addr,
- qoscube_t qc,
- int np1_fd,
- struct shm_du_buff * sdb)
-{
- int fd;
- struct dt_pci dt_pci;
- int ret;
-#ifdef IPCP_FLOW_STATS
- size_t len;
-#endif
- assert(sdb);
- assert(dst_addr != ipcpi.dt_addr);
-
- fd = pff_nhop(dt.pff[qc], dst_addr);
- if (fd < 0) {
- log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
-
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
- ++dt.stat[np1_fd].f_nhp_pkt[qc];
- dt.stat[np1_fd].f_nhp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
-#endif
- return -1;
- }
-
- dt_pci.dst_addr = dst_addr;
- dt_pci.qc = qc;
- dt_pci.eid = np1_fd;
- dt_pci.ecn = 0;
-
- if (dt_pci_ser(sdb, &dt_pci)) {
- log_dbg("Failed to serialize PDU.");
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
- goto fail_write;
- }
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
- ret = ipcp_flow_write(fd, sdb);
- if (ret < 0) {
- log_dbg("Failed to write packet to fd %d.", fd);
- if (ret == -EFLOWDOWN)
- notifier_event(NOTIFY_DT_FLOW_DOWN, &fd);
- goto fail_write;
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
-
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- if (dt_pci.eid < PROG_RES_FDS) {
- ++dt.stat[fd].lcl_w_pkt[qc];
- dt.stat[fd].lcl_w_bytes[qc] += len;
- }
- ++dt.stat[fd].snd_pkt[qc];
- dt.stat[fd].snd_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-#endif
- return 0;
-
- fail_write:
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
-
- ++dt.stat[np1_fd].lcl_w_pkt[qc];
- dt.stat[np1_fd].lcl_w_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- if (dt_pci.eid < PROG_RES_FDS) {
- ++dt.stat[fd].lcl_w_pkt[qc];
- dt.stat[fd].lcl_w_bytes[qc] += len;
- }
- ++dt.stat[fd].w_drp_pkt[qc];
- dt.stat[fd].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
-#endif
- return -1;
-}