diff options
Diffstat (limited to 'src/ipcpd/unicast')
-rw-r--r-- | src/ipcpd/unicast/dht.c | 5 | ||||
-rw-r--r-- | src/ipcpd/unicast/dt.c | 40 | ||||
-rw-r--r-- | src/ipcpd/unicast/dt.h | 2 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.c | 120 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.h | 2 |
5 files changed, 116 insertions, 53 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c index b964ca08..b048580d 100644 --- a/src/ipcpd/unicast/dht.c +++ b/src/ipcpd/unicast/dht.c @@ -239,7 +239,7 @@ struct dht { pthread_rwlock_t lock; - uint32_t eid; + uint64_t eid; struct tpm * tpm; @@ -2815,6 +2815,9 @@ struct dht * dht_create(uint64_t addr) goto fail_tpm_start; dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); + if ((int) dht->eid < 0) + goto fail_tpm_start; + notifier_reg(handle_event, dht); #else (void) handle_event; diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index c8aadabb..90318487 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -81,7 +81,7 @@ struct dt_pci { qoscube_t qc; uint8_t ttl; uint8_t ecn; - uint32_t eid; + uint64_t eid; }; struct { @@ -111,7 +111,7 @@ static void dt_pci_ser(uint8_t * head, memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size); memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN); memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN); - memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN); + memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN); memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size); } @@ -466,7 +466,8 @@ static void packet_handler(int fd, /* FIXME: Use qoscube from PCI instead of incoming flow. */ ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); if (ofd < 0) { - log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr); + log_dbg("No next hop for %" PRIu64 ".", + dt_pci.dst_addr); ipcp_sdb_release(sdb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -514,7 +515,7 @@ static void packet_handler(int fd, } if (dt.comps[dt_pci.eid].post_packet == NULL) { - log_err("No registered component on eid %d.", + log_err("No registered component on eid %" PRIu64 ".", dt_pci.eid); ipcp_sdb_release(sdb); return; @@ -577,6 +578,11 @@ int dt_init(enum pol_routing pr, info.pref_syntax = PROTO_FIXED; info.addr = ipcpi.dt_addr; + if (eid_size != 8) { /* only support 64 bits from now */ + log_warn("Invalid EID size. Only 64 bit is supported."); + eid_size = 8; + } + dt_pci_info.addr_size = addr_size; dt_pci_info.eid_size = eid_size; dt_pci_info.max_ttl = max_ttl; @@ -727,37 +733,37 @@ int dt_reg_comp(void * comp, void (* func)(void * func, struct shm_du_buff *), char * name) { - int res_fd; + int eid; assert(func); pthread_rwlock_wrlock(&dt.lock); - res_fd = bmp_allocate(dt.res_fds); - if (!bmp_is_id_valid(dt.res_fds, res_fd)) { - log_warn("Reserved fds depleted."); + eid = bmp_allocate(dt.res_fds); + if (!bmp_is_id_valid(dt.res_fds, eid)) { + log_warn("Reserved EIDs depleted."); pthread_rwlock_unlock(&dt.lock); return -EBADF; } - assert(dt.comps[res_fd].post_packet == NULL); - assert(dt.comps[res_fd].comp == NULL); - assert(dt.comps[res_fd].name == NULL); + assert(dt.comps[eid].post_packet == NULL); + assert(dt.comps[eid].comp == NULL); + assert(dt.comps[eid].name == NULL); - dt.comps[res_fd].post_packet = func; - dt.comps[res_fd].comp = comp; - dt.comps[res_fd].name = name; + dt.comps[eid].post_packet = func; + dt.comps[eid].comp = comp; + dt.comps[eid].name = name; pthread_rwlock_unlock(&dt.lock); #ifdef IPCP_FLOW_STATS - stat_used(res_fd, ipcpi.dt_addr); + stat_used(eid, ipcpi.dt_addr); #endif - return res_fd; + return eid; } int dt_write_packet(uint64_t dst_addr, qoscube_t qc, - uint32_t eid, + uint64_t eid, struct shm_du_buff * sdb) { struct dt_pci dt_pci; diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 3e569dfe..1fd8f295 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -49,7 +49,7 @@ int dt_reg_comp(void * comp, int dt_write_packet(uint64_t dst_addr, qoscube_t qc, - uint32_t eid, + uint64_t eid, struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_UNICAST_DT_H */ diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 08c7a930..0d132619 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -37,6 +37,7 @@ #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/rib.h> +#include <ouroboros/random.h> #include "dir.h" #include "fa.h" @@ -61,8 +62,8 @@ struct fa_msg { uint64_t s_addr; - uint32_t r_eid; - uint32_t s_eid; + uint64_t r_eid; + uint64_t s_eid; uint8_t code; int8_t response; uint16_t ece; @@ -94,9 +95,10 @@ struct fa_flow { size_t b_rcv; /* Bytes received */ size_t b_rcv_f; /* Bytes received fail */ #endif - int r_eid; /* remote endpoint id */ - uint64_t r_addr; /* remote address */ - void * ctx; /* congestion avoidance context */ + uint64_t s_eid; /* Local endpoint id */ + uint64_t r_eid; /* Remote endpoint id */ + uint64_t r_addr; /* Remote address */ + void * ctx; /* Congestion avoidance context */ }; struct { @@ -123,6 +125,8 @@ static int fa_stat_read(const char * path, struct fa_flow * flow; int fd; char r_addrstr[20]; + char s_eidstr[20]; + char r_eidstr[20]; char tmstr[20]; char castr[1024]; struct tm * tm; @@ -147,6 +151,8 @@ static int fa_stat_read(const char * path, } sprintf(r_addrstr, "%" PRIu64, flow->r_addr); + sprintf(s_eidstr, "%" PRIu64, flow->s_eid); + sprintf(r_eidstr, "%" PRIu64, flow->r_eid); tm = localtime(&flow->stamp); strftime(tmstr, sizeof(tmstr), "%F %T", tm); @@ -156,8 +162,8 @@ static int fa_stat_read(const char * path, sprintf(buf, "Flow established at: %20s\n" "Remote address: %20s\n" - "Local endpoint ID: %20d\n" - "Remote endpoint ID: %20d\n" + "Local endpoint ID: %20s\n" + "Remote endpoint ID: %20s\n" "Sent (packets): %20zu\n" "Sent (bytes): %20zu\n" "Send failed (packets): %20zu\n" @@ -167,7 +173,8 @@ static int fa_stat_read(const char * path, "Receive failed (packets): %20zu\n" "Receive failed (bytes): %20zu\n" "%s", - tmstr, r_addrstr, fd, flow->r_eid, + tmstr, r_addrstr, + s_eidstr, r_eidstr, flow->p_snd, flow->b_snd, flow->p_snd_f, flow->b_snd_f, flow->p_rcv, flow->b_rcv, @@ -277,13 +284,43 @@ static struct rib_ops r_ops = { .getattr = fa_stat_getattr }; +static int eid_to_fd(uint64_t eid) +{ + struct fa_flow * flow; + int fd; + + fd = eid & 0xFFFFFFFF; + + if (fd < 0 || fd > PROG_MAX_FLOWS) + return -1; + + flow = &fa.flows[fd]; + + if (flow->s_eid == eid) + return fd; + + return -1; +} + +static uint64_t gen_eid(int fd) +{ + uint32_t rnd; + + if (random_buffer(&rnd, sizeof(rnd)) < 0) + return fa.eid; /* INVALID */ + + fd &= 0xFFFFFFFF; + + return ((uint64_t) rnd << 32) + fd; +} + static void packet_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) { struct fa_flow * flow; uint64_t r_addr; - uint32_t r_eid; + uint64_t r_eid; ca_wnd_t wnd; size_t len; @@ -327,6 +364,7 @@ static int fa_flow_init(struct fa_flow * flow) memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; + flow->s_eid = -1; flow->r_addr = INVALID_ADDR; flow->ctx = ca_ctx_create(); @@ -350,6 +388,7 @@ static void fa_flow_fini(struct fa_flow * flow) memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; + flow->s_eid = -1; flow->r_addr = INVALID_ADDR; #ifdef IPCP_FLOW_STATS @@ -485,7 +524,8 @@ static void * fa_handle_packet(void * o) fa_flow_init(flow); - flow->r_eid = ntoh32(msg->s_eid); + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); flow->r_addr = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -499,20 +539,26 @@ static void * fa_handle_packet(void * o) case FLOW_REPLY: assert(len >= sizeof(*msg)); - flow = &fa.flows[ntoh32(msg->r_eid)]; - pthread_rwlock_wrlock(&fa.flows_lock); - flow->r_eid = ntoh32(msg->s_eid); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + break; + } + + flow = &fa.flows[fd]; + + flow->r_eid = ntoh64(msg->s_eid); if (msg->response < 0) fa_flow_fini(flow); else - psched_add(fa.psched, ntoh32(msg->r_eid)); + psched_add(fa.psched, fd); pthread_rwlock_unlock(&fa.flows_lock); - ipcp_flow_alloc_reply(ntoh32(msg->r_eid), + ipcp_flow_alloc_reply(fd, msg->response, buf + sizeof(*msg), len - sizeof(*msg)); @@ -520,10 +566,16 @@ static void * fa_handle_packet(void * o) case FLOW_UPDATE: assert(len >= sizeof(*msg)); - flow = &fa.flows[ntoh32(msg->r_eid)]; - pthread_rwlock_wrlock(&fa.flows_lock); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + break; + } + + flow = &fa.flows[fd]; + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); pthread_rwlock_unlock(&fa.flows_lock); @@ -560,12 +612,14 @@ int fa_init(void) list_head_init(&fa.cmds); - fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); - sprintf(fastr, "%s", FA); if (rib_reg(fastr, &r_ops)) goto fail_rib_reg; + fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); + if ((int) fa.eid < 0) + goto fail_rib_reg; + return 0; fail_rib_reg: @@ -655,6 +709,7 @@ int fa_alloc(int fd, uint64_t addr; qoscube_t qc = QOS_CUBE_BE; size_t len; + uint64_t eid; addr = dir_query(dst); if (addr == 0) @@ -668,8 +723,10 @@ int fa_alloc(int fd, msg = (struct fa_msg *) shm_du_buff_head(sdb); memset(msg, 0, sizeof(*msg)); + eid = gen_eid(fd); + msg->code = FLOW_REQ; - msg->s_eid = hton32(fd); + msg->s_eid = hton64(eid); msg->s_addr = hton64(ipcpi.dt_addr); msg->delay = hton32(qs.delay); msg->bandwidth = hton64(qs.bandwidth); @@ -694,6 +751,7 @@ int fa_alloc(int fd, fa_flow_init(flow); flow->r_addr = addr; + flow->s_eid = eid; pthread_rwlock_unlock(&fa.flows_lock); @@ -746,8 +804,8 @@ int fa_alloc_resp(int fd, pthread_rwlock_wrlock(&fa.flows_lock); msg->code = FLOW_REPLY; - msg->r_eid = hton32(flow->r_eid); - msg->s_eid = hton32(fd); + msg->r_eid = hton64(flow->r_eid); + msg->s_eid = hton64(flow->s_eid); msg->response = response; memcpy(msg + 1, data, len); @@ -811,7 +869,7 @@ static int fa_update_remote(int fd, pthread_rwlock_rdlock(&fa.flows_lock); msg->code = FLOW_UPDATE; - msg->r_eid = hton32(flow->r_eid); + msg->r_eid = hton64(flow->r_eid); msg->ece = hton16(ece); r_addr = flow->r_addr; @@ -827,7 +885,7 @@ static int fa_update_remote(int fd, return 0; } -void fa_np1_rcv(uint32_t eid, +void fa_np1_rcv(uint64_t eid, uint8_t ecn, struct shm_du_buff * sdb) { @@ -837,23 +895,19 @@ void fa_np1_rcv(uint32_t eid, int fd; size_t len; - fd = (int) eid; - if (fd < 0) { - ipcp_sdb_release(sdb); - return; - } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); - flow = &fa.flows[fd]; - pthread_rwlock_wrlock(&fa.flows_lock); - if (flow->r_eid == -1) { + fd = eid_to_fd(eid); + if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); return; } + + flow = &fa.flows[fd]; + #ifdef IPCP_FLOW_STATS ++flow->p_rcv; flow->b_rcv += len; diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index c5c1baec..ea06a64e 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -47,7 +47,7 @@ int fa_alloc_resp(int fd, int fa_dealloc(int fd); -void fa_np1_rcv(uint32_t eid, +void fa_np1_rcv(uint64_t eid, uint8_t ecn, struct shm_du_buff * sdb); |