From 1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 20 Sep 2020 13:04:52 +0200 Subject: lib: Complete retransmission logic This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/irmd/ipcp.c | 13 ++++++++----- src/irmd/ipcp.h | 5 +++-- src/irmd/main.c | 20 ++++++++++++-------- 3 files changed, 23 insertions(+), 15 deletions(-) (limited to 'src/irmd') diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 78408185..cbd9ee15 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -543,16 +543,19 @@ int ipcp_flow_alloc_resp(pid_t pid, return ret; } -int ipcp_flow_dealloc(pid_t pid, - int flow_id) +int ipcp_flow_dealloc(pid_t pid, + int flow_id, + time_t timeo) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.flow_id = flow_id; + msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; + msg.has_flow_id = true; + msg.flow_id = flow_id; + msg.has_timeo_sec = true; + msg.timeo_sec = timeo; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index ae00792b..652316ba 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -85,7 +85,8 @@ int ipcp_flow_alloc_resp(pid_t pid, const void * data, size_t len); -int ipcp_flow_dealloc(pid_t pid, - int flow_id); +int ipcp_flow_dealloc(pid_t pid, + int flow_id, + time_t timeo); #endif /* OUROBOROS_IRMD_IPCP_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index 3709a3e5..3a0ad544 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -68,10 +68,11 @@ #endif #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ -#define SHM_SAN_HOLDOFF 1000 /* ms */ -#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) -#define IB_LEN SOCK_BUF_SIZE -#define BIND_TIMEOUT 10 /* ms */ +#define SHM_SAN_HOLDOFF 1000 /* ms */ +#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) +#define IB_LEN SOCK_BUF_SIZE +#define BIND_TIMEOUT 10 /* ms */ +#define DEALLOC_TIME 300 /* s */ enum init_state { IPCP_NULL = 0, @@ -1475,7 +1476,8 @@ static int flow_alloc(pid_t pid, } static int flow_dealloc(pid_t pid, - int flow_id) + int flow_id, + time_t timeo) { pid_t n_1_pid = -1; int ret = 0; @@ -1521,7 +1523,7 @@ static int flow_dealloc(pid_t pid, pthread_rwlock_unlock(&irmd.flows_lock); if (n_1_pid != -1) - ret = ipcp_flow_dealloc(n_1_pid, flow_id); + ret = ipcp_flow_dealloc(n_1_pid, flow_id, timeo); return ret; } @@ -1927,7 +1929,7 @@ void * irm_sanitize(void * o) ipcpi = f->n_1_pid; flow_id = f->flow_id; pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_dealloc(ipcpi, flow_id); + ipcp_flow_dealloc(ipcpi, flow_id, DEALLOC_TIME); pthread_rwlock_wrlock(&irmd.flows_lock); continue; } @@ -2190,7 +2192,9 @@ static void * mainloop(void * o) } break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - result = flow_dealloc(msg->pid, msg->flow_id); + result = flow_dealloc(msg->pid, + msg->flow_id, + msg->timeo_sec); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: assert(msg->pk.len > 0 ? msg->pk.data != NULL -- cgit v1.2.3