summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/fa.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r--src/ipcpd/unicast/fa.c377
1 files changed, 199 insertions, 178 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 6e6d52f0..3631fd7b 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2021
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Flow allocator of the IPC Process
*
@@ -31,6 +31,7 @@
#define FA "flow-allocator"
#define OUROBOROS_PREFIX FA
+#include <ouroboros/endian.h>
#include <ouroboros/logs.h>
#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
@@ -55,7 +56,7 @@
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
-#define TIMEOUT 10000 /* nanoseconds */
+#define TIMEOUT 10 * MILLION /* nanoseconds */
#define FLOW_REQ 0
#define FLOW_REPLY 1
@@ -72,14 +73,15 @@ struct fa_msg {
int8_t response;
uint16_t ece;
/* QoS parameters from spec, aligned */
- uint8_t availability;
- uint8_t in_order;
uint32_t delay;
uint64_t bandwidth;
uint32_t loss;
uint32_t ber;
uint32_t max_gap;
+ uint32_t timeout;
uint16_t cypher_s;
+ uint8_t availability;
+ uint8_t in_order;
} __attribute__((packed));
struct cmd {
@@ -143,7 +145,7 @@ static int fa_rib_read(const char * path,
fd = atoi(entry);
- if (fd < 0 || fd > PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -1;
if (len < 1536)
@@ -238,7 +240,7 @@ static int fa_rib_readdir(char *** buf)
if ((*buf)[idx] == NULL) {
while (idx-- > 0)
free((*buf)[idx]);
- free(buf);
+ free(*buf);
pthread_rwlock_unlock(&fa.flows_lock);
return -ENOMEM;
}
@@ -303,7 +305,7 @@ static int eid_to_fd(uint64_t eid)
fd = eid & 0xFFFFFFFF;
- if (fd < 0 || fd > PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -1;
flow = &fa.flows[fd];
@@ -340,7 +342,7 @@ static void packet_handler(int fd,
pthread_rwlock_wrlock(&fa.flows_lock);
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ len = shm_du_buff_len(sdb);
#ifdef IPCP_FLOW_STATS
++flow->p_snd;
@@ -357,7 +359,7 @@ static void packet_handler(int fd,
if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
- log_warn("Failed to forward packet.");
+ log_dbg("Failed to forward packet.");
#ifdef IPCP_FLOW_STATS
pthread_rwlock_wrlock(&fa.flows_lock);
++flow->p_snd_f;
@@ -435,167 +437,190 @@ static void fa_post_packet(void * comp,
pthread_mutex_unlock(&fa.mtx);
}
-static void * fa_handle_packet(void * o)
+static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
{
- struct timespec ts = {0, TIMEOUT * 1000};
-
- (void) o;
+ struct cmd * cmd;
+ size_t len;
- while (true) {
- struct timespec abstime;
- int fd;
- uint8_t buf[MSGBUFSZ];
- struct fa_msg * msg;
- qosspec_t qs;
- struct cmd * cmd;
- size_t len;
- size_t msg_len;
- struct fa_flow * flow;
+ pthread_mutex_lock(&fa.mtx);
- pthread_mutex_lock(&fa.mtx);
+ pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx);
- pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx);
+ while (list_is_empty(&fa.cmds))
+ pthread_cond_wait(&fa.cond, &fa.mtx);
- while (list_is_empty(&fa.cmds))
- pthread_cond_wait(&fa.cond, &fa.mtx);
+ cmd = list_last_entry(&fa.cmds, struct cmd, next);
+ list_del(&cmd->next);
- cmd = list_last_entry(&fa.cmds, struct cmd, next);
- list_del(&cmd->next);
+ pthread_cleanup_pop(true);
- pthread_cleanup_pop(true);
+ len = shm_du_buff_len(cmd->sdb);
+ if (len > MSGBUFSZ || len < sizeof(*msg)) {
+ log_warn("Invalid flow allocation message (len: %zd).", len);
+ free(cmd);
+ return 0; /* No valid message */
+ }
- len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+ memcpy(msg, shm_du_buff_head(cmd->sdb), len);
- if (len > MSGBUFSZ) {
- log_err("Message over buffer size.");
- free(cmd);
- continue;
- }
+ ipcp_sdb_release(cmd->sdb);
- msg = (struct fa_msg *) buf;
+ free(cmd);
- /* Depending on the message call the function in ipcp-dev.h */
+ return len;
+}
- memcpy(msg, shm_du_buff_head(cmd->sdb), len);
+static int fa_handle_flow_req(struct fa_msg * msg,
+ size_t len)
+{
+ size_t msg_len;
+ int fd;
+ qosspec_t qs;
+ struct fa_flow * flow;
+ uint8_t * dst;
+ buffer_t data; /* Piggbacked data on flow alloc request. */
- ipcp_sdb_release(cmd->sdb);
+ msg_len = sizeof(*msg) + ipcp_dir_hash_len();
+ if (len < msg_len) {
+ log_err("Invalid flow allocation request");
+ return -EPERM;
+ }
- free(cmd);
+ dst = (uint8_t *)(msg + 1);
+ data.data = (uint8_t *) msg + msg_len;
+ data.len = len - msg_len;
+
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
+ qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
+
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data);
+ if (fd < 0)
+ return fd;
- switch (msg->code) {
- case FLOW_REQ:
- msg_len = sizeof(*msg) + ipcp_dir_hash_len();
+ flow = &fa.flows[fd];
- assert(len >= msg_len);
+ pthread_rwlock_wrlock(&fa.flows_lock);
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ fa_flow_init(flow);
- pthread_mutex_lock(&ipcpi.alloc_lock);
+ flow->s_eid = gen_eid(fd);
+ flow->r_eid = ntoh64(msg->s_eid);
+ flow->r_addr = ntoh64(msg->s_addr);
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
+ pthread_rwlock_unlock(&fa.flows_lock);
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Won't allocate over non-operational"
- "IPCP.");
- continue;
- }
+ return fd;
+}
- assert(ipcpi.alloc_id == -1);
+static int fa_handle_flow_reply(struct fa_msg * msg,
+ size_t len)
+{
+ int fd;
+ struct fa_flow * flow;
+ buffer_t data; /* Piggbacked data on flow alloc request. */
+ time_t mpl = IPCP_UNICAST_MPL;
- qs.delay = ntoh32(msg->delay);
- qs.bandwidth = ntoh64(msg->bandwidth);
- qs.availability = msg->availability;
- qs.loss = ntoh32(msg->loss);
- qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
- qs.max_gap = ntoh32(msg->max_gap);
- qs.cypher_s = ntoh16(msg->cypher_s);
+ assert(len >= sizeof(*msg));
- fd = ipcp_flow_req_arr((uint8_t *) (msg + 1),
- ipcp_dir_hash_len(),
- qs,
- buf + msg_len,
- len - msg_len);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_err("Failed to get fd for flow.");
- continue;
- }
+ data.data = (uint8_t *) msg + sizeof(*msg);
+ data.len = len - sizeof(*msg);
- flow = &fa.flows[fd];
+ pthread_rwlock_wrlock(&fa.flows_lock);
- pthread_rwlock_wrlock(&fa.flows_lock);
+ fd = eid_to_fd(ntoh64(msg->r_eid));
+ if (fd < 0) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Flow reply for unknown EID %" PRIu64 ".",
+ ntoh64(msg->r_eid));
+ return -ENOTALLOC;
+ }
- fa_flow_init(flow);
+ flow = &fa.flows[fd];
- flow->s_eid = gen_eid(fd);
- flow->r_eid = ntoh64(msg->s_eid);
- flow->r_addr = ntoh64(msg->s_addr);
+ flow->r_eid = ntoh64(msg->s_eid);
- pthread_rwlock_unlock(&fa.flows_lock);
+ if (msg->response < 0)
+ fa_flow_fini(flow);
+ else
+ psched_add(fa.psched, fd);
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
+ pthread_rwlock_unlock(&fa.flows_lock);
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) {
+ log_err("Failed to reply for flow allocation on fd %d.", fd);
+ return -EIRMD;
+ }
- break;
- case FLOW_REPLY:
- assert(len >= sizeof(*msg));
+ return 0;
+}
- pthread_rwlock_wrlock(&fa.flows_lock);
+static int fa_handle_flow_update(struct fa_msg * msg,
+ size_t len)
+{
+ struct fa_flow * flow;
+ int fd;
- fd = eid_to_fd(ntoh64(msg->r_eid));
- if (fd < 0) {
- pthread_rwlock_unlock(&fa.flows_lock);
- break;
- }
+ (void) len;
+ assert(len >= sizeof(*msg));
- flow = &fa.flows[fd];
+ pthread_rwlock_wrlock(&fa.flows_lock);
- flow->r_eid = ntoh64(msg->s_eid);
+ fd = eid_to_fd(ntoh64(msg->r_eid));
+ if (fd < 0) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Flow update for unknown EID %" PRIu64 ".",
+ ntoh64(msg->r_eid));
+ return -EPERM;
+ }
- if (msg->response < 0)
- fa_flow_fini(flow);
- else
- psched_add(fa.psched, fd);
+ flow = &fa.flows[fd];
+#ifdef IPCP_FLOW_STATS
+ flow->u_rcv++;
+#endif
+ ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
- pthread_rwlock_unlock(&fa.flows_lock);
+ pthread_rwlock_unlock(&fa.flows_lock);
- ipcp_flow_alloc_reply(fd,
- msg->response,
- buf + sizeof(*msg),
- len - sizeof(*msg));
- break;
- case FLOW_UPDATE:
- assert(len >= sizeof(*msg));
+ return 0;
+}
- pthread_rwlock_wrlock(&fa.flows_lock);
+static void * fa_handle_packet(void * o)
+{
+ (void) o;
- fd = eid_to_fd(ntoh64(msg->r_eid));
- if (fd < 0) {
- pthread_rwlock_unlock(&fa.flows_lock);
- break;
- }
+ while (true) {
+ uint8_t buf[MSGBUFSZ];
+ struct fa_msg * msg;
+ size_t len;
- flow = &fa.flows[fd];
-#ifdef IPCP_FLOW_STATS
- flow->u_rcv++;
-#endif
- ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
+ msg = (struct fa_msg *) buf;
- pthread_rwlock_unlock(&fa.flows_lock);
+ len = fa_wait_for_fa_msg(msg);
+ if (len == 0)
+ continue;
+ switch (msg->code) {
+ case FLOW_REQ:
+ if (fa_handle_flow_req(msg, len) < 0)
+ log_err("Error handling flow alloc request.");
+ break;
+ case FLOW_REPLY:
+ if (fa_handle_flow_reply(msg, len) < 0)
+ log_err("Error handling flow reply.");
+ break;
+ case FLOW_UPDATE:
+ if (fa_handle_flow_update(msg, len) < 0)
+ log_err("Error handling flow update.");
break;
default:
- log_err("Got an unknown flow allocation message.");
+ log_warn("Recieved unknown flow allocation message.");
break;
}
}
@@ -644,7 +669,7 @@ int fa_init(void)
fail_mtx:
pthread_rwlock_destroy(&fa.flows_lock);
fail_rwlock:
- log_err("Failed to initialize flow allocator.");
+
return -1;
}
@@ -663,7 +688,7 @@ int fa_start(void)
int pol;
int max;
- fa.psched = psched_create(packet_handler);
+ fa.psched = psched_create(packet_handler, np1_flow_read);
if (fa.psched == NULL) {
log_err("Failed to start packet scheduler.");
goto fail_psched;
@@ -700,7 +725,6 @@ int fa_start(void)
fail_thread:
psched_destroy(fa.psched);
fail_psched:
- log_err("Failed to start flow allocator.");
return -1;
}
@@ -712,11 +736,10 @@ void fa_stop(void)
psched_destroy(fa.psched);
}
-int fa_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t dlen)
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data)
{
struct fa_msg * msg;
struct shm_du_buff * sdb;
@@ -732,7 +755,7 @@ int fa_alloc(int fd,
len = sizeof(*msg) + ipcp_dir_hash_len();
- if (ipcp_sdb_reserve(&sdb, len + dlen))
+ if (ipcp_sdb_reserve(&sdb, len + data->len))
return -1;
msg = (struct fa_msg *) shm_du_buff_head(sdb);
@@ -751,11 +774,14 @@ int fa_alloc(int fd,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
- memcpy(shm_du_buff_head(sdb) + len, data, dlen);
+ if (data->len > 0)
+ memcpy(shm_du_buff_head(sdb) + len, data->data, data->len);
if (dt_write_packet(addr, qc, fa.eid, sdb)) {
+ log_err("Failed to send flow allocation request packet.");
ipcp_sdb_release(sdb);
return -1;
}
@@ -773,75 +799,66 @@ int fa_alloc(int fd,
return 0;
}
-int fa_alloc_resp(int fd,
- int response,
- const void * data,
- size_t len)
+int fa_alloc_resp(int fd,
+ int response,
+ const buffer_t * data)
{
- struct timespec ts = {0, TIMEOUT * 1000};
- struct timespec abstime;
struct fa_msg * msg;
struct shm_du_buff * sdb;
struct fa_flow * flow;
qoscube_t qc = QOS_CUBE_BE;
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
flow = &fa.flows[fd];
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
+ if (ipcp_wait_flow_resp(fd) < 0) {
+ log_err("Failed to wait for flow response.");
+ goto fail_alloc_resp;
}
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
- fa_flow_fini(flow);
- return -1;
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) {
+ log_err("Failed to reserve sdb (%zu bytes).",
+ sizeof(*msg) + data->len);
+ goto fail_reserve;
}
msg = (struct fa_msg *) shm_du_buff_head(sdb);
memset(msg, 0, sizeof(*msg));
- pthread_rwlock_wrlock(&fa.flows_lock);
-
msg->code = FLOW_REPLY;
+ msg->response = response;
+ if (data->len > 0)
+ memcpy(msg + 1, data->data, data->len);
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
msg->r_eid = hton64(flow->r_eid);
msg->s_eid = hton64(flow->s_eid);
- msg->response = response;
- memcpy(msg + 1, data, len);
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
+ log_err("Failed to send flow allocation response packet.");
+ goto fail_packet;
+ }
if (response < 0) {
+ pthread_rwlock_rdlock(&fa.flows_lock);
fa_flow_fini(flow);
- ipcp_sdb_release(sdb);
+ pthread_rwlock_unlock(&fa.flows_lock);
} else {
psched_add(fa.psched, fd);
}
- if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
- fa_flow_fini(flow);
- pthread_rwlock_unlock(&fa.flows_lock);
- ipcp_sdb_release(sdb);
- return -1;
- }
+ return 0;
+ fail_packet:
+ ipcp_sdb_release(sdb);
+ fail_reserve:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+ fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
-
- return 0;
+ fail_alloc_resp:
+ return -1;
}
int fa_dealloc(int fd)
@@ -857,7 +874,7 @@ int fa_dealloc(int fd)
pthread_rwlock_unlock(&fa.flows_lock);
- flow_dealloc(fd);
+ ipcp_flow_dealloc(fd);
return 0;
}
@@ -872,6 +889,7 @@ static int fa_update_remote(int fd,
uint64_t r_addr;
if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
+ log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg));
return -1;
}
@@ -895,6 +913,7 @@ static int fa_update_remote(int fd,
if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
+ log_err("Failed to send flow update packet.");
ipcp_sdb_release(sdb);
return -1;
}
@@ -912,13 +931,14 @@ void fa_np1_rcv(uint64_t eid,
int fd;
size_t len;
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ len = shm_du_buff_len(sdb);
pthread_rwlock_wrlock(&fa.flows_lock);
fd = eid_to_fd(eid);
if (fd < 0) {
pthread_rwlock_unlock(&fa.flows_lock);
+ log_dbg("Received packet for unknown EID %" PRIu64 ".", eid);
ipcp_sdb_release(sdb);
return;
}
@@ -934,6 +954,7 @@ void fa_np1_rcv(uint64_t eid,
pthread_rwlock_unlock(&fa.flows_lock);
if (ipcp_flow_write(fd, sdb) < 0) {
+ log_dbg("Failed to write to flow %d.", fd);
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
pthread_rwlock_wrlock(&fa.flows_lock);