diff options
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r-- | src/ipcpd/unicast/fa.c | 377 |
1 files changed, 199 insertions, 178 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 6e6d52f0..3631fd7b 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow allocator of the IPC Process * @@ -31,6 +31,7 @@ #define FA "flow-allocator" #define OUROBOROS_PREFIX FA +#include <ouroboros/endian.h> #include <ouroboros/logs.h> #include <ouroboros/fqueue.h> #include <ouroboros/errno.h> @@ -55,7 +56,7 @@ #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif -#define TIMEOUT 10000 /* nanoseconds */ +#define TIMEOUT 10 * MILLION /* nanoseconds */ #define FLOW_REQ 0 #define FLOW_REPLY 1 @@ -72,14 +73,15 @@ struct fa_msg { int8_t response; uint16_t ece; /* QoS parameters from spec, aligned */ - uint8_t availability; - uint8_t in_order; uint32_t delay; uint64_t bandwidth; uint32_t loss; uint32_t ber; uint32_t max_gap; + uint32_t timeout; uint16_t cypher_s; + uint8_t availability; + uint8_t in_order; } __attribute__((packed)); struct cmd { @@ -143,7 +145,7 @@ static int fa_rib_read(const char * path, fd = atoi(entry); - if (fd < 0 || fd > PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROG_MAX_FLOWS) return -1; if (len < 1536) @@ -238,7 +240,7 @@ static int fa_rib_readdir(char *** buf) if ((*buf)[idx] == NULL) { while (idx-- > 0) free((*buf)[idx]); - free(buf); + free(*buf); pthread_rwlock_unlock(&fa.flows_lock); return -ENOMEM; } @@ -303,7 +305,7 @@ static int eid_to_fd(uint64_t eid) fd = eid & 0xFFFFFFFF; - if (fd < 0 || fd > PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROG_MAX_FLOWS) return -1; flow = &fa.flows[fd]; @@ -340,7 +342,7 @@ static void packet_handler(int fd, pthread_rwlock_wrlock(&fa.flows_lock); - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); #ifdef IPCP_FLOW_STATS ++flow->p_snd; @@ -357,7 +359,7 @@ static void packet_handler(int fd, if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); - log_warn("Failed to forward packet."); + log_dbg("Failed to forward packet."); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); ++flow->p_snd_f; @@ -435,167 +437,190 @@ static void fa_post_packet(void * comp, pthread_mutex_unlock(&fa.mtx); } -static void * fa_handle_packet(void * o) +static size_t fa_wait_for_fa_msg(struct fa_msg * msg) { - struct timespec ts = {0, TIMEOUT * 1000}; - - (void) o; + struct cmd * cmd; + size_t len; - while (true) { - struct timespec abstime; - int fd; - uint8_t buf[MSGBUFSZ]; - struct fa_msg * msg; - qosspec_t qs; - struct cmd * cmd; - size_t len; - size_t msg_len; - struct fa_flow * flow; + pthread_mutex_lock(&fa.mtx); - pthread_mutex_lock(&fa.mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); - pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); - while (list_is_empty(&fa.cmds)) - pthread_cond_wait(&fa.cond, &fa.mtx); + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); - cmd = list_last_entry(&fa.cmds, struct cmd, next); - list_del(&cmd->next); + pthread_cleanup_pop(true); - pthread_cleanup_pop(true); + len = shm_du_buff_len(cmd->sdb); + if (len > MSGBUFSZ || len < sizeof(*msg)) { + log_warn("Invalid flow allocation message (len: %zd).", len); + free(cmd); + return 0; /* No valid message */ + } - len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + memcpy(msg, shm_du_buff_head(cmd->sdb), len); - if (len > MSGBUFSZ) { - log_err("Message over buffer size."); - free(cmd); - continue; - } + ipcp_sdb_release(cmd->sdb); - msg = (struct fa_msg *) buf; + free(cmd); - /* Depending on the message call the function in ipcp-dev.h */ + return len; +} - memcpy(msg, shm_du_buff_head(cmd->sdb), len); +static int fa_handle_flow_req(struct fa_msg * msg, + size_t len) +{ + size_t msg_len; + int fd; + qosspec_t qs; + struct fa_flow * flow; + uint8_t * dst; + buffer_t data; /* Piggbacked data on flow alloc request. */ - ipcp_sdb_release(cmd->sdb); + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + if (len < msg_len) { + log_err("Invalid flow allocation request"); + return -EPERM; + } - free(cmd); + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; + + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.cypher_s = ntoh16(msg->cypher_s); + qs.timeout = ntoh32(msg->timeout); + + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); + if (fd < 0) + return fd; - switch (msg->code) { - case FLOW_REQ: - msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + flow = &fa.flows[fd]; - assert(len >= msg_len); + pthread_rwlock_wrlock(&fa.flows_lock); - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + fa_flow_init(flow); - pthread_mutex_lock(&ipcpi.alloc_lock); + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Won't allocate over non-operational" - "IPCP."); - continue; - } + return fd; +} - assert(ipcpi.alloc_id == -1); +static int fa_handle_flow_reply(struct fa_msg * msg, + size_t len) +{ + int fd; + struct fa_flow * flow; + buffer_t data; /* Piggbacked data on flow alloc request. */ + time_t mpl = IPCP_UNICAST_MPL; - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - qs.cypher_s = ntoh16(msg->cypher_s); + assert(len >= sizeof(*msg)); - fd = ipcp_flow_req_arr((uint8_t *) (msg + 1), - ipcp_dir_hash_len(), - qs, - buf + msg_len, - len - msg_len); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_err("Failed to get fd for flow."); - continue; - } + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); - flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - pthread_rwlock_wrlock(&fa.flows_lock); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow reply for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -ENOTALLOC; + } - fa_flow_init(flow); + flow = &fa.flows[fd]; - flow->s_eid = gen_eid(fd); - flow->r_eid = ntoh64(msg->s_eid); - flow->r_addr = ntoh64(msg->s_addr); + flow->r_eid = ntoh64(msg->s_eid); - pthread_rwlock_unlock(&fa.flows_lock); + if (msg->response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, fd); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); + pthread_rwlock_unlock(&fa.flows_lock); - pthread_mutex_unlock(&ipcpi.alloc_lock); + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { + log_err("Failed to reply for flow allocation on fd %d.", fd); + return -EIRMD; + } - break; - case FLOW_REPLY: - assert(len >= sizeof(*msg)); + return 0; +} - pthread_rwlock_wrlock(&fa.flows_lock); +static int fa_handle_flow_update(struct fa_msg * msg, + size_t len) +{ + struct fa_flow * flow; + int fd; - fd = eid_to_fd(ntoh64(msg->r_eid)); - if (fd < 0) { - pthread_rwlock_unlock(&fa.flows_lock); - break; - } + (void) len; + assert(len >= sizeof(*msg)); - flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - flow->r_eid = ntoh64(msg->s_eid); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + log_err("Flow update for unknown EID %" PRIu64 ".", + ntoh64(msg->r_eid)); + return -EPERM; + } - if (msg->response < 0) - fa_flow_fini(flow); - else - psched_add(fa.psched, fd); + flow = &fa.flows[fd]; +#ifdef IPCP_FLOW_STATS + flow->u_rcv++; +#endif + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); - pthread_rwlock_unlock(&fa.flows_lock); + pthread_rwlock_unlock(&fa.flows_lock); - ipcp_flow_alloc_reply(fd, - msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); - break; - case FLOW_UPDATE: - assert(len >= sizeof(*msg)); + return 0; +} - pthread_rwlock_wrlock(&fa.flows_lock); +static void * fa_handle_packet(void * o) +{ + (void) o; - fd = eid_to_fd(ntoh64(msg->r_eid)); - if (fd < 0) { - pthread_rwlock_unlock(&fa.flows_lock); - break; - } + while (true) { + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + size_t len; - flow = &fa.flows[fd]; -#ifdef IPCP_FLOW_STATS - flow->u_rcv++; -#endif - ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); + msg = (struct fa_msg *) buf; - pthread_rwlock_unlock(&fa.flows_lock); + len = fa_wait_for_fa_msg(msg); + if (len == 0) + continue; + switch (msg->code) { + case FLOW_REQ: + if (fa_handle_flow_req(msg, len) < 0) + log_err("Error handling flow alloc request."); + break; + case FLOW_REPLY: + if (fa_handle_flow_reply(msg, len) < 0) + log_err("Error handling flow reply."); + break; + case FLOW_UPDATE: + if (fa_handle_flow_update(msg, len) < 0) + log_err("Error handling flow update."); break; default: - log_err("Got an unknown flow allocation message."); + log_warn("Recieved unknown flow allocation message."); break; } } @@ -644,7 +669,7 @@ int fa_init(void) fail_mtx: pthread_rwlock_destroy(&fa.flows_lock); fail_rwlock: - log_err("Failed to initialize flow allocator."); + return -1; } @@ -663,7 +688,7 @@ int fa_start(void) int pol; int max; - fa.psched = psched_create(packet_handler); + fa.psched = psched_create(packet_handler, np1_flow_read); if (fa.psched == NULL) { log_err("Failed to start packet scheduler."); goto fail_psched; @@ -700,7 +725,6 @@ int fa_start(void) fail_thread: psched_destroy(fa.psched); fail_psched: - log_err("Failed to start flow allocator."); return -1; } @@ -712,11 +736,10 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -732,7 +755,7 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_sdb_reserve(&sdb, len + data->len)) return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); @@ -751,11 +774,14 @@ int fa_alloc(int fd, msg->in_order = qs.in_order; msg->max_gap = hton32(qs.max_gap); msg->cypher_s = hton16(qs.cypher_s); + msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - memcpy(shm_du_buff_head(sdb) + len, data, dlen); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); if (dt_write_packet(addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation request packet."); ipcp_sdb_release(sdb); return -1; } @@ -773,75 +799,66 @@ int fa_alloc(int fd, return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; struct fa_msg * msg; struct shm_du_buff * sdb; struct fa_flow * flow; qoscube_t qc = QOS_CUBE_BE; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - flow = &fa.flows[fd]; - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); + if (ipcp_wait_flow_resp(fd) < 0) { + log_err("Failed to wait for flow response."); + goto fail_alloc_resp; } - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - fa_flow_fini(flow); - return -1; + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve sdb (%zu bytes).", + sizeof(*msg) + data->len); + goto fail_reserve; } msg = (struct fa_msg *) shm_du_buff_head(sdb); memset(msg, 0, sizeof(*msg)); - pthread_rwlock_wrlock(&fa.flows_lock); - msg->code = FLOW_REPLY; + msg->response = response; + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); + + pthread_rwlock_rdlock(&fa.flows_lock); + msg->r_eid = hton64(flow->r_eid); msg->s_eid = hton64(flow->s_eid); - msg->response = response; - memcpy(msg + 1, data, len); + pthread_rwlock_unlock(&fa.flows_lock); + + if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow allocation response packet."); + goto fail_packet; + } if (response < 0) { + pthread_rwlock_rdlock(&fa.flows_lock); fa_flow_fini(flow); - ipcp_sdb_release(sdb); + pthread_rwlock_unlock(&fa.flows_lock); } else { psched_add(fa.psched, fd); } - if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { - fa_flow_fini(flow); - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); - return -1; - } + return 0; + fail_packet: + ipcp_sdb_release(sdb); + fail_reserve: + pthread_rwlock_wrlock(&fa.flows_lock); + fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); - - return 0; + fail_alloc_resp: + return -1; } int fa_dealloc(int fd) @@ -857,7 +874,7 @@ int fa_dealloc(int fd) pthread_rwlock_unlock(&fa.flows_lock); - flow_dealloc(fd); + ipcp_flow_dealloc(fd); return 0; } @@ -872,6 +889,7 @@ static int fa_update_remote(int fd, uint64_t r_addr; if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg)); return -1; } @@ -895,6 +913,7 @@ static int fa_update_remote(int fd, if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { + log_err("Failed to send flow update packet."); ipcp_sdb_release(sdb); return -1; } @@ -912,13 +931,14 @@ void fa_np1_rcv(uint64_t eid, int fd; size_t len; - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + len = shm_du_buff_len(sdb); pthread_rwlock_wrlock(&fa.flows_lock); fd = eid_to_fd(eid); if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); + log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); ipcp_sdb_release(sdb); return; } @@ -934,6 +954,7 @@ void fa_np1_rcv(uint64_t eid, pthread_rwlock_unlock(&fa.flows_lock); if (ipcp_flow_write(fd, sdb) < 0) { + log_dbg("Failed to write to flow %d.", fd); ipcp_sdb_release(sdb); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); |