summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
-rw-r--r--src/ipcpd/unicast/dt.c119
1 files changed, 76 insertions, 43 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 2bb5ed2f..e2679ffe 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -41,6 +41,7 @@
#include <ouroboros/fccntl.h>
#endif
+#include "addr-auth.h"
#include "common/comp.h"
#include "common/connmgr.h"
#include "ca.h"
@@ -59,7 +60,7 @@
#include <assert.h>
#define QOS_BLOCK_LEN 672
-#define RIB_FILE_STRLEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+#define RIB_FILE_STRLEN (169 + RIB_TM_STRLEN + QOS_BLOCK_LEN * QOS_CUBE_MAX)
#define RIB_NAME_STRLEN 256
#ifndef CLOCK_REALTIME_COARSE
@@ -144,6 +145,8 @@ static void dt_pci_shrink(struct shm_du_buff * sdb)
struct {
struct psched * psched;
+ uint64_t addr;
+
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
#ifdef IPCP_FLOW_STATS
@@ -186,7 +189,7 @@ static int dt_rib_read(const char * path,
char str[QOS_BLOCK_LEN + 1];
char addrstr[20];
char * entry;
- char tmstr[20];
+ char tmstr[RIB_TM_STRLEN];
size_t rxqlen = 0;
size_t txqlen = 0;
struct tm * tm;
@@ -209,13 +212,13 @@ static int dt_rib_read(const char * path,
return 0;
}
- if (dt.stat[fd].addr == ipcpi.dt_addr)
+ if (dt.stat[fd].addr == dt.addr)
sprintf(addrstr, "%s", dt.comps[fd].name);
else
- sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr);
+ sprintf(addrstr, ADDR_FMT32, ADDR_VAL32(&dt.stat[fd].addr));
- tm = localtime(&dt.stat[fd].stamp);
- strftime(tmstr, sizeof(tmstr), "%F %T", tm);
+ tm = gmtime(&dt.stat[fd].stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
if (fd >= PROG_RES_FDS) {
fccntl(fd, FLOWGRXQLEN, &rxqlen);
@@ -223,11 +226,11 @@ static int dt_rib_read(const char * path,
}
sprintf(buf,
- "Flow established at: %20s\n"
+ "Flow established at: %.*s\n"
"Endpoint address: %20s\n"
"Queued packets (rx): %20zu\n"
"Queued packets (tx): %20zu\n\n",
- tmstr, addrstr, rxqlen, txqlen);
+ RIB_TM_STRLEN - 1, tmstr, addrstr, rxqlen, txqlen);
for (i = 0; i < QOS_CUBE_MAX; ++i) {
sprintf(str,
"Qos cube %3d:\n"
@@ -285,48 +288,45 @@ static int dt_rib_readdir(char *** buf)
pthread_rwlock_rdlock(&dt.lock);
if (dt.n_flows < 1) {
- pthread_rwlock_unlock(&dt.lock);
- return 0;
+ *buf = NULL;
+ goto no_flows;
}
*buf = malloc(sizeof(**buf) * dt.n_flows);
- if (*buf == NULL) {
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
+ if (*buf == NULL)
+ goto fail_entries;
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
pthread_mutex_lock(&dt.stat[i].lock);
if (dt.stat[i].stamp == 0) {
pthread_mutex_unlock(&dt.stat[i].lock);
- /* Optimization: skip unused res_fds. */
- if (i < PROG_RES_FDS)
- i = PROG_RES_FDS;
- continue;
+ break;
}
+ pthread_mutex_unlock(&dt.stat[i].lock);
+
sprintf(entry, "%zu", i);
(*buf)[idx] = malloc(strlen(entry) + 1);
- if ((*buf)[idx] == NULL) {
- while (idx-- > 0)
- free((*buf)[idx]);
- free(*buf);
- pthread_mutex_unlock(&dt.stat[i].lock);
- pthread_rwlock_unlock(&dt.lock);
- return -ENOMEM;
- }
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
strcpy((*buf)[idx++], entry);
- pthread_mutex_unlock(&dt.stat[i].lock);
}
- assert((size_t) idx == dt.n_flows);
-
+ no_flows:
pthread_rwlock_unlock(&dt.lock);
return idx;
+
+ fail_entry:
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(*buf);
+ fail_entries:
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
#else
(void) buf;
return 0;
@@ -454,7 +454,7 @@ static void packet_handler(int fd,
head = shm_du_buff_head(sdb);
dt_pci_des(head, &dt_pci);
- if (dt_pci.dst_addr != ipcpi.dt_addr) {
+ if (dt_pci.dst_addr != dt.addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
ipcp_sdb_release(sdb);
@@ -570,16 +570,22 @@ int dt_init(struct dt_config cfg)
int i;
int j;
char dtstr[RIB_NAME_STRLEN + 1];
- int pp;
+ enum pol_pff pp;
struct conn_info info;
memset(&info, 0, sizeof(info));
+ dt.addr = addr_auth_address();
+ if (dt.addr == INVALID_ADDR) {
+ log_err("Failed to get address");
+ return -1;
+ }
+
strcpy(info.comp_name, DT_COMP);
strcpy(info.protocol, DT_PROTO);
info.pref_version = 1;
info.pref_syntax = PROTO_FIXED;
- info.addr = ipcpi.dt_addr;
+ info.addr = dt.addr;
if (cfg.eid_size != 8) { /* only support 64 bits from now */
log_warn("Invalid EID size. Only 64 bit is supported.");
@@ -601,8 +607,7 @@ int dt_init(struct dt_config cfg)
goto fail_connmgr_comp_init;
}
- pp = routing_init(cfg.routing_type);
- if (pp < 0) {
+ if (routing_init(&cfg.routing, &pp) < 0) {
log_err("Failed to init routing.");
goto fail_routing;
}
@@ -647,7 +652,7 @@ int dt_init(struct dt_config cfg)
dt.n_flows = 0;
#endif
- sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
+ sprintf(dtstr, "%s." ADDR_FMT32, DT, ADDR_VAL32(&dt.addr));
if (rib_reg(dtstr, &r_ops)) {
log_err("Failed to register RIB.");
goto fail_rib_reg;
@@ -683,7 +688,7 @@ void dt_fini(void)
char dtstr[RIB_NAME_STRLEN + 1];
int i;
- sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
+ sprintf(dtstr, "%s.%" PRIu64, DT, dt.addr);
rib_unreg(dtstr);
#ifdef IPCP_FLOW_STATS
for (i = 0; i < PROG_MAX_FLOWS; ++i)
@@ -719,21 +724,31 @@ int dt_start(void)
if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) {
log_err("Failed to create listener thread.");
- psched_destroy(dt.psched);
- return -1;
+ goto fail_listener;
+ }
+
+ if (routing_start() < 0) {
+ log_err("Failed to start routing.");
+ goto fail_routing;
}
return 0;
+ fail_routing:
+ pthread_cancel(dt.listener);
+ pthread_join(dt.listener, NULL);
+ fail_listener:
+ notifier_unreg(&handle_event);
fail_notifier_reg:
psched_destroy(dt.psched);
fail_psched:
return -1;
-
}
void dt_stop(void)
{
+ routing_stop();
+
pthread_cancel(dt.listener);
pthread_join(dt.listener, NULL);
@@ -748,7 +763,7 @@ int dt_reg_comp(void * comp,
{
int eid;
- assert(func);
+ assert(func != NULL);
pthread_rwlock_wrlock(&dt.lock);
@@ -769,11 +784,28 @@ int dt_reg_comp(void * comp,
pthread_rwlock_unlock(&dt.lock);
#ifdef IPCP_FLOW_STATS
- stat_used(eid, ipcpi.dt_addr);
+ stat_used(eid, dt.addr);
#endif
return eid;
}
+void dt_unreg_comp(int eid)
+{
+ assert(eid >= 0 && eid < PROG_RES_FDS);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ assert(dt.comps[eid].post_packet != NULL);
+
+ dt.comps[eid].post_packet = NULL;
+ dt.comps[eid].comp = NULL;
+ dt.comps[eid].name = NULL;
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ return;
+}
+
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
uint64_t eid,
@@ -786,7 +818,7 @@ int dt_write_packet(uint64_t dst_addr,
size_t len;
assert(sdb);
- assert(dst_addr != ipcpi.dt_addr);
+ assert(dst_addr != dt.addr);
len = shm_du_buff_len(sdb);
@@ -802,7 +834,8 @@ int dt_write_packet(uint64_t dst_addr,
#endif
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
- log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
+ log_dbg("Could not get nhop for " ADDR_FMT32 ".",
+ ADDR_VAL32(&dst_addr));
#ifdef IPCP_FLOW_STATS
if (eid < PROG_RES_FDS) {
pthread_mutex_lock(&dt.stat[eid].lock);