summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/fa.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r--src/ipcpd/unicast/fa.c222
1 files changed, 170 insertions, 52 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index e154d785..8f268a9d 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -42,6 +42,7 @@
#include "psched.h"
#include "ipcp.h"
#include "dt.h"
+#include "ca.h"
#include <pthread.h>
#include <stdlib.h>
@@ -49,9 +50,10 @@
#define TIMEOUT 10000 /* nanoseconds */
-#define FLOW_REQ 0
-#define FLOW_REPLY 1
-#define MSGBUFSZ 2048
+#define FLOW_REQ 0
+#define FLOW_REPLY 1
+#define FLOW_UPDATE 2
+#define MSGBUFSZ 2048
struct fa_msg {
uint64_t s_addr;
@@ -59,6 +61,7 @@ struct fa_msg {
uint32_t s_eid;
uint8_t code;
int8_t response;
+ uint16_t ece;
/* QoS parameters from spec, aligned */
uint8_t availability;
uint8_t in_order;
@@ -75,10 +78,16 @@ struct cmd {
struct shm_du_buff * sdb;
};
+struct fa_flow {
+ int r_eid; /* remote endpoint id */
+ uint64_t r_addr; /* remote address */
+ void * ctx; /* congestion avoidance context */
+};
+
struct {
pthread_rwlock_t flows_lock;
- int r_eid[PROG_MAX_FLOWS];
- uint64_t r_addr[PROG_MAX_FLOWS];
+ struct fa_flow flows[PROG_MAX_FLOWS];
+
int fd;
struct list_head cmds;
@@ -93,22 +102,56 @@ static void packet_handler(int fd,
qoscube_t qc,
struct shm_du_buff * sdb)
{
- pthread_rwlock_rdlock(&fa.flows_lock);
+ struct fa_flow * flow;
+ uint64_t r_addr;
+ uint32_t r_eid;
+ ca_wnd_t wnd;
+ size_t len;
- if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) {
- pthread_rwlock_unlock(&fa.flows_lock);
+ flow = &fa.flows[fd];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+ wnd = ca_ctx_update_snd(flow->ctx, len);
+
+ r_addr = flow->r_addr;
+ r_eid = flow->r_eid;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ca_wnd_wait(wnd);
+
+ if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
log_warn("Failed to forward packet.");
return;
}
+}
- pthread_rwlock_unlock(&fa.flows_lock);
+static int fa_flow_init(struct fa_flow * flow)
+{
+ memset(flow, 0, sizeof(*flow));
+
+ flow->r_eid = -1;
+ flow->r_addr = INVALID_ADDR;
+
+ flow->ctx = ca_ctx_create();
+ if (flow->ctx == NULL)
+ return -1;
+
+ return 0;
}
-static void destroy_conn(int fd)
+static void fa_flow_fini(struct fa_flow * flow)
{
- fa.r_eid[fd] = -1;
- fa.r_addr[fd] = INVALID_ADDR;
+ ca_ctx_destroy(flow->ctx);
+
+ memset(flow, 0, sizeof(*flow));
+
+ flow->r_eid = -1;
+ flow->r_addr = INVALID_ADDR;
}
static void fa_post_packet(void * comp,
@@ -145,14 +188,15 @@ static void * fa_handle_packet(void * o)
(void) o;
while (true) {
- struct timespec abstime;
- int fd;
- uint8_t buf[MSGBUFSZ];
- struct fa_msg * msg;
- qosspec_t qs;
- struct cmd * cmd;
- size_t len;
- size_t msg_len;
+ struct timespec abstime;
+ int fd;
+ uint8_t buf[MSGBUFSZ];
+ struct fa_msg * msg;
+ qosspec_t qs;
+ struct cmd * cmd;
+ size_t len;
+ size_t msg_len;
+ struct fa_flow * flow;
pthread_mutex_lock(&fa.mtx);
@@ -232,10 +276,14 @@ static void * fa_handle_packet(void * o)
continue;
}
+ flow = &fa.flows[fd];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- fa.r_eid[fd] = ntoh32(msg->s_eid);
- fa.r_addr[fd] = ntoh64(msg->s_addr);
+ fa_flow_init(flow);
+
+ flow->r_eid = ntoh32(msg->s_eid);
+ flow->r_addr = ntoh64(msg->s_addr);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -248,19 +296,32 @@ static void * fa_handle_packet(void * o)
case FLOW_REPLY:
assert(len >= sizeof(*msg));
+ flow = &fa.flows[ntoh32(msg->r_eid)];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+ flow->r_eid = ntoh32(msg->s_eid);
+
+ if (msg->response < 0)
+ fa_flow_fini(flow);
+ else
+ psched_add(fa.psched, ntoh32(msg->r_eid));
+
+ pthread_rwlock_unlock(&fa.flows_lock);
ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
msg->response,
buf + sizeof(*msg),
len - sizeof(*msg));
+ break;
+ case FLOW_UPDATE:
+ assert(len >= sizeof(*msg));
- if (msg->response < 0)
- destroy_conn(ntoh32(msg->r_eid));
- else
- psched_add(fa.psched, ntoh32(msg->r_eid));
+ flow = &fa.flows[ntoh32(msg->r_eid)];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
pthread_rwlock_unlock(&fa.flows_lock);
@@ -275,10 +336,6 @@ static void * fa_handle_packet(void * o)
int fa_init(void)
{
pthread_condattr_t cattr;
- int i;
-
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
- destroy_conn(i);
if (pthread_rwlock_init(&fa.flows_lock, NULL))
goto fail_rwlock;
@@ -383,9 +440,10 @@ int fa_alloc(int fd,
size_t dlen)
{
struct fa_msg * msg;
- uint64_t addr;
struct shm_du_buff * sdb;
- qoscube_t qc;
+ struct fa_flow * flow;
+ uint64_t addr;
+ qoscube_t qc = QOS_CUBE_BE;
size_t len;
addr = dir_query(dst);
@@ -397,7 +455,9 @@ int fa_alloc(int fd,
if (ipcp_sdb_reserve(&sdb, len + dlen))
return -1;
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ memset(msg, 0, sizeof(*msg));
+
msg->code = FLOW_REQ;
msg->s_eid = hton32(fd);
msg->s_addr = hton64(ipcpi.dt_addr);
@@ -413,17 +473,17 @@ int fa_alloc(int fd,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
- qc = qos_spec_to_cube(qs);
-
if (dt_write_packet(addr, qc, fa.fd, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
+ flow = &fa.flows[fd];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- assert(fa.r_eid[fd] == -1);
- fa.r_addr[fd] = addr;
+ fa_flow_init(flow);
+ flow->r_addr = addr;
pthread_rwlock_unlock(&fa.flows_lock);
@@ -439,10 +499,13 @@ int fa_alloc_resp(int fd,
struct timespec abstime;
struct fa_msg * msg;
struct shm_du_buff * sdb;
- qoscube_t qc;
+ struct fa_flow * flow;
+ qoscube_t qc = QOS_CUBE_BE;
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ flow = &fa.flows[fd];
+
pthread_mutex_lock(&ipcpi.alloc_lock);
while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
@@ -463,33 +526,31 @@ int fa_alloc_resp(int fd,
pthread_mutex_unlock(&ipcpi.alloc_lock);
if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
- destroy_conn(fd);
+ fa_flow_fini(flow);
return -1;
}
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ memset(msg, 0, sizeof(*msg));
+
pthread_rwlock_wrlock(&fa.flows_lock);
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
msg->code = FLOW_REPLY;
- msg->r_eid = hton32(fa.r_eid[fd]);
+ msg->r_eid = hton32(flow->r_eid);
msg->s_eid = hton32(fd);
msg->response = response;
memcpy(msg + 1, data, len);
if (response < 0) {
- destroy_conn(fd);
+ fa_flow_fini(flow);
ipcp_sdb_release(sdb);
} else {
psched_add(fa.psched, fd);
}
- ipcp_flow_get_qoscube(fd, &qc);
-
- assert(qc >= 0 && qc < QOS_CUBE_MAX);
-
- if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) {
- destroy_conn(fd);
+ if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
return -1;
@@ -505,11 +566,11 @@ int fa_dealloc(int fd)
if (ipcp_flow_fini(fd) < 0)
return 0;
- pthread_rwlock_wrlock(&fa.flows_lock);
-
psched_del(fa.psched, fd);
- destroy_conn(fd);
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa_flow_fini(&fa.flows[fd]);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -517,3 +578,60 @@ int fa_dealloc(int fd)
return 0;
}
+
+static int fa_update_remote(int fd,
+ uint16_t ece)
+{
+ struct fa_msg * msg;
+ struct shm_du_buff * sdb;
+ qoscube_t qc = QOS_CUBE_BE;
+ struct fa_flow * flow;
+
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
+ return -1;
+ }
+
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+
+ memset(msg, 0, sizeof(*msg));
+
+ flow = &fa.flows[fd];
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ msg->code = FLOW_UPDATE;
+ msg->r_eid = hton32(flow->r_eid);
+ msg->ece = hton16(ece);
+
+ if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+
+ return 0;
+}
+
+void fa_ecn_update(int eid,
+ uint8_t ecn,
+ size_t len)
+{
+ struct fa_flow * flow;
+ bool update;
+ uint16_t ece;
+
+ flow = &fa.flows[eid];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ if (update)
+ fa_update_remote(eid, ece);
+
+}