From e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 23 Feb 2024 09:29:47 +0100 Subject: lib: Revise app flow allocation This revises the application flow allocator to use the flow_info struct/message between the components. Revises the messaging to move the use protocol buffers to its own source (serdes-irm). Adds a timeout to the IRMd flow allocator to make sure flow allocations don't hang forever (this was previously taken care of by the sanitize thread). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/ipcp-dev.h | 26 +- include/ouroboros/irm.h | 4 - include/ouroboros/protobuf.h | 46 +-- include/ouroboros/pthread.h | 2 + include/ouroboros/serdes-irm.h | 79 +++++ include/ouroboros/sockets.h.in | 16 +- src/ipcpd/broadcast/main.c | 3 +- src/ipcpd/eth/eth.c | 111 ++++--- src/ipcpd/ipcp.c | 53 ++-- src/ipcpd/ipcp.h | 25 +- src/ipcpd/local/main.c | 20 +- src/ipcpd/udp/main.c | 68 ++--- src/ipcpd/unicast/fa.c | 50 ++-- src/ipcpd/unicast/fa.h | 18 +- src/irmd/CMakeLists.txt | 9 +- src/irmd/config.h.in | 45 +-- src/irmd/ipcp.c | 12 +- src/irmd/main.c | 105 +++---- src/irmd/reg/reg.c | 2 +- src/lib/CMakeLists.txt | 7 +- src/lib/config.h.in | 1 + src/lib/dev.c | 665 +++++++++++++++++------------------------ src/lib/frct.c | 4 +- src/lib/pb/ipcp.proto | 3 +- src/lib/pb/ipcp_config.proto | 5 +- src/lib/pb/irm.proto | 55 ++-- src/lib/pb/model.proto | 61 ++++ src/lib/pb/qos.proto | 35 --- src/lib/protobuf.c | 78 +++++ src/lib/serdes-irm.c | 478 +++++++++++++++++++++++++++++ src/lib/serdes-oep.c | 1 - src/lib/sockets.c | 27 +- src/lib/timerwheel.c | 10 +- 33 files changed, 1332 insertions(+), 792 deletions(-) create mode 100644 include/ouroboros/serdes-irm.h create mode 100644 src/lib/pb/model.proto delete mode 100644 src/lib/pb/qos.proto create mode 100644 src/lib/serdes-irm.c diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 5ee78905..378d724a 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -20,27 +20,25 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include -#include -#include - #ifndef OUROBOROS_LIB_IPCP_DEV_H #define OUROBOROS_LIB_IPCP_DEV_H +#include +#include +#include +#include + int ipcp_create_r(const struct ipcp_info * info); -int ipcp_flow_req_arr(const uint8_t * dst, - size_t len, +int ipcp_flow_req_arr(const buffer_t * dst, qosspec_t qs, time_t mpl, - const void * data, - size_t dlen); - -int ipcp_flow_alloc_reply(int fd, - int response, - time_t mpl, - const void * data, - size_t len); + const buffer_t * data); + +int ipcp_flow_alloc_reply(int fd, + int response, + time_t mpl, + const buffer_t * data); int ipcp_flow_read(int fd, struct shm_du_buff ** sdb); diff --git a/include/ouroboros/irm.h b/include/ouroboros/irm.h index b27343e8..0105f88e 100644 --- a/include/ouroboros/irm.h +++ b/include/ouroboros/irm.h @@ -30,10 +30,6 @@ #include -/* Name binding options. */ -#define BIND_AUTO 0x01 -#define NAME_SIZE 255 - struct ipcp_list_info { pid_t pid; enum ipcp_type type; diff --git a/include/ouroboros/protobuf.h b/include/ouroboros/protobuf.h index eb292721..9d38afb1 100644 --- a/include/ouroboros/protobuf.h +++ b/include/ouroboros/protobuf.h @@ -23,44 +23,56 @@ #ifndef OUROBOROS_LIB_PROTOBUF_H #define OUROBOROS_LIB_PROTOBUF_H +#include #include #include #include +#include #include #include "ipcp_config.pb-c.h" typedef IpcpConfigMsg ipcp_config_msg_t; -typedef LayerInfoMsg layer_info_msg_t; -typedef DtConfigMsg dt_config_msg_t; -typedef EthConfigMsg eth_config_msg_t; -typedef UdpConfigMsg udp_config_msg_t; -typedef UniConfigMsg uni_config_msg_t; +typedef DtConfigMsg dt_config_msg_t; +typedef EthConfigMsg eth_config_msg_t; +typedef UdpConfigMsg udp_config_msg_t; +typedef UniConfigMsg uni_config_msg_t; #include "ipcp.pb-c.h" -typedef IpcpMsg ipcp_msg_t; +typedef IpcpMsg ipcp_msg_t; #include "irm.pb-c.h" -typedef IpcpInfoMsg ipcp_info_msg_t; -typedef IpcpListMsg ipcp_list_msg_t; -typedef NameInfoMsg name_info_msg_t; +typedef IrmMsg irm_msg_t; +typedef TimespecMsg timespec_msg_t; +typedef IpcpInfoMsg ipcp_info_msg_t; +typedef IpcpListMsg ipcp_list_msg_t; -#include "qos.pb-c.h" -typedef QosspecMsg qosspec_msg_t; +#include "model.pb-c.h" +typedef FlowInfoMsg flow_info_msg_t; +typedef LayerInfoMsg layer_info_msg_t; +typedef NameInfoMsg name_info_msg_t; +typedef QosspecMsg qosspec_msg_t; #include "enroll.pb-c.h" -typedef EnrollReqMsg enroll_req_msg_t; -typedef EnrollRespMsg enroll_resp_msg_t; -typedef EnrollAckMsg enroll_ack_msg_t; +typedef EnrollReqMsg enroll_req_msg_t; +typedef EnrollRespMsg enroll_resp_msg_t; +typedef EnrollAckMsg enroll_ack_msg_t; /* IPCP configuration */ +timespec_msg_t * timespec_s_to_msg(const struct timespec * s); + +struct timespec timespec_msg_to_s(timespec_msg_t * msg); + +flow_info_msg_t * flow_info_s_to_msg(const struct flow_info * s); + +struct flow_info flow_info_msg_to_s(const flow_info_msg_t * msg); layer_info_msg_t * layer_info_s_to_msg(const struct layer_info * s); struct layer_info layer_info_msg_to_s(const layer_info_msg_t * msg); -ipcp_info_msg_t * ipcp_info_s_to_msg(const struct ipcp_info * s); +ipcp_info_msg_t * ipcp_info_s_to_msg(const struct ipcp_info * s); -struct ipcp_info ipcp_info_msg_to_s(const ipcp_info_msg_t * msg); +struct ipcp_info ipcp_info_msg_to_s(const ipcp_info_msg_t * msg); dt_config_msg_t * dt_config_s_to_msg(const struct dt_config * s); @@ -102,4 +114,4 @@ enroll_ack_msg_t * enroll_ack_s_to_msg(const struct enroll_ack * s); struct enroll_ack enroll_ack_msg_to_s(const enroll_ack_msg_t * msg); -#endif /* OUROBOROS_LIB_PROTOBUF_H */ \ No newline at end of file +#endif /* OUROBOROS_LIB_PROTOBUF_H */ diff --git a/include/ouroboros/pthread.h b/include/ouroboros/pthread.h index df9213c0..7044cb5e 100644 --- a/include/ouroboros/pthread.h +++ b/include/ouroboros/pthread.h @@ -35,11 +35,13 @@ static int __attribute__((unused)) __timedwait(pthread_cond_t * cond, return pthread_cond_timedwait(cond, mtx, abstime); } +#if defined (_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L /* various cleanup functions for pthread_cleanup_push */ static void __attribute__((unused)) __cleanup_rwlock_unlock(void * rwlock) { pthread_rwlock_unlock((pthread_rwlock_t *) rwlock); } +#endif static void __attribute__((unused)) __cleanup_mutex_unlock(void * mutex) { diff --git a/include/ouroboros/serdes-irm.h b/include/ouroboros/serdes-irm.h new file mode 100644 index 00000000..1d041541 --- /dev/null +++ b/include/ouroboros/serdes-irm.h @@ -0,0 +1,79 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Ouroboros IRM Protocol - serialization/deserialization + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_SERDES_IRM_H +#define OUROBOROS_LIB_SERDES_IRM_H + +#include +#include +#include +#include + +#include + +int flow_alloc__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const char * dst, + const struct timespec * timeo); + +int flow_join__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const char * dst, + const struct timespec * timeo); + +int flow_accept__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const struct timespec * timeo); + +int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf, + const buffer_t * dst, + const struct flow_info * flow, + const buffer_t * data); + +int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, + const struct flow_info * flow, + int response, + const buffer_t * data); + +/* response to alloc / join / accept / flow_req_arr */ +int flow__irm_result_des(buffer_t * buf, + struct flow_info * flow, + buffer_t * sk); + +int flow_dealloc__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const struct timespec * timeo); + +int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf, + const struct flow_info * info); + +int ipcp_create_r__irm_req_ser(buffer_t * buf, + const struct ipcp_info * ipcp); + +int proc_announce__irm_req_ser(buffer_t * buf, + const char * prog); + +int proc_exit__irm_req_ser(buffer_t * buf); + +int irm__irm_result_des(buffer_t * buf); + +#endif /* OUROBOROS_LIB_SERDES_IRM_H*/ diff --git a/include/ouroboros/sockets.h.in b/include/ouroboros/sockets.h.in index cd198781..095674a9 100644 --- a/include/ouroboros/sockets.h.in +++ b/include/ouroboros/sockets.h.in @@ -23,18 +23,17 @@ #ifndef OUROBOROS_LIB_SOCKETS_H #define OUROBOROS_LIB_SOCKETS_H -#include +#include -#include "irm.pb-c.h" -typedef IrmMsg irm_msg_t; +#include -#define SOCK_PATH "/var/run/ouroboros/" -#define SOCK_PATH_SUFFIX ".sock" +#define SOCK_PATH "/var/run/ouroboros/" +#define SOCK_PATH_SUFFIX ".sock" -#define IRM_SOCK_PATH SOCK_PATH "irm" SOCK_PATH_SUFFIX +#define IRM_SOCK_PATH SOCK_PATH "irm" SOCK_PATH_SUFFIX #define IPCP_SOCK_PATH_PREFIX SOCK_PATH "ipcp" -#define SOCK_BUF_SIZE @SOCK_BUF_SIZE@ +#define SOCK_BUF_SIZE @SOCK_BUF_SIZE@ /* Returns the full socket path of an IPCP */ char * ipcp_sock_path(pid_t pid); @@ -43,8 +42,9 @@ int server_socket_open(char * file_name); int client_socket_open(char * file_name); -irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); +int send_recv_msg(buffer_t * buf); +irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); /* cleanup socket when cancelling thread */ void __cleanup_close_ptr(void * o); diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index d2cbed6f..f51fc629 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -212,6 +212,7 @@ static int broadcast_ipcp_join(int fd, { struct conn conn; time_t mpl = IPCP_BROADCAST_MPL; + buffer_t data = {NULL, 0}; (void) qs; @@ -226,7 +227,7 @@ static int broadcast_ipcp_join(int fd, notifier_event(NOTIFY_DT_CONN_ADD, &conn); - ipcp_flow_alloc_reply(fd, 0, mpl, NULL, 0); + ipcp_flow_alloc_reply(fd, 0, mpl, &data); return 0; } diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index c0aaf711..ea6e0f1c 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -455,16 +455,15 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, return 0; } -static int eth_ipcp_alloc(const uint8_t * dst_addr, +static int eth_ipcp_alloc(const uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t eid, + uint16_t eid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, + uint8_t ssap, #endif - const uint8_t * hash, - qosspec_t qs, - const void * data, - size_t dlen) + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; @@ -473,7 +472,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + ETH_HEADER_TOT_SIZE + dlen); + buf = malloc(len + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -496,8 +495,8 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, hash, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen); + if (data->len > 0) + memcpy(buf + len + ETH_HEADER_TOT_SIZE, data->data, data->len); ret = eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -506,28 +505,27 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, len + dlen); + buf, len + data->len); free(buf); return ret; } -static int eth_ipcp_alloc_resp(uint8_t * dst_addr, +static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - uint8_t dsap, + uint8_t ssap, + uint8_t dsap, #endif - int response, - const void * data, - size_t len) + int response, + const buffer_t * data) { struct mgmt_msg * msg; uint8_t * buf; - buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + len); + buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + data->len); if (buf == NULL) return -1; @@ -543,8 +541,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, #endif msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); if (eth_ipcp_send_frame(dst_addr, #if defined(BUILD_ETH_DIX) @@ -553,7 +551,7 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), #endif - buf, sizeof(*msg) + len)) { + buf, sizeof(*msg) + data->len)) { free(buf); return -1; } @@ -563,20 +561,19 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, return 0; } -static int eth_ipcp_req(uint8_t * r_addr, +static int eth_ipcp_req(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t r_eid, + uint16_t r_eid, #elif defined(BUILD_ETH_LLC) - uint8_t r_sap, + uint8_t r_sap, #endif - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { int fd; - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data, len); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data); if (fd < 0) { log_err("Could not get new flow from IRMd."); return -1; @@ -600,17 +597,16 @@ static int eth_ipcp_req(uint8_t * r_addr, return 0; } -static int eth_ipcp_alloc_reply(uint8_t * r_addr, +static int eth_ipcp_alloc_reply(uint8_t * r_addr, #if defined(BUILD_ETH_DIX) - uint16_t seid, - uint16_t deid, + uint16_t seid, + uint16_t deid, #elif defined(BUILD_ETH_LLC) - uint8_t ssap, - int dsap, + uint8_t ssap, + int dsap, #endif - int response, - const void * data, - size_t len) + int response, + const buffer_t * data) { int ret = 0; int fd = -1; @@ -649,7 +645,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, #elif defined(BUILD_ETH_LLC) log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); #endif - if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data, len)) < 0) { + if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) { log_err("Failed to reply to flow allocation."); return -1; } @@ -716,6 +712,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, struct mgmt_msg * msg; size_t msg_len; qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; @@ -735,6 +732,9 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); + data.data = (uint8_t *) buf + msg_len; + data.len = len - msg_len; + if (shim_data_reg_has(eth_data.shim_data, buf + sizeof(*msg))) { eth_ipcp_req(r_addr, @@ -745,13 +745,15 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, #endif buf + sizeof(*msg), qs, - buf + msg_len, - len - msg_len); + &data); } break; case FLOW_REPLY: assert(len >= sizeof(*msg)); + data.data = (uint8_t *) buf + sizeof(*msg); + data.len = len - sizeof(*msg); + eth_ipcp_alloc_reply(r_addr, #if defined(BUILD_ETH_DIX) ntohs(msg->seid), @@ -761,8 +763,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf, msg->dsap, #endif msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + &data); break; case NAME_QUERY_REQ: eth_ipcp_name_query_req(buf + sizeof(*msg), r_addr); @@ -1589,11 +1590,10 @@ static int eth_ipcp_query(const uint8_t * hash) return ret; } -static int eth_ipcp_flow_alloc(int fd, - const uint8_t * hash, - qosspec_t qs, - const void * data, - size_t len) +static int eth_ipcp_flow_alloc(int fd, + const uint8_t * hash, + qosspec_t qs, + const buffer_t * data) { #ifdef BUILD_ETH_LLC uint8_t ssap = 0; @@ -1634,8 +1634,7 @@ static int eth_ipcp_flow_alloc(int fd, #endif hash, qs, - data, - len) < 0) { + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); @@ -1654,10 +1653,9 @@ static int eth_ipcp_flow_alloc(int fd, return 0; } -static int eth_ipcp_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int eth_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { #if defined(BUILD_ETH_DIX) uint16_t r_eid; @@ -1698,8 +1696,7 @@ static int eth_ipcp_flow_alloc_resp(int fd, ssap, r_sap, #endif response, - data, - len) < 0) { + data) < 0) { #ifdef BUILD_ETH_LLC pthread_rwlock_wrlock(ð_data.flows_lock); bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 0215cdaa..966c4920 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -261,15 +261,18 @@ static void * acceptloop(void * o) return (void *) 0; } -int ipcp_wait_flow_req_arr(const uint8_t * dst, - qosspec_t qs, - time_t mpl, - const void * data, - size_t len) +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) { struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT); struct timespec abstime; int fd; + buffer_t hash; + + hash.data = (uint8_t *) dst; + hash.len = ipcp_dir_hash_len(); clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -290,7 +293,7 @@ int ipcp_wait_flow_req_arr(const uint8_t * dst, assert(ipcpi.alloc_id == -1); - fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len); + fd = ipcp_flow_req_arr(&hash, qs, mpl, data); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); log_err("Failed to get fd for flow."); @@ -492,13 +495,12 @@ static void do_query(const uint8_t * hash, ret_msg->result = ipcpi.ops->ipcp_query(hash); } -static void do_flow_alloc(pid_t pid, - int flow_id, - uint8_t * dst, - qosspec_t qs, - void * data, - size_t len, - ipcp_msg_t * ret_msg) +static void do_flow_alloc(pid_t pid, + int flow_id, + uint8_t * dst, + qosspec_t qs, + const buffer_t * data, + ipcp_msg_t * ret_msg) { int fd; @@ -525,7 +527,7 @@ static void do_flow_alloc(pid_t pid, goto finish; } - ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data, len); + ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data); finish: log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.", flow_id, HASH_VAL32(dst), ret_msg->result); @@ -566,11 +568,10 @@ static void do_flow_join(pid_t pid, log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst)); } -static void do_flow_alloc_resp(int resp, - int flow_id, - const void * data, - size_t len, - ipcp_msg_t * ret_msg) +static void do_flow_alloc_resp(int resp, + int flow_id, + const buffer_t * data, + ipcp_msg_t * ret_msg) { int fd = -1; @@ -597,7 +598,7 @@ static void do_flow_alloc_resp(int resp, } } - ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data, len); + ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data); finish: log_info("Finished responding to allocation request: %d", ret_msg->result); @@ -648,6 +649,7 @@ static void * mainloop(void * o) ipcp_msg_t ret_msg = IPCP_MSG__INIT; qosspec_t qs; struct cmd * cmd; + buffer_t data; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -710,11 +712,12 @@ static void * mainloop(void * o) assert(msg->hash.len == ipcp_dir_hash_len()); assert(msg->pk.len > 0 ? msg->pk.data != NULL : msg->pk.data == NULL); + data.len = msg->pk.len; + data.data = msg->pk.data; qs = qos_spec_msg_to_s(msg->qosspec); do_flow_alloc(msg->pid, msg->flow_id, msg->hash.data, qs, - msg->pk.data, msg->pk.len, - &ret_msg); + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_JOIN: assert(msg->hash.len == ipcp_dir_hash_len()); @@ -725,10 +728,10 @@ static void * mainloop(void * o) case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: assert(msg->pk.len > 0 ? msg->pk.data != NULL : msg->pk.data == NULL); - + data.len = msg->pk.len; + data.data = msg->pk.data; do_flow_alloc_resp(msg->response, msg->flow_id, - msg->pk.data, msg->pk.len, - &ret_msg); + &data, &ret_msg); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: do_flow_dealloc(msg->flow_id, msg->timeo_sec, &ret_msg); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1ce07c57..aab490c7 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -53,20 +53,18 @@ struct ipcp_ops { int (* ipcp_query)(const uint8_t * hash); - int (* ipcp_flow_alloc)(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len); + int (* ipcp_flow_alloc)(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); int (* ipcp_flow_join)(int fd, const uint8_t * dst, qosspec_t qs); - int (* ipcp_flow_alloc_resp)(int fd, - int response, - const void * data, - size_t len); + int (* ipcp_flow_alloc_resp)(int fd, + int response, + const buffer_t * data); int (* ipcp_flow_dealloc)(int fd); }; @@ -129,11 +127,10 @@ int ipcp_parse_arg(int argc, char * argv[]); /* Helper functions to handle races during flow allocation */ -int ipcp_wait_flow_req_arr(const uint8_t * dst, - qosspec_t qs, - time_t mpl, - const void * data, - size_t len); +int ipcp_wait_flow_req_arr(const uint8_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data); int ipcp_wait_flow_resp(const int fd); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index dd2c7209..5a53dec5 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -186,11 +186,10 @@ static int local_ipcp_query(const uint8_t * hash) return ret; } -static int local_ipcp_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int local_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { int out_fd = -1; @@ -198,7 +197,7 @@ static int local_ipcp_flow_alloc(int fd, HASH_VAL32(dst), fd); assert(dst); - out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data, len); + out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data); if (out_fd < 0) { log_dbg("Flow allocation failed: %d", out_fd); return -1; @@ -218,10 +217,9 @@ static int local_ipcp_flow_alloc(int fd, return 0; } -static int local_ipcp_flow_alloc_resp(int fd, - int response, - const void * data, - size_t len) +static int local_ipcp_flow_alloc_resp(int fd, + int response, + const buffer_t * data) { int out_fd; time_t mpl = IPCP_LOCAL_MPL; @@ -249,7 +247,7 @@ static int local_ipcp_flow_alloc_resp(int fd, fset_add(local_data.flows, fd); - if (ipcp_flow_alloc_reply(out_fd, response, mpl, data, len) < 0) + if (ipcp_flow_alloc_reply(out_fd, response, mpl, data) < 0) return -1; log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd); diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index 909ca0a5..2e8d84ce 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -203,18 +203,17 @@ static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr, uint32_t s_eid, const uint8_t * dst, qosspec_t qs, - const void * data, - size_t dlen) + const buffer_t * data) { uint8_t * buf; struct mgmt_msg * msg; size_t len; - assert(dlen > 0 ? data != NULL : data == NULL); + assert(data->len > 0 ? data->data != NULL : data->data == NULL); len = sizeof(*msg) + ipcp_dir_hash_len(); - buf = malloc(len + dlen); + buf = malloc(len + data->len); if (buf == NULL) return -1; @@ -233,10 +232,10 @@ static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(buf + len, data, dlen); + if (data->len > 0) + memcpy(buf + len, data->data, data->len); - if (sendto(udp_data.s_fd, msg, len + dlen, + if (sendto(udp_data.s_fd, msg, len + data->len, SENDTO_FLAGS, (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0) { free(buf); @@ -252,12 +251,11 @@ static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr, uint32_t s_eid, uint32_t d_eid, int8_t response, - const void * data, - size_t len) + const buffer_t * data) { struct mgmt_msg * msg; - msg = malloc(sizeof(*msg) + len); + msg = malloc(sizeof(*msg) + data->len); if (msg == NULL) return -1; @@ -267,10 +265,10 @@ static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr, msg->d_eid = hton32(d_eid); msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); - if (sendto(udp_data.s_fd, msg, sizeof(*msg) + len, + if (sendto(udp_data.s_fd, msg, sizeof(*msg) + data->len, SENDTO_FLAGS, (const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0 ) { free(msg); @@ -286,12 +284,11 @@ static int udp_ipcp_port_req(struct sockaddr_in * c_saddr, int d_eid, const uint8_t * dst, qosspec_t qs, - const void * data, - size_t len) + const buffer_t * data) { int fd; - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data, len); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data); if (fd < 0) { log_err("Could not get new flow from IRMd."); return -1; @@ -314,8 +311,7 @@ static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr, uint32_t s_eid, uint32_t d_eid, int8_t response, - const void * data, - size_t len) + const buffer_t * data) { time_t mpl = IPCP_UDP_MPL; @@ -333,7 +329,7 @@ static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr, pthread_rwlock_unlock(&udp_data.flows_lock); - if (ipcp_flow_alloc_reply(s_eid, response, mpl, data, len) < 0) { + if (ipcp_flow_alloc_reply(s_eid, response, mpl, data) < 0) { log_err("Failed to reply to flow allocation."); return -1; } @@ -351,6 +347,7 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf, struct mgmt_msg * msg; size_t msg_len; qosspec_t qs; + buffer_t data; msg = (struct mgmt_msg *) buf; @@ -360,6 +357,10 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf, assert(len >= msg_len); + data.len = len - msg_len; + data.data = (uint8_t *) buf + msg_len; + + qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); qs.availability = msg->availability; @@ -372,17 +373,18 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf, return udp_ipcp_port_req(&c_saddr, ntoh32(msg->s_eid), (uint8_t *) (msg + 1), qs, - buf + msg_len, - len - msg_len); + &data); case FLOW_REPLY: assert(len >= sizeof(*msg)); + data.len = len - sizeof(*msg); + data.data = (uint8_t *) buf + sizeof(*msg); + return udp_ipcp_port_alloc_reply(&c_saddr, ntoh32(msg->s_eid), ntoh32(msg->d_eid), msg->response, - buf + sizeof(*msg), - len - sizeof(*msg)); + &data); default: log_err("Unknown message received %d.", msg->code); return -1; @@ -983,11 +985,10 @@ static int udp_ipcp_query(const uint8_t * hash) return 0; } -static int udp_ipcp_flow_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len) +static int udp_ipcp_flow_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct sockaddr_in r_saddr; /* Server address */ uint32_t ip_addr = 0; @@ -1017,7 +1018,7 @@ static int udp_ipcp_flow_alloc(int fd, r_saddr.sin_addr.s_addr = ip_addr; r_saddr.sin_port = udp_data.s_saddr.sin_port; - if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data, len) < 0) { + if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data) < 0) { log_err("Could not allocate port."); return -1; } @@ -1034,10 +1035,9 @@ static int udp_ipcp_flow_alloc(int fd, return 0; } -static int udp_ipcp_flow_alloc_resp(int fd, - int resp, - const void * data, - size_t len) +static int udp_ipcp_flow_alloc_resp(int fd, + int resp, + const buffer_t * data) { struct sockaddr_in saddr; int d_eid; @@ -1054,7 +1054,7 @@ static int udp_ipcp_flow_alloc_resp(int fd, pthread_rwlock_unlock(&udp_data.flows_lock); - if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data, len) < 0) { + if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data) < 0) { fset_del(udp_data.np1_flows, fd); log_err("Failed to respond to flow request."); return -1; diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index cea9483e..3631fd7b 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -478,8 +478,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qosspec_t qs; struct fa_flow * flow; uint8_t * dst; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + buffer_t data; /* Piggbacked data on flow alloc request. */ msg_len = sizeof(*msg) + ipcp_dir_hash_len(); if (len < msg_len) { @@ -487,9 +486,9 @@ static int fa_handle_flow_req(struct fa_msg * msg, return -EPERM; } - dst = (uint8_t *)(msg + 1); - data = (uint8_t *) msg + msg_len; - dlen = len - msg_len; + dst = (uint8_t *)(msg + 1); + data.data = (uint8_t *) msg + msg_len; + data.len = len - msg_len; qs.delay = ntoh32(msg->delay); qs.bandwidth = ntoh64(msg->bandwidth); @@ -501,7 +500,7 @@ static int fa_handle_flow_req(struct fa_msg * msg, qs.cypher_s = ntoh16(msg->cypher_s); qs.timeout = ntoh32(msg->timeout); - fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, data, dlen); + fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data); if (fd < 0) return fd; @@ -525,14 +524,13 @@ static int fa_handle_flow_reply(struct fa_msg * msg, { int fd; struct fa_flow * flow; - uint8_t * data; /* Piggbacked data on flow alloc request. */ - size_t dlen; /* Length of piggybacked data. */ + buffer_t data; /* Piggbacked data on flow alloc request. */ time_t mpl = IPCP_UNICAST_MPL; assert(len >= sizeof(*msg)); - data = (uint8_t *) msg + sizeof(*msg); - dlen = len - sizeof(*msg); + data.data = (uint8_t *) msg + sizeof(*msg); + data.len = len - sizeof(*msg); pthread_rwlock_wrlock(&fa.flows_lock); @@ -555,7 +553,7 @@ static int fa_handle_flow_reply(struct fa_msg * msg, pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen) < 0) { + if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) { log_err("Failed to reply for flow allocation on fd %d.", fd); return -EIRMD; } @@ -738,11 +736,10 @@ void fa_stop(void) psched_destroy(fa.psched); } -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t dlen) +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -758,7 +755,7 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + dlen)) + if (ipcp_sdb_reserve(&sdb, len + data->len)) return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); @@ -780,8 +777,8 @@ int fa_alloc(int fd, msg->timeout = hton32(qs.timeout); memcpy(msg + 1, dst, ipcp_dir_hash_len()); - if (dlen > 0) - memcpy(shm_du_buff_head(sdb) + len, data, dlen); + if (data->len > 0) + memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); if (dt_write_packet(addr, qc, fa.eid, sdb)) { log_err("Failed to send flow allocation request packet."); @@ -802,10 +799,9 @@ int fa_alloc(int fd, return 0; } -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len) +int fa_alloc_resp(int fd, + int response, + const buffer_t * data) { struct fa_msg * msg; struct shm_du_buff * sdb; @@ -819,9 +815,9 @@ int fa_alloc_resp(int fd, goto fail_alloc_resp; } - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { log_err("Failed to reserve sdb (%zu bytes).", - sizeof(*msg) + len); + sizeof(*msg) + data->len); goto fail_reserve; } @@ -830,8 +826,8 @@ int fa_alloc_resp(int fd, msg->code = FLOW_REPLY; msg->response = response; - if (len > 0) - memcpy(msg + 1, data, len); + if (data->len > 0) + memcpy(msg + 1, data->data, data->len); pthread_rwlock_rdlock(&fa.flows_lock); diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index 6d559a22..1e716966 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -34,16 +34,14 @@ int fa_start(void); void fa_stop(void); -int fa_alloc(int fd, - const uint8_t * dst, - qosspec_t qs, - const void * data, - size_t len); - -int fa_alloc_resp(int fd, - int response, - const void * data, - size_t len); +int fa_alloc(int fd, + const uint8_t * dst, + qosspec_t qs, + const buffer_t * data); + +int fa_alloc_resp(int fd, + int response, + const buffer_t * data); int fa_dealloc(int fd); diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt index 3a5be324..7eaa0ce6 100644 --- a/src/irmd/CMakeLists.txt +++ b/src/irmd/CMakeLists.txt @@ -40,18 +40,19 @@ endif () set(IRMD_REQ_ARR_TIMEOUT 1000 CACHE STRING "Timeout for an application to respond to a new flow (ms)") -set(IRMD_FLOW_TIMEOUT 5000 CACHE STRING - "Timeout for a flow allocation response (ms)") + set(BOOTSTRAP_TIMEOUT 5000 CACHE STRING "Timeout for an IPCP to bootstrap (ms)") set(ENROLL_TIMEOUT 60000 CACHE STRING "Timeout for an IPCP to enroll (ms)") -set(REG_TIMEOUT 10000 CACHE STRING +set(REG_TIMEOUT 60000 CACHE STRING "Timeout for registering a name (ms)") -set(QUERY_TIMEOUT 3000 CACHE STRING +set(QUERY_TIMEOUT 60000 CACHE STRING "Timeout to query a name with an IPCP (ms)") set(CONNECT_TIMEOUT 60000 CACHE STRING "Timeout to connect an IPCP to another IPCP (ms)") +set(FLOW_ALLOC_TIMEOUT 5000 CACHE STRING + "Timeout for a flow allocation response (ms)") set(IRMD_MIN_THREADS 8 CACHE STRING "Minimum number of worker threads in the IRMd") set(IRMD_ADD_THREADS 8 CACHE STRING diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in index b25053f7..fa1156b9 100644 --- a/src/irmd/config.h.in +++ b/src/irmd/config.h.in @@ -21,38 +21,39 @@ */ -#define IPCP_UDP_EXEC "@IPCP_UDP_TARGET@" -#define IPCP_ETH_LLC_EXEC "@IPCP_ETH_LLC_TARGET@" -#define IPCP_ETH_DIX_EXEC "@IPCP_ETH_DIX_TARGET@" -#define IPCP_UNICAST_EXEC "@IPCP_UNICAST_TARGET@" -#define IPCP_BROADCAST_EXEC "@IPCP_BROADCAST_TARGET@" -#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" +#define IPCP_UDP_EXEC "@IPCP_UDP_TARGET@" +#define IPCP_ETH_LLC_EXEC "@IPCP_ETH_LLC_TARGET@" +#define IPCP_ETH_DIX_EXEC "@IPCP_ETH_DIX_TARGET@" +#define IPCP_UNICAST_EXEC "@IPCP_UNICAST_TARGET@" +#define IPCP_BROADCAST_EXEC "@IPCP_BROADCAST_TARGET@" +#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" -#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" -#define INSTALL_SBINDIR "@CMAKE_INSTALL_SBINDIR@" +#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" +#define INSTALL_SBINDIR "@CMAKE_INSTALL_SBINDIR@" -#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@ +#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@ -#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@ +#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@ -#define IRMD_REQ_ARR_TIMEOUT @IRMD_REQ_ARR_TIMEOUT@ -#define IRMD_FLOW_TIMEOUT @IRMD_FLOW_TIMEOUT@ +#define IRMD_REQ_ARR_TIMEOUT @IRMD_REQ_ARR_TIMEOUT@ -#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@ -#define ENROLL_TIMEOUT @ENROLL_TIMEOUT@ -#define REG_TIMEOUT @REG_TIMEOUT@ -#define QUERY_TIMEOUT @QUERY_TIMEOUT@ -#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@ +#define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@ +#define FLOW_DEALLOC_TIMEOUT @FLOW_DEALLOC_TIMEOUT@ -#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@ +#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@ +#define ENROLL_TIMEOUT @ENROLL_TIMEOUT@ +#define REG_TIMEOUT @REG_TIMEOUT@ +#define QUERY_TIMEOUT @QUERY_TIMEOUT@ +#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@ -#define IRMD_MIN_THREADS @IRMD_MIN_THREADS@ -#define IRMD_ADD_THREADS @IRMD_ADD_THREADS@ +#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@ +#define IRMD_MIN_THREADS @IRMD_MIN_THREADS@ +#define IRMD_ADD_THREADS @IRMD_ADD_THREADS@ #cmakedefine HAVE_FUSE #ifdef HAVE_FUSE -#define FUSE_PREFIX "@FUSE_PREFIX@" +#define FUSE_PREFIX "@FUSE_PREFIX@" #endif #cmakedefine HAVE_TOML @@ -61,7 +62,7 @@ #define OUROBOROS_CONFIG_FILE "@OUROBOROS_CONFIG_FILE@" #endif -#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@ +#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@ #cmakedefine IRMD_KILL_ALL_PROCESSES #cmakedefine HAVE_LIBGCRYPT diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 3253a8f3..c8055aa1 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -58,6 +58,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid, struct timeval tv; struct timespec tic; struct timespec toc; + bool dealloc = false; if (kill(pid, 0) < 0) return NULL; @@ -101,6 +102,15 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid, tv.tv_sec = CONNECT_TIMEOUT / 1000; tv.tv_usec = (CONNECT_TIMEOUT % 1000) * 1000; break; + case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: + tv.tv_sec = FLOW_ALLOC_TIMEOUT / 1000; + tv.tv_usec = (FLOW_ALLOC_TIMEOUT % 1000) * 1000; + break; + case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: + dealloc = true; + tv.tv_sec = 0; /* FIX DEALLOC: don't wait for dealloc */ + tv.tv_usec = 500; + break; default: tv.tv_sec = SOCKET_TIMEOUT / 1000; tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000; @@ -127,7 +137,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid, if (len > 0) recv_msg = ipcp_msg__unpack(NULL, len, buf); else { - if (errno == EAGAIN) { + if (errno == EAGAIN && !dealloc) { int diff = ts_diff_ms(&tic, &toc); log_warn("IPCP command timed out after %d ms.", diff); } diff --git a/src/irmd/main.c b/src/irmd/main.c index 2cbe8ed4..32f41ab2 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1234,14 +1234,14 @@ static int flow_alloc_reply(struct flow_info * flow, } static int flow_dealloc(struct flow_info * flow, - time_t timeo) + struct timespec * ts) { log_info("Deallocating flow %d for process %d.", flow->id, flow->n_pid); reg_dealloc_flow(flow); - if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, timeo) < 0) { + if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) { log_err("Failed to request dealloc from %d.", flow->n_1_pid); return -EIPCP; } @@ -1324,14 +1324,27 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) struct flow_info flow; struct proc_info proc; struct name_info name; - struct timespec * abstime = NULL; - struct timespec ts; + struct timespec * abstime; + struct timespec max = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT); + struct timespec now; + struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */ int res; irm_msg_t * ret_msg; buffer_t data; memset(&flow, 0, sizeof(flow)); + clock_gettime(PTHREAD_COND_CLOCK, &now); + + if (msg->timeo != NULL) { + ts = timespec_msg_to_s(msg->timeo); + ts_add(&ts, &now, &ts); + abstime = &ts; + } else { + ts_add(&max, &now, &max); + abstime = NULL; + } + ret_msg = malloc(sizeof(*ret_msg)); if (ret_msg == NULL) { log_err("Failed to malloc return msg."); @@ -1342,20 +1355,6 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) ret_msg->code = IRM_MSG_CODE__IRM_REPLY; - if (msg->has_timeo_sec) { - struct timespec now; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - assert(msg->has_timeo_nsec); - - ts.tv_sec = msg->timeo_sec; - ts.tv_nsec = msg->timeo_nsec; - - ts_add(&ts, &now, &ts); - - abstime = &ts; - } - pthread_cleanup_push(free_msg, ret_msg); switch (msg->code) { @@ -1430,20 +1429,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) case IRM_MSG_CODE__IRM_FLOW_ACCEPT: data.len = msg->pk.len; data.data = msg->pk.data; + msg->has_pk = false; assert(data.len > 0 ? data.data != NULL : data.data == NULL); - flow.n_pid = msg->pid; - flow.qs = qos_raw; + flow = flow_info_msg_to_s(msg->flow_info); res = flow_accept(&flow, &data, abstime); if (res == 0) { - qosspec_msg_t * qs_msg; - qs_msg = qos_spec_s_to_msg(&flow.qs); - ret_msg->has_flow_id = true; - ret_msg->flow_id = flow.id; - ret_msg->has_pid = true; - ret_msg->pid = flow.n_1_pid; - ret_msg->has_mpl = true; - ret_msg->qosspec = qs_msg; - ret_msg->mpl = flow.mpl; + ret_msg->flow_info = flow_info_s_to_msg(&flow); ret_msg->has_symmkey = data.len != 0; ret_msg->symmkey.data = data.data; ret_msg->symmkey.len = data.len; @@ -1453,17 +1444,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) data.len = msg->pk.len; data.data = msg->pk.data; msg->has_pk = false; - flow.n_pid = msg->pid; - flow.qs = qos_spec_msg_to_s(msg->qosspec); assert(data.len > 0 ? data.data != NULL : data.data == NULL); + flow = flow_info_msg_to_s(msg->flow_info); + abstime = abstime == NULL ? &max : abstime; res = flow_alloc(&flow, msg->dst, &data, abstime); if (res == 0) { - ret_msg->has_flow_id = true; - ret_msg->flow_id = flow.id; - ret_msg->has_pid = true; - ret_msg->pid = flow.n_1_pid; - ret_msg->has_mpl = true; - ret_msg->mpl = flow.mpl; + ret_msg->flow_info = flow_info_s_to_msg(&flow); ret_msg->has_symmkey = data.len != 0; ret_msg->symmkey.data = data.data; ret_msg->symmkey.len = data.len; @@ -1471,46 +1457,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) break; case IRM_MSG_CODE__IRM_FLOW_JOIN: assert(msg->pk.len == 0 && msg->pk.data == NULL); - flow.qs = qos_spec_msg_to_s(msg->qosspec); + flow = flow_info_msg_to_s(msg->flow_info); + abstime = abstime == NULL ? &max : abstime; res = flow_join(&flow, msg->dst, abstime); + if (res == 0) + ret_msg->flow_info = flow_info_s_to_msg(&flow); break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - flow.n_pid = msg->pid; - flow.id = msg->flow_id; - res = flow_dealloc(&flow, msg->timeo_sec); + flow = flow_info_msg_to_s(msg->flow_info); + res = flow_dealloc(&flow, &ts); break; case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: - flow.n_1_pid = msg->pid; - flow.id = msg->flow_id; + flow = flow_info_msg_to_s(msg->flow_info); res = flow_dealloc_resp(&flow); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: data.len = msg->pk.len; data.data = msg->pk.data; - msg->has_pk = false; /* pass data */ - msg->pk.data = NULL; + msg->pk.data = NULL; /* pass data */ msg->pk.len = 0; assert(data.len > 0 ? data.data != NULL : data.data == NULL); - flow.n_1_pid = msg->pid; - flow.mpl = msg->mpl; - flow.qs = qos_spec_msg_to_s(msg->qosspec); + flow = flow_info_msg_to_s(msg->flow_info); res = flow_req_arr(&flow, msg->hash.data, &data); - if (res == 0) { - ret_msg->has_flow_id = true; - ret_msg->flow_id = flow.id; - ret_msg->has_pid = true; - ret_msg->pid = flow.n_pid; - } + if (res == 0) + ret_msg->flow_info = flow_info_s_to_msg(&flow); break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: data.len = msg->pk.len; data.data = msg->pk.data; - msg->has_pk = false; /* pass data */ - msg->pk.data = NULL; + msg->pk.data = NULL; /* pass data */ msg->pk.len = 0; assert(data.len > 0 ? data.data != NULL : data.data == NULL); - flow.id = msg->flow_id; - flow.mpl = msg->mpl; + flow = flow_info_msg_to_s(msg->flow_info); res = flow_alloc_reply(&flow, msg->response, &data); break; default: @@ -1522,7 +1500,10 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) pthread_cleanup_pop(false); ret_msg->has_result = true; - ret_msg->result = res; + if (abstime == &max && res == -ETIMEDOUT) + ret_msg->result = -EPERM; /* No timeout requested */ + else + ret_msg->result = res; return ret_msg; } @@ -1664,8 +1645,6 @@ static void destroy_mount(char * mnt) { struct stat st; - log_dbg("Destroying mountpoint %s.", mnt); - if (stat(mnt, &st) == -1){ switch(errno) { case ENOENT: @@ -1719,7 +1698,7 @@ static void cleanup_pid(pid_t pid) void * irm_sanitize(void * o) { pid_t pid; - struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 20); + struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 20); (void) o; @@ -2003,7 +1982,7 @@ static void * kill_dash_nine(void * o) { time_t slept = 0; #ifdef IRMD_KILL_ALL_PROCESSES - struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 19); + struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 19); #endif (void) o; diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c index f486c1cc..731e44b6 100644 --- a/src/irmd/reg/reg.c +++ b/src/irmd/reg/reg.c @@ -1490,7 +1490,7 @@ int reg_get_exec(enum hash_algo algo, exec = __reg_get_exec(algo, hash); if (exec == NULL) { - ret = 0; + ret = -EPERM; goto finish; } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4a4684b0..b0bdb5ce 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -4,8 +4,8 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_BINARY_DIR}/include) -protobuf_generate_c(QOSSPEC_PROTO_SRCS QOSSPEC_PROTO_HDRS - pb/qos.proto) +protobuf_generate_c(MODEL_PROTO_SRCS MODEL_PROTO_HDRS + pb/model.proto) protobuf_generate_c(IPCP_CONFIG_PROTO_SRCS IPCP_CONFIG_PROTO_HDRS pb/ipcp_config.proto) protobuf_generate_c(ENROLL_PROTO_SRCS ENROLL_PROTO_HDRS @@ -264,6 +264,7 @@ set(SOURCE_FILES_COMMON qoscube.c random.c rib.c + serdes-irm.c serdes-oep.c sha3.c shm_flow_set.c @@ -278,7 +279,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in" "${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY) add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS} - ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${QOSSPEC_PROTO_SRCS} + ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${MODEL_PROTO_SRCS} ${ENROLL_PROTO_SRCS}) add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CACEP_PROTO_SRCS}) diff --git a/src/lib/config.h.in b/src/lib/config.h.in index d1eb9a54..604038b4 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -42,6 +42,7 @@ #define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@ #define SHM_BUFFER_SIZE @SHM_BUFFER_SIZE@ #define SHM_RBUFF_SIZE @SHM_RBUFF_SIZE@ +#define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@ #if defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__)) /* Avoid a bug in robust mutex implementation of glibc 2.25 */ diff --git a/src/lib/dev.c b/src/lib/dev.c index 9e37978c..a7f20e88 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -42,9 +42,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -57,6 +57,7 @@ #ifdef HAVE_LIBGCRYPT #include #endif +#include #include #include #include @@ -79,6 +80,7 @@ /* map flow_ids to flow descriptors; track state of the flow */ struct fmap { int fd; + /* TODO: use actual flow state */ enum flow_state state; }; @@ -88,12 +90,13 @@ struct fmap { struct flow { struct list_head next; + struct flow_info info; + struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; - int flow_id; + uint16_t oflags; - qosspec_t qs; ssize_t part_idx; struct crypt_info crypt; @@ -221,53 +224,32 @@ static enum flow_state flow_wait_assign(int flow_id) return state; } -static int proc_announce(char * prog) +static int proc_announce(const char * prog) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret = -1; - - msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE; - msg.has_pid = true; - msg.pid = getpid(); - msg.prog = prog; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result || (ret = recv_msg->result)) { - irm_msg__free_unpacked(recv_msg, NULL); - return ret; - } + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - irm_msg__free_unpacked(recv_msg, NULL); + if (proc_announce__irm_req_ser(&msg, prog) < 0) + return -ENOMEM; - return ret; + err = send_recv_msg(&msg); + if (err < 0) + return err; + + return irm__irm_result_des(&msg); } +/* IRMd will clean up the mess if this fails */ static void proc_exit(void) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret = -1; - - msg.code = IRM_MSG_CODE__IRM_PROC_EXIT; - msg.has_pid = true; - msg.pid = getpid(); + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) + if (proc_exit__irm_req_ser(&msg) < 0) return; - if (!recv_msg->has_result || (ret = recv_msg->result)) { - irm_msg__free_unpacked(recv_msg, NULL); - return; - } - - irm_msg__free_unpacked(recv_msg, NULL); - - return; + send_recv_msg(&msg); } #include "frct.c" @@ -305,7 +287,7 @@ static void flow_send_keepalive(struct flow * flow, if (shm_rbuff_write(flow->tx_rb, idx)) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); } @@ -323,8 +305,8 @@ static void _flow_keepalive(struct flow * flow) s_act = flow->snd_act; r_act = flow->rcv_act; - flow_id = flow->flow_id; - timeo = flow->qs.timeout; + flow_id = flow->info.id; + timeo = flow->info.qs.timeout; acl = shm_rbuff_get_acl(flow->rx_rb); if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) @@ -400,10 +382,10 @@ static void flow_clear(int fd) { memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - ai.flows[fd].flow_id = -1; + ai.flows[fd].info.id = -1; } -static void flow_fini(int fd) +static void __flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); @@ -414,13 +396,13 @@ static void flow_fini(int fd) pthread_join(ai.tx, NULL); } - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); frcti_destroy(ai.flows[fd].frcti); } - if (ai.flows[fd].flow_id != -1) { - flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]); + if (ai.flows[fd].info.id != -1) { + flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]); bmp_release(ai.fds, fd); } @@ -436,7 +418,7 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) { shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].flow_id, + ai.flows[fd].info.id, FLOW_DEALLOC); shm_flow_set_close(ai.flows[fd].set); } @@ -448,11 +430,17 @@ static void flow_fini(int fd) flow_clear(fd); } -static int flow_init(int flow_id, - pid_t pid, - qosspec_t qs, - uint8_t * s, - time_t mpl) +static void flow_fini(int fd) +{ + pthread_rwlock_wrlock(&ai.lock); + + __flow_fini(fd); + + pthread_rwlock_unlock(&ai.lock); +} + +static int flow_init(struct flow_info * info, + buffer_t * sk) { struct timespec now; struct flow * flow; @@ -471,43 +459,43 @@ static int flow_init(int flow_id, flow = &ai.flows[fd]; - flow->rx_rb = shm_rbuff_open(getpid(), flow_id); + flow->info = *info; + + flow->rx_rb = shm_rbuff_open(info->n_pid, info->id); if (flow->rx_rb == NULL) goto fail_rx_rb; - flow->tx_rb = shm_rbuff_open(pid, flow_id); + flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id); if (flow->tx_rb == NULL) goto fail_tx_rb; - flow->set = shm_flow_set_open(pid); + flow->set = shm_flow_set_open(info->n_1_pid); if (flow->set == NULL) goto fail_set; - flow->flow_id = flow_id; flow->oflags = FLOWFDEFAULT; flow->part_idx = NO_PART; - flow->qs = qs; flow->snd_act = now; flow->rcv_act = now; - flow->crypt.flags = qs.cypher_s; /* TODO: remove cypher_s from qos */ + flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */ - if (flow->crypt.flags > 0 && s != NULL) /* static analyzer s != NULL */ - memcpy(flow->crypt.key, s ,SYMMKEYSZ); - else - memset(flow->crypt.key, 0, SYMMKEYSZ); + memset(flow->crypt.key, 0, SYMMKEYSZ); + + if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL) + memcpy(flow->crypt.key, sk->data , sk->len); if (crypt_init(&flow->crypt) < 0) goto fail_crypt; assert(flow->frcti == NULL); - if (flow->qs.in_order != 0) { - flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl); + if (info->qs.in_order != 0) { + flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); if (flow->frcti == NULL) goto fail_frcti; - if (shm_flow_set_add(ai.fqset, 0, flow_id)) + if (shm_flow_set_add(ai.fqset, 0, info->id)) goto fail_flow_set_add; ++ai.n_frcti; @@ -518,16 +506,16 @@ static int flow_init(int flow_id, list_add_tail(&flow->next, &ai.flow_list); - ai.id_to_fd[flow_id].fd = fd; + ai.id_to_fd[info->id].fd = fd; - flow_set_state(&ai.id_to_fd[flow_id], FLOW_ALLOCATED); + flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED); pthread_rwlock_unlock(&ai.lock); return fd; fail_tx_thread: - shm_flow_set_del(ai.fqset, 0, flow_id); + shm_flow_set_del(ai.fqset, 0, info->id); fail_flow_set_add: frcti_destroy(flow->frcti); fail_frcti: @@ -722,12 +710,12 @@ static void fini(void) pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].flow_id != -1) { + if (ai.flows[i].info.id != -1) { ssize_t idx; shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - flow_fini(i); + __flow_fini(i); } } @@ -774,142 +762,94 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini; int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - int err = -EIRMD; - uint8_t * symmkey; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; - msg.has_pid = true; - msg.pid = getpid(); + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + buffer_t sk; + int fd; + int err; - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - goto fail_recv; - - if (!recv_msg->has_result) - goto fail_msg; - - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_msg; - } +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif + memset(&flow, 0, sizeof(flow)); - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl || recv_msg->qosspec == NULL) - goto fail_msg; + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - symmkey = recv_msg->has_symmkey ? recv_msg->symmkey.data : NULL; + if (flow_accept__irm_req_ser(&msg, &flow, timeo)) + return -ENOMEM; - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qos_spec_msg_to_s(recv_msg->qosspec), - symmkey, - recv_msg->mpl); + err = send_recv_msg(&msg); + if (err < 0) + return err; - irm_msg__free_unpacked(recv_msg, NULL); + err = flow__irm_result_des(&msg, &flow, &sk); + if (err < 0) + return err; - if (fd < 0) - return fd; + fd = flow_init(&flow, &sk); - pthread_rwlock_rdlock(&ai.lock); + freebuf(sk); if (qs != NULL) - *qs = ai.flows[fd].qs; - - pthread_rwlock_unlock(&ai.lock); + *qs = flow.qs; return fd; - - fail_msg: - irm_msg__free_unpacked(recv_msg, NULL); - fail_recv: - return err; } int flow_alloc(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - int err = -EIRMD; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + buffer_t sk; /* symmetric key */ + int fd; + int err; #ifdef QOS_DISABLE_CRC if (qs != NULL) qs->ber = 1; #endif - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.pid = getpid(); - msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); - - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } - recv_msg = send_recv_irm_msg(&msg); - qosspec_msg__free_unpacked(msg.qosspec, NULL); - if (recv_msg == NULL) - goto fail_send_recv; + memset(&flow, 0, sizeof(flow)); - if (!recv_msg->has_result) - goto fail_result; + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_result; - } + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl) - goto fail_result; + err = send_recv_msg(&msg); + if (err < 0) + return err; - if ((qs != NULL && qs->cypher_s != 0) && - (!recv_msg->has_symmkey || recv_msg->symmkey.len != SYMMKEYSZ)) { - err = -ECRYPT; - goto fail_result; - } + err = flow__irm_result_des(&msg, &flow, &sk); + if (err < 0) + return err; - /* TODO: Make sure qosspec is set in msg */ - if (qs != NULL && recv_msg->qosspec != NULL) - *qs = qos_spec_msg_to_s(recv_msg->qosspec); + fd = flow_init(&flow, &sk); - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qs == NULL ? qos_raw : *qs, recv_msg->symmkey.data, - recv_msg->mpl); + freebuf(sk); - irm_msg__free_unpacked(recv_msg, NULL); + if (qs != NULL) + *qs = flow.qs; return fd; - - fail_result: - irm_msg__free_unpacked(recv_msg, NULL); - fail_send_recv: - return err; } int flow_join(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - uint8_t s[SYMMKEYSZ]; - int fd; - int err = -EIRMD; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int fd; + int err; #ifdef QOS_DISABLE_CRC if (qs != NULL) @@ -918,184 +858,145 @@ int flow_join(const char * dst, if (qs != NULL && qs->cypher_s > 0) return -ENOTSUP; /* TODO: Encrypted broadcast */ - memset(s, 0, SYMMKEYSZ); + memset(&flow, 0, sizeof(flow)); - msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.pid = getpid(); - msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; - recv_msg = send_recv_irm_msg(&msg); - qosspec_msg__free_unpacked(msg.qosspec, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - if (recv_msg == NULL) - goto fail_send; + err = flow__irm_result_des(&msg, &flow, NULL); + if (err < 0) + return err; - if (!recv_msg->has_result) - goto fail_result; + fd = flow_init(&flow, NULL); - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_result; - } - - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl) - goto fail_result; - - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qs == NULL ? qos_raw : *qs, s, - recv_msg->mpl); - - irm_msg__free_unpacked(recv_msg, NULL); + if (qs != NULL) + *qs = flow.qs; return fd; - - fail_result: - irm_msg__free_unpacked(recv_msg, NULL); - fail_send: - return err; } +#define PKT_BUF_LEN 2048 int flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - uint8_t buf[128]; - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); - struct flow * f; - time_t timeo; + struct flow_info info; + uint8_t pkt[PKT_BUF_LEN]; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + struct timespec timeo = TIMESPEC_INIT_S(0); + struct flow * flow; + int err; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_nsec = 0; + memset(&info, 0, sizeof(flow)); - f = &ai.flows[fd]; + flow = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (f->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - msg.flow_id = f->flow_id; - - f->oflags = FLOWFDEFAULT | FLOWFRNOPART; + flow->oflags = FLOWFDEFAULT | FLOWFRNOPART; - f->rcv_timesout = true; - f->rcv_timeo = tic; + flow->rcv_timesout = true; + flow->rcv_timeo = tic; pthread_rwlock_unlock(&ai.lock); - flow_read(fd, buf, 128); + flow_read(fd, buf, SOCK_BUF_SIZE); pthread_rwlock_rdlock(&ai.lock); - timeo = frcti_dealloc(f->frcti); - while (timeo < 0) { /* keep the flow active for rtx */ + timeo.tv_sec = frcti_dealloc(flow->frcti); + while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ ssize_t ret; pthread_rwlock_unlock(&ai.lock); - ret = flow_read(fd, buf, 128); + ret = flow_read(fd, pkt, PKT_BUF_LEN); pthread_rwlock_rdlock(&ai.lock); - timeo = frcti_dealloc(f->frcti); + timeo.tv_sec = frcti_dealloc(flow->frcti); - if (ret == -EFLOWDOWN && timeo < 0) - timeo = -timeo; + if (ret == -EFLOWDOWN && timeo.tv_sec < 0) + timeo.tv_sec = -timeo.tv_sec; } - msg.timeo_sec = timeo; - pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); - shm_rbuff_fini(ai.flows[fd].tx_rb); + shm_rbuff_fini(flow->tx_rb); pthread_cleanup_pop(true); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; + info.id = flow->info.id; + info.n_pid = getpid(); - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_wrlock(&ai.lock); + err = irm__irm_result_des(&msg); flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); - - return 0; + return err; } int ipcp_flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - struct flow * f; + struct flow_info info; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + struct flow * flow; + int err; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_flow_id = true; + flow = &ai.flows[fd]; - f = &ai.flows[fd]; + memset(&info, 0, sizeof(flow)); pthread_rwlock_rdlock(&ai.lock); - if (f->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - msg.flow_id = f->flow_id; + info.id = flow->info.id; + info.n_1_pid = flow->info.n_1_pid; pthread_rwlock_unlock(&ai.lock); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_wrlock(&ai.lock); + err = irm__irm_result_des(&msg); flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); - - return 0; + return err; } int fccntl(int fd, @@ -1122,7 +1023,7 @@ int fccntl(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -1167,7 +1068,7 @@ int fccntl(int fd, qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = flow->qs; + *qs = flow->info.qs; break; case FLOWGRXQLEN: qlen = va_arg(l, size_t *); @@ -1194,13 +1095,13 @@ int fccntl(int fd, rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->flow_id, + flow->info.id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->flow_id, + flow->info.id, FLOW_UP); } @@ -1302,7 +1203,7 @@ static int flow_tx_sdb(struct flow * flow, if (crypt_encrypt(&flow->crypt, sdb) < 0) goto enomem; - if (flow->qs.ber == 0 && add_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && add_crc(sdb) != 0) goto enomem; } @@ -1316,7 +1217,7 @@ static int flow_tx_sdb(struct flow * flow, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_cleanup_pop(true); @@ -1353,7 +1254,7 @@ ssize_t flow_write(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1398,7 +1299,7 @@ static bool invalid_pkt(struct flow * flow, if (shm_du_buff_len(sdb) == 0) return true; - if (flow->qs.ber == 0 && chk_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0) return true; if (crypt_decrypt(&flow->crypt, sdb) < 0) @@ -1461,7 +1362,7 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1627,20 +1528,20 @@ int fset_add(struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { ret = -EINVAL; goto fail; } if (flow->frcti != NULL) - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id); if (ret < 0) goto fail; if (shm_rbuff_queued(ai.flows[fd].rx_rb)) - shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT); + shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1663,11 +1564,11 @@ void fset_del(struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); + if (flow->info.id >= 0) + shm_flow_set_del(ai.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id); + shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id); pthread_rwlock_unlock(&ai.lock); } @@ -1682,12 +1583,12 @@ bool fset_has(const struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id < 0) { + if (ai.flows[fd].info.id < 0) { pthread_rwlock_unlock(&ai.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1); + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1); pthread_rwlock_unlock(&ai.lock); @@ -1828,10 +1729,20 @@ ssize_t fevent(struct flow_set * set, /* ipcp-dev functions. */ -int np1_flow_alloc(pid_t n_pid, - int flow_id) +int np1_flow_alloc(pid_t n_pid, + int flow_id) { - return flow_init(flow_id, n_pid, qos_np1, NULL, 0); + struct flow_info flow; + + memset(&flow, 0, sizeof(flow)); + + flow.id = flow_id; + flow.n_pid = getpid(); + flow.qs = qos_np1; + flow.mpl = 0; + flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + + return flow_init(&flow, NULL); } int np1_flow_dealloc(int flow_id, @@ -1874,123 +1785,85 @@ int np1_flow_resp(int flow_id) int ipcp_create_r(const struct ipcp_info * info) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.ipcp_info = ipcp_info_s_to_msg(info); + if (ipcp_create_r__irm_req_ser(&msg,info) < 0) + return -ENOMEM; - recv_msg = send_recv_irm_msg(&msg); + err = send_recv_msg(&msg); + if (err < 0) + return err; - ipcp_info_msg__free_unpacked(msg.ipcp_info, NULL); + return irm__irm_result_des(&msg); +} - if (recv_msg == NULL) - return -EIRMD; +int ipcp_flow_req_arr(const buffer_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) +{ + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + memset(&flow, 0, sizeof(flow)); - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); + assert(dst != NULL && dst->len != 0 && dst->data != NULL); - return ret; -} + flow.n_1_pid = getpid(); + flow.qs = qs; + flow.mpl = mpl; -int ipcp_flow_req_arr(const uint8_t * dst, - size_t len, - qosspec_t qs, - time_t mpl, - const void * data, - size_t dlen) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - - assert(dst != NULL); - - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.qosspec = qos_spec_s_to_msg(&qs); - msg.has_mpl = true; - msg.mpl = mpl; - msg.has_pk = true; - msg.pk.data = (uint8_t *) data; - msg.pk.len = dlen; - - recv_msg = send_recv_irm_msg(&msg); - qosspec_msg__free_unpacked(msg.qosspec, NULL); - - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_flow_id || !recv_msg->has_pid) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) + return -ENOMEM; - if (recv_msg->has_result && recv_msg->result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + err = send_recv_msg(&msg); + if (err < 0) + return err; - fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0); + err = flow__irm_result_des(&msg, &flow, NULL); + if (err < 0) + return err; - irm_msg__free_unpacked(recv_msg, NULL); + /* inverted for np1_flow */ + flow.n_1_pid = flow.n_pid; + flow.n_pid = getpid(); + flow.mpl = 0; - return fd; + return flow_init(&flow, NULL); } -int ipcp_flow_alloc_reply(int fd, - int response, - time_t mpl, - const void * data, - size_t len) +int ipcp_flow_alloc_reply(int fd, + int response, + time_t mpl, + const buffer_t * data) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_flow_id = true; - msg.has_pk = true; - msg.pk.data = (uint8_t *) data; - msg.pk.len = (uint32_t) len; - msg.has_mpl = true; - msg.mpl = mpl; - pthread_rwlock_rdlock(&ai.lock); - msg.flow_id = ai.flows[fd].flow_id; + flow.id = ai.flows[fd].info.id; pthread_rwlock_unlock(&ai.lock); - msg.has_response = true; - msg.response = response; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + flow.mpl = mpl; - ret = recv_msg->result; + if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - return ret; + return irm__irm_result_des(&msg); } int ipcp_flow_read(int fd, @@ -2006,7 +1879,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->flow_id >= 0); + assert(flow->info.id >= 0); while (frcti_queued_pdu(flow->frcti) < 0) { pthread_rwlock_unlock(&ai.lock); @@ -2038,7 +1911,7 @@ int ipcp_flow_write(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -2066,7 +1939,7 @@ int np1_flow_read(int fd, flow = &ai.flows[fd]; - assert(flow->flow_id >= 0); + assert(flow->info.id >= 0); pthread_rwlock_rdlock(&ai.lock); @@ -2097,7 +1970,7 @@ int np1_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -2115,7 +1988,7 @@ int np1_flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); return ret; } @@ -2139,7 +2012,7 @@ int ipcp_flow_fini(int fd) pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id < 0) { + if (ai.flows[fd].info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -1; } @@ -2148,7 +2021,7 @@ int ipcp_flow_fini(int fd) shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].flow_id, + ai.flows[fd].info.id, FLOW_DEALLOC); rx_rb = ai.flows[fd].rx_rb; @@ -2169,9 +2042,9 @@ int ipcp_flow_get_qoscube(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].flow_id >= 0); + assert(ai.flows[fd].info.id >= 0); - *cube = qos_spec_to_cube(ai.flows[fd].qs); + *cube = qos_spec_to_cube(ai.flows[fd].info.qs); pthread_rwlock_unlock(&ai.lock); @@ -2184,7 +2057,7 @@ size_t ipcp_flow_queued(int fd) pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].flow_id >= 0); + assert(ai.flows[fd].info.id >= 0); q = shm_rbuff_queued(ai.flows[fd].tx_rb); @@ -2220,14 +2093,14 @@ int local_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret == 0) - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); else shm_rdrbuff_remove(ai.rdrb, idx); diff --git a/src/lib/frct.c b/src/lib/frct.c index 604e8a62..c6fef35c 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -271,7 +271,7 @@ static void __send_frct_pkt(int fd, #endif goto fail; - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + shm_flow_set_notify(f->set, f->info.id, FLOW_PKT); return; @@ -398,7 +398,7 @@ static struct frcti * frcti_create(int fd, frcti->n_out = 0; frcti->n_rqo = 0; #endif - if (ai.flows[fd].qs.loss == 0) { + if (ai.flows[fd].info.qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; } diff --git a/src/lib/pb/ipcp.proto b/src/lib/pb/ipcp.proto index 71bf90b8..c2c7f48b 100644 --- a/src/lib/pb/ipcp.proto +++ b/src/lib/pb/ipcp.proto @@ -23,7 +23,8 @@ syntax = "proto2"; import "ipcp_config.proto"; -import "qos.proto"; +import "model.proto"; + enum ipcp_msg_code { IPCP_BOOTSTRAP = 1; diff --git a/src/lib/pb/ipcp_config.proto b/src/lib/pb/ipcp_config.proto index ca4d55aa..28528b0c 100644 --- a/src/lib/pb/ipcp_config.proto +++ b/src/lib/pb/ipcp_config.proto @@ -22,10 +22,7 @@ syntax = "proto2"; -message layer_info_msg { - required string name = 1; - required uint32 dir_hash_algo = 2; -} +import "model.proto"; message dt_config_msg { required uint32 addr_size = 1; diff --git a/src/lib/pb/irm.proto b/src/lib/pb/irm.proto index c962e5e5..da3bd982 100644 --- a/src/lib/pb/irm.proto +++ b/src/lib/pb/irm.proto @@ -23,7 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; -import "qos.proto"; +import "model.proto"; enum irm_msg_code { IRM_CREATE_IPCP = 1; @@ -55,11 +55,9 @@ enum irm_msg_code { IRM_REPLY = 27; } -message ipcp_info_msg { - required uint32 type = 1; - required string name = 2; - required uint32 pid = 3; - required uint32 state = 4; +message timespec_msg { + required uint64 tv_sec = 1; + required uint32 tv_nsec = 2; } message ipcp_list_msg { @@ -70,33 +68,30 @@ message ipcp_list_msg { required uint32 hash_algo = 5; } -message name_info_msg { - required string name = 1; - required uint32 pol_lb = 2; -} - message irm_msg { required irm_msg_code code = 1; optional string prog = 2; optional sint32 pid = 3; optional string name = 4; - optional ipcp_info_msg ipcp_info = 5; - optional string layer = 6; - repeated string exec = 7; - optional sint32 response = 8; - optional string dst = 9; - optional bytes hash = 10; - optional sint32 flow_id = 11; - optional qosspec_msg qosspec = 12; - optional ipcp_config_msg conf = 13; - optional uint32 opts = 14; - repeated ipcp_list_msg ipcps = 15; - repeated name_info_msg names = 16; - optional uint32 timeo_sec = 17; - optional uint32 timeo_nsec = 18; - optional sint32 mpl = 19; - optional string comp = 20; - optional bytes pk = 21; /* piggyback */ - optional bytes symmkey = 22; - optional sint32 result = 23; + optional flow_info_msg flow_info = 5; + optional ipcp_info_msg ipcp_info = 6; + optional string layer = 7; + repeated string exec = 8; + optional sint32 response = 9; + optional string dst = 10; + optional bytes hash = 11; + optional sint32 flow_id = 12; + optional qosspec_msg qosspec = 13; + optional ipcp_config_msg conf = 14; + optional uint32 opts = 15; + repeated ipcp_list_msg ipcps = 16; + repeated name_info_msg names = 17; + optional timespec_msg timeo = 18; + optional sint32 mpl = 20; + optional string comp = 21; + optional bytes pk = 22; /* piggyback */ + optional bytes symmkey = 23; + optional uint32 timeo_sec = 24; + optional uint32 timeo_nsec = 25; + optional sint32 result = 26; } diff --git a/src/lib/pb/model.proto b/src/lib/pb/model.proto new file mode 100644 index 00000000..f1e401f9 --- /dev/null +++ b/src/lib/pb/model.proto @@ -0,0 +1,61 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Model description messages + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +syntax = "proto2"; + +message qosspec_msg { + required uint32 delay = 1; /* In ms. */ + required uint64 bandwidth = 2; /* In bits/s. */ + required uint32 availability = 3; /* Class of 9s. */ + required uint32 loss = 4; /* Packet loss. */ + required uint32 ber = 5; /* Bit error rate, ppb. */ + required uint32 in_order = 6; /* In-order delivery. */ + required uint32 max_gap = 7; /* In ms. */ + required uint32 cypher_s = 8; /* Crypto strength in bits. */ + required uint32 timeout = 9; /* Timeout in ms. */ +} + +message flow_info_msg { + required uint32 id = 1; + required uint32 n_pid = 2; + required uint32 n_1_pid = 3; + required uint32 mpl = 4; + required uint32 state = 5; + required qosspec_msg qos = 6; +} + +message name_info_msg { + required string name = 1; + required uint32 pol_lb = 2; +} + +message layer_info_msg { + required string name = 1; + required uint32 dir_hash_algo = 2; +} + +message ipcp_info_msg { + required uint32 type = 1; + required string name = 2; + required uint32 pid = 3; + required uint32 state = 4; +} diff --git a/src/lib/pb/qos.proto b/src/lib/pb/qos.proto deleted file mode 100644 index 64f5a285..00000000 --- a/src/lib/pb/qos.proto +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2024 - * - * QoS specification message - * - * Dimitri Staessens - * Sander Vrijders - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -syntax = "proto2"; - -message qosspec_msg { - required uint32 delay = 1; /* In ms. */ - required uint64 bandwidth = 2; /* In bits/s. */ - required uint32 availability = 3; /* Class of 9s. */ - required uint32 loss = 4; /* Packet loss. */ - required uint32 ber = 5; /* Bit error rate, ppb. */ - required uint32 in_order = 6; /* In-order delivery. */ - required uint32 max_gap = 7; /* In ms. */ - required uint32 cypher_s = 8; /* Crypto strength in bits. */ - required uint32 timeout = 9; /* Timeout in ms. */ -} diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c index 2135d57e..b586168c 100644 --- a/src/lib/protobuf.c +++ b/src/lib/protobuf.c @@ -28,6 +28,84 @@ #include #include +timespec_msg_t * timespec_s_to_msg(const struct timespec * s) +{ + timespec_msg_t * msg; + + assert(s != NULL); + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + timespec_msg__init(msg); + + msg->tv_sec = s->tv_sec; + msg->tv_nsec = s->tv_nsec; + + return msg; + + fail_malloc: + return NULL; +} + +struct timespec timespec_msg_to_s(timespec_msg_t * msg) +{ + struct timespec s; + + assert(msg != NULL); + + s.tv_sec = msg->tv_sec; + s.tv_nsec = msg->tv_nsec; + + return s; +} + +flow_info_msg_t * flow_info_s_to_msg(const struct flow_info * s) +{ + flow_info_msg_t * msg; + + assert(s != NULL); + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + flow_info_msg__init(msg); + + msg->id = s->id; + msg->n_pid = s->n_pid; + msg->n_1_pid = s->n_1_pid; + msg->mpl = s->mpl; + msg->state = s->state; + msg->qos = qos_spec_s_to_msg(&s->qs); + if (msg->qos == NULL) + goto fail_msg; + + return msg; + + fail_msg: + flow_info_msg__free_unpacked(msg, NULL); + fail_malloc: + return NULL; +} + +struct flow_info flow_info_msg_to_s(const flow_info_msg_t * msg) +{ + struct flow_info s; + + assert(msg != NULL); + + s.id = msg->id; + s.n_pid = msg->n_pid; + s.n_1_pid = msg->n_1_pid; + s.mpl = msg->mpl; + s.state = msg->state; + s.qs = qos_spec_msg_to_s(msg->qos); + + return s; +} + layer_info_msg_t * layer_info_s_to_msg(const struct layer_info * s) { layer_info_msg_t * msg; diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c new file mode 100644 index 00000000..c4ba3053 --- /dev/null +++ b/src/lib/serdes-irm.c @@ -0,0 +1,478 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Ouroboros IRM Protocol - serialization/deserialization + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" + +#include +#include +#include + +#include +#include + +int flow_accept__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const struct timespec * timeo) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->timeo = timeo == NULL ? NULL : timespec_s_to_msg(timeo); + if (timeo != NULL && msg->timeo == NULL) + goto fail_msg; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +static int __flow_alloc_ser(buffer_t * buf, + const struct flow_info * flow, + const char * dst, + const struct timespec * timeo, + int msg_code) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = msg_code; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->dst = strdup(dst); + if (msg->dst == NULL) + goto fail_msg; + + msg->timeo = timeo == NULL ? NULL : timespec_s_to_msg(timeo); + if (timeo != NULL && msg->timeo == NULL) + goto fail_msg; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int flow_alloc__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const char * dst, + const struct timespec * timeo) +{ + return __flow_alloc_ser(buf, flow, dst, timeo, + IRM_MSG_CODE__IRM_FLOW_ALLOC); +} + +int flow_join__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const char * dst, + const struct timespec * timeo) +{ + return __flow_alloc_ser(buf, flow, dst, timeo, + IRM_MSG_CODE__IRM_FLOW_JOIN); +} + +int flow__irm_result_des(buffer_t * buf, + struct flow_info * flow, + buffer_t * sk) +{ + irm_msg_t * msg; + int err; + + if (sk != NULL) + sk->data = NULL; + + msg = irm_msg__unpack(NULL, buf->len, buf->data); + if (msg == NULL) { + err = -EIRMD; + goto fail_msg; + } + + if (!msg->has_result) { + err = -EIRMD; + goto fail; + } + + if (msg->result < 0) { + err = msg->result; + goto fail; + } + + if (msg->flow_info == NULL) { + err = -EBADF; + goto fail; + } + + *flow = flow_info_msg_to_s(msg->flow_info); + + if (flow->qs.cypher_s > 0 && sk != NULL) { + if (msg->symmkey.data == NULL || msg->symmkey.len == 0) { + err = -ECRYPT; + goto fail; + } + + sk->len = msg->symmkey.len; + sk->data = msg->symmkey.data; + + msg->symmkey.data = NULL; + msg->symmkey.len = 0; + } + + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail: + irm_msg__free_unpacked(msg, NULL); + fail_msg: + return err; +} + +int flow_dealloc__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const struct timespec * timeo) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->timeo = timespec_s_to_msg(timeo); + if (msg->timeo == NULL) + goto fail_msg; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf, + const struct flow_info * flow) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + + +int ipcp_create_r__irm_req_ser(buffer_t * buf, + const struct ipcp_info * ipcp) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IPCP_CREATE_R; + msg->ipcp_info = ipcp_info_s_to_msg(ipcp); + if (msg->ipcp_info == NULL) + goto fail_msg; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int proc_announce__irm_req_ser(buffer_t * buf, + const char * prog) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE; + msg->has_pid = true; + msg->pid = getpid(); + msg->prog = strdup(prog); + if (msg->prog == NULL) + goto fail_msg; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int proc_exit__irm_req_ser(buffer_t * buf) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IRM_PROC_EXIT; + msg->has_pid = true; + msg->pid = getpid(); + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf, + const buffer_t * dst, + const struct flow_info * flow, + const buffer_t * data) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->has_hash = true; + msg->hash.len = dst->len; + msg->hash.data = dst->data; + msg->has_pk = true; + msg->pk.len = data->len; + msg->pk.data = data->data; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + + /* Don't free * dst or data! */ + msg->hash.len = 0; + msg->hash.data = NULL; + msg->pk.len = 0; + msg->pk.data = NULL; + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, + const struct flow_info * flow, + int response, + const buffer_t * data) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->has_pk = true; + msg->pk.len = data->len; + msg->pk.data = data->data; + msg->has_response = true; + msg->response = response; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + + /* Don't free * data! */ + msg->pk.len = 0; + msg->pk.data = NULL; + + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + +int irm__irm_result_des(buffer_t * buf) +{ + irm_msg_t * msg; + int err; + + msg = irm_msg__unpack(NULL, buf->len, buf->data); + if (msg == NULL) { + err = -EIRMD; + goto fail_msg; + } + + if (!msg->has_result) { + err = -EIRMD; + goto fail; + } + + err = msg->result; + fail: + irm_msg__free_unpacked(msg, NULL); + fail_msg: + return err; +} diff --git a/src/lib/serdes-oep.c b/src/lib/serdes-oep.c index f92011c5..8a836b3b 100644 --- a/src/lib/serdes-oep.c +++ b/src/lib/serdes-oep.c @@ -25,7 +25,6 @@ #include #include - ssize_t enroll_req_ser(const struct enroll_req * req, buffer_t buf) { diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 9c5b7a51..13219db0 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -21,6 +21,7 @@ */ #include +#include #include #include @@ -29,7 +30,6 @@ #include #include #include -#include #include /* Apple doesn't support SEQPACKET. */ @@ -118,7 +118,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) pthread_cleanup_push(__cleanup_close_ptr, &sockfd); - if (write(sockfd, buf, len) != -1) + len = write(sockfd, buf, len); + if (len >= 0) len = read(sockfd, buf, SOCK_BUF_SIZE); pthread_cleanup_pop(true); @@ -131,6 +132,28 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) return NULL; } +int send_recv_msg(buffer_t * msg) +{ + int sockfd; + ssize_t len = 0; + + sockfd = client_socket_open(IRM_SOCK_PATH); + if (sockfd < 0) + return -1; + + pthread_cleanup_push(__cleanup_close_ptr, &sockfd); + + len = write(sockfd, msg->data, msg->len); + if (len >= 0) + len = read(sockfd, msg->data, SOCK_BUF_SIZE); + + pthread_cleanup_pop(true); + + msg->len = (size_t) len; + + return len < 0 ? -1 : 0; +} + char * ipcp_sock_path(pid_t pid) { char * full_name = NULL; diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 40dfbb19..96f4ac47 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -178,7 +178,7 @@ static void timerwheel_move(void) shm_du_buff_ack(r->sdb); #endif if (f->frcti == NULL - || f->flow_id != r->flow_id) + || f->info.id != r->flow_id) goto cleanup; pthread_rwlock_rdlock(&r->frcti->lock); @@ -249,7 +249,7 @@ static void timerwheel_move(void) if (shm_rbuff_write(f->tx_rb, idx) < 0) #endif goto flow_down; - shm_flow_set_notify(f->set, f->flow_id, + shm_flow_set_notify(f->set, f->info.id, FLOW_PKT); reschedule: list_add(&r->next, &rw.rxms[lvl][rslot]); @@ -292,7 +292,7 @@ static void timerwheel_move(void) rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; - if (f->flow_id == a->flow_id && f->frcti != NULL) + if (f->info.id == a->flow_id && f->frcti != NULL) send_frct_pkt(a->frcti); free(a); @@ -341,7 +341,7 @@ static int timerwheel_rxm(struct frcti * frcti, slot = r->t0 >> RXMQ_RES; r->fd = frcti->fd; - r->flow_id = ai.flows[r->fd].flow_id; + r->flow_id = ai.flows[r->fd].info.id; pthread_rwlock_unlock(&r->frcti->lock); @@ -394,7 +394,7 @@ static int timerwheel_delayed_ack(int fd, a->fd = fd; a->frcti = frcti; - a->flow_id = ai.flows[fd].flow_id; + a->flow_id = ai.flows[fd].info.id; pthread_mutex_lock(&rw.lock); -- cgit v1.2.3