summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast')
-rw-r--r--src/ipcpd/unicast/dht.c5
-rw-r--r--src/ipcpd/unicast/dt.c40
-rw-r--r--src/ipcpd/unicast/dt.h2
-rw-r--r--src/ipcpd/unicast/fa.c120
-rw-r--r--src/ipcpd/unicast/fa.h2
5 files changed, 116 insertions, 53 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c
index b964ca08..b048580d 100644
--- a/src/ipcpd/unicast/dht.c
+++ b/src/ipcpd/unicast/dht.c
@@ -239,7 +239,7 @@ struct dht {
pthread_rwlock_t lock;
- uint32_t eid;
+ uint64_t eid;
struct tpm * tpm;
@@ -2815,6 +2815,9 @@ struct dht * dht_create(uint64_t addr)
goto fail_tpm_start;
dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT);
+ if ((int) dht->eid < 0)
+ goto fail_tpm_start;
+
notifier_reg(handle_event, dht);
#else
(void) handle_event;
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index c8aadabb..90318487 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -81,7 +81,7 @@ struct dt_pci {
qoscube_t qc;
uint8_t ttl;
uint8_t ecn;
- uint32_t eid;
+ uint64_t eid;
};
struct {
@@ -111,7 +111,7 @@ static void dt_pci_ser(uint8_t * head,
memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size);
memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN);
memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN);
- memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
+ memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size);
}
@@ -466,7 +466,8 @@ static void packet_handler(int fd,
/* FIXME: Use qoscube from PCI instead of incoming flow. */
ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
if (ofd < 0) {
- log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
+ log_dbg("No next hop for %" PRIu64 ".",
+ dt_pci.dst_addr);
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
@@ -514,7 +515,7 @@ static void packet_handler(int fd,
}
if (dt.comps[dt_pci.eid].post_packet == NULL) {
- log_err("No registered component on eid %d.",
+ log_err("No registered component on eid %" PRIu64 ".",
dt_pci.eid);
ipcp_sdb_release(sdb);
return;
@@ -577,6 +578,11 @@ int dt_init(enum pol_routing pr,
info.pref_syntax = PROTO_FIXED;
info.addr = ipcpi.dt_addr;
+ if (eid_size != 8) { /* only support 64 bits from now */
+ log_warn("Invalid EID size. Only 64 bit is supported.");
+ eid_size = 8;
+ }
+
dt_pci_info.addr_size = addr_size;
dt_pci_info.eid_size = eid_size;
dt_pci_info.max_ttl = max_ttl;
@@ -727,37 +733,37 @@ int dt_reg_comp(void * comp,
void (* func)(void * func, struct shm_du_buff *),
char * name)
{
- int res_fd;
+ int eid;
assert(func);
pthread_rwlock_wrlock(&dt.lock);
- res_fd = bmp_allocate(dt.res_fds);
- if (!bmp_is_id_valid(dt.res_fds, res_fd)) {
- log_warn("Reserved fds depleted.");
+ eid = bmp_allocate(dt.res_fds);
+ if (!bmp_is_id_valid(dt.res_fds, eid)) {
+ log_warn("Reserved EIDs depleted.");
pthread_rwlock_unlock(&dt.lock);
return -EBADF;
}
- assert(dt.comps[res_fd].post_packet == NULL);
- assert(dt.comps[res_fd].comp == NULL);
- assert(dt.comps[res_fd].name == NULL);
+ assert(dt.comps[eid].post_packet == NULL);
+ assert(dt.comps[eid].comp == NULL);
+ assert(dt.comps[eid].name == NULL);
- dt.comps[res_fd].post_packet = func;
- dt.comps[res_fd].comp = comp;
- dt.comps[res_fd].name = name;
+ dt.comps[eid].post_packet = func;
+ dt.comps[eid].comp = comp;
+ dt.comps[eid].name = name;
pthread_rwlock_unlock(&dt.lock);
#ifdef IPCP_FLOW_STATS
- stat_used(res_fd, ipcpi.dt_addr);
+ stat_used(eid, ipcpi.dt_addr);
#endif
- return res_fd;
+ return eid;
}
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- uint32_t eid,
+ uint64_t eid,
struct shm_du_buff * sdb)
{
struct dt_pci dt_pci;
diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h
index 3e569dfe..1fd8f295 100644
--- a/src/ipcpd/unicast/dt.h
+++ b/src/ipcpd/unicast/dt.h
@@ -49,7 +49,7 @@ int dt_reg_comp(void * comp,
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- uint32_t eid,
+ uint64_t eid,
struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_UNICAST_DT_H */
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 08c7a930..0d132619 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -37,6 +37,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/rib.h>
+#include <ouroboros/random.h>
#include "dir.h"
#include "fa.h"
@@ -61,8 +62,8 @@
struct fa_msg {
uint64_t s_addr;
- uint32_t r_eid;
- uint32_t s_eid;
+ uint64_t r_eid;
+ uint64_t s_eid;
uint8_t code;
int8_t response;
uint16_t ece;
@@ -94,9 +95,10 @@ struct fa_flow {
size_t b_rcv; /* Bytes received */
size_t b_rcv_f; /* Bytes received fail */
#endif
- int r_eid; /* remote endpoint id */
- uint64_t r_addr; /* remote address */
- void * ctx; /* congestion avoidance context */
+ uint64_t s_eid; /* Local endpoint id */
+ uint64_t r_eid; /* Remote endpoint id */
+ uint64_t r_addr; /* Remote address */
+ void * ctx; /* Congestion avoidance context */
};
struct {
@@ -123,6 +125,8 @@ static int fa_stat_read(const char * path,
struct fa_flow * flow;
int fd;
char r_addrstr[20];
+ char s_eidstr[20];
+ char r_eidstr[20];
char tmstr[20];
char castr[1024];
struct tm * tm;
@@ -147,6 +151,8 @@ static int fa_stat_read(const char * path,
}
sprintf(r_addrstr, "%" PRIu64, flow->r_addr);
+ sprintf(s_eidstr, "%" PRIu64, flow->s_eid);
+ sprintf(r_eidstr, "%" PRIu64, flow->r_eid);
tm = localtime(&flow->stamp);
strftime(tmstr, sizeof(tmstr), "%F %T", tm);
@@ -156,8 +162,8 @@ static int fa_stat_read(const char * path,
sprintf(buf,
"Flow established at: %20s\n"
"Remote address: %20s\n"
- "Local endpoint ID: %20d\n"
- "Remote endpoint ID: %20d\n"
+ "Local endpoint ID: %20s\n"
+ "Remote endpoint ID: %20s\n"
"Sent (packets): %20zu\n"
"Sent (bytes): %20zu\n"
"Send failed (packets): %20zu\n"
@@ -167,7 +173,8 @@ static int fa_stat_read(const char * path,
"Receive failed (packets): %20zu\n"
"Receive failed (bytes): %20zu\n"
"%s",
- tmstr, r_addrstr, fd, flow->r_eid,
+ tmstr, r_addrstr,
+ s_eidstr, r_eidstr,
flow->p_snd, flow->b_snd,
flow->p_snd_f, flow->b_snd_f,
flow->p_rcv, flow->b_rcv,
@@ -277,13 +284,43 @@ static struct rib_ops r_ops = {
.getattr = fa_stat_getattr
};
+static int eid_to_fd(uint64_t eid)
+{
+ struct fa_flow * flow;
+ int fd;
+
+ fd = eid & 0xFFFFFFFF;
+
+ if (fd < 0 || fd > PROG_MAX_FLOWS)
+ return -1;
+
+ flow = &fa.flows[fd];
+
+ if (flow->s_eid == eid)
+ return fd;
+
+ return -1;
+}
+
+static uint64_t gen_eid(int fd)
+{
+ uint32_t rnd;
+
+ if (random_buffer(&rnd, sizeof(rnd)) < 0)
+ return fa.eid; /* INVALID */
+
+ fd &= 0xFFFFFFFF;
+
+ return ((uint64_t) rnd << 32) + fd;
+}
+
static void packet_handler(int fd,
qoscube_t qc,
struct shm_du_buff * sdb)
{
struct fa_flow * flow;
uint64_t r_addr;
- uint32_t r_eid;
+ uint64_t r_eid;
ca_wnd_t wnd;
size_t len;
@@ -327,6 +364,7 @@ static int fa_flow_init(struct fa_flow * flow)
memset(flow, 0, sizeof(*flow));
flow->r_eid = -1;
+ flow->s_eid = -1;
flow->r_addr = INVALID_ADDR;
flow->ctx = ca_ctx_create();
@@ -350,6 +388,7 @@ static void fa_flow_fini(struct fa_flow * flow)
memset(flow, 0, sizeof(*flow));
flow->r_eid = -1;
+ flow->s_eid = -1;
flow->r_addr = INVALID_ADDR;
#ifdef IPCP_FLOW_STATS
@@ -485,7 +524,8 @@ static void * fa_handle_packet(void * o)
fa_flow_init(flow);
- flow->r_eid = ntoh32(msg->s_eid);
+ flow->s_eid = gen_eid(fd);
+ flow->r_eid = ntoh64(msg->s_eid);
flow->r_addr = ntoh64(msg->s_addr);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -499,20 +539,26 @@ static void * fa_handle_packet(void * o)
case FLOW_REPLY:
assert(len >= sizeof(*msg));
- flow = &fa.flows[ntoh32(msg->r_eid)];
-
pthread_rwlock_wrlock(&fa.flows_lock);
- flow->r_eid = ntoh32(msg->s_eid);
+ fd = eid_to_fd(ntoh64(msg->r_eid));
+ if (fd < 0) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ break;
+ }
+
+ flow = &fa.flows[fd];
+
+ flow->r_eid = ntoh64(msg->s_eid);
if (msg->response < 0)
fa_flow_fini(flow);
else
- psched_add(fa.psched, ntoh32(msg->r_eid));
+ psched_add(fa.psched, fd);
pthread_rwlock_unlock(&fa.flows_lock);
- ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
+ ipcp_flow_alloc_reply(fd,
msg->response,
buf + sizeof(*msg),
len - sizeof(*msg));
@@ -520,10 +566,16 @@ static void * fa_handle_packet(void * o)
case FLOW_UPDATE:
assert(len >= sizeof(*msg));
- flow = &fa.flows[ntoh32(msg->r_eid)];
-
pthread_rwlock_wrlock(&fa.flows_lock);
+ fd = eid_to_fd(ntoh64(msg->r_eid));
+ if (fd < 0) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ break;
+ }
+
+ flow = &fa.flows[fd];
+
ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
pthread_rwlock_unlock(&fa.flows_lock);
@@ -560,12 +612,14 @@ int fa_init(void)
list_head_init(&fa.cmds);
- fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA);
-
sprintf(fastr, "%s", FA);
if (rib_reg(fastr, &r_ops))
goto fail_rib_reg;
+ fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA);
+ if ((int) fa.eid < 0)
+ goto fail_rib_reg;
+
return 0;
fail_rib_reg:
@@ -655,6 +709,7 @@ int fa_alloc(int fd,
uint64_t addr;
qoscube_t qc = QOS_CUBE_BE;
size_t len;
+ uint64_t eid;
addr = dir_query(dst);
if (addr == 0)
@@ -668,8 +723,10 @@ int fa_alloc(int fd,
msg = (struct fa_msg *) shm_du_buff_head(sdb);
memset(msg, 0, sizeof(*msg));
+ eid = gen_eid(fd);
+
msg->code = FLOW_REQ;
- msg->s_eid = hton32(fd);
+ msg->s_eid = hton64(eid);
msg->s_addr = hton64(ipcpi.dt_addr);
msg->delay = hton32(qs.delay);
msg->bandwidth = hton64(qs.bandwidth);
@@ -694,6 +751,7 @@ int fa_alloc(int fd,
fa_flow_init(flow);
flow->r_addr = addr;
+ flow->s_eid = eid;
pthread_rwlock_unlock(&fa.flows_lock);
@@ -746,8 +804,8 @@ int fa_alloc_resp(int fd,
pthread_rwlock_wrlock(&fa.flows_lock);
msg->code = FLOW_REPLY;
- msg->r_eid = hton32(flow->r_eid);
- msg->s_eid = hton32(fd);
+ msg->r_eid = hton64(flow->r_eid);
+ msg->s_eid = hton64(flow->s_eid);
msg->response = response;
memcpy(msg + 1, data, len);
@@ -811,7 +869,7 @@ static int fa_update_remote(int fd,
pthread_rwlock_rdlock(&fa.flows_lock);
msg->code = FLOW_UPDATE;
- msg->r_eid = hton32(flow->r_eid);
+ msg->r_eid = hton64(flow->r_eid);
msg->ece = hton16(ece);
r_addr = flow->r_addr;
@@ -827,7 +885,7 @@ static int fa_update_remote(int fd,
return 0;
}
-void fa_np1_rcv(uint32_t eid,
+void fa_np1_rcv(uint64_t eid,
uint8_t ecn,
struct shm_du_buff * sdb)
{
@@ -837,23 +895,19 @@ void fa_np1_rcv(uint32_t eid,
int fd;
size_t len;
- fd = (int) eid;
- if (fd < 0) {
- ipcp_sdb_release(sdb);
- return;
- }
-
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
- flow = &fa.flows[fd];
-
pthread_rwlock_wrlock(&fa.flows_lock);
- if (flow->r_eid == -1) {
+ fd = eid_to_fd(eid);
+ if (fd < 0) {
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
return;
}
+
+ flow = &fa.flows[fd];
+
#ifdef IPCP_FLOW_STATS
++flow->p_rcv;
flow->b_rcv += len;
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index c5c1baec..ea06a64e 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,7 +47,7 @@ int fa_alloc_resp(int fd,
int fa_dealloc(int fd);
-void fa_np1_rcv(uint32_t eid,
+void fa_np1_rcv(uint64_t eid,
uint8_t ecn,
struct shm_du_buff * sdb);