diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2024-02-23 09:29:47 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2024-02-23 16:41:37 +0100 | 
| commit | e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f (patch) | |
| tree | ad959d95f8fb1f6d4744c57c9027bf182bc3190b /src/irmd | |
| parent | dcefa07624926da23a559eedc3f7361ac36e8312 (diff) | |
| download | ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.tar.gz ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.zip | |
lib: Revise app flow allocation
This revises the application flow allocator to use the flow_info
struct/message between the components. Revises the messaging to move
the use protocol buffers to its own source (serdes-irm).
Adds a timeout to the IRMd flow allocator to make sure flow
allocations don't hang forever (this was previously taken care of by
the sanitize thread).
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/CMakeLists.txt | 9 | ||||
| -rw-r--r-- | src/irmd/config.h.in | 45 | ||||
| -rw-r--r-- | src/irmd/ipcp.c | 12 | ||||
| -rw-r--r-- | src/irmd/main.c | 105 | ||||
| -rw-r--r-- | src/irmd/reg/reg.c | 2 | 
5 files changed, 82 insertions, 91 deletions
| diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt index 3a5be324..7eaa0ce6 100644 --- a/src/irmd/CMakeLists.txt +++ b/src/irmd/CMakeLists.txt @@ -40,18 +40,19 @@ endif ()  set(IRMD_REQ_ARR_TIMEOUT 1000 CACHE STRING    "Timeout for an application to respond to a new flow (ms)") -set(IRMD_FLOW_TIMEOUT 5000 CACHE STRING -  "Timeout for a flow allocation response (ms)") +  set(BOOTSTRAP_TIMEOUT 5000 CACHE STRING    "Timeout for an IPCP to bootstrap (ms)")  set(ENROLL_TIMEOUT 60000 CACHE STRING    "Timeout for an IPCP to enroll (ms)") -set(REG_TIMEOUT 10000 CACHE STRING +set(REG_TIMEOUT 60000 CACHE STRING    "Timeout for registering a name (ms)") -set(QUERY_TIMEOUT 3000 CACHE STRING +set(QUERY_TIMEOUT 60000 CACHE STRING    "Timeout to query a name with an IPCP (ms)")  set(CONNECT_TIMEOUT 60000 CACHE STRING    "Timeout to connect an IPCP to another IPCP (ms)") +set(FLOW_ALLOC_TIMEOUT 5000 CACHE STRING +  "Timeout for a flow allocation response (ms)")  set(IRMD_MIN_THREADS 8 CACHE STRING    "Minimum number of worker threads in the IRMd")  set(IRMD_ADD_THREADS 8 CACHE STRING diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in index b25053f7..fa1156b9 100644 --- a/src/irmd/config.h.in +++ b/src/irmd/config.h.in @@ -21,38 +21,39 @@   */ -#define IPCP_UDP_EXEC           "@IPCP_UDP_TARGET@" -#define IPCP_ETH_LLC_EXEC       "@IPCP_ETH_LLC_TARGET@" -#define IPCP_ETH_DIX_EXEC       "@IPCP_ETH_DIX_TARGET@" -#define IPCP_UNICAST_EXEC       "@IPCP_UNICAST_TARGET@" -#define IPCP_BROADCAST_EXEC     "@IPCP_BROADCAST_TARGET@" -#define IPCP_LOCAL_EXEC         "@IPCP_LOCAL_TARGET@" +#define IPCP_UDP_EXEC          "@IPCP_UDP_TARGET@" +#define IPCP_ETH_LLC_EXEC      "@IPCP_ETH_LLC_TARGET@" +#define IPCP_ETH_DIX_EXEC      "@IPCP_ETH_DIX_TARGET@" +#define IPCP_UNICAST_EXEC      "@IPCP_UNICAST_TARGET@" +#define IPCP_BROADCAST_EXEC    "@IPCP_BROADCAST_TARGET@" +#define IPCP_LOCAL_EXEC        "@IPCP_LOCAL_TARGET@" -#define INSTALL_PREFIX          "@CMAKE_INSTALL_PREFIX@" -#define INSTALL_SBINDIR         "@CMAKE_INSTALL_SBINDIR@" +#define INSTALL_PREFIX         "@CMAKE_INSTALL_PREFIX@" +#define INSTALL_SBINDIR        "@CMAKE_INSTALL_SBINDIR@" -#define PTHREAD_COND_CLOCK      @PTHREAD_COND_CLOCK@ +#define PTHREAD_COND_CLOCK     @PTHREAD_COND_CLOCK@ -#define SOCKET_TIMEOUT          @SOCKET_TIMEOUT@ +#define SOCKET_TIMEOUT         @SOCKET_TIMEOUT@ -#define IRMD_REQ_ARR_TIMEOUT    @IRMD_REQ_ARR_TIMEOUT@ -#define IRMD_FLOW_TIMEOUT       @IRMD_FLOW_TIMEOUT@ +#define IRMD_REQ_ARR_TIMEOUT   @IRMD_REQ_ARR_TIMEOUT@ -#define BOOTSTRAP_TIMEOUT       @BOOTSTRAP_TIMEOUT@ -#define ENROLL_TIMEOUT          @ENROLL_TIMEOUT@ -#define REG_TIMEOUT             @REG_TIMEOUT@ -#define QUERY_TIMEOUT           @QUERY_TIMEOUT@ -#define CONNECT_TIMEOUT         @CONNECT_TIMEOUT@ +#define FLOW_ALLOC_TIMEOUT     @FLOW_ALLOC_TIMEOUT@ +#define FLOW_DEALLOC_TIMEOUT   @FLOW_DEALLOC_TIMEOUT@ -#define SYS_MAX_FLOWS           @SYS_MAX_FLOWS@ +#define BOOTSTRAP_TIMEOUT      @BOOTSTRAP_TIMEOUT@ +#define ENROLL_TIMEOUT         @ENROLL_TIMEOUT@ +#define REG_TIMEOUT            @REG_TIMEOUT@ +#define QUERY_TIMEOUT          @QUERY_TIMEOUT@ +#define CONNECT_TIMEOUT        @CONNECT_TIMEOUT@ -#define IRMD_MIN_THREADS        @IRMD_MIN_THREADS@ -#define IRMD_ADD_THREADS        @IRMD_ADD_THREADS@ +#define SYS_MAX_FLOWS          @SYS_MAX_FLOWS@ +#define IRMD_MIN_THREADS       @IRMD_MIN_THREADS@ +#define IRMD_ADD_THREADS       @IRMD_ADD_THREADS@  #cmakedefine HAVE_FUSE  #ifdef HAVE_FUSE -#define FUSE_PREFIX             "@FUSE_PREFIX@" +#define FUSE_PREFIX            "@FUSE_PREFIX@"  #endif  #cmakedefine HAVE_TOML @@ -61,7 +62,7 @@  #define OUROBOROS_CONFIG_FILE  "@OUROBOROS_CONFIG_FILE@"  #endif -#define IRMD_PKILL_TIMEOUT      @IRMD_PKILL_TIMEOUT@ +#define IRMD_PKILL_TIMEOUT     @IRMD_PKILL_TIMEOUT@  #cmakedefine IRMD_KILL_ALL_PROCESSES  #cmakedefine HAVE_LIBGCRYPT diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 3253a8f3..c8055aa1 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -58,6 +58,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        pid,          struct timeval  tv;          struct timespec tic;          struct timespec toc; +        bool            dealloc = false;          if (kill(pid, 0) < 0)                  return NULL; @@ -101,6 +102,15 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        pid,                  tv.tv_sec  = CONNECT_TIMEOUT / 1000;                  tv.tv_usec = (CONNECT_TIMEOUT % 1000) * 1000;                  break; +        case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: +                tv.tv_sec  = FLOW_ALLOC_TIMEOUT / 1000; +                tv.tv_usec = (FLOW_ALLOC_TIMEOUT % 1000) * 1000; +                break; +        case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: +                dealloc = true; +                tv.tv_sec  = 0; /* FIX DEALLOC: don't wait for dealloc */ +                tv.tv_usec = 500; +                break;          default:                  tv.tv_sec  = SOCKET_TIMEOUT / 1000;                  tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000; @@ -127,7 +137,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t        pid,          if (len > 0)                  recv_msg = ipcp_msg__unpack(NULL, len, buf);          else { -                if (errno == EAGAIN) { +                if (errno == EAGAIN && !dealloc) {                          int diff = ts_diff_ms(&tic, &toc);                          log_warn("IPCP command timed out after %d ms.", diff);                  } diff --git a/src/irmd/main.c b/src/irmd/main.c index 2cbe8ed4..32f41ab2 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1234,14 +1234,14 @@ static int flow_alloc_reply(struct flow_info * flow,  }  static int flow_dealloc(struct flow_info * flow, -                        time_t             timeo) +                        struct timespec *  ts)  {          log_info("Deallocating flow %d for process %d.",                   flow->id, flow->n_pid);          reg_dealloc_flow(flow); -         if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, timeo) < 0) { +         if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) {                  log_err("Failed to request dealloc from %d.", flow->n_1_pid);                  return -EIPCP;           } @@ -1324,14 +1324,27 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          struct flow_info   flow;          struct proc_info   proc;          struct name_info   name; -        struct timespec *  abstime  = NULL; -        struct timespec    ts; +        struct timespec *  abstime; +        struct timespec    max = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT); +        struct timespec    now; +        struct timespec    ts = TIMESPEC_INIT_S(0); /* static analysis */          int                res;          irm_msg_t *        ret_msg;          buffer_t           data;          memset(&flow, 0, sizeof(flow)); +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        if (msg->timeo != NULL) { +                ts = timespec_msg_to_s(msg->timeo); +                ts_add(&ts, &now, &ts); +                abstime = &ts; +        } else { +                ts_add(&max, &now, &max); +                abstime = NULL; +        } +          ret_msg = malloc(sizeof(*ret_msg));          if (ret_msg == NULL) {                  log_err("Failed to malloc return msg."); @@ -1342,20 +1355,6 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          ret_msg->code = IRM_MSG_CODE__IRM_REPLY; -        if (msg->has_timeo_sec) { -                struct timespec now; - -                clock_gettime(PTHREAD_COND_CLOCK, &now); -                assert(msg->has_timeo_nsec); - -                ts.tv_sec  = msg->timeo_sec; -                ts.tv_nsec = msg->timeo_nsec; - -                ts_add(&ts, &now, &ts); - -                abstime = &ts; -        } -          pthread_cleanup_push(free_msg, ret_msg);          switch (msg->code) { @@ -1430,20 +1429,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          case IRM_MSG_CODE__IRM_FLOW_ACCEPT:                  data.len  = msg->pk.len;                  data.data = msg->pk.data; +                msg->has_pk = false;                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); -                flow.n_pid = msg->pid; -                flow.qs  = qos_raw; +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_accept(&flow, &data, abstime);                  if (res == 0) { -                        qosspec_msg_t * qs_msg; -                        qs_msg = qos_spec_s_to_msg(&flow.qs); -                        ret_msg->has_flow_id  = true; -                        ret_msg->flow_id      = flow.id; -                        ret_msg->has_pid      = true; -                        ret_msg->pid          = flow.n_1_pid; -                        ret_msg->has_mpl      = true; -                        ret_msg->qosspec      = qs_msg; -                        ret_msg->mpl          = flow.mpl; +                        ret_msg->flow_info    = flow_info_s_to_msg(&flow);                          ret_msg->has_symmkey  = data.len != 0;                          ret_msg->symmkey.data = data.data;                          ret_msg->symmkey.len  = data.len; @@ -1453,17 +1444,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)                  data.len  = msg->pk.len;                  data.data = msg->pk.data;                  msg->has_pk = false; -                flow.n_pid = msg->pid; -                flow.qs = qos_spec_msg_to_s(msg->qosspec);                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); +                flow = flow_info_msg_to_s(msg->flow_info); +                abstime = abstime == NULL ? &max : abstime;                  res = flow_alloc(&flow, msg->dst, &data, abstime);                  if (res == 0) { -                        ret_msg->has_flow_id  = true; -                        ret_msg->flow_id      = flow.id; -                        ret_msg->has_pid      = true; -                        ret_msg->pid          = flow.n_1_pid; -                        ret_msg->has_mpl      = true; -                        ret_msg->mpl          = flow.mpl; +                        ret_msg->flow_info    = flow_info_s_to_msg(&flow);                          ret_msg->has_symmkey  = data.len != 0;                          ret_msg->symmkey.data = data.data;                          ret_msg->symmkey.len  = data.len; @@ -1471,46 +1457,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)                  break;          case IRM_MSG_CODE__IRM_FLOW_JOIN:                  assert(msg->pk.len == 0 && msg->pk.data == NULL); -                flow.qs = qos_spec_msg_to_s(msg->qosspec); +                flow = flow_info_msg_to_s(msg->flow_info); +                abstime = abstime == NULL ? &max : abstime;                  res = flow_join(&flow, msg->dst, abstime); +                if (res == 0) +                        ret_msg->flow_info    = flow_info_s_to_msg(&flow);                  break;          case IRM_MSG_CODE__IRM_FLOW_DEALLOC: -                flow.n_pid = msg->pid; -                flow.id    = msg->flow_id; -                res = flow_dealloc(&flow, msg->timeo_sec); +                flow = flow_info_msg_to_s(msg->flow_info); +                res = flow_dealloc(&flow, &ts);                  break;          case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: -                flow.n_1_pid = msg->pid; -                flow.id      = msg->flow_id; +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_dealloc_resp(&flow);                  break;          case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:                  data.len  = msg->pk.len;                  data.data = msg->pk.data; -                msg->has_pk  = false; /* pass data */ -                msg->pk.data = NULL; +                msg->pk.data = NULL; /* pass data */                  msg->pk.len  = 0;                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); -                flow.n_1_pid = msg->pid; -                flow.mpl = msg->mpl; -                flow.qs = qos_spec_msg_to_s(msg->qosspec); +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_req_arr(&flow, msg->hash.data, &data); -                if (res == 0) { -                        ret_msg->has_flow_id = true; -                        ret_msg->flow_id     = flow.id; -                        ret_msg->has_pid     = true; -                        ret_msg->pid         = flow.n_pid; -                } +                if (res == 0) +                        ret_msg->flow_info = flow_info_s_to_msg(&flow);                  break;          case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:                  data.len  = msg->pk.len;                  data.data = msg->pk.data; -                msg->has_pk  = false; /* pass data */ -                msg->pk.data = NULL; +                msg->pk.data = NULL; /* pass data */                  msg->pk.len  = 0;                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); -                flow.id      = msg->flow_id; -                flow.mpl     = msg->mpl; +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_alloc_reply(&flow, msg->response, &data);                  break;          default: @@ -1522,7 +1500,10 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          pthread_cleanup_pop(false);          ret_msg->has_result = true; -        ret_msg->result     = res; +        if (abstime == &max && res == -ETIMEDOUT) +                ret_msg->result = -EPERM; /* No timeout requested */ +        else +                ret_msg->result = res;          return ret_msg;  } @@ -1664,8 +1645,6 @@ static void destroy_mount(char * mnt)  {          struct stat st; -        log_dbg("Destroying mountpoint %s.", mnt); -          if (stat(mnt, &st) == -1){                  switch(errno) {                  case ENOENT: @@ -1719,7 +1698,7 @@ static void cleanup_pid(pid_t pid)  void * irm_sanitize(void * o)  {          pid_t           pid; -        struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 20); +        struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 20);          (void) o; @@ -2003,7 +1982,7 @@ static void * kill_dash_nine(void * o)  {          time_t slept = 0;  #ifdef IRMD_KILL_ALL_PROCESSES -        struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 19); +        struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 19);  #endif          (void) o; diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c index f486c1cc..731e44b6 100644 --- a/src/irmd/reg/reg.c +++ b/src/irmd/reg/reg.c @@ -1490,7 +1490,7 @@ int reg_get_exec(enum hash_algo  algo,          exec = __reg_get_exec(algo, hash);          if (exec == NULL) { -                ret = 0; +                ret = -EPERM;                  goto finish;          } | 
