summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
-rw-r--r--src/ipcpd/unicast/dt.c363
1 files changed, 160 insertions, 203 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index d6a3ddc9..2bb5ed2f 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2020
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Data Transfer Component
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -41,13 +41,14 @@
#include <ouroboros/fccntl.h>
#endif
-#include "connmgr.h"
+#include "common/comp.h"
+#include "common/connmgr.h"
+#include "ca.h"
#include "ipcp.h"
#include "dt.h"
#include "pff.h"
#include "routing.h"
#include "psched.h"
-#include "comp.h"
#include "fa.h"
#include <stdlib.h>
@@ -57,8 +58,9 @@
#include <inttypes.h>
#include <assert.h>
-#define QOS_BLOCK_LEN 672
-#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+#define QOS_BLOCK_LEN 672
+#define RIB_FILE_STRLEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+#define RIB_NAME_STRLEN 256
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
@@ -80,7 +82,7 @@ struct dt_pci {
qoscube_t qc;
uint8_t ttl;
uint8_t ecn;
- uint32_t eid;
+ uint64_t eid;
};
struct {
@@ -98,39 +100,29 @@ struct {
uint8_t max_ttl;
} dt_pci_info;
-static int dt_pci_ser(struct shm_du_buff * sdb,
- struct dt_pci * dt_pci)
+static void dt_pci_ser(uint8_t * head,
+ struct dt_pci * dt_pci)
{
- uint8_t * head;
- uint8_t ttl = dt_pci_info.max_ttl;
+ uint8_t ttl = dt_pci_info.max_ttl;
- assert(sdb);
+ assert(head);
assert(dt_pci);
- head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size);
- if (head == NULL)
- return -EPERM;
-
/* FIXME: Add check and operations for Big Endian machines. */
memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size);
memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN);
memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN);
- memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
+ memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size);
- return 0;
}
-static void dt_pci_des(struct shm_du_buff * sdb,
- struct dt_pci * dt_pci)
+static void dt_pci_des(uint8_t * head,
+ struct dt_pci * dt_pci)
{
- uint8_t * head;
-
- assert(sdb);
+ assert(head);
assert(dt_pci);
- head = shm_du_buff_head(sdb);
-
/* Decrease TTL */
--*(head + dt_pci_info.ttl_o);
@@ -184,24 +176,28 @@ struct {
pthread_t listener;
} dt;
-static int dt_stat_read(const char * path,
- char * buf,
- size_t len)
+static int dt_rib_read(const char * path,
+ char * buf,
+ size_t len)
{
#ifdef IPCP_FLOW_STATS
int fd;
int i;
char str[QOS_BLOCK_LEN + 1];
char addrstr[20];
+ char * entry;
char tmstr[20];
size_t rxqlen = 0;
size_t txqlen = 0;
struct tm * tm;
/* NOTE: we may need stronger checks. */
- fd = atoi(path);
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
+
+ fd = atoi(entry);
- if (len < STAT_FILE_LEN)
+ if (len < RIB_FILE_STRLEN)
return 0;
buf[0] = '\0';
@@ -232,7 +228,6 @@ static int dt_stat_read(const char * path,
"Queued packets (rx): %20zu\n"
"Queued packets (tx): %20zu\n\n",
tmstr, addrstr, rxqlen, txqlen);
-
for (i = 0; i < QOS_CUBE_MAX; ++i) {
sprintf(str,
"Qos cube %3d:\n"
@@ -271,7 +266,7 @@ static int dt_stat_read(const char * path,
pthread_mutex_unlock(&dt.stat[fd].lock);
- return STAT_FILE_LEN;
+ return RIB_FILE_STRLEN;
#else
(void) path;
(void) buf;
@@ -280,7 +275,7 @@ static int dt_stat_read(const char * path,
#endif
}
-static int dt_stat_readdir(char *** buf)
+static int dt_rib_readdir(char *** buf)
{
#ifdef IPCP_FLOW_STATS
char entry[RIB_PATH_LEN + 1];
@@ -317,7 +312,7 @@ static int dt_stat_readdir(char *** buf)
if ((*buf)[idx] == NULL) {
while (idx-- > 0)
free((*buf)[idx]);
- free(buf);
+ free(*buf);
pthread_mutex_unlock(&dt.stat[i].lock);
pthread_rwlock_unlock(&dt.lock);
return -ENOMEM;
@@ -327,11 +322,10 @@ static int dt_stat_readdir(char *** buf)
pthread_mutex_unlock(&dt.stat[i].lock);
}
+ assert((size_t) idx == dt.n_flows);
pthread_rwlock_unlock(&dt.lock);
- assert((size_t) idx == dt.n_flows);
-
return idx;
#else
(void) buf;
@@ -339,45 +333,43 @@ static int dt_stat_readdir(char *** buf)
#endif
}
-static int dt_stat_getattr(const char * path,
- struct stat * st)
+static int dt_rib_getattr(const char * path,
+ struct rib_attr * attr)
{
#ifdef IPCP_FLOW_STATS
- int fd;
+ int fd;
+ char * entry;
- fd = atoi(path);
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
- st->st_mode = S_IFREG | 0755;
- st->st_nlink = 1;
- st->st_uid = getuid();
- st->st_gid = getgid();
+ fd = atoi(entry);
pthread_mutex_lock(&dt.stat[fd].lock);
if (dt.stat[fd].stamp != -1) {
- st->st_size = STAT_FILE_LEN;
- st->st_mtime = dt.stat[fd].stamp;
+ attr->size = RIB_FILE_STRLEN;
+ attr->mtime = dt.stat[fd].stamp;
} else {
- st->st_size = 0;
- st->st_mtime = 0;
+ attr->size = 0;
+ attr->mtime = 0;
}
pthread_mutex_unlock(&dt.stat[fd].lock);
#else
(void) path;
- (void) st;
+ (void) attr;
#endif
return 0;
}
static struct rib_ops r_ops = {
- .read = dt_stat_read,
- .readdir = dt_stat_readdir,
- .getattr = dt_stat_getattr
+ .read = dt_rib_read,
+ .readdir = dt_rib_readdir,
+ .getattr = dt_rib_getattr
};
#ifdef IPCP_FLOW_STATS
-
static void stat_used(int fd,
uint64_t addr)
{
@@ -407,6 +399,7 @@ static void handle_event(void * self,
const void * o)
{
struct conn * c;
+ int fd;
(void) self;
@@ -414,19 +407,20 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
+ fd = c->flow_info.fd;
#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, c->conn_info.addr);
+ stat_used(fd, c->conn_info.addr);
#endif
- psched_add(dt.psched, c->flow_info.fd);
- log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd);
+ psched_add(dt.psched, fd);
+ log_dbg("Added fd %d to packet scheduler.", fd);
break;
case NOTIFY_DT_CONN_DEL:
+ fd = c->flow_info.fd;
#ifdef IPCP_FLOW_STATS
- stat_used(c->flow_info.fd, INVALID_ADDR);
+ stat_used(fd, INVALID_ADDR);
#endif
- psched_del(dt.psched, c->flow_info.fd);
- log_dbg("Removed fd %d from "
- "packet scheduler.", c->flow_info.fd);
+ psched_del(dt.psched, fd);
+ log_dbg("Removed fd %d from packet scheduler.", fd);
break;
default:
break;
@@ -440,17 +434,26 @@ static void packet_handler(int fd,
struct dt_pci dt_pci;
int ret;
int ofd;
-#ifdef IPCP_FLOW_STATS
+ uint8_t * head;
size_t len;
-#else
+
+ len = shm_du_buff_len(sdb);
+
+#ifndef IPCP_FLOW_STATS
(void) fd;
-#endif
+#else
+ pthread_mutex_lock(&dt.stat[fd].lock);
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
#endif
memset(&dt_pci, 0, sizeof(dt_pci));
- dt_pci_des(sdb, &dt_pci);
+
+ head = shm_du_buff_head(sdb);
+
+ dt_pci_des(head, &dt_pci);
if (dt_pci.dst_addr != ipcpi.dt_addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
@@ -458,8 +461,6 @@ static void packet_handler(int fd,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
++dt.stat[fd].r_drp_pkt[qc];
dt.stat[fd].r_drp_bytes[qc] += len;
@@ -471,13 +472,12 @@ static void packet_handler(int fd,
/* FIXME: Use qoscube from PCI instead of incoming flow. */
ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
if (ofd < 0) {
- log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
+ log_dbg("No next hop for %" PRIu64 ".",
+ dt_pci.dst_addr);
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
++dt.stat[fd].f_nhp_pkt[qc];
dt.stat[fd].f_nhp_bytes[qc] += len;
@@ -486,6 +486,8 @@ static void packet_handler(int fd,
return;
}
+ (void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len);
+
ret = ipcp_flow_write(ofd, sdb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", ofd);
@@ -493,12 +495,6 @@ static void packet_handler(int fd,
notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd);
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
pthread_mutex_lock(&dt.stat[ofd].lock);
++dt.stat[ofd].w_drp_pkt[qc];
@@ -509,12 +505,6 @@ static void packet_handler(int fd,
return;
}
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
pthread_mutex_lock(&dt.stat[ofd].lock);
++dt.stat[ofd].snd_pkt[qc];
@@ -525,65 +515,20 @@ static void packet_handler(int fd,
} else {
dt_pci_shrink(sdb);
if (dt_pci.eid >= PROG_RES_FDS) {
- if (ipcp_flow_write(dt_pci.eid, sdb)) {
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[fd].lock);
-
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
-
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].rcv_pkt[qc];
- dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
- ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
- dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
+ uint8_t ecn = *(head + dt_pci_info.ecn_o);
+ fa_np1_rcv(dt_pci.eid, ecn, sdb);
return;
}
if (dt.comps[dt_pci.eid].post_packet == NULL) {
- log_err("No registered component on eid %d.",
+ log_err("No registered component on eid %" PRIu64 ".",
dt_pci.eid);
ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[fd].lock);
-
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[fd].lock);
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
return;
}
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- ++dt.stat[fd].rcv_pkt[qc];
- dt.stat[fd].rcv_bytes[qc] += len;
++dt.stat[fd].lcl_r_pkt[qc];
dt.stat[fd].lcl_r_bytes[qc] += len;
@@ -620,14 +565,11 @@ static void * dt_conn_handle(void * o)
return 0;
}
-int dt_init(enum pol_routing pr,
- uint8_t addr_size,
- uint8_t eid_size,
- uint8_t max_ttl)
+int dt_init(struct dt_config cfg)
{
int i;
int j;
- char dtstr[256];
+ char dtstr[RIB_NAME_STRLEN + 1];
int pp;
struct conn_info info;
@@ -639,9 +581,14 @@ int dt_init(enum pol_routing pr,
info.pref_syntax = PROTO_FIXED;
info.addr = ipcpi.dt_addr;
- dt_pci_info.addr_size = addr_size;
- dt_pci_info.eid_size = eid_size;
- dt_pci_info.max_ttl = max_ttl;
+ if (cfg.eid_size != 8) { /* only support 64 bits from now */
+ log_warn("Invalid EID size. Only 64 bit is supported.");
+ cfg.eid_size = 8;
+ }
+
+ dt_pci_info.addr_size = cfg.addr_size;
+ dt_pci_info.eid_size = cfg.eid_size;
+ dt_pci_info.max_ttl = cfg.max_ttl;
dt_pci_info.qc_o = dt_pci_info.addr_size;
dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN;
@@ -649,17 +596,12 @@ int dt_init(enum pol_routing pr,
dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN;
dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size;
- if (notifier_reg(handle_event, NULL)) {
- log_err("Failed to register with notifier.");
- goto fail_notifier_reg;
- }
-
if (connmgr_comp_init(COMPID_DT, &info)) {
log_err("Failed to register with connmgr.");
goto fail_connmgr_comp_init;
}
- pp = routing_init(pr);
+ pp = routing_init(cfg.routing_type);
if (pp < 0) {
log_err("Failed to init routing.");
goto fail_routing;
@@ -697,6 +639,7 @@ int dt_init(enum pol_routing pr,
for (i = 0; i < PROG_MAX_FLOWS; ++i)
if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
+ log_err("Failed to init mutex for flow %d.", i);
for (j = 0; j < i; ++j)
pthread_mutex_destroy(&dt.stat[j].lock);
goto fail_stat_lock;
@@ -705,8 +648,10 @@ int dt_init(enum pol_routing pr,
dt.n_flows = 0;
#endif
sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
- if (rib_reg(dtstr, &r_ops))
+ if (rib_reg(dtstr, &r_ops)) {
+ log_err("Failed to register RIB.");
goto fail_rib_reg;
+ }
return 0;
@@ -730,16 +675,16 @@ int dt_init(enum pol_routing pr,
fail_routing:
connmgr_comp_fini(COMPID_DT);
fail_connmgr_comp_init:
- notifier_unreg(&handle_event);
- fail_notifier_reg:
return -1;
}
void dt_fini(void)
{
+ char dtstr[RIB_NAME_STRLEN + 1];
int i;
- rib_unreg(DT);
+ sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
+ rib_unreg(dtstr);
#ifdef IPCP_FLOW_STATS
for (i = 0; i < PROG_MAX_FLOWS; ++i)
pthread_mutex_destroy(&dt.stat[i].lock);
@@ -757,16 +702,19 @@ void dt_fini(void)
routing_fini();
connmgr_comp_fini(COMPID_DT);
-
- notifier_unreg(&handle_event);
}
int dt_start(void)
{
- dt.psched = psched_create(packet_handler);
+ dt.psched = psched_create(packet_handler, ipcp_flow_read);
if (dt.psched == NULL) {
log_err("Failed to create N-1 packet scheduler.");
- return -1;
+ goto fail_psched;
+ }
+
+ if (notifier_reg(handle_event, NULL)) {
+ log_err("Failed to register with notifier.");
+ goto fail_notifier_reg;
}
if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) {
@@ -776,12 +724,21 @@ int dt_start(void)
}
return 0;
+
+ fail_notifier_reg:
+ psched_destroy(dt.psched);
+ fail_psched:
+ return -1;
+
}
void dt_stop(void)
{
pthread_cancel(dt.listener);
pthread_join(dt.listener, NULL);
+
+ notifier_unreg(&handle_event);
+
psched_destroy(dt.psched);
}
@@ -789,81 +746,93 @@ int dt_reg_comp(void * comp,
void (* func)(void * func, struct shm_du_buff *),
char * name)
{
- int res_fd;
+ int eid;
assert(func);
pthread_rwlock_wrlock(&dt.lock);
- res_fd = bmp_allocate(dt.res_fds);
- if (!bmp_is_id_valid(dt.res_fds, res_fd)) {
- log_warn("Reserved fds depleted.");
+ eid = bmp_allocate(dt.res_fds);
+ if (!bmp_is_id_valid(dt.res_fds, eid)) {
+ log_err("Cannot allocate EID.");
pthread_rwlock_unlock(&dt.lock);
return -EBADF;
}
- assert(dt.comps[res_fd].post_packet == NULL);
- assert(dt.comps[res_fd].comp == NULL);
- assert(dt.comps[res_fd].name == NULL);
+ assert(dt.comps[eid].post_packet == NULL);
+ assert(dt.comps[eid].comp == NULL);
+ assert(dt.comps[eid].name == NULL);
- dt.comps[res_fd].post_packet = func;
- dt.comps[res_fd].comp = comp;
- dt.comps[res_fd].name = name;
+ dt.comps[eid].post_packet = func;
+ dt.comps[eid].comp = comp;
+ dt.comps[eid].name = name;
pthread_rwlock_unlock(&dt.lock);
#ifdef IPCP_FLOW_STATS
- stat_used(res_fd, ipcpi.dt_addr);
+ stat_used(eid, ipcpi.dt_addr);
#endif
- return res_fd;
+ return eid;
}
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- int np1_fd,
+ uint64_t eid,
struct shm_du_buff * sdb)
{
- int fd;
struct dt_pci dt_pci;
+ int fd;
int ret;
-#ifdef IPCP_FLOW_STATS
+ uint8_t * head;
size_t len;
-#endif
+
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
+ len = shm_du_buff_len(sdb);
+
+#ifdef IPCP_FLOW_STATS
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
+
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
+#endif
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
- ++dt.stat[np1_fd].f_nhp_pkt[qc];
- dt.stat[np1_fd].f_nhp_bytes[qc] += len;
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
- return -1;
+ return -EPERM;
+ }
+
+ head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size);
+ if (head == NULL) {
+ log_dbg("Failed to allocate DT header.");
+ goto fail_write;
}
+ len = shm_du_buff_len(sdb);
+
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
- dt_pci.eid = np1_fd;
+ dt_pci.eid = eid;
dt_pci.ecn = 0;
- if (dt_pci_ser(sdb, &dt_pci)) {
- log_dbg("Failed to serialize PDU.");
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
- goto fail_write;
- }
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
+ (void) ca_calc_ecn(fd, &dt_pci.ecn, qc, len);
+
+ dt_pci_ser(head, &dt_pci);
+
ret = ipcp_flow_write(fd, sdb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", fd);
@@ -872,12 +841,6 @@ int dt_write_packet(uint64_t dst_addr,
goto fail_write;
}
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
-
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
pthread_mutex_lock(&dt.stat[fd].lock);
if (dt_pci.eid < PROG_RES_FDS) {
@@ -893,15 +856,9 @@ int dt_write_packet(uint64_t dst_addr,
fail_write:
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
-
- ++dt.stat[np1_fd].lcl_w_pkt[qc];
- dt.stat[np1_fd].lcl_w_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
pthread_mutex_lock(&dt.stat[fd].lock);
- if (dt_pci.eid < PROG_RES_FDS) {
+ if (eid < PROG_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}