diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2024-02-23 09:29:47 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2024-02-23 16:41:37 +0100 | 
| commit | e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f (patch) | |
| tree | ad959d95f8fb1f6d4744c57c9027bf182bc3190b /src/irmd/main.c | |
| parent | dcefa07624926da23a559eedc3f7361ac36e8312 (diff) | |
| download | ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.tar.gz ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.zip | |
lib: Revise app flow allocation
This revises the application flow allocator to use the flow_info
struct/message between the components. Revises the messaging to move
the use protocol buffers to its own source (serdes-irm).
Adds a timeout to the IRMd flow allocator to make sure flow
allocations don't hang forever (this was previously taken care of by
the sanitize thread).
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd/main.c')
| -rw-r--r-- | src/irmd/main.c | 105 | 
1 files changed, 42 insertions, 63 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index 2cbe8ed4..32f41ab2 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1234,14 +1234,14 @@ static int flow_alloc_reply(struct flow_info * flow,  }  static int flow_dealloc(struct flow_info * flow, -                        time_t             timeo) +                        struct timespec *  ts)  {          log_info("Deallocating flow %d for process %d.",                   flow->id, flow->n_pid);          reg_dealloc_flow(flow); -         if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, timeo) < 0) { +         if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) {                  log_err("Failed to request dealloc from %d.", flow->n_1_pid);                  return -EIPCP;           } @@ -1324,14 +1324,27 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          struct flow_info   flow;          struct proc_info   proc;          struct name_info   name; -        struct timespec *  abstime  = NULL; -        struct timespec    ts; +        struct timespec *  abstime; +        struct timespec    max = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT); +        struct timespec    now; +        struct timespec    ts = TIMESPEC_INIT_S(0); /* static analysis */          int                res;          irm_msg_t *        ret_msg;          buffer_t           data;          memset(&flow, 0, sizeof(flow)); +        clock_gettime(PTHREAD_COND_CLOCK, &now); + +        if (msg->timeo != NULL) { +                ts = timespec_msg_to_s(msg->timeo); +                ts_add(&ts, &now, &ts); +                abstime = &ts; +        } else { +                ts_add(&max, &now, &max); +                abstime = NULL; +        } +          ret_msg = malloc(sizeof(*ret_msg));          if (ret_msg == NULL) {                  log_err("Failed to malloc return msg."); @@ -1342,20 +1355,6 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          ret_msg->code = IRM_MSG_CODE__IRM_REPLY; -        if (msg->has_timeo_sec) { -                struct timespec now; - -                clock_gettime(PTHREAD_COND_CLOCK, &now); -                assert(msg->has_timeo_nsec); - -                ts.tv_sec  = msg->timeo_sec; -                ts.tv_nsec = msg->timeo_nsec; - -                ts_add(&ts, &now, &ts); - -                abstime = &ts; -        } -          pthread_cleanup_push(free_msg, ret_msg);          switch (msg->code) { @@ -1430,20 +1429,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          case IRM_MSG_CODE__IRM_FLOW_ACCEPT:                  data.len  = msg->pk.len;                  data.data = msg->pk.data; +                msg->has_pk = false;                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); -                flow.n_pid = msg->pid; -                flow.qs  = qos_raw; +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_accept(&flow, &data, abstime);                  if (res == 0) { -                        qosspec_msg_t * qs_msg; -                        qs_msg = qos_spec_s_to_msg(&flow.qs); -                        ret_msg->has_flow_id  = true; -                        ret_msg->flow_id      = flow.id; -                        ret_msg->has_pid      = true; -                        ret_msg->pid          = flow.n_1_pid; -                        ret_msg->has_mpl      = true; -                        ret_msg->qosspec      = qs_msg; -                        ret_msg->mpl          = flow.mpl; +                        ret_msg->flow_info    = flow_info_s_to_msg(&flow);                          ret_msg->has_symmkey  = data.len != 0;                          ret_msg->symmkey.data = data.data;                          ret_msg->symmkey.len  = data.len; @@ -1453,17 +1444,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)                  data.len  = msg->pk.len;                  data.data = msg->pk.data;                  msg->has_pk = false; -                flow.n_pid = msg->pid; -                flow.qs = qos_spec_msg_to_s(msg->qosspec);                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); +                flow = flow_info_msg_to_s(msg->flow_info); +                abstime = abstime == NULL ? &max : abstime;                  res = flow_alloc(&flow, msg->dst, &data, abstime);                  if (res == 0) { -                        ret_msg->has_flow_id  = true; -                        ret_msg->flow_id      = flow.id; -                        ret_msg->has_pid      = true; -                        ret_msg->pid          = flow.n_1_pid; -                        ret_msg->has_mpl      = true; -                        ret_msg->mpl          = flow.mpl; +                        ret_msg->flow_info    = flow_info_s_to_msg(&flow);                          ret_msg->has_symmkey  = data.len != 0;                          ret_msg->symmkey.data = data.data;                          ret_msg->symmkey.len  = data.len; @@ -1471,46 +1457,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)                  break;          case IRM_MSG_CODE__IRM_FLOW_JOIN:                  assert(msg->pk.len == 0 && msg->pk.data == NULL); -                flow.qs = qos_spec_msg_to_s(msg->qosspec); +                flow = flow_info_msg_to_s(msg->flow_info); +                abstime = abstime == NULL ? &max : abstime;                  res = flow_join(&flow, msg->dst, abstime); +                if (res == 0) +                        ret_msg->flow_info    = flow_info_s_to_msg(&flow);                  break;          case IRM_MSG_CODE__IRM_FLOW_DEALLOC: -                flow.n_pid = msg->pid; -                flow.id    = msg->flow_id; -                res = flow_dealloc(&flow, msg->timeo_sec); +                flow = flow_info_msg_to_s(msg->flow_info); +                res = flow_dealloc(&flow, &ts);                  break;          case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: -                flow.n_1_pid = msg->pid; -                flow.id      = msg->flow_id; +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_dealloc_resp(&flow);                  break;          case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:                  data.len  = msg->pk.len;                  data.data = msg->pk.data; -                msg->has_pk  = false; /* pass data */ -                msg->pk.data = NULL; +                msg->pk.data = NULL; /* pass data */                  msg->pk.len  = 0;                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); -                flow.n_1_pid = msg->pid; -                flow.mpl = msg->mpl; -                flow.qs = qos_spec_msg_to_s(msg->qosspec); +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_req_arr(&flow, msg->hash.data, &data); -                if (res == 0) { -                        ret_msg->has_flow_id = true; -                        ret_msg->flow_id     = flow.id; -                        ret_msg->has_pid     = true; -                        ret_msg->pid         = flow.n_pid; -                } +                if (res == 0) +                        ret_msg->flow_info = flow_info_s_to_msg(&flow);                  break;          case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:                  data.len  = msg->pk.len;                  data.data = msg->pk.data; -                msg->has_pk  = false; /* pass data */ -                msg->pk.data = NULL; +                msg->pk.data = NULL; /* pass data */                  msg->pk.len  = 0;                  assert(data.len > 0 ? data.data != NULL : data.data == NULL); -                flow.id      = msg->flow_id; -                flow.mpl     = msg->mpl; +                flow = flow_info_msg_to_s(msg->flow_info);                  res = flow_alloc_reply(&flow, msg->response, &data);                  break;          default: @@ -1522,7 +1500,10 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)          pthread_cleanup_pop(false);          ret_msg->has_result = true; -        ret_msg->result     = res; +        if (abstime == &max && res == -ETIMEDOUT) +                ret_msg->result = -EPERM; /* No timeout requested */ +        else +                ret_msg->result = res;          return ret_msg;  } @@ -1664,8 +1645,6 @@ static void destroy_mount(char * mnt)  {          struct stat st; -        log_dbg("Destroying mountpoint %s.", mnt); -          if (stat(mnt, &st) == -1){                  switch(errno) {                  case ENOENT: @@ -1719,7 +1698,7 @@ static void cleanup_pid(pid_t pid)  void * irm_sanitize(void * o)  {          pid_t           pid; -        struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 20); +        struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 20);          (void) o; @@ -2003,7 +1982,7 @@ static void * kill_dash_nine(void * o)  {          time_t slept = 0;  #ifdef IRMD_KILL_ALL_PROCESSES -        struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 19); +        struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 19);  #endif          (void) o; | 
