diff options
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r-- | src/ipcpd/unicast/fa.c | 191 |
1 files changed, 68 insertions, 123 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 7d5e6549..3631fd7b 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2022 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow allocator of the IPC Process * @@ -31,6 +31,7 @@ #define FA "flow-allocator" #define OUROBOROS_PREFIX FA +#include <ouroboros/endian.h> #include <ouroboros/logs.h> #include <ouroboros/fqueue.h> #include <ouroboros/errno.h> @@ -55,7 +56,7 @@ #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define TIMEOUT 10000 /* nanoseconds */ +#define TIMEOUT 10 * MILLION /* nanoseconds */ #define FLOW_REQ 0 #define FLOW_REPLY 1 @@ -358,7 +359,7 @@ static void packet_handler(int fd, if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); - log_warn("Failed to forward packet."); + log_dbg("Failed to forward packet."); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); ++flow->p_snd_f; @@ -455,7 +456,7 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg) len = shm_du_buff_len(cmd->sdb); if (len > MSGBUFSZ || len < sizeof(*msg)) { - log_warn("Invalid flow allocation message (len: %zd)\n", len); + log_warn("Invalid flow allocation message (len: %zd).", len); free(cmd); return 0; /* No valid message */ } @@ -469,81 +470,6 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg) return len; } -static int fa_wait_irmd_alloc(uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) -{ - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; - int fd; - time_t mpl = IPCP_UNICAST_MPL; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Won't allocate over non-operational IPCP."); - return -EIPCPSTATE; - } - - assert(ipcpi.alloc_id == -1); - - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Failed to get fd for flow."); - return -ENOTALLOC; - } - - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - return fd; -} - -static int fa_wait_irmd_alloc_resp(int fd) -{ - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - assert(ipcpi.alloc_id == fd); - - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - return 0; -} - static int fa_handle_flow_req(struct fa_msg * msg, size_t len) { @@ -551,8 +477,8 @@ static int fa_handle_flow_req(struct fa_msg * msg, int fd; qosspec_t qs; struct fa_flow * flow; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + uint8_t * dst; + buffer_t data; /* Piggbacked data on flow alloc request. */ msg_len = sizeof(*msg) + ipcp_dir_hash_len(); if (len < msg_len) { @@ -560,8 +486,9 @@ static int fa_handle_flow_req(struct fa_msg * msg, return -EPERM; } - data = (uint8_t *) msg + msg_len; - dlen = len - msg_len; + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); @@ -573,7 +500,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); - fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); if (fd < 0) return fd; @@ -597,20 +524,21 @@ static int fa_handle_flow_reply(struct fa_msg * msg, { int fd; struct fa_flow * flow; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + buffer_t data; /* Piggbacked data on flow alloc request. */ time_t mpl = IPCP_UNICAST_MPL; assert(len >= sizeof(*msg)); - data = (uint8_t *) msg + sizeof(*msg); - dlen = len - sizeof(*msg); + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); pthread_rwlock_wrlock(&fa.flows_lock); fd = eid_to_fd(ntoh64(msg->r_eid)); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow reply for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); return -ENOTALLOC; } @@ -625,8 +553,10 @@ static int fa_handle_flow_reply(struct fa_msg * msg, pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen)) + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { + log_err("Failed to reply for flow allocation on fd %d.", fd); return -EIRMD; + } return 0; } @@ -645,6 +575,8 @@ static int fa_handle_flow_update(struct fa_msg * msg, fd = eid_to_fd(ntoh64(msg->r_eid)); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow update for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); return -EPERM; } @@ -737,7 +669,7 @@ int fa_init(void) fail_mtx: pthread_rwlock_destroy(&fa.flows_lock); fail_rwlock: - log_err("Failed to initialize flow allocator."); + return -1; } @@ -793,7 +725,6 @@ int fa_start(void) fail_thread: psched_destroy(fa.psched); fail_psched: - log_err("Failed to start flow allocator."); return -1; } @@ -805,11 +736,10 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -825,7 +755,7 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_sdb_reserve(&sdb, len + data->len)) return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); @@ -847,10 +777,11 @@ int fa_alloc(int fd, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(shm_du_buff_head(sdb) + len, data, dlen); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); if (dt_write_packet(addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation request packet."); ipcp_sdb_release(sdb); return -1; } @@ -868,10 +799,9 @@ int fa_alloc(int fd, return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -880,44 +810,55 @@ int fa_alloc_resp(int fd, flow = &fa.flows[fd]; - if (fa_wait_irmd_alloc_resp(fd) < 0) - return -1; + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); + goto fail_alloc_resp; + } - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - fa_flow_fini(flow); - return -1; + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve sdb (%zu bytes).", + sizeof(*msg) + data->len); + goto fail_reserve; } msg = (struct fa_msg *) shm_du_buff_head(sdb); memset(msg, 0, sizeof(*msg)); - pthread_rwlock_wrlock(&fa.flows_lock); - msg->code = FLOW_REPLY; + msg->response = response; + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); + + pthread_rwlock_rdlock(&fa.flows_lock); + msg->r_eid = hton64(flow->r_eid); msg->s_eid = hton64(flow->s_eid); - msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + pthread_rwlock_unlock(&fa.flows_lock); + + if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation response packet."); + goto fail_packet; + } if (response < 0) { + pthread_rwlock_rdlock(&fa.flows_lock); fa_flow_fini(flow); - ipcp_sdb_release(sdb); + pthread_rwlock_unlock(&fa.flows_lock); } else { psched_add(fa.psched, fd); } - if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { - fa_flow_fini(flow); - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); - return -1; - } + return 0; + fail_packet: + ipcp_sdb_release(sdb); + fail_reserve: + pthread_rwlock_wrlock(&fa.flows_lock); + fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); - - return 0; + fail_alloc_resp: + return -1; } int fa_dealloc(int fd) @@ -933,7 +874,7 @@ int fa_dealloc(int fd) pthread_rwlock_unlock(&fa.flows_lock); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); return 0; } @@ -948,6 +889,7 @@ static int fa_update_remote(int fd, uint64_t r_addr; if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg)); return -1; } @@ -971,6 +913,7 @@ static int fa_update_remote(int fd, if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow update packet."); ipcp_sdb_release(sdb); return -1; } @@ -995,6 +938,7 @@ void fa_np1_rcv(uint64_t eid, fd = eid_to_fd(eid); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); ipcp_sdb_release(sdb); return; } @@ -1010,6 +954,7 @@ void fa_np1_rcv(uint64_t eid, pthread_rwlock_unlock(&fa.flows_lock); if (ipcp_flow_write(fd, sdb) < 0) { + log_dbg("Failed to write to flow %d.", fd); ipcp_sdb_release(sdb); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); |