diff options
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 2 | ||||
-rw-r--r-- | src/irmd/main.c | 66 |
2 files changed, 61 insertions, 7 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index f583a998..186f0ebc 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -910,7 +910,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) SO_REUSEADDR, &enable, sizeof(int)) < 0) - LOG_WARN("Setsockopt(SO_REUSEADDR) failed."); + LOG_WARN("Failed to set SO_REUSEADDR."); memset((char *) &s_saddr, 0, sizeof(s_saddr)); shim_data(_ipcp)->s_saddr.sin_family = AF_INET; diff --git a/src/irmd/main.c b/src/irmd/main.c index db96b6ed..c20d63db 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -37,6 +37,7 @@ #include <ouroboros/flow.h> #include <ouroboros/qos.h> #include <ouroboros/rw_lock.h> +#include <ouroboros/time_utils.h> #include "utils.h" @@ -50,13 +51,19 @@ #include <pthread.h> #ifndef IRMD_MAX_FLOWS - #define IRMD_MAX_FLOWS 4096 +#define IRMD_MAX_FLOWS 4096 #endif #ifndef IRMD_THREADPOOL_SIZE - #define IRMD_THREADPOOL_SIZE 3 +#define IRMD_THREADPOOL_SIZE 3 #endif +#ifndef IRMD_FLOW_TIMEOUT +#define IRMD_FLOW_TIMEOUT 5000 /* ms */ +#endif + +#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ + struct ipcp_entry { struct list_head next; instance_name_t * api; @@ -98,6 +105,7 @@ struct port_map_entry { pthread_mutex_t res_lock; enum flow_state state; + struct timespec t0; }; struct irm { @@ -116,6 +124,8 @@ struct irm { pthread_t * threadpool; int sockfd; rw_lock_t state_lock; + + pthread_t cleanup_flows; } * instance = NULL; static struct port_map_entry * port_map_entry_create() @@ -139,6 +149,9 @@ static struct port_map_entry * port_map_entry_create() return NULL; } + e->t0.tv_sec = 0; + e->t0.tv_nsec = 0; + return e; } @@ -945,6 +958,8 @@ static struct port_map_entry * flow_alloc(pid_t pid, pme->n_pid = pid; pme->state = FLOW_PENDING; + if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) + LOG_WARN("Failed to set timestamp."); rw_lock_rdlock(&instance->state_lock); rw_lock_rdlock(&instance->reg_lock); @@ -1314,6 +1329,8 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) } + pthread_cancel(instance->cleanup_flows); + rw_lock_unlock(&instance->state_lock); case SIGPIPE: @@ -1323,6 +1340,43 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) } } +void * irm_flow_cleaner() +{ + struct timespec now; + struct list_head * pos = NULL; + struct list_head * n = NULL; + struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION, + IRMD_CLEANUP_TIMER % BILLION}; + + while (true) { + if(clock_gettime(CLOCK_MONOTONIC, &now) < 0) + LOG_WARN("Failed to get time."); + /* cleanup stale PENDING flows */ + rw_lock_rdlock(&instance->state_lock); + + list_for_each_safe(pos, n, &(instance->port_map)) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + pthread_mutex_lock(&e->res_lock); + + if (e->state == FLOW_PENDING && + ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) { + LOG_DBGF("Flow time exceeded on port ID %d.", + e->port_id); + e->state = FLOW_NULL; + pthread_cond_broadcast(&e->res_signal); + } + + pthread_mutex_unlock(&e->res_lock); + } + + rw_lock_unlock(&instance->state_lock); + + nanosleep(&timeout, NULL); + } +} + void * mainloop() { uint8_t buf[IRM_MSG_BUF_SIZE]; @@ -1579,10 +1633,8 @@ int main() if (instance == NULL) return 1; - /* - * FIXME: we need a main loop that delegates messages to subthreads in a - * way that avoids all possible deadlocks for local apps - */ + + pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); @@ -1591,6 +1643,8 @@ int main() for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_join(instance->threadpool[t], NULL); + pthread_join(instance->cleanup_flows, NULL); + irm_destroy(instance); return 0; |