diff options
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r-- | src/ipcpd/unicast/fa.c | 120 |
1 files changed, 87 insertions, 33 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 08c7a930..0d132619 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -37,6 +37,7 @@ #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/rib.h> +#include <ouroboros/random.h> #include "dir.h" #include "fa.h" @@ -61,8 +62,8 @@ struct fa_msg { uint64_t s_addr; - uint32_t r_eid; - uint32_t s_eid; + uint64_t r_eid; + uint64_t s_eid; uint8_t code; int8_t response; uint16_t ece; @@ -94,9 +95,10 @@ struct fa_flow { size_t b_rcv; /* Bytes received */ size_t b_rcv_f; /* Bytes received fail */ #endif - int r_eid; /* remote endpoint id */ - uint64_t r_addr; /* remote address */ - void * ctx; /* congestion avoidance context */ + uint64_t s_eid; /* Local endpoint id */ + uint64_t r_eid; /* Remote endpoint id */ + uint64_t r_addr; /* Remote address */ + void * ctx; /* Congestion avoidance context */ }; struct { @@ -123,6 +125,8 @@ static int fa_stat_read(const char * path, struct fa_flow * flow; int fd; char r_addrstr[20]; + char s_eidstr[20]; + char r_eidstr[20]; char tmstr[20]; char castr[1024]; struct tm * tm; @@ -147,6 +151,8 @@ static int fa_stat_read(const char * path, } sprintf(r_addrstr, "%" PRIu64, flow->r_addr); + sprintf(s_eidstr, "%" PRIu64, flow->s_eid); + sprintf(r_eidstr, "%" PRIu64, flow->r_eid); tm = localtime(&flow->stamp); strftime(tmstr, sizeof(tmstr), "%F %T", tm); @@ -156,8 +162,8 @@ static int fa_stat_read(const char * path, sprintf(buf, "Flow established at: %20s\n" "Remote address: %20s\n" - "Local endpoint ID: %20d\n" - "Remote endpoint ID: %20d\n" + "Local endpoint ID: %20s\n" + "Remote endpoint ID: %20s\n" "Sent (packets): %20zu\n" "Sent (bytes): %20zu\n" "Send failed (packets): %20zu\n" @@ -167,7 +173,8 @@ static int fa_stat_read(const char * path, "Receive failed (packets): %20zu\n" "Receive failed (bytes): %20zu\n" "%s", - tmstr, r_addrstr, fd, flow->r_eid, + tmstr, r_addrstr, + s_eidstr, r_eidstr, flow->p_snd, flow->b_snd, flow->p_snd_f, flow->b_snd_f, flow->p_rcv, flow->b_rcv, @@ -277,13 +284,43 @@ static struct rib_ops r_ops = { .getattr = fa_stat_getattr }; +static int eid_to_fd(uint64_t eid) +{ + struct fa_flow * flow; + int fd; + + fd = eid & 0xFFFFFFFF; + + if (fd < 0 || fd > PROG_MAX_FLOWS) + return -1; + + flow = &fa.flows[fd]; + + if (flow->s_eid == eid) + return fd; + + return -1; +} + +static uint64_t gen_eid(int fd) +{ + uint32_t rnd; + + if (random_buffer(&rnd, sizeof(rnd)) < 0) + return fa.eid; /* INVALID */ + + fd &= 0xFFFFFFFF; + + return ((uint64_t) rnd << 32) + fd; +} + static void packet_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) { struct fa_flow * flow; uint64_t r_addr; - uint32_t r_eid; + uint64_t r_eid; ca_wnd_t wnd; size_t len; @@ -327,6 +364,7 @@ static int fa_flow_init(struct fa_flow * flow) memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; + flow->s_eid = -1; flow->r_addr = INVALID_ADDR; flow->ctx = ca_ctx_create(); @@ -350,6 +388,7 @@ static void fa_flow_fini(struct fa_flow * flow) memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; + flow->s_eid = -1; flow->r_addr = INVALID_ADDR; #ifdef IPCP_FLOW_STATS @@ -485,7 +524,8 @@ static void * fa_handle_packet(void * o) fa_flow_init(flow); - flow->r_eid = ntoh32(msg->s_eid); + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); flow->r_addr = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -499,20 +539,26 @@ static void * fa_handle_packet(void * o) case FLOW_REPLY: assert(len >= sizeof(*msg)); - flow = &fa.flows[ntoh32(msg->r_eid)]; - pthread_rwlock_wrlock(&fa.flows_lock); - flow->r_eid = ntoh32(msg->s_eid); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + break; + } + + flow = &fa.flows[fd]; + + flow->r_eid = ntoh64(msg->s_eid); if (msg->response < 0) fa_flow_fini(flow); else - psched_add(fa.psched, ntoh32(msg->r_eid)); + psched_add(fa.psched, fd); pthread_rwlock_unlock(&fa.flows_lock); - ipcp_flow_alloc_reply(ntoh32(msg->r_eid), + ipcp_flow_alloc_reply(fd, msg->response, buf + sizeof(*msg), len - sizeof(*msg)); @@ -520,10 +566,16 @@ static void * fa_handle_packet(void * o) case FLOW_UPDATE: assert(len >= sizeof(*msg)); - flow = &fa.flows[ntoh32(msg->r_eid)]; - pthread_rwlock_wrlock(&fa.flows_lock); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + break; + } + + flow = &fa.flows[fd]; + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); pthread_rwlock_unlock(&fa.flows_lock); @@ -560,12 +612,14 @@ int fa_init(void) list_head_init(&fa.cmds); - fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); - sprintf(fastr, "%s", FA); if (rib_reg(fastr, &r_ops)) goto fail_rib_reg; + fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); + if ((int) fa.eid < 0) + goto fail_rib_reg; + return 0; fail_rib_reg: @@ -655,6 +709,7 @@ int fa_alloc(int fd, uint64_t addr; qoscube_t qc = QOS_CUBE_BE; size_t len; + uint64_t eid; addr = dir_query(dst); if (addr == 0) @@ -668,8 +723,10 @@ int fa_alloc(int fd, msg = (struct fa_msg *) shm_du_buff_head(sdb); memset(msg, 0, sizeof(*msg)); + eid = gen_eid(fd); + msg->code = FLOW_REQ; - msg->s_eid = hton32(fd); + msg->s_eid = hton64(eid); msg->s_addr = hton64(ipcpi.dt_addr); msg->delay = hton32(qs.delay); msg->bandwidth = hton64(qs.bandwidth); @@ -694,6 +751,7 @@ int fa_alloc(int fd, fa_flow_init(flow); flow->r_addr = addr; + flow->s_eid = eid; pthread_rwlock_unlock(&fa.flows_lock); @@ -746,8 +804,8 @@ int fa_alloc_resp(int fd, pthread_rwlock_wrlock(&fa.flows_lock); msg->code = FLOW_REPLY; - msg->r_eid = hton32(flow->r_eid); - msg->s_eid = hton32(fd); + msg->r_eid = hton64(flow->r_eid); + msg->s_eid = hton64(flow->s_eid); msg->response = response; memcpy(msg + 1, data, len); @@ -811,7 +869,7 @@ static int fa_update_remote(int fd, pthread_rwlock_rdlock(&fa.flows_lock); msg->code = FLOW_UPDATE; - msg->r_eid = hton32(flow->r_eid); + msg->r_eid = hton64(flow->r_eid); msg->ece = hton16(ece); r_addr = flow->r_addr; @@ -827,7 +885,7 @@ static int fa_update_remote(int fd, return 0; } -void fa_np1_rcv(uint32_t eid, +void fa_np1_rcv(uint64_t eid, uint8_t ecn, struct shm_du_buff * sdb) { @@ -837,23 +895,19 @@ void fa_np1_rcv(uint32_t eid, int fd; size_t len; - fd = (int) eid; - if (fd < 0) { - ipcp_sdb_release(sdb); - return; - } - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); - flow = &fa.flows[fd]; - pthread_rwlock_wrlock(&fa.flows_lock); - if (flow->r_eid == -1) { + fd = eid_to_fd(eid); + if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); return; } + + flow = &fa.flows[fd]; + #ifdef IPCP_FLOW_STATS ++flow->p_rcv; flow->b_rcv += len; |