From e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 23 Feb 2024 09:29:47 +0100 Subject: lib: Revise app flow allocation This revises the application flow allocator to use the flow_info struct/message between the components. Revises the messaging to move the use protocol buffers to its own source (serdes-irm). Adds a timeout to the IRMd flow allocator to make sure flow allocations don't hang forever (this was previously taken care of by the sanitize thread). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/broadcast/main.c | 3 +- src/ipcpd/eth/eth.c | 111 ++++++++++++++++++++++----------------------- src/ipcpd/ipcp.c | 53 ++++++++++++---------- src/ipcpd/ipcp.h | 25 +++++----- src/ipcpd/local/main.c | 20 ++++---- src/ipcpd/udp/main.c | 68 +++++++++++++-------------- src/ipcpd/unicast/fa.c | 50 ++++++++++---------- src/ipcpd/unicast/fa.h | 18 ++++---- 8 files changed, 169 insertions(+), 179 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index d2cbed6f..f51fc629 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -212,6 +212,7 @@ static int broadcast_ipcp_join(int fd, { struct conn conn; time_t mpl = IPCP_BROADCAST_MPL; + buffer_t data = {NULL, 0}; (void) qs; @@ -226,7 +227,7 @@ static int broadcast_ipcp_join(int fd, notifier_event(NOTIFY_DT_CONN_ADD, &conn); - ipcp_flow_alloc_reply(fd, 0, mpl, NULL, 0); + ipcp_flow_alloc_reply(fd, 0, mpl, &data); return 0; } diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index c0aaf711..ea6e0f1c 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -455,16 +455,15 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, return 0; } -static int eth_ipcp_alloc(const uint8_t * dst_addr, +static int eth_ipcp_alloc(const uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t eid, + uint16_t eid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, + uint8_t ssap, #endif - const uint8_t * hash, - qosspec_t qs, - const void * data, - size_t dlen) + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; @@ -473,7 +472,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + ETH_HEADER_TOT_SIZE + dlen); + buf = malloc(len + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -496,8 +495,8 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, hash, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen); + if (data->len > 0) + memcpy(buf + len + ETH_HEADER_TOT_SIZE, data->data, data->len); ret = eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -506,28 +505,27 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, len + dlen); + buf, len + data->len); free(buf); return ret; } -static int eth_ipcp_alloc_resp(uint8_t * dst_addr, +static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - uint8_t dsap, + uint8_t ssap, + uint8_t dsap, #endif - int response, - const void * data, - size_t len) + int response, + const buffer_t * data) { struct mgmt_msg * msg; uint8_t * buf; - buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + len); + buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -543,8 +541,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #endif msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); if (eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -553,7 +551,7 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, sizeof(*msg) + len)) { + buf, sizeof(*msg) + data->len)) { free(buf); return -1; } @@ -563,20 +561,19 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, return 0; } -static int eth_ipcp_req(uint8_t * r_addr, +static int eth_ipcp_req(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t r_eid, + uint16_t r_eid, #elif defined(BUILD_ETH_LLC) - uint8_t r_sap, + uint8_t r_sap, #endif - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { int fd; - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data, len); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data); if (fd < 0) { log_err("Could not get new flow from IRMd."); return -1; @@ -600,17 +597,16 @@ static int eth_ipcp_req(uint8_t * r_addr, return 0; } -static int eth_ipcp_alloc_reply(uint8_t * r_addr, +static int eth_ipcp_alloc_reply(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - int dsap, + uint8_t ssap, + int dsap, #endif - int response, - const void * data, - size_t len) + int response, + const buffer_t * data) { int ret = 0; int fd = -1; @@ -649,7 +645,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, #elif defined(BUILD_ETH_LLC) log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); #endif - if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data, len)) < 0) { + if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) { log_err("Failed to reply to flow allocation."); return -1; } @@ -716,6 +712,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, struct mgmt_msg * msg; size_t msg_len; qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; @@ -735,6 +732,9 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); + data.data = (uint8_t *) buf + msg_len; + data.len = len - msg_len; + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -745,13 +745,15 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, #endif buf + sizeof(*msg), qs, - buf + msg_len, - len - msg_len); + &data); } break; case FLOW_REPLY: assert(len >= sizeof(*msg)); + data.data = (uint8_t *) buf + sizeof(*msg); + data.len = len - sizeof(*msg); + eth_ipcp_alloc_reply(r_addr, #if defined(BUILD_ETH_DIX) ntohs(msg->seid), @@ -761,8 +763,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->dsap, #endif msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + &data); break; case NAME_QUERY_REQ: eth_ipcp_name_query_req(buf + sizeof(*msg), r_addr); @@ -1589,11 +1590,10 @@ static int eth_ipcp_query(const uint8_t * hash) return ret; } -static int eth_ipcp_flow_alloc(int fd, - const uint8_t * hash, - qosspec_t qs, - const void * data, - size_t len) +static int eth_ipcp_flow_alloc(int fd, + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1634,8 +1634,7 @@ static int eth_ipcp_flow_alloc(int fd, #endif hash, qs, - data, - len) < 0) { + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); @@ -1654,10 +1653,9 @@ static int eth_ipcp_flow_alloc(int fd, return 0; } -static int eth_ipcp_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int eth_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { #if defined(BUILD_ETH_DIX) uint16_t r_eid; @@ -1698,8 +1696,7 @@ static int eth_ipcp_flow_alloc_resp(int fd, ssap, r_sap, #endif response, - data, - len) < 0) { + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 0215cdaa..966c4920 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -261,15 +261,18 @@ static void * acceptloop(void * o) return (void *) 0; } -int ipcp_wait_flow_req_arr(const uint8_t * dst, - qosspec_t qs, - time_t mpl, - const void * data, - size_t len) +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) { struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT); struct timespec abstime; int fd; + buffer_t hash; + + hash.data = (uint8_t *) dst; + hash.len = ipcp_dir_hash_len(); clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -290,7 +293,7 @@ int ipcp_wait_flow_req_arr(const uint8_t * dst, assert(ipcpi.alloc_id == -1); - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); + fd = ipcp_flow_req_arr(&hash, qs, mpl, data); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Failed to get fd for flow."); @@ -492,13 +495,12 @@ static void do_query(const uint8_t * hash, ret_msg->result = ipcpi.ops->ipcp_query(hash); } -static void do_flow_alloc(pid_t pid, - int flow_id, - uint8_t * dst, - qosspec_t qs, - void * data, - size_t len, - ipcp_msg_t * ret_msg) +static void do_flow_alloc(pid_t pid, + int flow_id, + uint8_t * dst, + qosspec_t qs, + const buffer_t * data, + ipcp_msg_t * ret_msg) { int fd; @@ -525,7 +527,7 @@ static void do_flow_alloc(pid_t pid, goto finish; } - ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data, len); + ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data); finish: log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.", flow_id, HASH_VAL32(dst), ret_msg->result); @@ -566,11 +568,10 @@ static void do_flow_join(pid_t pid, log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); } -static void do_flow_alloc_resp(int resp, - int flow_id, - const void * data, - size_t len, - ipcp_msg_t * ret_msg) +static void do_flow_alloc_resp(int resp, + int flow_id, + const buffer_t * data, + ipcp_msg_t * ret_msg) { int fd = -1; @@ -597,7 +598,7 @@ static void do_flow_alloc_resp(int resp, } } - ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data, len); + ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data); finish: log_info("Finished responding to allocation request: %d", ret_msg->result); @@ -648,6 +649,7 @@ static void * mainloop(void * o) ipcp_msg_t ret_msg = IPCP_MSG__INIT; qosspec_t qs; struct cmd * cmd; + buffer_t data; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -710,11 +712,12 @@ static void * mainloop(void * o) assert(msg->hash.len == ipcp_dir_hash_len()); assert(msg->pk.len > 0 ? msg->pk.data != NULL : msg->pk.data == NULL); + data.len = msg->pk.len; + data.data = msg->pk.data; qs = qos_spec_msg_to_s(msg->qosspec); do_flow_alloc(msg->pid, msg->flow_id, msg->hash.data, qs, - msg->pk.data, msg->pk.len, - &ret_msg); + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_JOIN: assert(msg->hash.len == ipcp_dir_hash_len()); @@ -725,10 +728,10 @@ static void * mainloop(void * o) case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: assert(msg->pk.len > 0 ? msg->pk.data != NULL : msg->pk.data == NULL); - + data.len = msg->pk.len; + data.data = msg->pk.data; do_flow_alloc_resp(msg->response, msg->flow_id, - msg->pk.data, msg->pk.len, - &ret_msg); + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: do_flow_dealloc(msg->flow_id, msg->timeo_sec, &ret_msg); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1ce07c57..aab490c7 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -53,20 +53,18 @@ struct ipcp_ops { int (* ipcp_query)(const uint8_t * hash); - int (* ipcp_flow_alloc)(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len); + int (* ipcp_flow_alloc)(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); int (* ipcp_flow_join)(int fd, const uint8_t * dst, qosspec_t qs); - int (* ipcp_flow_alloc_resp)(int fd, - int response, - const void * data, - size_t len); + int (* ipcp_flow_alloc_resp)(int fd, + int response, + const buffer_t * data); int (* ipcp_flow_dealloc)(int fd); }; @@ -129,11 +127,10 @@ int ipcp_parse_arg(int argc, char * argv[]); /* Helper functions to handle races during flow allocation */ -int ipcp_wait_flow_req_arr(const uint8_t * dst, - qosspec_t qs, - time_t mpl, - const void * data, - size_t len); +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data); int ipcp_wait_flow_resp(const int fd); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index dd2c7209..5a53dec5 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -186,11 +186,10 @@ static int local_ipcp_query(const uint8_t * hash) return ret; } -static int local_ipcp_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int local_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { int out_fd = -1; @@ -198,7 +197,7 @@ static int local_ipcp_flow_alloc(int fd, HASH_VAL32(dst), fd); assert(dst); - out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data, len); + out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data); if (out_fd < 0) { log_dbg("Flow allocation failed: %d", out_fd); return -1; @@ -218,10 +217,9 @@ static int local_ipcp_flow_alloc(int fd, return 0; } -static int local_ipcp_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int local_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { int out_fd; time_t mpl = IPCP_LOCAL_MPL; @@ -249,7 +247,7 @@ static int local_ipcp_flow_alloc_resp(int fd, fset_add(local_data.flows, fd); - if (ipcp_flow_alloc_reply(out_fd, response, mpl, data, len) < 0) + if (ipcp_flow_alloc_reply(out_fd, response, mpl, data) < 0) return -1; log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd); diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 909ca0a5..2e8d84ce 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -203,18 +203,17 @@ static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr, uint32_t s_eid, const uint8_t * dst, qosspec_t qs, - const void * data, - size_t dlen) + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; size_t len; - assert(dlen > 0 ? data != NULL : data == NULL); + assert(data->len > 0 ? data->data != NULL : data->data == NULL); len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + dlen); + buf = malloc(len + data->len); if (buf == NULL) return -1; @@ -233,10 +232,10 @@ static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(buf + len, data, dlen); + if (data->len > 0) + memcpy(buf + len, data->data, data->len); - if (sendto(udp_data.s_fd, msg, len + dlen, + if (sendto(udp_data.s_fd, msg, len + data->len, SENDTO_FLAGS, (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0) { free(buf); @@ -252,12 +251,11 @@ static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr, uint32_t s_eid, uint32_t d_eid, int8_t response, - const void * data, - size_t len) + const buffer_t * data) { struct mgmt_msg * msg; - msg = malloc(sizeof(*msg) + len); + msg = malloc(sizeof(*msg) + data->len); if (msg == NULL) return -1; @@ -267,10 +265,10 @@ static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr, msg->d_eid = hton32(d_eid); msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); - if (sendto(udp_data.s_fd, msg, sizeof(*msg) + len, + if (sendto(udp_data.s_fd, msg, sizeof(*msg) + data->len, SENDTO_FLAGS, (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0 ) { free(msg); @@ -286,12 +284,11 @@ static int udp_ipcp_port_req(struct sockaddr_in * c_saddr, int d_eid, const uint8_t * dst, qosspec_t qs, - const void * data, - size_t len) + const buffer_t * data) { int fd; - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data, len); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data); if (fd < 0) { log_err("Could not get new flow from IRMd."); return -1; @@ -314,8 +311,7 @@ static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr, uint32_t s_eid, uint32_t d_eid, int8_t response, - const void * data, - size_t len) + const buffer_t * data) { time_t mpl = IPCP_UDP_MPL; @@ -333,7 +329,7 @@ static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_flow_alloc_reply(s_eid, response, mpl, data, len) < 0) { + if (ipcp_flow_alloc_reply(s_eid, response, mpl, data) < 0) { log_err("Failed to reply to flow allocation."); return -1; } @@ -351,6 +347,7 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf, struct mgmt_msg * msg; size_t msg_len; qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; @@ -360,6 +357,10 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf, assert(len >= msg_len); + data.len = len - msg_len; + data.data = (uint8_t *) buf + msg_len; + + qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); qs.availability = msg->availability; @@ -372,17 +373,18 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf, return udp_ipcp_port_req(&c_saddr, ntoh32(msg->s_eid), (uint8_t *) (msg + 1), qs, - buf + msg_len, - len - msg_len); + &data); case FLOW_REPLY: assert(len >= sizeof(*msg)); + data.len = len - sizeof(*msg); + data.data = (uint8_t *) buf + sizeof(*msg); + return udp_ipcp_port_alloc_reply(&c_saddr, ntoh32(msg->s_eid), ntoh32(msg->d_eid), msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + &data); default: log_err("Unknown message received %d.", msg->code); return -1; @@ -983,11 +985,10 @@ static int udp_ipcp_query(const uint8_t * hash) return 0; } -static int udp_ipcp_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int udp_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct sockaddr_in r_saddr; /* Server address */ uint32_t ip_addr = 0; @@ -1017,7 +1018,7 @@ static int udp_ipcp_flow_alloc(int fd, r_saddr.sin_addr.s_addr = ip_addr; r_saddr.sin_port = udp_data.s_saddr.sin_port; - if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data, len) < 0) { + if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data) < 0) { log_err("Could not allocate port."); return -1; } @@ -1034,10 +1035,9 @@ static int udp_ipcp_flow_alloc(int fd, return 0; } -static int udp_ipcp_flow_alloc_resp(int fd, - int resp, - const void * data, - size_t len) +static int udp_ipcp_flow_alloc_resp(int fd, + int resp, + const buffer_t * data) { struct sockaddr_in saddr; int d_eid; @@ -1054,7 +1054,7 @@ static int udp_ipcp_flow_alloc_resp(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data, len) < 0) { + if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data) < 0) { fset_del(udp_data.np1_flows, fd); log_err("Failed to respond to flow request."); return -1; diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index cea9483e..3631fd7b 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -478,8 +478,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qosspec_t qs; struct fa_flow * flow; uint8_t * dst; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + buffer_t data; /* Piggbacked data on flow alloc request. */ msg_len = sizeof(*msg) + ipcp_dir_hash_len(); if (len < msg_len) { @@ -487,9 +486,9 @@ static int fa_handle_flow_req(struct fa_msg * msg, return -EPERM; } - dst = (uint8_t *)(msg + 1); - data = (uint8_t *) msg + msg_len; - dlen = len - msg_len; + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); @@ -501,7 +500,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, data, dlen); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); if (fd < 0) return fd; @@ -525,14 +524,13 @@ static int fa_handle_flow_reply(struct fa_msg * msg, { int fd; struct fa_flow * flow; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + buffer_t data; /* Piggbacked data on flow alloc request. */ time_t mpl = IPCP_UNICAST_MPL; assert(len >= sizeof(*msg)); - data = (uint8_t *) msg + sizeof(*msg); - dlen = len - sizeof(*msg); + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); pthread_rwlock_wrlock(&fa.flows_lock); @@ -555,7 +553,7 @@ static int fa_handle_flow_reply(struct fa_msg * msg, pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen) < 0) { + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { log_err("Failed to reply for flow allocation on fd %d.", fd); return -EIRMD; } @@ -738,11 +736,10 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -758,7 +755,7 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_sdb_reserve(&sdb, len + data->len)) return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); @@ -780,8 +777,8 @@ int fa_alloc(int fd, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(shm_du_buff_head(sdb) + len, data, dlen); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); if (dt_write_packet(addr, qc, fa.eid, sdb)) { log_err("Failed to send flow allocation request packet."); @@ -802,10 +799,9 @@ int fa_alloc(int fd, return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -819,9 +815,9 @@ int fa_alloc_resp(int fd, goto fail_alloc_resp; } - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { log_err("Failed to reserve sdb (%zu bytes).", - sizeof(*msg) + len); + sizeof(*msg) + data->len); goto fail_reserve; } @@ -830,8 +826,8 @@ int fa_alloc_resp(int fd, msg->code = FLOW_REPLY; msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); pthread_rwlock_rdlock(&fa.flows_lock); diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index 6d559a22..1e716966 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -34,16 +34,14 @@ int fa_start(void); void fa_stop(void); -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len); - -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len); +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); + +int fa_alloc_resp(int fd, + int response, + const buffer_t * data); int fa_dealloc(int fd); -- cgit v1.2.3