diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2023-03-14 12:50:22 +0100 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2023-03-18 17:12:26 +0100 |
commit | ee196e9a00475a029018181f8d6a00106ee462ce (patch) | |
tree | 8cb703ebaed3f8b5d7b71bf263710f9ed5edb6a1 /src | |
parent | 975a3ad0c761f5603a5026a56d825ba0ccb591c9 (diff) | |
download | ouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.tar.gz ouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.zip |
lib: Split flow_alloc from flow_join
Better to keep these separate during IRMd revision. Moves the qosspec
default out of the protobuf message parsing.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src')
-rw-r--r-- | src/irmd/main.c | 129 | ||||
-rw-r--r-- | src/lib/dev.c | 88 | ||||
-rw-r--r-- | src/lib/protobuf.c | 23 |
3 files changed, 183 insertions, 57 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index b2f83388..5ef9a82e 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1415,12 +1415,103 @@ static int flow_accept(pid_t pid, return 0; } +static int flow_join(pid_t pid, + const char * dst, + qosspec_t qs, + struct timespec * timeo, + struct irm_flow * f_out) +{ + struct irm_flow * f; + struct ipcp_entry * ipcp; + int flow_id; + int state; + uint8_t * hash; + + log_info("Allocating flow for %d to %s.", pid, dst); + + ipcp = get_ipcp_entry_by_layer(dst); + if (ipcp == NULL) { + log_info("Layer %s unreachable.", dst); + return -1; + } + + pthread_rwlock_wrlock(&irmd.flows_lock); + + flow_id = bmp_allocate(irmd.flow_ids); + if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { + pthread_rwlock_unlock(&irmd.flows_lock); + log_err("Could not allocate flow_id."); + return -EBADF; + } + + f = irm_flow_create(pid, ipcp->pid, flow_id, qs); + if (f == NULL) { + bmp_release(irmd.flow_ids, flow_id); + pthread_rwlock_unlock(&irmd.flows_lock); + log_err("Could not allocate flow_id."); + return -ENOMEM; + } + + list_add(&f->next, &irmd.irm_flows); + + pthread_rwlock_unlock(&irmd.flows_lock); + + assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); + + hash = malloc(IPCP_HASH_LEN(ipcp)); + if (hash == NULL) + /* sanitizer cleans this */ + return -ENOMEM; + + str_hash(ipcp->dir_hash_algo, hash, dst); + + if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, + IPCP_HASH_LEN(ipcp), qs)) { + irm_flow_set_state(f, FLOW_NULL); + /* sanitizer cleans this */ + log_info("Flow_join failed."); + free(hash); + return -EAGAIN; + } + + free(hash); + + state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo); + if (state != FLOW_ALLOCATED) { + if (state == -ETIMEDOUT) { + log_dbg("Flow allocation timed out"); + return -ETIMEDOUT; + } + + log_info("Pending flow to %s torn down.", dst); + return -EPIPE; + } + + pthread_rwlock_wrlock(&irmd.flows_lock); + + assert(irm_flow_get_state(f) == FLOW_ALLOCATED); + + f_out->flow_id = f->flow_id; + f_out->n_pid = f->n_pid; + f_out->n_1_pid = f->n_1_pid; + f_out->data = f->data; /* pass owner */ + f_out->len = f->len; + f_out->mpl = f->mpl; + f->data = NULL; + f->len = 0; + + pthread_rwlock_unlock(&irmd.flows_lock); + + log_info("Flow on flow_id %d allocated.", flow_id); + + return 0; +} + static int flow_alloc(pid_t pid, const char * dst, qosspec_t qs, struct timespec * timeo, struct irm_flow * f_out, - bool join, const void * data, size_t len) { @@ -1432,8 +1523,7 @@ static int flow_alloc(pid_t pid, log_info("Allocating flow for %d to %s.", pid, dst); - ipcp = join ? get_ipcp_entry_by_layer(dst) - : get_ipcp_by_dst_name(dst, pid); + ipcp = get_ipcp_by_dst_name(dst, pid); if (ipcp == NULL) { log_info("Destination %s unreachable.", dst); return -1; @@ -1469,24 +1559,13 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); - if (join) { - if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, - IPCP_HASH_LEN(ipcp), qs)) { - irm_flow_set_state(f, FLOW_NULL); - /* sanitizer cleans this */ - log_info("Flow_join failed."); - free(hash); - return -EAGAIN; - } - } else { - if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, - IPCP_HASH_LEN(ipcp), qs, data, len)) { - irm_flow_set_state(f, FLOW_NULL); - /* sanitizer cleans this */ - log_info("Flow_allocation failed."); - free(hash); - return -EAGAIN; - } + if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, + IPCP_HASH_LEN(ipcp), qs, data, len)) { + irm_flow_set_state(f, FLOW_NULL); + /* sanitizer cleans this */ + log_info("Flow_allocation failed."); + free(hash); + return -EAGAIN; } free(hash); @@ -2136,7 +2215,7 @@ static void * mainloop(void * o) : msg->pk.data == NULL); result = flow_alloc(msg->pid, msg->dst, qos_spec_msg_to_s(msg->qosspec), - timeo, &e, false, msg->pk.data, + timeo, &e, msg->pk.data, msg->pk.len); if (result == 0) { ret_msg->has_flow_id = true; @@ -2152,9 +2231,9 @@ static void * mainloop(void * o) break; case IRM_MSG_CODE__IRM_FLOW_JOIN: assert(msg->pk.len == 0 && msg->pk.data == NULL); - result = flow_alloc(msg->pid, msg->dst, - qos_spec_msg_to_s(msg->qosspec), - timeo, &e, true, NULL, 0); + result = flow_join(msg->pid, msg->dst, + qos_spec_msg_to_s(msg->qosspec), + timeo, &e); if (result == 0) { ret_msg->has_flow_id = true; ret_msg->flow_id = e.flow_id; diff --git a/src/lib/dev.c b/src/lib/dev.c index e84efe55..b0c0175c 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -476,7 +476,6 @@ static int flow_init(int flow_id, flow->rcv_act = now; if (qs.cypher_s > 0) { - assert(s != NULL); if (crypt_init(&flow->ctx) < 0) goto fail_ctx; @@ -822,10 +821,9 @@ int flow_accept(qosspec_t * qs, return err; } -static int __flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo, - bool join) +int flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg; @@ -841,12 +839,11 @@ static int __flow_alloc(const char * dst, if (qs != NULL) qs->ber = 1; #endif - msg.code = join ? IRM_MSG_CODE__IRM_FLOW_JOIN - : IRM_MSG_CODE__IRM_FLOW_ALLOC; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst = (char *) dst; msg.has_pid = true; msg.pid = getpid(); - msg.qosspec = qos_spec_s_to_msg(qs); + msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); if (timeo != NULL) { msg.has_timeo_sec = true; @@ -855,7 +852,7 @@ static int __flow_alloc(const char * dst, msg.timeo_nsec = timeo->tv_nsec; } - if (!join && qs != NULL && qs->cypher_s != 0) { + if (qs != NULL && qs->cypher_s != 0) { ssize_t key_len; key_len = crypt_dh_pkp_create(&pkp, buf); @@ -887,7 +884,7 @@ static int __flow_alloc(const char * dst, !recv_msg->has_mpl) goto fail_result; - if (!join && qs != NULL && qs->cypher_s != 0) { + if (qs != NULL && qs->cypher_s != 0) { if (!recv_msg->has_pk || recv_msg->pk.len == 0) { err = -ECRYPT; goto fail_result; @@ -902,6 +899,9 @@ static int __flow_alloc(const char * dst, crypt_dh_pkp_destroy(pkp); } + /* TODO: Make sure qosspec is set in msg */ + if (qs != NULL && recv_msg->qosspec != NULL) + *qs = qos_spec_msg_to_s(recv_msg->qosspec); fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs == NULL ? qos_raw : *qs, s, @@ -909,6 +909,7 @@ static int __flow_alloc(const char * dst, irm_msg__free_unpacked(recv_msg, NULL); + return fd; fail_result: @@ -919,21 +920,68 @@ static int __flow_alloc(const char * dst, return err; } -int flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo) -{ - return __flow_alloc(dst, qs, timeo, false); -} - int flow_join(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - if (qs != NULL && qs->cypher_s != 0) - return -ECRYPT; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + uint8_t s[SYMMKEYSZ]; + int fd; + int err = -EIRMD; + +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif + if (qs != NULL && qs->cypher_s > 0) + return -ENOTSUP; /* TODO: Encrypted broadcast */ + + memset(s, 0, SYMMKEYSZ); + + msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN; + msg.dst = (char *) dst; + msg.has_pid = true; + msg.pid = getpid(); + msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); + + if (timeo != NULL) { + msg.has_timeo_sec = true; + msg.has_timeo_nsec = true; + msg.timeo_sec = timeo->tv_sec; + msg.timeo_nsec = timeo->tv_nsec; + } + + recv_msg = send_recv_irm_msg(&msg); + qosspec_msg__free_unpacked(msg.qosspec, NULL); + + if (recv_msg == NULL) + goto fail_send; + + if (!recv_msg->has_result) + goto fail_result; + + if (recv_msg->result != 0) { + err = recv_msg->result; + goto fail_result; + } - return __flow_alloc(dst, qs, timeo, true); + if (!recv_msg->has_pid || !recv_msg->has_flow_id || + !recv_msg->has_mpl) + goto fail_result; + + fd = flow_init(recv_msg->flow_id, recv_msg->pid, + qs == NULL ? qos_raw : *qs, s, + recv_msg->mpl); + + irm_msg__free_unpacked(recv_msg, NULL); + + return fd; + + fail_result: + irm_msg__free_unpacked(recv_msg, NULL); + fail_send: + return err; } int flow_dealloc(int fd) diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c index 0855305f..830efd40 100644 --- a/src/lib/protobuf.c +++ b/src/lib/protobuf.c @@ -301,26 +301,25 @@ struct ipcp_config ipcp_config_msg_to_s(const ipcp_config_msg_t * msg) qosspec_msg_t * qos_spec_s_to_msg(const struct qos_spec * s) { - struct qos_spec spec; qosspec_msg_t * msg; + assert(s != NULL); + msg = malloc(sizeof(*msg)); if (msg == NULL) return NULL; qosspec_msg__init(msg); - spec = (s == NULL ? qos_raw : *s); - - msg->delay = spec.delay; - msg->bandwidth = spec.bandwidth; - msg->availability = spec.availability; - msg->loss = spec.loss; - msg->ber = spec.ber; - msg->in_order = spec.in_order; - msg->max_gap = spec.max_gap; - msg->cypher_s = spec.cypher_s; - msg->timeout = spec.timeout; + msg->delay = s->delay; + msg->bandwidth = s->bandwidth; + msg->availability = s->availability; + msg->loss = s->loss; + msg->ber = s->ber; + msg->in_order = s->in_order; + msg->max_gap = s->max_gap; + msg->cypher_s = s->cypher_s; + msg->timeout = s->timeout; return msg; } |