summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/fa.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r--src/ipcpd/unicast/fa.c191
1 files changed, 68 insertions, 123 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 7d5e6549..3631fd7b 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2022
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Flow allocator of the IPC Process
*
@@ -31,6 +31,7 @@
#define FA "flow-allocator"
#define OUROBOROS_PREFIX FA
+#include <ouroboros/endian.h>
#include <ouroboros/logs.h>
#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
@@ -55,7 +56,7 @@
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
-#define TIMEOUT 10000 /* nanoseconds */
+#define TIMEOUT 10 * MILLION /* nanoseconds */
#define FLOW_REQ 0
#define FLOW_REPLY 1
@@ -358,7 +359,7 @@ static void packet_handler(int fd,
if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
- log_warn("Failed to forward packet.");
+ log_dbg("Failed to forward packet.");
#ifdef IPCP_FLOW_STATS
pthread_rwlock_wrlock(&fa.flows_lock);
++flow->p_snd_f;
@@ -455,7 +456,7 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
len = shm_du_buff_len(cmd->sdb);
if (len > MSGBUFSZ || len < sizeof(*msg)) {
- log_warn("Invalid flow allocation message (len: %zd)\n", len);
+ log_warn("Invalid flow allocation message (len: %zd).", len);
free(cmd);
return 0; /* No valid message */
}
@@ -469,81 +470,6 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
return len;
}
-static int fa_wait_irmd_alloc(uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len)
-{
- struct timespec ts = {0, TIMEOUT * 1000};
- struct timespec abstime;
- int fd;
- time_t mpl = IPCP_UNICAST_MPL;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Won't allocate over non-operational IPCP.");
- return -EIPCPSTATE;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Failed to get fd for flow.");
- return -ENOTALLOC;
- }
-
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- return fd;
-}
-
-static int fa_wait_irmd_alloc_resp(int fd)
-{
- struct timespec ts = {0, TIMEOUT * 1000};
- struct timespec abstime;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- assert(ipcpi.alloc_id == fd);
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- return 0;
-}
-
static int fa_handle_flow_req(struct fa_msg * msg,
size_t len)
{
@@ -551,8 +477,8 @@ static int fa_handle_flow_req(struct fa_msg * msg,
int fd;
qosspec_t qs;
struct fa_flow * flow;
- uint8_t * data; /* Piggbacked data on flow alloc request. */
- size_t dlen; /* Length of piggybacked data. */
+ uint8_t * dst;
+ buffer_t data; /* Piggbacked data on flow alloc request. */
msg_len = sizeof(*msg) + ipcp_dir_hash_len();
if (len < msg_len) {
@@ -560,8 +486,9 @@ static int fa_handle_flow_req(struct fa_msg * msg,
return -EPERM;
}
- data = (uint8_t *) msg + msg_len;
- dlen = len - msg_len;
+ dst = (uint8_t *)(msg + 1);
+ data.data = (uint8_t *) msg + msg_len;
+ data.len = len - msg_len;
qs.delay = ntoh32(msg->delay);
qs.bandwidth = ntoh64(msg->bandwidth);
@@ -573,7 +500,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qs.cypher_s = ntoh16(msg->cypher_s);
qs.timeout = ntoh32(msg->timeout);
- fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data);
if (fd < 0)
return fd;
@@ -597,20 +524,21 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
{
int fd;
struct fa_flow * flow;
- uint8_t * data; /* Piggbacked data on flow alloc request. */
- size_t dlen; /* Length of piggybacked data. */
+ buffer_t data; /* Piggbacked data on flow alloc request. */
time_t mpl = IPCP_UNICAST_MPL;
assert(len >= sizeof(*msg));
- data = (uint8_t *) msg + sizeof(*msg);
- dlen = len - sizeof(*msg);
+ data.data = (uint8_t *) msg + sizeof(*msg);
+ data.len = len - sizeof(*msg);
pthread_rwlock_wrlock(&fa.flows_lock);
fd = eid_to_fd(ntoh64(msg->r_eid));
if (fd < 0) {
pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Flow reply for unknown EID %" PRIu64 ".",
+ ntoh64(msg->r_eid));
return -ENOTALLOC;
}
@@ -625,8 +553,10 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
pthread_rwlock_unlock(&fa.flows_lock);
- if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen))
+ if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) {
+ log_err("Failed to reply for flow allocation on fd %d.", fd);
return -EIRMD;
+ }
return 0;
}
@@ -645,6 +575,8 @@ static int fa_handle_flow_update(struct fa_msg * msg,
fd = eid_to_fd(ntoh64(msg->r_eid));
if (fd < 0) {
pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Flow update for unknown EID %" PRIu64 ".",
+ ntoh64(msg->r_eid));
return -EPERM;
}
@@ -737,7 +669,7 @@ int fa_init(void)
fail_mtx:
pthread_rwlock_destroy(&fa.flows_lock);
fail_rwlock:
- log_err("Failed to initialize flow allocator.");
+
return -1;
}
@@ -793,7 +725,6 @@ int fa_start(void)
fail_thread:
psched_destroy(fa.psched);
fail_psched:
- log_err("Failed to start flow allocator.");
return -1;
}
@@ -805,11 +736,10 @@ void fa_stop(void)
psched_destroy(fa.psched);
}
-int fa_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t dlen)
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data)
{
struct fa_msg * msg;
struct shm_du_buff * sdb;
@@ -825,7 +755,7 @@ int fa_alloc(int fd,
len = sizeof(*msg) + ipcp_dir_hash_len();
- if (ipcp_sdb_reserve(&sdb, len + dlen))
+ if (ipcp_sdb_reserve(&sdb, len + data->len))
return -1;
msg = (struct fa_msg *) shm_du_buff_head(sdb);
@@ -847,10 +777,11 @@ int fa_alloc(int fd,
msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
- if (dlen > 0)
- memcpy(shm_du_buff_head(sdb) + len, data, dlen);
+ if (data->len > 0)
+ memcpy(shm_du_buff_head(sdb) + len, data->data, data->len);
if (dt_write_packet(addr, qc, fa.eid, sdb)) {
+ log_err("Failed to send flow allocation request packet.");
ipcp_sdb_release(sdb);
return -1;
}
@@ -868,10 +799,9 @@ int fa_alloc(int fd,
return 0;
}
-int fa_alloc_resp(int fd,
- int response,
- const void * data,
- size_t len)
+int fa_alloc_resp(int fd,
+ int response,
+ const buffer_t * data)
{
struct fa_msg * msg;
struct shm_du_buff * sdb;
@@ -880,44 +810,55 @@ int fa_alloc_resp(int fd,
flow = &fa.flows[fd];
- if (fa_wait_irmd_alloc_resp(fd) < 0)
- return -1;
+ if (ipcp_wait_flow_resp(fd) < 0) {
+ log_err("Failed to wait for flow response.");
+ goto fail_alloc_resp;
+ }
- if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
- fa_flow_fini(flow);
- return -1;
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) {
+ log_err("Failed to reserve sdb (%zu bytes).",
+ sizeof(*msg) + data->len);
+ goto fail_reserve;
}
msg = (struct fa_msg *) shm_du_buff_head(sdb);
memset(msg, 0, sizeof(*msg));
- pthread_rwlock_wrlock(&fa.flows_lock);
-
msg->code = FLOW_REPLY;
+ msg->response = response;
+ if (data->len > 0)
+ memcpy(msg + 1, data->data, data->len);
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
msg->r_eid = hton64(flow->r_eid);
msg->s_eid = hton64(flow->s_eid);
- msg->response = response;
- if (len > 0)
- memcpy(msg + 1, data, len);
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
+ log_err("Failed to send flow allocation response packet.");
+ goto fail_packet;
+ }
if (response < 0) {
+ pthread_rwlock_rdlock(&fa.flows_lock);
fa_flow_fini(flow);
- ipcp_sdb_release(sdb);
+ pthread_rwlock_unlock(&fa.flows_lock);
} else {
psched_add(fa.psched, fd);
}
- if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
- fa_flow_fini(flow);
- pthread_rwlock_unlock(&fa.flows_lock);
- ipcp_sdb_release(sdb);
- return -1;
- }
+ return 0;
+ fail_packet:
+ ipcp_sdb_release(sdb);
+ fail_reserve:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+ fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
-
- return 0;
+ fail_alloc_resp:
+ return -1;
}
int fa_dealloc(int fd)
@@ -933,7 +874,7 @@ int fa_dealloc(int fd)
pthread_rwlock_unlock(&fa.flows_lock);
- flow_dealloc(fd);
+ ipcp_flow_dealloc(fd);
return 0;
}
@@ -948,6 +889,7 @@ static int fa_update_remote(int fd,
uint64_t r_addr;
if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
+ log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg));
return -1;
}
@@ -971,6 +913,7 @@ static int fa_update_remote(int fd,
if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
+ log_err("Failed to send flow update packet.");
ipcp_sdb_release(sdb);
return -1;
}
@@ -995,6 +938,7 @@ void fa_np1_rcv(uint64_t eid,
fd = eid_to_fd(eid);
if (fd < 0) {
pthread_rwlock_unlock(&fa.flows_lock);
+ log_dbg("Received packet for unknown EID %" PRIu64 ".", eid);
ipcp_sdb_release(sdb);
return;
}
@@ -1010,6 +954,7 @@ void fa_np1_rcv(uint64_t eid,
pthread_rwlock_unlock(&fa.flows_lock);
if (ipcp_flow_write(fd, sdb) < 0) {
+ log_dbg("Failed to write to flow %d.", fd);
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
pthread_rwlock_wrlock(&fa.flows_lock);