summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2023-03-14 12:50:22 +0100
committerSander Vrijders <sander@ouroboros.rocks>2023-03-18 17:12:26 +0100
commitee196e9a00475a029018181f8d6a00106ee462ce (patch)
tree8cb703ebaed3f8b5d7b71bf263710f9ed5edb6a1 /src
parent975a3ad0c761f5603a5026a56d825ba0ccb591c9 (diff)
downloadouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.tar.gz
ouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.zip
lib: Split flow_alloc from flow_join
Better to keep these separate during IRMd revision. Moves the qosspec default out of the protobuf message parsing. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src')
-rw-r--r--src/irmd/main.c129
-rw-r--r--src/lib/dev.c88
-rw-r--r--src/lib/protobuf.c23
3 files changed, 183 insertions, 57 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index b2f83388..5ef9a82e 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1415,12 +1415,103 @@ static int flow_accept(pid_t pid,
return 0;
}
+static int flow_join(pid_t pid,
+ const char * dst,
+ qosspec_t qs,
+ struct timespec * timeo,
+ struct irm_flow * f_out)
+{
+ struct irm_flow * f;
+ struct ipcp_entry * ipcp;
+ int flow_id;
+ int state;
+ uint8_t * hash;
+
+ log_info("Allocating flow for %d to %s.", pid, dst);
+
+ ipcp = get_ipcp_entry_by_layer(dst);
+ if (ipcp == NULL) {
+ log_info("Layer %s unreachable.", dst);
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&irmd.flows_lock);
+
+ flow_id = bmp_allocate(irmd.flow_ids);
+ if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) {
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ log_err("Could not allocate flow_id.");
+ return -EBADF;
+ }
+
+ f = irm_flow_create(pid, ipcp->pid, flow_id, qs);
+ if (f == NULL) {
+ bmp_release(irmd.flow_ids, flow_id);
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ log_err("Could not allocate flow_id.");
+ return -ENOMEM;
+ }
+
+ list_add(&f->next, &irmd.irm_flows);
+
+ pthread_rwlock_unlock(&irmd.flows_lock);
+
+ assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
+
+ hash = malloc(IPCP_HASH_LEN(ipcp));
+ if (hash == NULL)
+ /* sanitizer cleans this */
+ return -ENOMEM;
+
+ str_hash(ipcp->dir_hash_algo, hash, dst);
+
+ if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash,
+ IPCP_HASH_LEN(ipcp), qs)) {
+ irm_flow_set_state(f, FLOW_NULL);
+ /* sanitizer cleans this */
+ log_info("Flow_join failed.");
+ free(hash);
+ return -EAGAIN;
+ }
+
+ free(hash);
+
+ state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo);
+ if (state != FLOW_ALLOCATED) {
+ if (state == -ETIMEDOUT) {
+ log_dbg("Flow allocation timed out");
+ return -ETIMEDOUT;
+ }
+
+ log_info("Pending flow to %s torn down.", dst);
+ return -EPIPE;
+ }
+
+ pthread_rwlock_wrlock(&irmd.flows_lock);
+
+ assert(irm_flow_get_state(f) == FLOW_ALLOCATED);
+
+ f_out->flow_id = f->flow_id;
+ f_out->n_pid = f->n_pid;
+ f_out->n_1_pid = f->n_1_pid;
+ f_out->data = f->data; /* pass owner */
+ f_out->len = f->len;
+ f_out->mpl = f->mpl;
+ f->data = NULL;
+ f->len = 0;
+
+ pthread_rwlock_unlock(&irmd.flows_lock);
+
+ log_info("Flow on flow_id %d allocated.", flow_id);
+
+ return 0;
+}
+
static int flow_alloc(pid_t pid,
const char * dst,
qosspec_t qs,
struct timespec * timeo,
struct irm_flow * f_out,
- bool join,
const void * data,
size_t len)
{
@@ -1432,8 +1523,7 @@ static int flow_alloc(pid_t pid,
log_info("Allocating flow for %d to %s.", pid, dst);
- ipcp = join ? get_ipcp_entry_by_layer(dst)
- : get_ipcp_by_dst_name(dst, pid);
+ ipcp = get_ipcp_by_dst_name(dst, pid);
if (ipcp == NULL) {
log_info("Destination %s unreachable.", dst);
return -1;
@@ -1469,24 +1559,13 @@ static int flow_alloc(pid_t pid,
str_hash(ipcp->dir_hash_algo, hash, dst);
- if (join) {
- if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash,
- IPCP_HASH_LEN(ipcp), qs)) {
- irm_flow_set_state(f, FLOW_NULL);
- /* sanitizer cleans this */
- log_info("Flow_join failed.");
- free(hash);
- return -EAGAIN;
- }
- } else {
- if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
- IPCP_HASH_LEN(ipcp), qs, data, len)) {
- irm_flow_set_state(f, FLOW_NULL);
- /* sanitizer cleans this */
- log_info("Flow_allocation failed.");
- free(hash);
- return -EAGAIN;
- }
+ if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
+ IPCP_HASH_LEN(ipcp), qs, data, len)) {
+ irm_flow_set_state(f, FLOW_NULL);
+ /* sanitizer cleans this */
+ log_info("Flow_allocation failed.");
+ free(hash);
+ return -EAGAIN;
}
free(hash);
@@ -2136,7 +2215,7 @@ static void * mainloop(void * o)
: msg->pk.data == NULL);
result = flow_alloc(msg->pid, msg->dst,
qos_spec_msg_to_s(msg->qosspec),
- timeo, &e, false, msg->pk.data,
+ timeo, &e, msg->pk.data,
msg->pk.len);
if (result == 0) {
ret_msg->has_flow_id = true;
@@ -2152,9 +2231,9 @@ static void * mainloop(void * o)
break;
case IRM_MSG_CODE__IRM_FLOW_JOIN:
assert(msg->pk.len == 0 && msg->pk.data == NULL);
- result = flow_alloc(msg->pid, msg->dst,
- qos_spec_msg_to_s(msg->qosspec),
- timeo, &e, true, NULL, 0);
+ result = flow_join(msg->pid, msg->dst,
+ qos_spec_msg_to_s(msg->qosspec),
+ timeo, &e);
if (result == 0) {
ret_msg->has_flow_id = true;
ret_msg->flow_id = e.flow_id;
diff --git a/src/lib/dev.c b/src/lib/dev.c
index e84efe55..b0c0175c 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -476,7 +476,6 @@ static int flow_init(int flow_id,
flow->rcv_act = now;
if (qs.cypher_s > 0) {
- assert(s != NULL);
if (crypt_init(&flow->ctx) < 0)
goto fail_ctx;
@@ -822,10 +821,9 @@ int flow_accept(qosspec_t * qs,
return err;
}
-static int __flow_alloc(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo,
- bool join)
+int flow_alloc(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg;
@@ -841,12 +839,11 @@ static int __flow_alloc(const char * dst,
if (qs != NULL)
qs->ber = 1;
#endif
- msg.code = join ? IRM_MSG_CODE__IRM_FLOW_JOIN
- : IRM_MSG_CODE__IRM_FLOW_ALLOC;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst = (char *) dst;
msg.has_pid = true;
msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs);
+ msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
if (timeo != NULL) {
msg.has_timeo_sec = true;
@@ -855,7 +852,7 @@ static int __flow_alloc(const char * dst,
msg.timeo_nsec = timeo->tv_nsec;
}
- if (!join && qs != NULL && qs->cypher_s != 0) {
+ if (qs != NULL && qs->cypher_s != 0) {
ssize_t key_len;
key_len = crypt_dh_pkp_create(&pkp, buf);
@@ -887,7 +884,7 @@ static int __flow_alloc(const char * dst,
!recv_msg->has_mpl)
goto fail_result;
- if (!join && qs != NULL && qs->cypher_s != 0) {
+ if (qs != NULL && qs->cypher_s != 0) {
if (!recv_msg->has_pk || recv_msg->pk.len == 0) {
err = -ECRYPT;
goto fail_result;
@@ -902,6 +899,9 @@ static int __flow_alloc(const char * dst,
crypt_dh_pkp_destroy(pkp);
}
+ /* TODO: Make sure qosspec is set in msg */
+ if (qs != NULL && recv_msg->qosspec != NULL)
+ *qs = qos_spec_msg_to_s(recv_msg->qosspec);
fd = flow_init(recv_msg->flow_id, recv_msg->pid,
qs == NULL ? qos_raw : *qs, s,
@@ -909,6 +909,7 @@ static int __flow_alloc(const char * dst,
irm_msg__free_unpacked(recv_msg, NULL);
+
return fd;
fail_result:
@@ -919,21 +920,68 @@ static int __flow_alloc(const char * dst,
return err;
}
-int flow_alloc(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo)
-{
- return __flow_alloc(dst, qs, timeo, false);
-}
-
int flow_join(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- if (qs != NULL && qs->cypher_s != 0)
- return -ECRYPT;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg;
+ uint8_t s[SYMMKEYSZ];
+ int fd;
+ int err = -EIRMD;
+
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ if (qs != NULL && qs->cypher_s > 0)
+ return -ENOTSUP; /* TODO: Encrypted broadcast */
+
+ memset(s, 0, SYMMKEYSZ);
+
+ msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN;
+ msg.dst = (char *) dst;
+ msg.has_pid = true;
+ msg.pid = getpid();
+ msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
+
+ if (timeo != NULL) {
+ msg.has_timeo_sec = true;
+ msg.has_timeo_nsec = true;
+ msg.timeo_sec = timeo->tv_sec;
+ msg.timeo_nsec = timeo->tv_nsec;
+ }
+
+ recv_msg = send_recv_irm_msg(&msg);
+ qosspec_msg__free_unpacked(msg.qosspec, NULL);
+
+ if (recv_msg == NULL)
+ goto fail_send;
+
+ if (!recv_msg->has_result)
+ goto fail_result;
+
+ if (recv_msg->result != 0) {
+ err = recv_msg->result;
+ goto fail_result;
+ }
- return __flow_alloc(dst, qs, timeo, true);
+ if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
+ !recv_msg->has_mpl)
+ goto fail_result;
+
+ fd = flow_init(recv_msg->flow_id, recv_msg->pid,
+ qs == NULL ? qos_raw : *qs, s,
+ recv_msg->mpl);
+
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ return fd;
+
+ fail_result:
+ irm_msg__free_unpacked(recv_msg, NULL);
+ fail_send:
+ return err;
}
int flow_dealloc(int fd)
diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c
index 0855305f..830efd40 100644
--- a/src/lib/protobuf.c
+++ b/src/lib/protobuf.c
@@ -301,26 +301,25 @@ struct ipcp_config ipcp_config_msg_to_s(const ipcp_config_msg_t * msg)
qosspec_msg_t * qos_spec_s_to_msg(const struct qos_spec * s)
{
- struct qos_spec spec;
qosspec_msg_t * msg;
+ assert(s != NULL);
+
msg = malloc(sizeof(*msg));
if (msg == NULL)
return NULL;
qosspec_msg__init(msg);
- spec = (s == NULL ? qos_raw : *s);
-
- msg->delay = spec.delay;
- msg->bandwidth = spec.bandwidth;
- msg->availability = spec.availability;
- msg->loss = spec.loss;
- msg->ber = spec.ber;
- msg->in_order = spec.in_order;
- msg->max_gap = spec.max_gap;
- msg->cypher_s = spec.cypher_s;
- msg->timeout = spec.timeout;
+ msg->delay = s->delay;
+ msg->bandwidth = s->bandwidth;
+ msg->availability = s->availability;
+ msg->loss = s->loss;
+ msg->ber = s->ber;
+ msg->in_order = s->in_order;
+ msg->max_gap = s->max_gap;
+ msg->cypher_s = s->cypher_s;
+ msg->timeout = s->timeout;
return msg;
}