From e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 23 Feb 2024 09:29:47 +0100 Subject: lib: Revise app flow allocation This revises the application flow allocator to use the flow_info struct/message between the components. Revises the messaging to move the use protocol buffers to its own source (serdes-irm). Adds a timeout to the IRMd flow allocator to make sure flow allocations don't hang forever (this was previously taken care of by the sanitize thread). Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/irmd/CMakeLists.txt | 9 +++-- src/irmd/config.h.in | 45 +++++++++++---------- src/irmd/ipcp.c | 12 +++++- src/irmd/main.c | 105 +++++++++++++++++++----------------------------- src/irmd/reg/reg.c | 2 +- 5 files changed, 82 insertions(+), 91 deletions(-) (limited to 'src/irmd') diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt index 3a5be324..7eaa0ce6 100644 --- a/src/irmd/CMakeLists.txt +++ b/src/irmd/CMakeLists.txt @@ -40,18 +40,19 @@ endif () set(IRMD_REQ_ARR_TIMEOUT 1000 CACHE STRING "Timeout for an application to respond to a new flow (ms)") -set(IRMD_FLOW_TIMEOUT 5000 CACHE STRING - "Timeout for a flow allocation response (ms)") + set(BOOTSTRAP_TIMEOUT 5000 CACHE STRING "Timeout for an IPCP to bootstrap (ms)") set(ENROLL_TIMEOUT 60000 CACHE STRING "Timeout for an IPCP to enroll (ms)") -set(REG_TIMEOUT 10000 CACHE STRING +set(REG_TIMEOUT 60000 CACHE STRING "Timeout for registering a name (ms)") -set(QUERY_TIMEOUT 3000 CACHE STRING +set(QUERY_TIMEOUT 60000 CACHE STRING "Timeout to query a name with an IPCP (ms)") set(CONNECT_TIMEOUT 60000 CACHE STRING "Timeout to connect an IPCP to another IPCP (ms)") +set(FLOW_ALLOC_TIMEOUT 5000 CACHE STRING + "Timeout for a flow allocation response (ms)") set(IRMD_MIN_THREADS 8 CACHE STRING "Minimum number of worker threads in the IRMd") set(IRMD_ADD_THREADS 8 CACHE STRING diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in index b25053f7..fa1156b9 100644 --- a/src/irmd/config.h.in +++ b/src/irmd/config.h.in @@ -21,38 +21,39 @@ */ -#define IPCP_UDP_EXEC "@IPCP_UDP_TARGET@" -#define IPCP_ETH_LLC_EXEC "@IPCP_ETH_LLC_TARGET@" -#define IPCP_ETH_DIX_EXEC "@IPCP_ETH_DIX_TARGET@" -#define IPCP_UNICAST_EXEC "@IPCP_UNICAST_TARGET@" -#define IPCP_BROADCAST_EXEC "@IPCP_BROADCAST_TARGET@" -#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" +#define IPCP_UDP_EXEC "@IPCP_UDP_TARGET@" +#define IPCP_ETH_LLC_EXEC "@IPCP_ETH_LLC_TARGET@" +#define IPCP_ETH_DIX_EXEC "@IPCP_ETH_DIX_TARGET@" +#define IPCP_UNICAST_EXEC "@IPCP_UNICAST_TARGET@" +#define IPCP_BROADCAST_EXEC "@IPCP_BROADCAST_TARGET@" +#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" -#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" -#define INSTALL_SBINDIR "@CMAKE_INSTALL_SBINDIR@" +#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" +#define INSTALL_SBINDIR "@CMAKE_INSTALL_SBINDIR@" -#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@ +#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@ -#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@ +#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@ -#define IRMD_REQ_ARR_TIMEOUT @IRMD_REQ_ARR_TIMEOUT@ -#define IRMD_FLOW_TIMEOUT @IRMD_FLOW_TIMEOUT@ +#define IRMD_REQ_ARR_TIMEOUT @IRMD_REQ_ARR_TIMEOUT@ -#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@ -#define ENROLL_TIMEOUT @ENROLL_TIMEOUT@ -#define REG_TIMEOUT @REG_TIMEOUT@ -#define QUERY_TIMEOUT @QUERY_TIMEOUT@ -#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@ +#define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@ +#define FLOW_DEALLOC_TIMEOUT @FLOW_DEALLOC_TIMEOUT@ -#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@ +#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@ +#define ENROLL_TIMEOUT @ENROLL_TIMEOUT@ +#define REG_TIMEOUT @REG_TIMEOUT@ +#define QUERY_TIMEOUT @QUERY_TIMEOUT@ +#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@ -#define IRMD_MIN_THREADS @IRMD_MIN_THREADS@ -#define IRMD_ADD_THREADS @IRMD_ADD_THREADS@ +#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@ +#define IRMD_MIN_THREADS @IRMD_MIN_THREADS@ +#define IRMD_ADD_THREADS @IRMD_ADD_THREADS@ #cmakedefine HAVE_FUSE #ifdef HAVE_FUSE -#define FUSE_PREFIX "@FUSE_PREFIX@" +#define FUSE_PREFIX "@FUSE_PREFIX@" #endif #cmakedefine HAVE_TOML @@ -61,7 +62,7 @@ #define OUROBOROS_CONFIG_FILE "@OUROBOROS_CONFIG_FILE@" #endif -#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@ +#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@ #cmakedefine IRMD_KILL_ALL_PROCESSES #cmakedefine HAVE_LIBGCRYPT diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 3253a8f3..c8055aa1 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -58,6 +58,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid, struct timeval tv; struct timespec tic; struct timespec toc; + bool dealloc = false; if (kill(pid, 0) < 0) return NULL; @@ -101,6 +102,15 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid, tv.tv_sec = CONNECT_TIMEOUT / 1000; tv.tv_usec = (CONNECT_TIMEOUT % 1000) * 1000; break; + case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: + tv.tv_sec = FLOW_ALLOC_TIMEOUT / 1000; + tv.tv_usec = (FLOW_ALLOC_TIMEOUT % 1000) * 1000; + break; + case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: + dealloc = true; + tv.tv_sec = 0; /* FIX DEALLOC: don't wait for dealloc */ + tv.tv_usec = 500; + break; default: tv.tv_sec = SOCKET_TIMEOUT / 1000; tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000; @@ -127,7 +137,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid, if (len > 0) recv_msg = ipcp_msg__unpack(NULL, len, buf); else { - if (errno == EAGAIN) { + if (errno == EAGAIN && !dealloc) { int diff = ts_diff_ms(&tic, &toc); log_warn("IPCP command timed out after %d ms.", diff); } diff --git a/src/irmd/main.c b/src/irmd/main.c index 2cbe8ed4..32f41ab2 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1234,14 +1234,14 @@ static int flow_alloc_reply(struct flow_info * flow, } static int flow_dealloc(struct flow_info * flow, - time_t timeo) + struct timespec * ts) { log_info("Deallocating flow %d for process %d.", flow->id, flow->n_pid); reg_dealloc_flow(flow); - if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, timeo) < 0) { + if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) { log_err("Failed to request dealloc from %d.", flow->n_1_pid); return -EIPCP; } @@ -1324,14 +1324,27 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) struct flow_info flow; struct proc_info proc; struct name_info name; - struct timespec * abstime = NULL; - struct timespec ts; + struct timespec * abstime; + struct timespec max = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT); + struct timespec now; + struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */ int res; irm_msg_t * ret_msg; buffer_t data; memset(&flow, 0, sizeof(flow)); + clock_gettime(PTHREAD_COND_CLOCK, &now); + + if (msg->timeo != NULL) { + ts = timespec_msg_to_s(msg->timeo); + ts_add(&ts, &now, &ts); + abstime = &ts; + } else { + ts_add(&max, &now, &max); + abstime = NULL; + } + ret_msg = malloc(sizeof(*ret_msg)); if (ret_msg == NULL) { log_err("Failed to malloc return msg."); @@ -1342,20 +1355,6 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) ret_msg->code = IRM_MSG_CODE__IRM_REPLY; - if (msg->has_timeo_sec) { - struct timespec now; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - assert(msg->has_timeo_nsec); - - ts.tv_sec = msg->timeo_sec; - ts.tv_nsec = msg->timeo_nsec; - - ts_add(&ts, &now, &ts); - - abstime = &ts; - } - pthread_cleanup_push(free_msg, ret_msg); switch (msg->code) { @@ -1430,20 +1429,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) case IRM_MSG_CODE__IRM_FLOW_ACCEPT: data.len = msg->pk.len; data.data = msg->pk.data; + msg->has_pk = false; assert(data.len > 0 ? data.data != NULL : data.data == NULL); - flow.n_pid = msg->pid; - flow.qs = qos_raw; + flow = flow_info_msg_to_s(msg->flow_info); res = flow_accept(&flow, &data, abstime); if (res == 0) { - qosspec_msg_t * qs_msg; - qs_msg = qos_spec_s_to_msg(&flow.qs); - ret_msg->has_flow_id = true; - ret_msg->flow_id = flow.id; - ret_msg->has_pid = true; - ret_msg->pid = flow.n_1_pid; - ret_msg->has_mpl = true; - ret_msg->qosspec = qs_msg; - ret_msg->mpl = flow.mpl; + ret_msg->flow_info = flow_info_s_to_msg(&flow); ret_msg->has_symmkey = data.len != 0; ret_msg->symmkey.data = data.data; ret_msg->symmkey.len = data.len; @@ -1453,17 +1444,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) data.len = msg->pk.len; data.data = msg->pk.data; msg->has_pk = false; - flow.n_pid = msg->pid; - flow.qs = qos_spec_msg_to_s(msg->qosspec); assert(data.len > 0 ? data.data != NULL : data.data == NULL); + flow = flow_info_msg_to_s(msg->flow_info); + abstime = abstime == NULL ? &max : abstime; res = flow_alloc(&flow, msg->dst, &data, abstime); if (res == 0) { - ret_msg->has_flow_id = true; - ret_msg->flow_id = flow.id; - ret_msg->has_pid = true; - ret_msg->pid = flow.n_1_pid; - ret_msg->has_mpl = true; - ret_msg->mpl = flow.mpl; + ret_msg->flow_info = flow_info_s_to_msg(&flow); ret_msg->has_symmkey = data.len != 0; ret_msg->symmkey.data = data.data; ret_msg->symmkey.len = data.len; @@ -1471,46 +1457,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) break; case IRM_MSG_CODE__IRM_FLOW_JOIN: assert(msg->pk.len == 0 && msg->pk.data == NULL); - flow.qs = qos_spec_msg_to_s(msg->qosspec); + flow = flow_info_msg_to_s(msg->flow_info); + abstime = abstime == NULL ? &max : abstime; res = flow_join(&flow, msg->dst, abstime); + if (res == 0) + ret_msg->flow_info = flow_info_s_to_msg(&flow); break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - flow.n_pid = msg->pid; - flow.id = msg->flow_id; - res = flow_dealloc(&flow, msg->timeo_sec); + flow = flow_info_msg_to_s(msg->flow_info); + res = flow_dealloc(&flow, &ts); break; case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: - flow.n_1_pid = msg->pid; - flow.id = msg->flow_id; + flow = flow_info_msg_to_s(msg->flow_info); res = flow_dealloc_resp(&flow); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: data.len = msg->pk.len; data.data = msg->pk.data; - msg->has_pk = false; /* pass data */ - msg->pk.data = NULL; + msg->pk.data = NULL; /* pass data */ msg->pk.len = 0; assert(data.len > 0 ? data.data != NULL : data.data == NULL); - flow.n_1_pid = msg->pid; - flow.mpl = msg->mpl; - flow.qs = qos_spec_msg_to_s(msg->qosspec); + flow = flow_info_msg_to_s(msg->flow_info); res = flow_req_arr(&flow, msg->hash.data, &data); - if (res == 0) { - ret_msg->has_flow_id = true; - ret_msg->flow_id = flow.id; - ret_msg->has_pid = true; - ret_msg->pid = flow.n_pid; - } + if (res == 0) + ret_msg->flow_info = flow_info_s_to_msg(&flow); break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: data.len = msg->pk.len; data.data = msg->pk.data; - msg->has_pk = false; /* pass data */ - msg->pk.data = NULL; + msg->pk.data = NULL; /* pass data */ msg->pk.len = 0; assert(data.len > 0 ? data.data != NULL : data.data == NULL); - flow.id = msg->flow_id; - flow.mpl = msg->mpl; + flow = flow_info_msg_to_s(msg->flow_info); res = flow_alloc_reply(&flow, msg->response, &data); break; default: @@ -1522,7 +1500,10 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg) pthread_cleanup_pop(false); ret_msg->has_result = true; - ret_msg->result = res; + if (abstime == &max && res == -ETIMEDOUT) + ret_msg->result = -EPERM; /* No timeout requested */ + else + ret_msg->result = res; return ret_msg; } @@ -1664,8 +1645,6 @@ static void destroy_mount(char * mnt) { struct stat st; - log_dbg("Destroying mountpoint %s.", mnt); - if (stat(mnt, &st) == -1){ switch(errno) { case ENOENT: @@ -1719,7 +1698,7 @@ static void cleanup_pid(pid_t pid) void * irm_sanitize(void * o) { pid_t pid; - struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 20); + struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 20); (void) o; @@ -2003,7 +1982,7 @@ static void * kill_dash_nine(void * o) { time_t slept = 0; #ifdef IRMD_KILL_ALL_PROCESSES - struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 19); + struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 19); #endif (void) o; diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c index f486c1cc..731e44b6 100644 --- a/src/irmd/reg/reg.c +++ b/src/irmd/reg/reg.c @@ -1490,7 +1490,7 @@ int reg_get_exec(enum hash_algo algo, exec = __reg_get_exec(algo, hash); if (exec == NULL) { - ret = 0; + ret = -EPERM; goto finish; } -- cgit v1.2.3