diff options
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/main.c | 66 | 
1 files changed, 60 insertions, 6 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index db96b6ed..c20d63db 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -37,6 +37,7 @@  #include <ouroboros/flow.h>  #include <ouroboros/qos.h>  #include <ouroboros/rw_lock.h> +#include <ouroboros/time_utils.h>  #include "utils.h" @@ -50,13 +51,19 @@  #include <pthread.h>  #ifndef IRMD_MAX_FLOWS -  #define IRMD_MAX_FLOWS 4096 +#define IRMD_MAX_FLOWS 4096  #endif  #ifndef IRMD_THREADPOOL_SIZE -  #define IRMD_THREADPOOL_SIZE 3 +#define IRMD_THREADPOOL_SIZE 3  #endif +#ifndef IRMD_FLOW_TIMEOUT +#define IRMD_FLOW_TIMEOUT 5000 /* ms */ +#endif + +#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ +  struct ipcp_entry {          struct list_head  next;          instance_name_t * api; @@ -98,6 +105,7 @@ struct port_map_entry {          pthread_mutex_t res_lock;          enum flow_state state; +        struct timespec t0;  };  struct irm { @@ -116,6 +124,8 @@ struct irm {          pthread_t *         threadpool;          int                 sockfd;          rw_lock_t           state_lock; + +        pthread_t           cleanup_flows;  } * instance = NULL;  static struct port_map_entry * port_map_entry_create() @@ -139,6 +149,9 @@ static struct port_map_entry * port_map_entry_create()                  return NULL;          } +        e->t0.tv_sec  = 0; +        e->t0.tv_nsec = 0; +          return e;  } @@ -945,6 +958,8 @@ static struct port_map_entry * flow_alloc(pid_t  pid,          pme->n_pid   = pid;          pme->state   = FLOW_PENDING; +        if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) +                LOG_WARN("Failed to set timestamp.");          rw_lock_rdlock(&instance->state_lock);          rw_lock_rdlock(&instance->reg_lock); @@ -1314,6 +1329,8 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)                  } +                pthread_cancel(instance->cleanup_flows); +                  rw_lock_unlock(&instance->state_lock);          case SIGPIPE: @@ -1323,6 +1340,43 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c)          }  } +void * irm_flow_cleaner() +{ +        struct timespec now; +        struct list_head * pos = NULL; +        struct list_head * n   = NULL; +        struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION, +                                   IRMD_CLEANUP_TIMER % BILLION}; + +        while (true) { +                if(clock_gettime(CLOCK_MONOTONIC, &now) < 0) +                        LOG_WARN("Failed to get time."); +                /* cleanup stale PENDING flows */ +                rw_lock_rdlock(&instance->state_lock); + +                list_for_each_safe(pos, n, &(instance->port_map)) { +                        struct port_map_entry * e = +                                list_entry(pos, struct port_map_entry, next); + +                        pthread_mutex_lock(&e->res_lock); + +                        if (e->state == FLOW_PENDING && +                            ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) { +                                LOG_DBGF("Flow time exceeded on port ID %d.", +                                         e->port_id); +                                e->state = FLOW_NULL; +                                pthread_cond_broadcast(&e->res_signal); +                        } + +                        pthread_mutex_unlock(&e->res_lock); +                } + +                rw_lock_unlock(&instance->state_lock); + +                nanosleep(&timeout, NULL); +        } +} +  void * mainloop()  {          uint8_t buf[IRM_MSG_BUF_SIZE]; @@ -1579,10 +1633,8 @@ int main()          if (instance == NULL)                  return 1; -        /* -         * FIXME: we need a main loop that delegates messages to subthreads in a -         * way that avoids all possible deadlocks for local apps -         */ + +        pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL);          for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)                  pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); @@ -1591,6 +1643,8 @@ int main()          for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)                  pthread_join(instance->threadpool[t], NULL); +        pthread_join(instance->cleanup_flows, NULL); +          irm_destroy(instance);          return 0; | 
