diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2023-03-14 12:50:22 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2023-03-18 17:12:26 +0100 | 
| commit | ee196e9a00475a029018181f8d6a00106ee462ce (patch) | |
| tree | 8cb703ebaed3f8b5d7b71bf263710f9ed5edb6a1 | |
| parent | 975a3ad0c761f5603a5026a56d825ba0ccb591c9 (diff) | |
| download | ouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.tar.gz ouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.zip | |
lib: Split flow_alloc from flow_join
Better to keep these separate during IRMd revision. Moves the qosspec
default out of the protobuf message parsing.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
| -rw-r--r-- | src/irmd/main.c | 129 | ||||
| -rw-r--r-- | src/lib/dev.c | 88 | ||||
| -rw-r--r-- | src/lib/protobuf.c | 23 | 
3 files changed, 183 insertions, 57 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index b2f83388..5ef9a82e 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1415,12 +1415,103 @@ static int flow_accept(pid_t             pid,          return 0;  } +static int flow_join(pid_t              pid, +                     const char *       dst, +                     qosspec_t          qs, +                     struct timespec *  timeo, +                     struct irm_flow *  f_out) +{ +        struct irm_flow *   f; +        struct ipcp_entry * ipcp; +        int                 flow_id; +        int                 state; +        uint8_t *           hash; + +        log_info("Allocating flow for %d to %s.", pid, dst); + +        ipcp = get_ipcp_entry_by_layer(dst); +        if (ipcp == NULL) { +                log_info("Layer %s unreachable.", dst); +                return -1; +        } + +        pthread_rwlock_wrlock(&irmd.flows_lock); + +        flow_id = bmp_allocate(irmd.flow_ids); +        if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { +                pthread_rwlock_unlock(&irmd.flows_lock); +                log_err("Could not allocate flow_id."); +                return -EBADF; +        } + +        f = irm_flow_create(pid, ipcp->pid, flow_id, qs); +        if (f == NULL) { +                bmp_release(irmd.flow_ids, flow_id); +                pthread_rwlock_unlock(&irmd.flows_lock); +                log_err("Could not allocate flow_id."); +                return -ENOMEM; +        } + +        list_add(&f->next, &irmd.irm_flows); + +        pthread_rwlock_unlock(&irmd.flows_lock); + +        assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); + +        hash = malloc(IPCP_HASH_LEN(ipcp)); +        if  (hash == NULL) +                /* sanitizer cleans this */ +                return -ENOMEM; + +        str_hash(ipcp->dir_hash_algo, hash, dst); + +        if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, +                                IPCP_HASH_LEN(ipcp), qs)) { +                irm_flow_set_state(f, FLOW_NULL); +                /* sanitizer cleans this */ +                log_info("Flow_join failed."); +                free(hash); +                return -EAGAIN; +        } + +        free(hash); + +        state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo); +        if (state != FLOW_ALLOCATED) { +                if (state == -ETIMEDOUT) { +                        log_dbg("Flow allocation timed out"); +                        return -ETIMEDOUT; +                } + +                log_info("Pending flow to %s torn down.", dst); +                return -EPIPE; +        } + +        pthread_rwlock_wrlock(&irmd.flows_lock); + +        assert(irm_flow_get_state(f) == FLOW_ALLOCATED); + +        f_out->flow_id = f->flow_id; +        f_out->n_pid   = f->n_pid; +        f_out->n_1_pid = f->n_1_pid; +        f_out->data    = f->data; /* pass owner */ +        f_out->len     = f->len; +        f_out->mpl     = f->mpl; +        f->data        = NULL; +        f->len         = 0; + +        pthread_rwlock_unlock(&irmd.flows_lock); + +        log_info("Flow on flow_id %d allocated.", flow_id); + +        return 0; +} +  static int flow_alloc(pid_t              pid,                        const char *       dst,                        qosspec_t          qs,                        struct timespec *  timeo,                        struct irm_flow *  f_out, -                      bool               join,                        const void *       data,                        size_t             len)  { @@ -1432,8 +1523,7 @@ static int flow_alloc(pid_t              pid,          log_info("Allocating flow for %d to %s.", pid, dst); -        ipcp = join ? get_ipcp_entry_by_layer(dst) -                    : get_ipcp_by_dst_name(dst, pid); +        ipcp = get_ipcp_by_dst_name(dst, pid);          if (ipcp == NULL) {                  log_info("Destination %s unreachable.", dst);                  return -1; @@ -1469,24 +1559,13 @@ static int flow_alloc(pid_t              pid,          str_hash(ipcp->dir_hash_algo, hash, dst); -        if (join) { -                if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, -                                   IPCP_HASH_LEN(ipcp), qs)) { -                        irm_flow_set_state(f, FLOW_NULL); -                        /* sanitizer cleans this */ -                        log_info("Flow_join failed."); -                        free(hash); -                        return -EAGAIN; -                } -        } else { -                if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, -                                    IPCP_HASH_LEN(ipcp), qs, data, len)) { -                        irm_flow_set_state(f, FLOW_NULL); -                        /* sanitizer cleans this */ -                        log_info("Flow_allocation failed."); -                        free(hash); -                        return -EAGAIN; -                } +        if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, +                            IPCP_HASH_LEN(ipcp), qs, data, len)) { +                irm_flow_set_state(f, FLOW_NULL); +                /* sanitizer cleans this */ +                log_info("Flow_allocation failed."); +                free(hash); +                return -EAGAIN;          }          free(hash); @@ -2136,7 +2215,7 @@ static void * mainloop(void * o)                                                 : msg->pk.data == NULL);                          result = flow_alloc(msg->pid, msg->dst,                                              qos_spec_msg_to_s(msg->qosspec), -                                            timeo, &e, false, msg->pk.data, +                                            timeo, &e, msg->pk.data,                                              msg->pk.len);                          if (result == 0) {                                  ret_msg->has_flow_id = true; @@ -2152,9 +2231,9 @@ static void * mainloop(void * o)                          break;                  case IRM_MSG_CODE__IRM_FLOW_JOIN:                          assert(msg->pk.len == 0 && msg->pk.data == NULL); -                        result = flow_alloc(msg->pid, msg->dst, -                                            qos_spec_msg_to_s(msg->qosspec), -                                            timeo, &e, true, NULL, 0); +                        result = flow_join(msg->pid, msg->dst, +                                           qos_spec_msg_to_s(msg->qosspec), +                                           timeo, &e);                          if (result == 0) {                                  ret_msg->has_flow_id = true;                                  ret_msg->flow_id     = e.flow_id; diff --git a/src/lib/dev.c b/src/lib/dev.c index e84efe55..b0c0175c 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -476,7 +476,6 @@ static int flow_init(int       flow_id,          flow->rcv_act  = now;          if (qs.cypher_s > 0) { -                assert(s != NULL);                  if (crypt_init(&flow->ctx) < 0)                          goto fail_ctx; @@ -822,10 +821,9 @@ int flow_accept(qosspec_t *             qs,          return err;  } -static int __flow_alloc(const char *            dst, -                        qosspec_t *             qs, -                        const struct timespec * timeo, -                        bool join) +int flow_alloc(const char *            dst, +               qosspec_t *             qs, +               const struct timespec * timeo)  {          irm_msg_t     msg    = IRM_MSG__INIT;          irm_msg_t *   recv_msg; @@ -841,12 +839,11 @@ static int __flow_alloc(const char *            dst,          if (qs != NULL)                  qs->ber = 1;  #endif -        msg.code    = join ? IRM_MSG_CODE__IRM_FLOW_JOIN -                           : IRM_MSG_CODE__IRM_FLOW_ALLOC; +        msg.code    = IRM_MSG_CODE__IRM_FLOW_ALLOC;          msg.dst     = (char *) dst;          msg.has_pid = true;          msg.pid     = getpid(); -        msg.qosspec = qos_spec_s_to_msg(qs); +        msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);          if (timeo != NULL) {                  msg.has_timeo_sec = true; @@ -855,7 +852,7 @@ static int __flow_alloc(const char *            dst,                  msg.timeo_nsec = timeo->tv_nsec;          } -        if (!join && qs != NULL && qs->cypher_s != 0) { +        if (qs != NULL && qs->cypher_s != 0) {                  ssize_t key_len;                  key_len = crypt_dh_pkp_create(&pkp, buf); @@ -887,7 +884,7 @@ static int __flow_alloc(const char *            dst,              !recv_msg->has_mpl)                  goto fail_result; -        if (!join && qs != NULL && qs->cypher_s != 0) { +        if (qs != NULL && qs->cypher_s != 0) {                  if (!recv_msg->has_pk || recv_msg->pk.len == 0) {                          err = -ECRYPT;                          goto fail_result; @@ -902,6 +899,9 @@ static int __flow_alloc(const char *            dst,                  crypt_dh_pkp_destroy(pkp);          } +        /* TODO: Make sure qosspec is set in msg */ +        if (qs != NULL && recv_msg->qosspec != NULL) +                *qs = qos_spec_msg_to_s(recv_msg->qosspec);          fd = flow_init(recv_msg->flow_id, recv_msg->pid,                         qs == NULL ? qos_raw : *qs, s, @@ -909,6 +909,7 @@ static int __flow_alloc(const char *            dst,          irm_msg__free_unpacked(recv_msg, NULL); +          return fd;   fail_result: @@ -919,21 +920,68 @@ static int __flow_alloc(const char *            dst,          return err;  } -int flow_alloc(const char *            dst, -               qosspec_t *             qs, -               const struct timespec * timeo) -{ -        return __flow_alloc(dst, qs, timeo, false); -} -  int flow_join(const char *            dst,                qosspec_t *             qs,                const struct timespec * timeo)  { -        if (qs != NULL && qs->cypher_s != 0) -                return -ECRYPT; +        irm_msg_t     msg    = IRM_MSG__INIT; +        irm_msg_t *   recv_msg; +        uint8_t       s[SYMMKEYSZ]; +        int           fd; +        int           err = -EIRMD; + +#ifdef QOS_DISABLE_CRC +        if (qs != NULL) +                qs->ber = 1; +#endif +        if (qs != NULL && qs->cypher_s > 0) +                return -ENOTSUP; /* TODO: Encrypted broadcast */ + +        memset(s, 0, SYMMKEYSZ); + +        msg.code    = IRM_MSG_CODE__IRM_FLOW_JOIN; +        msg.dst     = (char *) dst; +        msg.has_pid = true; +        msg.pid     = getpid(); +        msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); + +        if (timeo != NULL) { +                msg.has_timeo_sec = true; +                msg.has_timeo_nsec = true; +                msg.timeo_sec  = timeo->tv_sec; +                msg.timeo_nsec = timeo->tv_nsec; +        } + +        recv_msg = send_recv_irm_msg(&msg); +        qosspec_msg__free_unpacked(msg.qosspec, NULL); + +        if (recv_msg == NULL) +                goto fail_send; + +        if (!recv_msg->has_result) +                goto fail_result; + +        if (recv_msg->result != 0) { +                err = recv_msg->result; +                goto fail_result; +        } -        return __flow_alloc(dst, qs, timeo, true); +        if (!recv_msg->has_pid || !recv_msg->has_flow_id || +            !recv_msg->has_mpl) +                goto fail_result; + +        fd = flow_init(recv_msg->flow_id, recv_msg->pid, +                       qs == NULL ? qos_raw : *qs, s, +                       recv_msg->mpl); + +        irm_msg__free_unpacked(recv_msg, NULL); + +        return fd; + + fail_result: +        irm_msg__free_unpacked(recv_msg, NULL); + fail_send: +        return err;  }  int flow_dealloc(int fd) diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c index 0855305f..830efd40 100644 --- a/src/lib/protobuf.c +++ b/src/lib/protobuf.c @@ -301,26 +301,25 @@ struct ipcp_config ipcp_config_msg_to_s(const ipcp_config_msg_t * msg)  qosspec_msg_t * qos_spec_s_to_msg(const struct qos_spec * s)  { -        struct qos_spec spec;          qosspec_msg_t  * msg; +        assert(s != NULL); +          msg = malloc(sizeof(*msg));          if (msg == NULL)                  return NULL;          qosspec_msg__init(msg); -        spec = (s == NULL ? qos_raw : *s); - -        msg->delay        = spec.delay; -        msg->bandwidth    = spec.bandwidth; -        msg->availability = spec.availability; -        msg->loss         = spec.loss; -        msg->ber          = spec.ber; -        msg->in_order     = spec.in_order; -        msg->max_gap      = spec.max_gap; -        msg->cypher_s     = spec.cypher_s; -        msg->timeout      = spec.timeout; +        msg->delay        = s->delay; +        msg->bandwidth    = s->bandwidth; +        msg->availability = s->availability; +        msg->loss         = s->loss; +        msg->ber          = s->ber; +        msg->in_order     = s->in_order; +        msg->max_gap      = s->max_gap; +        msg->cypher_s     = s->cypher_s; +        msg->timeout      = s->timeout;          return msg;  } | 
