From c7025b1060ba250caa81f589c19e38d46949a3e7 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sat, 19 Feb 2022 16:20:22 +0100 Subject: ipcpd: Refactor flow allocator message handling This refactors the single long function that handles incoming packets destined for the flow allocator. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/unicast/fa.c | 283 +++++++++++++++++++++++++++++-------------------- 1 file changed, 170 insertions(+), 113 deletions(-) diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index ef6adae6..7143a346 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -435,167 +435,224 @@ static void fa_post_packet(void * comp, pthread_mutex_unlock(&fa.mtx); } -static void * fa_handle_packet(void * o) +static size_t fa_wait_for_fa_msg(struct fa_msg * msg) { - struct timespec ts = {0, TIMEOUT * 1000}; + struct cmd * cmd; + size_t len; - (void) o; + pthread_mutex_lock(&fa.mtx); - while (true) { - struct timespec abstime; - int fd; - uint8_t buf[MSGBUFSZ]; - struct fa_msg * msg; - qosspec_t qs; - struct cmd * cmd; - size_t len; - size_t msg_len; - struct fa_flow * flow; + pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); - pthread_mutex_lock(&fa.mtx); + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); - pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); - while (list_is_empty(&fa.cmds)) - pthread_cond_wait(&fa.cond, &fa.mtx); + pthread_cleanup_pop(true); - cmd = list_last_entry(&fa.cmds, struct cmd, next); - list_del(&cmd->next); + len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + if (len > MSGBUFSZ || len < sizeof(*msg)) { + log_warn("Invalid flow allocation message (len: %zd)\n", len); + free(cmd); + return 0; /* No valid message */ + } - pthread_cleanup_pop(true); + memcpy(msg, shm_du_buff_head(cmd->sdb), len); - len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); + ipcp_sdb_release(cmd->sdb); - if (len > MSGBUFSZ) { - log_err("Message over buffer size."); - free(cmd); - continue; - } + free(cmd); - msg = (struct fa_msg *) buf; + return len; +} - /* Depending on the message call the function in ipcp-dev.h */ +static int fa_wait_irmd_alloc(uint8_t * dst, + qosspec_t qs, + const void * data, + size_t len) +{ + struct timespec ts = {0, TIMEOUT * 1000}; + struct timespec abstime; + int fd; - memcpy(msg, shm_du_buff_head(cmd->sdb), len); + clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ipcp_sdb_release(cmd->sdb); + pthread_mutex_lock(&ipcpi.alloc_lock); - free(cmd); + while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } - switch (msg->code) { - case FLOW_REQ: - msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_dbg("Won't allocate over non-operational IPCP."); + return -EIPCPSTATE; + } - assert(len >= msg_len); + assert(ipcpi.alloc_id == -1); - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_dbg("Failed to get fd for flow."); + return -ENOTALLOC; + } + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); - pthread_mutex_lock(&ipcpi.alloc_lock); + pthread_mutex_unlock(&ipcpi.alloc_lock); - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + return fd; +} - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_dbg("Won't allocate over non-operational" - "IPCP."); - continue; - } +static int fa_handle_flow_req(struct fa_msg * msg, + size_t len) +{ + size_t msg_len; + int fd; + qosspec_t qs; + struct fa_flow * flow; + uint8_t * data; /* Piggbacked data on flow alloc request. */ + size_t dlen; /* Length of piggybacked data. */ - assert(ipcpi.alloc_id == -1); + msg_len = sizeof(*msg) + ipcp_dir_hash_len(); + if (len < msg_len) { + log_err("Invalid flow allocation request"); + return -EPERM; + } - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - qs.cypher_s = ntoh16(msg->cypher_s); + data = (uint8_t *) msg + msg_len; + dlen = len - msg_len; - fd = ipcp_flow_req_arr((uint8_t *) (msg + 1), - ipcp_dir_hash_len(), - qs, - buf + msg_len, - len - msg_len); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_err("Failed to get fd for flow."); - continue; - } + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); + qs.cypher_s = ntoh16(msg->cypher_s); - flow = &fa.flows[fd]; + fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen); + if (fd < 0) + return fd; - pthread_rwlock_wrlock(&fa.flows_lock); + flow = &fa.flows[fd]; - fa_flow_init(flow); + pthread_rwlock_wrlock(&fa.flows_lock); - flow->s_eid = gen_eid(fd); - flow->r_eid = ntoh64(msg->s_eid); - flow->r_addr = ntoh64(msg->s_addr); + fa_flow_init(flow); - pthread_rwlock_unlock(&fa.flows_lock); + flow->s_eid = gen_eid(fd); + flow->r_eid = ntoh64(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); + pthread_rwlock_unlock(&fa.flows_lock); - pthread_mutex_unlock(&ipcpi.alloc_lock); + return fd; +} - break; - case FLOW_REPLY: - assert(len >= sizeof(*msg)); +static int fa_handle_flow_reply(struct fa_msg * msg, + size_t len) +{ + int fd; + struct fa_flow * flow; + uint8_t * data; /* Piggbacked data on flow alloc request. */ + size_t dlen; /* Length of piggybacked data. */ - pthread_rwlock_wrlock(&fa.flows_lock); + assert(len >= sizeof(*msg)); - fd = eid_to_fd(ntoh64(msg->r_eid)); - if (fd < 0) { - pthread_rwlock_unlock(&fa.flows_lock); - break; - } + data = (uint8_t *) msg + sizeof(*msg); + dlen = len - sizeof(*msg); - flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - flow->r_eid = ntoh64(msg->s_eid); + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOTALLOC; + } - if (msg->response < 0) - fa_flow_fini(flow); - else - psched_add(fa.psched, fd); + flow = &fa.flows[fd]; - pthread_rwlock_unlock(&fa.flows_lock); + flow->r_eid = ntoh64(msg->s_eid); - ipcp_flow_alloc_reply(fd, - msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); - break; - case FLOW_UPDATE: - assert(len >= sizeof(*msg)); + if (msg->response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, fd); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (ipcp_flow_alloc_reply(fd, msg->response, data, dlen)) + return -EIRMD; - pthread_rwlock_wrlock(&fa.flows_lock); + return 0; +} - fd = eid_to_fd(ntoh64(msg->r_eid)); - if (fd < 0) { - pthread_rwlock_unlock(&fa.flows_lock); - break; - } +static int fa_handle_flow_update(struct fa_msg * msg, + size_t len) +{ + struct fa_flow * flow; + int fd; + + assert(len >= sizeof(*msg)); + + pthread_rwlock_wrlock(&fa.flows_lock); + + fd = eid_to_fd(ntoh64(msg->r_eid)); + if (fd < 0) { + pthread_rwlock_unlock(&fa.flows_lock); + return -EPERM; + } - flow = &fa.flows[fd]; + flow = &fa.flows[fd]; #ifdef IPCP_FLOW_STATS - flow->u_rcv++; + flow->u_rcv++; #endif - ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); - pthread_rwlock_unlock(&fa.flows_lock); + pthread_rwlock_unlock(&fa.flows_lock); + + return 0; +} + +static void * fa_handle_packet(void * o) +{ + (void) o; + + while (true) { + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + size_t len; + + msg = (struct fa_msg *) buf; + + len = fa_wait_for_fa_msg(msg); + if (len == 0) + continue; + switch (msg->code) { + case FLOW_REQ: + if (fa_handle_flow_req(msg, len) < 0) + log_err("Error handling flow alloc request."); + break; + case FLOW_REPLY: + if (fa_handle_flow_reply(msg, len) < 0) + log_err("Error handling flow reply."); + break; + case FLOW_UPDATE: + if (fa_handle_flow_update(msg, len) < 0) + log_err("Error handling flow update."); break; default: - log_err("Got an unknown flow allocation message."); + log_warn("Recieved unknown flow allocation message."); break; } } -- cgit v1.2.3