diff options
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r-- | src/ipcpd/unicast/fa.c | 222 |
1 files changed, 170 insertions, 52 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index e154d785..8f268a9d 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -42,6 +42,7 @@ #include "psched.h" #include "ipcp.h" #include "dt.h" +#include "ca.h" #include <pthread.h> #include <stdlib.h> @@ -49,9 +50,10 @@ #define TIMEOUT 10000 /* nanoseconds */ -#define FLOW_REQ 0 -#define FLOW_REPLY 1 -#define MSGBUFSZ 2048 +#define FLOW_REQ 0 +#define FLOW_REPLY 1 +#define FLOW_UPDATE 2 +#define MSGBUFSZ 2048 struct fa_msg { uint64_t s_addr; @@ -59,6 +61,7 @@ struct fa_msg { uint32_t s_eid; uint8_t code; int8_t response; + uint16_t ece; /* QoS parameters from spec, aligned */ uint8_t availability; uint8_t in_order; @@ -75,10 +78,16 @@ struct cmd { struct shm_du_buff * sdb; }; +struct fa_flow { + int r_eid; /* remote endpoint id */ + uint64_t r_addr; /* remote address */ + void * ctx; /* congestion avoidance context */ +}; + struct { pthread_rwlock_t flows_lock; - int r_eid[PROG_MAX_FLOWS]; - uint64_t r_addr[PROG_MAX_FLOWS]; + struct fa_flow flows[PROG_MAX_FLOWS]; + int fd; struct list_head cmds; @@ -93,22 +102,56 @@ static void packet_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) { - pthread_rwlock_rdlock(&fa.flows_lock); + struct fa_flow * flow; + uint64_t r_addr; + uint32_t r_eid; + ca_wnd_t wnd; + size_t len; - if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { - pthread_rwlock_unlock(&fa.flows_lock); + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + + wnd = ca_ctx_update_snd(flow->ctx, len); + + r_addr = flow->r_addr; + r_eid = flow->r_eid; + + pthread_rwlock_unlock(&fa.flows_lock); + + ca_wnd_wait(wnd); + + if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); log_warn("Failed to forward packet."); return; } +} - pthread_rwlock_unlock(&fa.flows_lock); +static int fa_flow_init(struct fa_flow * flow) +{ + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->r_addr = INVALID_ADDR; + + flow->ctx = ca_ctx_create(); + if (flow->ctx == NULL) + return -1; + + return 0; } -static void destroy_conn(int fd) +static void fa_flow_fini(struct fa_flow * flow) { - fa.r_eid[fd] = -1; - fa.r_addr[fd] = INVALID_ADDR; + ca_ctx_destroy(flow->ctx); + + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->r_addr = INVALID_ADDR; } static void fa_post_packet(void * comp, @@ -145,14 +188,15 @@ static void * fa_handle_packet(void * o) (void) o; while (true) { - struct timespec abstime; - int fd; - uint8_t buf[MSGBUFSZ]; - struct fa_msg * msg; - qosspec_t qs; - struct cmd * cmd; - size_t len; - size_t msg_len; + struct timespec abstime; + int fd; + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + qosspec_t qs; + struct cmd * cmd; + size_t len; + size_t msg_len; + struct fa_flow * flow; pthread_mutex_lock(&fa.mtx); @@ -232,10 +276,14 @@ static void * fa_handle_packet(void * o) continue; } + flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[fd] = ntoh32(msg->s_eid); - fa.r_addr[fd] = ntoh64(msg->s_addr); + fa_flow_init(flow); + + flow->r_eid = ntoh32(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -248,19 +296,32 @@ static void * fa_handle_packet(void * o) case FLOW_REPLY: assert(len >= sizeof(*msg)); + flow = &fa.flows[ntoh32(msg->r_eid)]; + pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + flow->r_eid = ntoh32(msg->s_eid); + + if (msg->response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, ntoh32(msg->r_eid)); + + pthread_rwlock_unlock(&fa.flows_lock); ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response, buf + sizeof(*msg), len - sizeof(*msg)); + break; + case FLOW_UPDATE: + assert(len >= sizeof(*msg)); - if (msg->response < 0) - destroy_conn(ntoh32(msg->r_eid)); - else - psched_add(fa.psched, ntoh32(msg->r_eid)); + flow = &fa.flows[ntoh32(msg->r_eid)]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); pthread_rwlock_unlock(&fa.flows_lock); @@ -275,10 +336,6 @@ static void * fa_handle_packet(void * o) int fa_init(void) { pthread_condattr_t cattr; - int i; - - for (i = 0; i < PROG_MAX_FLOWS; ++i) - destroy_conn(i); if (pthread_rwlock_init(&fa.flows_lock, NULL)) goto fail_rwlock; @@ -383,9 +440,10 @@ int fa_alloc(int fd, size_t dlen) { struct fa_msg * msg; - uint64_t addr; struct shm_du_buff * sdb; - qoscube_t qc; + struct fa_flow * flow; + uint64_t addr; + qoscube_t qc = QOS_CUBE_BE; size_t len; addr = dir_query(dst); @@ -397,7 +455,9 @@ int fa_alloc(int fd, if (ipcp_sdb_reserve(&sdb, len + dlen)) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + msg->code = FLOW_REQ; msg->s_eid = hton32(fd); msg->s_addr = hton64(ipcpi.dt_addr); @@ -413,17 +473,17 @@ int fa_alloc(int fd, memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(shm_du_buff_head(sdb) + len, data, dlen); - qc = qos_spec_to_cube(qs); - if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } + flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - assert(fa.r_eid[fd] == -1); - fa.r_addr[fd] = addr; + fa_flow_init(flow); + flow->r_addr = addr; pthread_rwlock_unlock(&fa.flows_lock); @@ -439,10 +499,13 @@ int fa_alloc_resp(int fd, struct timespec abstime; struct fa_msg * msg; struct shm_du_buff * sdb; - qoscube_t qc; + struct fa_flow * flow; + qoscube_t qc = QOS_CUBE_BE; clock_gettime(PTHREAD_COND_CLOCK, &abstime); + flow = &fa.flows[fd]; + pthread_mutex_lock(&ipcpi.alloc_lock); while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { @@ -463,33 +526,31 @@ int fa_alloc_resp(int fd, pthread_mutex_unlock(&ipcpi.alloc_lock); if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - destroy_conn(fd); + fa_flow_fini(flow); return -1; } + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + pthread_rwlock_wrlock(&fa.flows_lock); - msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REPLY; - msg->r_eid = hton32(fa.r_eid[fd]); + msg->r_eid = hton32(flow->r_eid); msg->s_eid = hton32(fd); msg->response = response; memcpy(msg + 1, data, len); if (response < 0) { - destroy_conn(fd); + fa_flow_fini(flow); ipcp_sdb_release(sdb); } else { psched_add(fa.psched, fd); } - ipcp_flow_get_qoscube(fd, &qc); - - assert(qc >= 0 && qc < QOS_CUBE_MAX); - - if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { - destroy_conn(fd); + if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { + fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); return -1; @@ -505,11 +566,11 @@ int fa_dealloc(int fd) if (ipcp_flow_fini(fd) < 0) return 0; - pthread_rwlock_wrlock(&fa.flows_lock); - psched_del(fa.psched, fd); - destroy_conn(fd); + pthread_rwlock_wrlock(&fa.flows_lock); + + fa_flow_fini(&fa.flows[fd]); pthread_rwlock_unlock(&fa.flows_lock); @@ -517,3 +578,60 @@ int fa_dealloc(int fd) return 0; } + +static int fa_update_remote(int fd, + uint16_t ece) +{ + struct fa_msg * msg; + struct shm_du_buff * sdb; + qoscube_t qc = QOS_CUBE_BE; + struct fa_flow * flow; + + if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + return -1; + } + + msg = (struct fa_msg *) shm_du_buff_head(sdb); + + memset(msg, 0, sizeof(*msg)); + + flow = &fa.flows[fd]; + + pthread_rwlock_rdlock(&fa.flows_lock); + + msg->code = FLOW_UPDATE; + msg->r_eid = hton32(flow->r_eid); + msg->ece = hton16(ece); + + if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { + pthread_rwlock_unlock(&fa.flows_lock); + ipcp_sdb_release(sdb); + return -1; + } + + pthread_rwlock_unlock(&fa.flows_lock); + + + return 0; +} + +void fa_ecn_update(int eid, + uint8_t ecn, + size_t len) +{ + struct fa_flow * flow; + bool update; + uint16_t ece; + + flow = &fa.flows[eid]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (update) + fa_update_remote(eid, ece); + +} |