summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
-rw-r--r--src/ipcpd/unicast/dt.c230
1 files changed, 135 insertions, 95 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 0f504daa..252477f4 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2021
+ * Ouroboros - Copyright (C) 2016 - 2026
*
* Data Transfer Component
*
@@ -41,6 +41,7 @@
#include <ouroboros/fccntl.h>
#endif
+#include "addr-auth.h"
#include "common/comp.h"
#include "common/connmgr.h"
#include "ca.h"
@@ -59,7 +60,7 @@
#include <assert.h>
#define QOS_BLOCK_LEN 672
-#define RIB_FILE_STRLEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+#define RIB_FILE_STRLEN (169 + RIB_TM_STRLEN + QOS_BLOCK_LEN * QOS_CUBE_MAX)
#define RIB_NAME_STRLEN 256
#ifndef CLOCK_REALTIME_COARSE
@@ -67,7 +68,7 @@
#endif
struct comp_info {
- void (* post_packet)(void * comp, struct shm_du_buff * sdb);
+ void (* post_packet)(void * comp, struct ssm_pk_buff * spb);
void * comp;
char * name;
};
@@ -134,16 +135,18 @@ static void dt_pci_des(uint8_t * head,
memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size);
}
-static void dt_pci_shrink(struct shm_du_buff * sdb)
+static void dt_pci_shrink(struct ssm_pk_buff * spb)
{
- assert(sdb);
+ assert(spb);
- shm_du_buff_head_release(sdb, dt_pci_info.head_size);
+ ssm_pk_buff_head_release(spb, dt_pci_info.head_size);
}
struct {
struct psched * psched;
+ uint64_t addr;
+
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
#ifdef IPCP_FLOW_STATS
@@ -186,7 +189,7 @@ static int dt_rib_read(const char * path,
char str[QOS_BLOCK_LEN + 1];
char addrstr[20];
char * entry;
- char tmstr[20];
+ char tmstr[RIB_TM_STRLEN];
size_t rxqlen = 0;
size_t txqlen = 0;
struct tm * tm;
@@ -209,13 +212,13 @@ static int dt_rib_read(const char * path,
return 0;
}
- if (dt.stat[fd].addr == ipcpi.dt_addr)
+ if (dt.stat[fd].addr == dt.addr)
sprintf(addrstr, "%s", dt.comps[fd].name);
else
- sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr);
+ sprintf(addrstr, ADDR_FMT32, ADDR_VAL32(&dt.stat[fd].addr));
- tm = localtime(&dt.stat[fd].stamp);
- strftime(tmstr, sizeof(tmstr), "%F %T", tm);
+ tm = gmtime(&dt.stat[fd].stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
if (fd >= PROG_RES_FDS) {
fccntl(fd, FLOWGRXQLEN, &rxqlen);
@@ -223,11 +226,11 @@ static int dt_rib_read(const char * path,
}
sprintf(buf,
- "Flow established at: %20s\n"
+ "Flow established at: %.*s\n"
"Endpoint address: %20s\n"
"Queued packets (rx): %20zu\n"
"Queued packets (tx): %20zu\n\n",
- tmstr, addrstr, rxqlen, txqlen);
+ RIB_TM_STRLEN - 1, tmstr, addrstr, rxqlen, txqlen);
for (i = 0; i < QOS_CUBE_MAX; ++i) {
sprintf(str,
"Qos cube %3d:\n"
@@ -285,48 +288,45 @@ static int dt_rib_readdir(char *** buf)
pthread_rwlock_rdlock(&dt.lock);
if (dt.n_flows < 1) {
- pthread_rwlock_unlock(&dt.lock);
- return 0;
+ *buf = NULL;
+ goto no_flows;
}
*buf = malloc(sizeof(**buf) * dt.n_flows);
- if (*buf == NULL) {
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
+ if (*buf == NULL)
+ goto fail_entries;
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
pthread_mutex_lock(&dt.stat[i].lock);
if (dt.stat[i].stamp == 0) {
pthread_mutex_unlock(&dt.stat[i].lock);
- /* Optimization: skip unused res_fds. */
- if (i < PROG_RES_FDS)
- i = PROG_RES_FDS;
- continue;
+ break;
}
+ pthread_mutex_unlock(&dt.stat[i].lock);
+
sprintf(entry, "%zu", i);
(*buf)[idx] = malloc(strlen(entry) + 1);
- if ((*buf)[idx] == NULL) {
- while (idx-- > 0)
- free((*buf)[idx]);
- free(buf);
- pthread_mutex_unlock(&dt.stat[i].lock);
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
strcpy((*buf)[idx++], entry);
- pthread_mutex_unlock(&dt.stat[i].lock);
}
- assert((size_t) idx == dt.n_flows);
-
+ no_flows:
pthread_rwlock_unlock(&dt.lock);
return idx;
+
+ fail_entry:
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(*buf);
+ fail_entries:
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
#else
(void) buf;
return 0;
@@ -399,6 +399,7 @@ static void handle_event(void * self,
const void * o)
{
struct conn * c;
+ int fd;
(void) self;
@@ -406,19 +407,20 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
+ fd = c->flow_info.fd;
#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, c->conn_info.addr);
+ stat_used(fd, c->conn_info.addr);
#endif
- psched_add(dt.psched, c->flow_info.fd);
- log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd);
+ psched_add(dt.psched, fd);
+ log_dbg("Added fd %d to packet scheduler.", fd);
break;
case NOTIFY_DT_CONN_DEL:
+ fd = c->flow_info.fd;
#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, INVALID_ADDR);
+ stat_used(fd, INVALID_ADDR);
#endif
- psched_del(dt.psched, c->flow_info.fd);
- log_dbg("Removed fd %d from "
- "packet scheduler.", c->flow_info.fd);
+ psched_del(dt.psched, fd);
+ log_dbg("Removed fd %d from packet scheduler.", fd);
break;
default:
break;
@@ -427,7 +429,7 @@ static void handle_event(void * self,
static void packet_handler(int fd,
qoscube_t qc,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct dt_pci dt_pci;
int ret;
@@ -435,7 +437,7 @@ static void packet_handler(int fd,
uint8_t * head;
size_t len;
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ len = ssm_pk_buff_len(spb);
#ifndef IPCP_FLOW_STATS
(void) fd;
@@ -449,13 +451,13 @@ static void packet_handler(int fd,
#endif
memset(&dt_pci, 0, sizeof(dt_pci));
- head = shm_du_buff_head(sdb);
+ head = ssm_pk_buff_head(spb);
dt_pci_des(head, &dt_pci);
- if (dt_pci.dst_addr != ipcpi.dt_addr) {
+ if (dt_pci.dst_addr != dt.addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
@@ -472,7 +474,7 @@ static void packet_handler(int fd,
if (ofd < 0) {
log_dbg("No next hop for %" PRIu64 ".",
dt_pci.dst_addr);
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
@@ -486,12 +488,12 @@ static void packet_handler(int fd,
(void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len);
- ret = ipcp_flow_write(ofd, sdb);
+ ret = ipcp_flow_write(ofd, spb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", ofd);
if (ret == -EFLOWDOWN)
notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd);
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[ofd].lock);
@@ -511,17 +513,17 @@ static void packet_handler(int fd,
pthread_mutex_unlock(&dt.stat[ofd].lock);
#endif
} else {
- dt_pci_shrink(sdb);
+ dt_pci_shrink(spb);
if (dt_pci.eid >= PROG_RES_FDS) {
uint8_t ecn = *(head + dt_pci_info.ecn_o);
- fa_np1_rcv(dt_pci.eid, ecn, sdb);
+ fa_np1_rcv(dt_pci.eid, ecn, spb);
return;
}
if (dt.comps[dt_pci.eid].post_packet == NULL) {
log_err("No registered component on eid %" PRIu64 ".",
dt_pci.eid);
- ipcp_sdb_release(sdb);
+ ipcp_spb_release(spb);
return;
}
#ifdef IPCP_FLOW_STATS
@@ -539,7 +541,7 @@ static void packet_handler(int fd,
pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
#endif
dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp,
- sdb);
+ spb);
}
}
@@ -563,33 +565,36 @@ static void * dt_conn_handle(void * o)
return 0;
}
-int dt_init(enum pol_routing pr,
- uint8_t addr_size,
- uint8_t eid_size,
- uint8_t max_ttl)
+int dt_init(struct dt_config cfg)
{
int i;
int j;
char dtstr[RIB_NAME_STRLEN + 1];
- int pp;
+ enum pol_pff pp;
struct conn_info info;
memset(&info, 0, sizeof(info));
+ dt.addr = addr_auth_address();
+ if (dt.addr == INVALID_ADDR) {
+ log_err("Failed to get address");
+ return -1;
+ }
+
strcpy(info.comp_name, DT_COMP);
strcpy(info.protocol, DT_PROTO);
info.pref_version = 1;
info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
+ info.addr = dt.addr;
- if (eid_size != 8) { /* only support 64 bits from now */
+ if (cfg.eid_size != 8) { /* only support 64 bits from now */
log_warn("Invalid EID size. Only 64 bit is supported.");
- eid_size = 8;
+ cfg.eid_size = 8;
}
- dt_pci_info.addr_size = addr_size;
- dt_pci_info.eid_size = eid_size;
- dt_pci_info.max_ttl = max_ttl;
+ dt_pci_info.addr_size = cfg.addr_size;
+ dt_pci_info.eid_size = cfg.eid_size;
+ dt_pci_info.max_ttl = cfg.max_ttl;
dt_pci_info.qc_o = dt_pci_info.addr_size;
dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN;
@@ -597,18 +602,12 @@ int dt_init(enum pol_routing pr,
dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN;
dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size;
- if (notifier_reg(handle_event, NULL)) {
- log_err("Failed to register with notifier.");
- goto fail_notifier_reg;
- }
-
if (connmgr_comp_init(COMPID_DT, &info)) {
log_err("Failed to register with connmgr.");
goto fail_connmgr_comp_init;
}
- pp = routing_init(pr);
- if (pp < 0) {
+ if (routing_init(&cfg.routing, &pp) < 0) {
log_err("Failed to init routing.");
goto fail_routing;
}
@@ -645,6 +644,7 @@ int dt_init(enum pol_routing pr,
for (i = 0; i < PROG_MAX_FLOWS; ++i)
if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
+ log_err("Failed to init mutex for flow %d.", i);
for (j = 0; j < i; ++j)
pthread_mutex_destroy(&dt.stat[j].lock);
goto fail_stat_lock;
@@ -652,9 +652,11 @@ int dt_init(enum pol_routing pr,
dt.n_flows = 0;
#endif
- sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
- if (rib_reg(dtstr, &r_ops))
+ sprintf(dtstr, "%s." ADDR_FMT32, DT, ADDR_VAL32(&dt.addr));
+ if (rib_reg(dtstr, &r_ops)) {
+ log_err("Failed to register RIB.");
goto fail_rib_reg;
+ }
return 0;
@@ -678,8 +680,6 @@ int dt_init(enum pol_routing pr,
fail_routing:
connmgr_comp_fini(COMPID_DT);
fail_connmgr_comp_init:
- notifier_unreg(&handle_event);
- fail_notifier_reg:
return -1;
}
@@ -688,7 +688,7 @@ void dt_fini(void)
char dtstr[RIB_NAME_STRLEN + 1];
int i;
- sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
+ sprintf(dtstr, "%s.%" PRIu64, DT, dt.addr);
rib_unreg(dtstr);
#ifdef IPCP_FLOW_STATS
for (i = 0; i < PROG_MAX_FLOWS; ++i)
@@ -707,47 +707,69 @@ void dt_fini(void)
routing_fini();
connmgr_comp_fini(COMPID_DT);
-
- notifier_unreg(&handle_event);
}
int dt_start(void)
{
- dt.psched = psched_create(packet_handler);
+ dt.psched = psched_create(packet_handler, ipcp_flow_read);
if (dt.psched == NULL) {
log_err("Failed to create N-1 packet scheduler.");
- return -1;
+ goto fail_psched;
+ }
+
+ if (notifier_reg(handle_event, NULL)) {
+ log_err("Failed to register with notifier.");
+ goto fail_notifier_reg;
}
if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) {
log_err("Failed to create listener thread.");
- psched_destroy(dt.psched);
- return -1;
+ goto fail_listener;
+ }
+
+ if (routing_start() < 0) {
+ log_err("Failed to start routing.");
+ goto fail_routing;
}
return 0;
+
+ fail_routing:
+ pthread_cancel(dt.listener);
+ pthread_join(dt.listener, NULL);
+ fail_listener:
+ notifier_unreg(&handle_event);
+ fail_notifier_reg:
+ psched_destroy(dt.psched);
+ fail_psched:
+ return -1;
}
void dt_stop(void)
{
+ routing_stop();
+
pthread_cancel(dt.listener);
pthread_join(dt.listener, NULL);
+
+ notifier_unreg(&handle_event);
+
psched_destroy(dt.psched);
}
int dt_reg_comp(void * comp,
- void (* func)(void * func, struct shm_du_buff *),
+ void (* func)(void * func, struct ssm_pk_buff *),
char * name)
{
int eid;
- assert(func);
+ assert(func != NULL);
pthread_rwlock_wrlock(&dt.lock);
eid = bmp_allocate(dt.res_fds);
if (!bmp_is_id_valid(dt.res_fds, eid)) {
- log_warn("Reserved EIDs depleted.");
+ log_err("Cannot allocate EID.");
pthread_rwlock_unlock(&dt.lock);
return -EBADF;
}
@@ -762,15 +784,32 @@ int dt_reg_comp(void * comp,
pthread_rwlock_unlock(&dt.lock);
#ifdef IPCP_FLOW_STATS
- stat_used(eid, ipcpi.dt_addr);
+ stat_used(eid, dt.addr);
#endif
return eid;
}
+void dt_unreg_comp(int eid)
+{
+ assert(eid >= 0 && eid < PROG_RES_FDS);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ assert(dt.comps[eid].post_packet != NULL);
+
+ dt.comps[eid].post_packet = NULL;
+ dt.comps[eid].comp = NULL;
+ dt.comps[eid].name = NULL;
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ return;
+}
+
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
uint64_t eid,
- struct shm_du_buff * sdb)
+ struct ssm_pk_buff * spb)
{
struct dt_pci dt_pci;
int fd;
@@ -778,12 +817,12 @@ int dt_write_packet(uint64_t dst_addr,
uint8_t * head;
size_t len;
- assert(sdb);
- assert(dst_addr != ipcpi.dt_addr);
-
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ assert(spb);
+ assert(dst_addr != dt.addr);
#ifdef IPCP_FLOW_STATS
+ len = ssm_pk_buff_len(spb);
+
if (eid < PROG_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);
@@ -795,7 +834,8 @@ int dt_write_packet(uint64_t dst_addr,
#endif
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
- log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
+ log_dbg("Could not get nhop for " ADDR_FMT32 ".",
+ ADDR_VAL32(&dst_addr));
#ifdef IPCP_FLOW_STATS
if (eid < PROG_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);
@@ -809,13 +849,13 @@ int dt_write_packet(uint64_t dst_addr,
return -EPERM;
}
- head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size);
+ head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size);
if (head == NULL) {
log_dbg("Failed to allocate DT header.");
goto fail_write;
}
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ len = ssm_pk_buff_len(spb);
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
@@ -826,7 +866,7 @@ int dt_write_packet(uint64_t dst_addr,
dt_pci_ser(head, &dt_pci);
- ret = ipcp_flow_write(fd, sdb);
+ ret = ipcp_flow_write(fd, spb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", fd);
if (ret == -EFLOWDOWN)