diff options
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/main.c | 129 | 
1 files changed, 104 insertions, 25 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index b2f83388..5ef9a82e 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1415,12 +1415,103 @@ static int flow_accept(pid_t             pid,          return 0;  } +static int flow_join(pid_t              pid, +                     const char *       dst, +                     qosspec_t          qs, +                     struct timespec *  timeo, +                     struct irm_flow *  f_out) +{ +        struct irm_flow *   f; +        struct ipcp_entry * ipcp; +        int                 flow_id; +        int                 state; +        uint8_t *           hash; + +        log_info("Allocating flow for %d to %s.", pid, dst); + +        ipcp = get_ipcp_entry_by_layer(dst); +        if (ipcp == NULL) { +                log_info("Layer %s unreachable.", dst); +                return -1; +        } + +        pthread_rwlock_wrlock(&irmd.flows_lock); + +        flow_id = bmp_allocate(irmd.flow_ids); +        if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { +                pthread_rwlock_unlock(&irmd.flows_lock); +                log_err("Could not allocate flow_id."); +                return -EBADF; +        } + +        f = irm_flow_create(pid, ipcp->pid, flow_id, qs); +        if (f == NULL) { +                bmp_release(irmd.flow_ids, flow_id); +                pthread_rwlock_unlock(&irmd.flows_lock); +                log_err("Could not allocate flow_id."); +                return -ENOMEM; +        } + +        list_add(&f->next, &irmd.irm_flows); + +        pthread_rwlock_unlock(&irmd.flows_lock); + +        assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); + +        hash = malloc(IPCP_HASH_LEN(ipcp)); +        if  (hash == NULL) +                /* sanitizer cleans this */ +                return -ENOMEM; + +        str_hash(ipcp->dir_hash_algo, hash, dst); + +        if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, +                                IPCP_HASH_LEN(ipcp), qs)) { +                irm_flow_set_state(f, FLOW_NULL); +                /* sanitizer cleans this */ +                log_info("Flow_join failed."); +                free(hash); +                return -EAGAIN; +        } + +        free(hash); + +        state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo); +        if (state != FLOW_ALLOCATED) { +                if (state == -ETIMEDOUT) { +                        log_dbg("Flow allocation timed out"); +                        return -ETIMEDOUT; +                } + +                log_info("Pending flow to %s torn down.", dst); +                return -EPIPE; +        } + +        pthread_rwlock_wrlock(&irmd.flows_lock); + +        assert(irm_flow_get_state(f) == FLOW_ALLOCATED); + +        f_out->flow_id = f->flow_id; +        f_out->n_pid   = f->n_pid; +        f_out->n_1_pid = f->n_1_pid; +        f_out->data    = f->data; /* pass owner */ +        f_out->len     = f->len; +        f_out->mpl     = f->mpl; +        f->data        = NULL; +        f->len         = 0; + +        pthread_rwlock_unlock(&irmd.flows_lock); + +        log_info("Flow on flow_id %d allocated.", flow_id); + +        return 0; +} +  static int flow_alloc(pid_t              pid,                        const char *       dst,                        qosspec_t          qs,                        struct timespec *  timeo,                        struct irm_flow *  f_out, -                      bool               join,                        const void *       data,                        size_t             len)  { @@ -1432,8 +1523,7 @@ static int flow_alloc(pid_t              pid,          log_info("Allocating flow for %d to %s.", pid, dst); -        ipcp = join ? get_ipcp_entry_by_layer(dst) -                    : get_ipcp_by_dst_name(dst, pid); +        ipcp = get_ipcp_by_dst_name(dst, pid);          if (ipcp == NULL) {                  log_info("Destination %s unreachable.", dst);                  return -1; @@ -1469,24 +1559,13 @@ static int flow_alloc(pid_t              pid,          str_hash(ipcp->dir_hash_algo, hash, dst); -        if (join) { -                if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, -                                   IPCP_HASH_LEN(ipcp), qs)) { -                        irm_flow_set_state(f, FLOW_NULL); -                        /* sanitizer cleans this */ -                        log_info("Flow_join failed."); -                        free(hash); -                        return -EAGAIN; -                } -        } else { -                if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, -                                    IPCP_HASH_LEN(ipcp), qs, data, len)) { -                        irm_flow_set_state(f, FLOW_NULL); -                        /* sanitizer cleans this */ -                        log_info("Flow_allocation failed."); -                        free(hash); -                        return -EAGAIN; -                } +        if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, +                            IPCP_HASH_LEN(ipcp), qs, data, len)) { +                irm_flow_set_state(f, FLOW_NULL); +                /* sanitizer cleans this */ +                log_info("Flow_allocation failed."); +                free(hash); +                return -EAGAIN;          }          free(hash); @@ -2136,7 +2215,7 @@ static void * mainloop(void * o)                                                 : msg->pk.data == NULL);                          result = flow_alloc(msg->pid, msg->dst,                                              qos_spec_msg_to_s(msg->qosspec), -                                            timeo, &e, false, msg->pk.data, +                                            timeo, &e, msg->pk.data,                                              msg->pk.len);                          if (result == 0) {                                  ret_msg->has_flow_id = true; @@ -2152,9 +2231,9 @@ static void * mainloop(void * o)                          break;                  case IRM_MSG_CODE__IRM_FLOW_JOIN:                          assert(msg->pk.len == 0 && msg->pk.data == NULL); -                        result = flow_alloc(msg->pid, msg->dst, -                                            qos_spec_msg_to_s(msg->qosspec), -                                            timeo, &e, true, NULL, 0); +                        result = flow_join(msg->pid, msg->dst, +                                           qos_spec_msg_to_s(msg->qosspec), +                                           timeo, &e);                          if (result == 0) {                                  ret_msg->has_flow_id = true;                                  ret_msg->flow_id     = e.flow_id; | 
