summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2024-02-23 09:29:47 +0100
committerSander Vrijders <sander@ouroboros.rocks>2024-02-23 16:41:37 +0100
commite6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f (patch)
treead959d95f8fb1f6d4744c57c9027bf182bc3190b /src/lib
parentdcefa07624926da23a559eedc3f7361ac36e8312 (diff)
downloadouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.tar.gz
ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.zip
lib: Revise app flow allocation
This revises the application flow allocator to use the flow_info struct/message between the components. Revises the messaging to move the use protocol buffers to its own source (serdes-irm). Adds a timeout to the IRMd flow allocator to make sure flow allocations don't hang forever (this was previously taken care of by the sanitize thread). Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt7
-rw-r--r--src/lib/config.h.in1
-rw-r--r--src/lib/dev.c665
-rw-r--r--src/lib/frct.c4
-rw-r--r--src/lib/pb/ipcp.proto3
-rw-r--r--src/lib/pb/ipcp_config.proto5
-rw-r--r--src/lib/pb/irm.proto55
-rw-r--r--src/lib/pb/model.proto (renamed from src/lib/pb/qos.proto)28
-rw-r--r--src/lib/protobuf.c78
-rw-r--r--src/lib/serdes-irm.c478
-rw-r--r--src/lib/serdes-oep.c1
-rw-r--r--src/lib/sockets.c27
-rw-r--r--src/lib/timerwheel.c10
13 files changed, 917 insertions, 445 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 4a4684b0..b0bdb5ce 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -4,8 +4,8 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_BINARY_DIR}/include)
-protobuf_generate_c(QOSSPEC_PROTO_SRCS QOSSPEC_PROTO_HDRS
- pb/qos.proto)
+protobuf_generate_c(MODEL_PROTO_SRCS MODEL_PROTO_HDRS
+ pb/model.proto)
protobuf_generate_c(IPCP_CONFIG_PROTO_SRCS IPCP_CONFIG_PROTO_HDRS
pb/ipcp_config.proto)
protobuf_generate_c(ENROLL_PROTO_SRCS ENROLL_PROTO_HDRS
@@ -264,6 +264,7 @@ set(SOURCE_FILES_COMMON
qoscube.c
random.c
rib.c
+ serdes-irm.c
serdes-oep.c
sha3.c
shm_flow_set.c
@@ -278,7 +279,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY)
add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS}
- ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${QOSSPEC_PROTO_SRCS}
+ ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${MODEL_PROTO_SRCS}
${ENROLL_PROTO_SRCS})
add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CACEP_PROTO_SRCS})
diff --git a/src/lib/config.h.in b/src/lib/config.h.in
index d1eb9a54..604038b4 100644
--- a/src/lib/config.h.in
+++ b/src/lib/config.h.in
@@ -42,6 +42,7 @@
#define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@
#define SHM_BUFFER_SIZE @SHM_BUFFER_SIZE@
#define SHM_RBUFF_SIZE @SHM_RBUFF_SIZE@
+#define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@
#if defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__))
/* Avoid a bug in robust mutex implementation of glibc 2.25 */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 9e37978c..a7f20e88 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -42,9 +42,9 @@
#include <ouroboros/fccntl.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
-#include <ouroboros/protobuf.h>
#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
+#include <ouroboros/serdes-irm.h>
#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_rbuff.h>
@@ -57,6 +57,7 @@
#ifdef HAVE_LIBGCRYPT
#include <gcrypt.h>
#endif
+#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -79,6 +80,7 @@
/* map flow_ids to flow descriptors; track state of the flow */
struct fmap {
int fd;
+ /* TODO: use actual flow state */
enum flow_state state;
};
@@ -88,12 +90,13 @@ struct fmap {
struct flow {
struct list_head next;
+ struct flow_info info;
+
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
- int flow_id;
+
uint16_t oflags;
- qosspec_t qs;
ssize_t part_idx;
struct crypt_info crypt;
@@ -221,53 +224,32 @@ static enum flow_state flow_wait_assign(int flow_id)
return state;
}
-static int proc_announce(char * prog)
+static int proc_announce(const char * prog)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.prog = prog;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return ret;
- }
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (proc_announce__irm_req_ser(&msg, prog) < 0)
+ return -ENOMEM;
- return ret;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ return irm__irm_result_des(&msg);
}
+/* IRMd will clean up the mess if this fails */
static void proc_exit(void)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_EXIT;
- msg.has_pid = true;
- msg.pid = getpid();
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ if (proc_exit__irm_req_ser(&msg) < 0)
return;
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return;
- }
-
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return;
+ send_recv_msg(&msg);
}
#include "frct.c"
@@ -305,7 +287,7 @@ static void flow_send_keepalive(struct flow * flow,
if (shm_rbuff_write(flow->tx_rb, idx))
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
}
@@ -323,8 +305,8 @@ static void _flow_keepalive(struct flow * flow)
s_act = flow->snd_act;
r_act = flow->rcv_act;
- flow_id = flow->flow_id;
- timeo = flow->qs.timeout;
+ flow_id = flow->info.id;
+ timeo = flow->info.qs.timeout;
acl = shm_rbuff_get_acl(flow->rx_rb);
if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
@@ -400,10 +382,10 @@ static void flow_clear(int fd)
{
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
- ai.flows[fd].flow_id = -1;
+ ai.flows[fd].info.id = -1;
}
-static void flow_fini(int fd)
+static void __flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
@@ -414,13 +396,13 @@ static void flow_fini(int fd)
pthread_join(ai.tx, NULL);
}
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
frcti_destroy(ai.flows[fd].frcti);
}
- if (ai.flows[fd].flow_id != -1) {
- flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]);
+ if (ai.flows[fd].info.id != -1) {
+ flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]);
bmp_release(ai.fds, fd);
}
@@ -436,7 +418,7 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL) {
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
}
@@ -448,11 +430,17 @@ static void flow_fini(int fd)
flow_clear(fd);
}
-static int flow_init(int flow_id,
- pid_t pid,
- qosspec_t qs,
- uint8_t * s,
- time_t mpl)
+static void flow_fini(int fd)
+{
+ pthread_rwlock_wrlock(&ai.lock);
+
+ __flow_fini(fd);
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static int flow_init(struct flow_info * info,
+ buffer_t * sk)
{
struct timespec now;
struct flow * flow;
@@ -471,43 +459,43 @@ static int flow_init(int flow_id,
flow = &ai.flows[fd];
- flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
+ flow->info = *info;
+
+ flow->rx_rb = shm_rbuff_open(info->n_pid, info->id);
if (flow->rx_rb == NULL)
goto fail_rx_rb;
- flow->tx_rb = shm_rbuff_open(pid, flow_id);
+ flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id);
if (flow->tx_rb == NULL)
goto fail_tx_rb;
- flow->set = shm_flow_set_open(pid);
+ flow->set = shm_flow_set_open(info->n_1_pid);
if (flow->set == NULL)
goto fail_set;
- flow->flow_id = flow_id;
flow->oflags = FLOWFDEFAULT;
flow->part_idx = NO_PART;
- flow->qs = qs;
flow->snd_act = now;
flow->rcv_act = now;
- flow->crypt.flags = qs.cypher_s; /* TODO: remove cypher_s from qos */
+ flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */
- if (flow->crypt.flags > 0 && s != NULL) /* static analyzer s != NULL */
- memcpy(flow->crypt.key, s ,SYMMKEYSZ);
- else
- memset(flow->crypt.key, 0, SYMMKEYSZ);
+ memset(flow->crypt.key, 0, SYMMKEYSZ);
+
+ if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL)
+ memcpy(flow->crypt.key, sk->data , sk->len);
if (crypt_init(&flow->crypt) < 0)
goto fail_crypt;
assert(flow->frcti == NULL);
- if (flow->qs.in_order != 0) {
- flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl);
+ if (info->qs.in_order != 0) {
+ flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl);
if (flow->frcti == NULL)
goto fail_frcti;
- if (shm_flow_set_add(ai.fqset, 0, flow_id))
+ if (shm_flow_set_add(ai.fqset, 0, info->id))
goto fail_flow_set_add;
++ai.n_frcti;
@@ -518,16 +506,16 @@ static int flow_init(int flow_id,
list_add_tail(&flow->next, &ai.flow_list);
- ai.id_to_fd[flow_id].fd = fd;
+ ai.id_to_fd[info->id].fd = fd;
- flow_set_state(&ai.id_to_fd[flow_id], FLOW_ALLOCATED);
+ flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED);
pthread_rwlock_unlock(&ai.lock);
return fd;
fail_tx_thread:
- shm_flow_set_del(ai.fqset, 0, flow_id);
+ shm_flow_set_del(ai.fqset, 0, info->id);
fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
@@ -722,12 +710,12 @@ static void fini(void)
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].flow_id != -1) {
+ if (ai.flows[i].info.id != -1) {
ssize_t idx;
shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- flow_fini(i);
+ __flow_fini(i);
}
}
@@ -774,142 +762,94 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini;
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- int err = -EIRMD;
- uint8_t * symmkey;
-
- msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
- msg.has_pid = true;
- msg.pid = getpid();
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk;
+ int fd;
+ int err;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- goto fail_recv;
-
- if (!recv_msg->has_result)
- goto fail_msg;
-
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_msg;
- }
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ memset(&flow, 0, sizeof(flow));
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl || recv_msg->qosspec == NULL)
- goto fail_msg;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- symmkey = recv_msg->has_symmkey ? recv_msg->symmkey.data : NULL;
+ if (flow_accept__irm_req_ser(&msg, &flow, timeo))
+ return -ENOMEM;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qos_spec_msg_to_s(recv_msg->qosspec),
- symmkey,
- recv_msg->mpl);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- if (fd < 0)
- return fd;
+ fd = flow_init(&flow, &sk);
- pthread_rwlock_rdlock(&ai.lock);
+ freebuf(sk);
if (qs != NULL)
- *qs = ai.flows[fd].qs;
-
- pthread_rwlock_unlock(&ai.lock);
+ *qs = flow.qs;
return fd;
-
- fail_msg:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_recv:
- return err;
}
int flow_alloc(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- int err = -EIRMD;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk; /* symmetric key */
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
qs->ber = 1;
#endif
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
-
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
- if (recv_msg == NULL)
- goto fail_send_recv;
+ memset(&flow, 0, sizeof(flow));
- if (!recv_msg->has_result)
- goto fail_result;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if ((qs != NULL && qs->cypher_s != 0) &&
- (!recv_msg->has_symmkey || recv_msg->symmkey.len != SYMMKEYSZ)) {
- err = -ECRYPT;
- goto fail_result;
- }
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- /* TODO: Make sure qosspec is set in msg */
- if (qs != NULL && recv_msg->qosspec != NULL)
- *qs = qos_spec_msg_to_s(recv_msg->qosspec);
+ fd = flow_init(&flow, &sk);
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, recv_msg->symmkey.data,
- recv_msg->mpl);
+ freebuf(sk);
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send_recv:
- return err;
}
int flow_join(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- uint8_t s[SYMMKEYSZ];
- int fd;
- int err = -EIRMD;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
@@ -918,184 +858,145 @@ int flow_join(const char * dst,
if (qs != NULL && qs->cypher_s > 0)
return -ENOTSUP; /* TODO: Encrypted broadcast */
- memset(s, 0, SYMMKEYSZ);
+ memset(&flow, 0, sizeof(flow));
- msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if (recv_msg == NULL)
- goto fail_send;
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- if (!recv_msg->has_result)
- goto fail_result;
+ fd = flow_init(&flow, NULL);
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
-
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
-
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, s,
- recv_msg->mpl);
-
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send:
- return err;
}
+#define PKT_BUF_LEN 2048
int flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- uint8_t buf[128];
- struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
- struct flow * f;
- time_t timeo;
+ struct flow_info info;
+ uint8_t pkt[PKT_BUF_LEN];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+ struct timespec timeo = TIMESPEC_INIT_S(0);
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_nsec = 0;
+ memset(&info, 0, sizeof(flow));
- f = &ai.flows[fd];
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
-
- f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+ flow->oflags = FLOWFDEFAULT | FLOWFRNOPART;
- f->rcv_timesout = true;
- f->rcv_timeo = tic;
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = tic;
pthread_rwlock_unlock(&ai.lock);
- flow_read(fd, buf, 128);
+ flow_read(fd, buf, SOCK_BUF_SIZE);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
- while (timeo < 0) { /* keep the flow active for rtx */
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
+ while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
ssize_t ret;
pthread_rwlock_unlock(&ai.lock);
- ret = flow_read(fd, buf, 128);
+ ret = flow_read(fd, pkt, PKT_BUF_LEN);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
- if (ret == -EFLOWDOWN && timeo < 0)
- timeo = -timeo;
+ if (ret == -EFLOWDOWN && timeo.tv_sec < 0)
+ timeo.tv_sec = -timeo.tv_sec;
}
- msg.timeo_sec = timeo;
-
pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
- shm_rbuff_fini(ai.flows[fd].tx_rb);
+ shm_rbuff_fini(flow->tx_rb);
pthread_cleanup_pop(true);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
+ info.id = flow->info.id;
+ info.n_pid = getpid();
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int ipcp_flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- struct flow * f;
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_flow_id = true;
+ flow = &ai.flows[fd];
- f = &ai.flows[fd];
+ memset(&info, 0, sizeof(flow));
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
+ info.id = flow->info.id;
+ info.n_1_pid = flow->info.n_1_pid;
pthread_rwlock_unlock(&ai.lock);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int fccntl(int fd,
@@ -1122,7 +1023,7 @@ int fccntl(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -1167,7 +1068,7 @@ int fccntl(int fd,
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = flow->qs;
+ *qs = flow->info.qs;
break;
case FLOWGRXQLEN:
qlen = va_arg(l, size_t *);
@@ -1194,13 +1095,13 @@ int fccntl(int fd,
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_UP);
}
@@ -1302,7 +1203,7 @@ static int flow_tx_sdb(struct flow * flow,
if (crypt_encrypt(&flow->crypt, sdb) < 0)
goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && add_crc(sdb) != 0)
goto enomem;
}
@@ -1316,7 +1217,7 @@ static int flow_tx_sdb(struct flow * flow,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_cleanup_pop(true);
@@ -1353,7 +1254,7 @@ ssize_t flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1398,7 +1299,7 @@ static bool invalid_pkt(struct flow * flow,
if (shm_du_buff_len(sdb) == 0)
return true;
- if (flow->qs.ber == 0 && chk_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0)
return true;
if (crypt_decrypt(&flow->crypt, sdb) < 0)
@@ -1461,7 +1362,7 @@ ssize_t flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1627,20 +1528,20 @@ int fset_add(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
ret = -EINVAL;
goto fail;
}
if (flow->frcti != NULL)
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
- ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id);
if (ret < 0)
goto fail;
if (shm_rbuff_queued(ai.flows[fd].rx_rb))
- shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT);
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1663,11 +1564,11 @@ void fset_del(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
+ if (flow->info.id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->info.id);
if (flow->frcti != NULL)
- shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id);
pthread_rwlock_unlock(&ai.lock);
}
@@ -1682,12 +1583,12 @@ bool fset_has(const struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return false;
}
- ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1);
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1);
pthread_rwlock_unlock(&ai.lock);
@@ -1828,10 +1729,20 @@ ssize_t fevent(struct flow_set * set,
/* ipcp-dev functions. */
-int np1_flow_alloc(pid_t n_pid,
- int flow_id)
+int np1_flow_alloc(pid_t n_pid,
+ int flow_id)
{
- return flow_init(flow_id, n_pid, qos_np1, NULL, 0);
+ struct flow_info flow;
+
+ memset(&flow, 0, sizeof(flow));
+
+ flow.id = flow_id;
+ flow.n_pid = getpid();
+ flow.qs = qos_np1;
+ flow.mpl = 0;
+ flow.n_1_pid = n_pid; /* This "flow" is upside-down! */
+
+ return flow_init(&flow, NULL);
}
int np1_flow_dealloc(int flow_id,
@@ -1874,123 +1785,85 @@ int np1_flow_resp(int flow_id)
int ipcp_create_r(const struct ipcp_info * info)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.ipcp_info = ipcp_info_s_to_msg(info);
+ if (ipcp_create_r__irm_req_ser(&msg,info) < 0)
+ return -ENOMEM;
- recv_msg = send_recv_irm_msg(&msg);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- ipcp_info_msg__free_unpacked(msg.ipcp_info, NULL);
+ return irm__irm_result_des(&msg);
+}
- if (recv_msg == NULL)
- return -EIRMD;
+int ipcp_flow_req_arr(const buffer_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data)
+{
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ memset(&flow, 0, sizeof(flow));
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
+ assert(dst != NULL && dst->len != 0 && dst->data != NULL);
- return ret;
-}
+ flow.n_1_pid = getpid();
+ flow.qs = qs;
+ flow.mpl = mpl;
-int ipcp_flow_req_arr(const uint8_t * dst,
- size_t len,
- qosspec_t qs,
- time_t mpl,
- const void * data,
- size_t dlen)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
-
- assert(dst != NULL);
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_hash = true;
- msg.hash.len = len;
- msg.hash.data = (uint8_t *) dst;
- msg.qosspec = qos_spec_s_to_msg(&qs);
- msg.has_mpl = true;
- msg.mpl = mpl;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = dlen;
-
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
-
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_flow_id || !recv_msg->has_pid) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0)
+ return -ENOMEM;
- if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ /* inverted for np1_flow */
+ flow.n_1_pid = flow.n_pid;
+ flow.n_pid = getpid();
+ flow.mpl = 0;
- return fd;
+ return flow_init(&flow, NULL);
}
-int ipcp_flow_alloc_reply(int fd,
- int response,
- time_t mpl,
- const void * data,
- size_t len)
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.has_flow_id = true;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = (uint32_t) len;
- msg.has_mpl = true;
- msg.mpl = mpl;
-
pthread_rwlock_rdlock(&ai.lock);
- msg.flow_id = ai.flows[fd].flow_id;
+ flow.id = ai.flows[fd].info.id;
pthread_rwlock_unlock(&ai.lock);
- msg.has_response = true;
- msg.response = response;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ flow.mpl = mpl;
- ret = recv_msg->result;
+ if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- return ret;
+ return irm__irm_result_des(&msg);
}
int ipcp_flow_read(int fd,
@@ -2006,7 +1879,7 @@ int ipcp_flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
while (frcti_queued_pdu(flow->frcti) < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -2038,7 +1911,7 @@ int ipcp_flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2066,7 +1939,7 @@ int np1_flow_read(int fd,
flow = &ai.flows[fd];
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
pthread_rwlock_rdlock(&ai.lock);
@@ -2097,7 +1970,7 @@ int np1_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2115,7 +1988,7 @@ int np1_flow_write(int fd,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
return ret;
}
@@ -2139,7 +2012,7 @@ int ipcp_flow_fini(int fd)
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -1;
}
@@ -2148,7 +2021,7 @@ int ipcp_flow_fini(int fd)
shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
rx_rb = ai.flows[fd].rx_rb;
@@ -2169,9 +2042,9 @@ int ipcp_flow_get_qoscube(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
- *cube = qos_spec_to_cube(ai.flows[fd].qs);
+ *cube = qos_spec_to_cube(ai.flows[fd].info.qs);
pthread_rwlock_unlock(&ai.lock);
@@ -2184,7 +2057,7 @@ size_t ipcp_flow_queued(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
q = shm_rbuff_queued(ai.flows[fd].tx_rb);
@@ -2220,14 +2093,14 @@ int local_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
if (ret == 0)
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
else
shm_rdrbuff_remove(ai.rdrb, idx);
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 604e8a62..c6fef35c 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -271,7 +271,7 @@ static void __send_frct_pkt(int fd,
#endif
goto fail;
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+ shm_flow_set_notify(f->set, f->info.id, FLOW_PKT);
return;
@@ -398,7 +398,7 @@ static struct frcti * frcti_create(int fd,
frcti->n_out = 0;
frcti->n_rqo = 0;
#endif
- if (ai.flows[fd].qs.loss == 0) {
+ if (ai.flows[fd].info.qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
}
diff --git a/src/lib/pb/ipcp.proto b/src/lib/pb/ipcp.proto
index 71bf90b8..c2c7f48b 100644
--- a/src/lib/pb/ipcp.proto
+++ b/src/lib/pb/ipcp.proto
@@ -23,7 +23,8 @@
syntax = "proto2";
import "ipcp_config.proto";
-import "qos.proto";
+import "model.proto";
+
enum ipcp_msg_code {
IPCP_BOOTSTRAP = 1;
diff --git a/src/lib/pb/ipcp_config.proto b/src/lib/pb/ipcp_config.proto
index ca4d55aa..28528b0c 100644
--- a/src/lib/pb/ipcp_config.proto
+++ b/src/lib/pb/ipcp_config.proto
@@ -22,10 +22,7 @@
syntax = "proto2";
-message layer_info_msg {
- required string name = 1;
- required uint32 dir_hash_algo = 2;
-}
+import "model.proto";
message dt_config_msg {
required uint32 addr_size = 1;
diff --git a/src/lib/pb/irm.proto b/src/lib/pb/irm.proto
index c962e5e5..da3bd982 100644
--- a/src/lib/pb/irm.proto
+++ b/src/lib/pb/irm.proto
@@ -23,7 +23,7 @@
syntax = "proto2";
import "ipcp_config.proto";
-import "qos.proto";
+import "model.proto";
enum irm_msg_code {
IRM_CREATE_IPCP = 1;
@@ -55,11 +55,9 @@ enum irm_msg_code {
IRM_REPLY = 27;
}
-message ipcp_info_msg {
- required uint32 type = 1;
- required string name = 2;
- required uint32 pid = 3;
- required uint32 state = 4;
+message timespec_msg {
+ required uint64 tv_sec = 1;
+ required uint32 tv_nsec = 2;
}
message ipcp_list_msg {
@@ -70,33 +68,30 @@ message ipcp_list_msg {
required uint32 hash_algo = 5;
}
-message name_info_msg {
- required string name = 1;
- required uint32 pol_lb = 2;
-}
-
message irm_msg {
required irm_msg_code code = 1;
optional string prog = 2;
optional sint32 pid = 3;
optional string name = 4;
- optional ipcp_info_msg ipcp_info = 5;
- optional string layer = 6;
- repeated string exec = 7;
- optional sint32 response = 8;
- optional string dst = 9;
- optional bytes hash = 10;
- optional sint32 flow_id = 11;
- optional qosspec_msg qosspec = 12;
- optional ipcp_config_msg conf = 13;
- optional uint32 opts = 14;
- repeated ipcp_list_msg ipcps = 15;
- repeated name_info_msg names = 16;
- optional uint32 timeo_sec = 17;
- optional uint32 timeo_nsec = 18;
- optional sint32 mpl = 19;
- optional string comp = 20;
- optional bytes pk = 21; /* piggyback */
- optional bytes symmkey = 22;
- optional sint32 result = 23;
+ optional flow_info_msg flow_info = 5;
+ optional ipcp_info_msg ipcp_info = 6;
+ optional string layer = 7;
+ repeated string exec = 8;
+ optional sint32 response = 9;
+ optional string dst = 10;
+ optional bytes hash = 11;
+ optional sint32 flow_id = 12;
+ optional qosspec_msg qosspec = 13;
+ optional ipcp_config_msg conf = 14;
+ optional uint32 opts = 15;
+ repeated ipcp_list_msg ipcps = 16;
+ repeated name_info_msg names = 17;
+ optional timespec_msg timeo = 18;
+ optional sint32 mpl = 20;
+ optional string comp = 21;
+ optional bytes pk = 22; /* piggyback */
+ optional bytes symmkey = 23;
+ optional uint32 timeo_sec = 24;
+ optional uint32 timeo_nsec = 25;
+ optional sint32 result = 26;
}
diff --git a/src/lib/pb/qos.proto b/src/lib/pb/model.proto
index 64f5a285..f1e401f9 100644
--- a/src/lib/pb/qos.proto
+++ b/src/lib/pb/model.proto
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2024
*
- * QoS specification message
+ * Model description messages
*
* Dimitri Staessens <dimitri@ouroboros.rocks>
* Sander Vrijders <sander@ouroboros.rocks>
@@ -33,3 +33,29 @@ message qosspec_msg {
required uint32 cypher_s = 8; /* Crypto strength in bits. */
required uint32 timeout = 9; /* Timeout in ms. */
}
+
+message flow_info_msg {
+ required uint32 id = 1;
+ required uint32 n_pid = 2;
+ required uint32 n_1_pid = 3;
+ required uint32 mpl = 4;
+ required uint32 state = 5;
+ required qosspec_msg qos = 6;
+}
+
+message name_info_msg {
+ required string name = 1;
+ required uint32 pol_lb = 2;
+}
+
+message layer_info_msg {
+ required string name = 1;
+ required uint32 dir_hash_algo = 2;
+}
+
+message ipcp_info_msg {
+ required uint32 type = 1;
+ required string name = 2;
+ required uint32 pid = 3;
+ required uint32 state = 4;
+}
diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c
index 2135d57e..b586168c 100644
--- a/src/lib/protobuf.c
+++ b/src/lib/protobuf.c
@@ -28,6 +28,84 @@
#include <string.h>
#include <time.h>
+timespec_msg_t * timespec_s_to_msg(const struct timespec * s)
+{
+ timespec_msg_t * msg;
+
+ assert(s != NULL);
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ timespec_msg__init(msg);
+
+ msg->tv_sec = s->tv_sec;
+ msg->tv_nsec = s->tv_nsec;
+
+ return msg;
+
+ fail_malloc:
+ return NULL;
+}
+
+struct timespec timespec_msg_to_s(timespec_msg_t * msg)
+{
+ struct timespec s;
+
+ assert(msg != NULL);
+
+ s.tv_sec = msg->tv_sec;
+ s.tv_nsec = msg->tv_nsec;
+
+ return s;
+}
+
+flow_info_msg_t * flow_info_s_to_msg(const struct flow_info * s)
+{
+ flow_info_msg_t * msg;
+
+ assert(s != NULL);
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ flow_info_msg__init(msg);
+
+ msg->id = s->id;
+ msg->n_pid = s->n_pid;
+ msg->n_1_pid = s->n_1_pid;
+ msg->mpl = s->mpl;
+ msg->state = s->state;
+ msg->qos = qos_spec_s_to_msg(&s->qs);
+ if (msg->qos == NULL)
+ goto fail_msg;
+
+ return msg;
+
+ fail_msg:
+ flow_info_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
+}
+
+struct flow_info flow_info_msg_to_s(const flow_info_msg_t * msg)
+{
+ struct flow_info s;
+
+ assert(msg != NULL);
+
+ s.id = msg->id;
+ s.n_pid = msg->n_pid;
+ s.n_1_pid = msg->n_1_pid;
+ s.mpl = msg->mpl;
+ s.state = msg->state;
+ s.qs = qos_spec_msg_to_s(msg->qos);
+
+ return s;
+}
+
layer_info_msg_t * layer_info_s_to_msg(const struct layer_info * s)
{
layer_info_msg_t * msg;
diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c
new file mode 100644
index 00000000..c4ba3053
--- /dev/null
+++ b/src/lib/serdes-irm.c
@@ -0,0 +1,478 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Ouroboros IRM Protocol - serialization/deserialization
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/serdes-irm.h>
+#include <ouroboros/protobuf.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+int flow_accept__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const struct timespec * timeo)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->timeo = timeo == NULL ? NULL : timespec_s_to_msg(timeo);
+ if (timeo != NULL && msg->timeo == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int __flow_alloc_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo,
+ int msg_code)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = msg_code;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->dst = strdup(dst);
+ if (msg->dst == NULL)
+ goto fail_msg;
+
+ msg->timeo = timeo == NULL ? NULL : timespec_s_to_msg(timeo);
+ if (timeo != NULL && msg->timeo == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int flow_alloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo)
+{
+ return __flow_alloc_ser(buf, flow, dst, timeo,
+ IRM_MSG_CODE__IRM_FLOW_ALLOC);
+}
+
+int flow_join__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo)
+{
+ return __flow_alloc_ser(buf, flow, dst, timeo,
+ IRM_MSG_CODE__IRM_FLOW_JOIN);
+}
+
+int flow__irm_result_des(buffer_t * buf,
+ struct flow_info * flow,
+ buffer_t * sk)
+{
+ irm_msg_t * msg;
+ int err;
+
+ if (sk != NULL)
+ sk->data = NULL;
+
+ msg = irm_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ err = -EIRMD;
+ goto fail_msg;
+ }
+
+ if (!msg->has_result) {
+ err = -EIRMD;
+ goto fail;
+ }
+
+ if (msg->result < 0) {
+ err = msg->result;
+ goto fail;
+ }
+
+ if (msg->flow_info == NULL) {
+ err = -EBADF;
+ goto fail;
+ }
+
+ *flow = flow_info_msg_to_s(msg->flow_info);
+
+ if (flow->qs.cypher_s > 0 && sk != NULL) {
+ if (msg->symmkey.data == NULL || msg->symmkey.len == 0) {
+ err = -ECRYPT;
+ goto fail;
+ }
+
+ sk->len = msg->symmkey.len;
+ sk->data = msg->symmkey.data;
+
+ msg->symmkey.data = NULL;
+ msg->symmkey.len = 0;
+ }
+
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return err;
+}
+
+int flow_dealloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const struct timespec * timeo)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->timeo = timespec_s_to_msg(timeo);
+ if (msg->timeo == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+
+int ipcp_create_r__irm_req_ser(buffer_t * buf,
+ const struct ipcp_info * ipcp)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_CREATE_R;
+ msg->ipcp_info = ipcp_info_s_to_msg(ipcp);
+ if (msg->ipcp_info == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int proc_announce__irm_req_ser(buffer_t * buf,
+ const char * prog)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
+ msg->has_pid = true;
+ msg->pid = getpid();
+ msg->prog = strdup(prog);
+ if (msg->prog == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int proc_exit__irm_req_ser(buffer_t * buf)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_PROC_EXIT;
+ msg->has_pid = true;
+ msg->pid = getpid();
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf,
+ const buffer_t * dst,
+ const struct flow_info * flow,
+ const buffer_t * data)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->has_hash = true;
+ msg->hash.len = dst->len;
+ msg->hash.data = dst->data;
+ msg->has_pk = true;
+ msg->pk.len = data->len;
+ msg->pk.data = data->data;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+
+ /* Don't free * dst or data! */
+ msg->hash.len = 0;
+ msg->hash.data = NULL;
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ int response,
+ const buffer_t * data)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->has_pk = true;
+ msg->pk.len = data->len;
+ msg->pk.data = data->data;
+ msg->has_response = true;
+ msg->response = response;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+
+ /* Don't free * data! */
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
+
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int irm__irm_result_des(buffer_t * buf)
+{
+ irm_msg_t * msg;
+ int err;
+
+ msg = irm_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ err = -EIRMD;
+ goto fail_msg;
+ }
+
+ if (!msg->has_result) {
+ err = -EIRMD;
+ goto fail;
+ }
+
+ err = msg->result;
+ fail:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return err;
+}
diff --git a/src/lib/serdes-oep.c b/src/lib/serdes-oep.c
index f92011c5..8a836b3b 100644
--- a/src/lib/serdes-oep.c
+++ b/src/lib/serdes-oep.c
@@ -25,7 +25,6 @@
#include <ouroboros/protobuf.h>
#include <ouroboros/serdes-oep.h>
-
ssize_t enroll_req_ser(const struct enroll_req * req,
buffer_t buf)
{
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 9c5b7a51..13219db0 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -21,6 +21,7 @@
*/
#include <ouroboros/errno.h>
+#include <ouroboros/pthread.h>
#include <ouroboros/sockets.h>
#include <ouroboros/utils.h>
@@ -29,7 +30,6 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
-#include <pthread.h>
#include <stdbool.h>
/* Apple doesn't support SEQPACKET. */
@@ -118,7 +118,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
pthread_cleanup_push(__cleanup_close_ptr, &sockfd);
- if (write(sockfd, buf, len) != -1)
+ len = write(sockfd, buf, len);
+ if (len >= 0)
len = read(sockfd, buf, SOCK_BUF_SIZE);
pthread_cleanup_pop(true);
@@ -131,6 +132,28 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
return NULL;
}
+int send_recv_msg(buffer_t * msg)
+{
+ int sockfd;
+ ssize_t len = 0;
+
+ sockfd = client_socket_open(IRM_SOCK_PATH);
+ if (sockfd < 0)
+ return -1;
+
+ pthread_cleanup_push(__cleanup_close_ptr, &sockfd);
+
+ len = write(sockfd, msg->data, msg->len);
+ if (len >= 0)
+ len = read(sockfd, msg->data, SOCK_BUF_SIZE);
+
+ pthread_cleanup_pop(true);
+
+ msg->len = (size_t) len;
+
+ return len < 0 ? -1 : 0;
+}
+
char * ipcp_sock_path(pid_t pid)
{
char * full_name = NULL;
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 40dfbb19..96f4ac47 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -178,7 +178,7 @@ static void timerwheel_move(void)
shm_du_buff_ack(r->sdb);
#endif
if (f->frcti == NULL
- || f->flow_id != r->flow_id)
+ || f->info.id != r->flow_id)
goto cleanup;
pthread_rwlock_rdlock(&r->frcti->lock);
@@ -249,7 +249,7 @@ static void timerwheel_move(void)
if (shm_rbuff_write(f->tx_rb, idx) < 0)
#endif
goto flow_down;
- shm_flow_set_notify(f->set, f->flow_id,
+ shm_flow_set_notify(f->set, f->info.id,
FLOW_PKT);
reschedule:
list_add(&r->next, &rw.rxms[lvl][rslot]);
@@ -292,7 +292,7 @@ static void timerwheel_move(void)
rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
- if (f->flow_id == a->flow_id && f->frcti != NULL)
+ if (f->info.id == a->flow_id && f->frcti != NULL)
send_frct_pkt(a->frcti);
free(a);
@@ -341,7 +341,7 @@ static int timerwheel_rxm(struct frcti * frcti,
slot = r->t0 >> RXMQ_RES;
r->fd = frcti->fd;
- r->flow_id = ai.flows[r->fd].flow_id;
+ r->flow_id = ai.flows[r->fd].info.id;
pthread_rwlock_unlock(&r->frcti->lock);
@@ -394,7 +394,7 @@ static int timerwheel_delayed_ack(int fd,
a->fd = fd;
a->frcti = frcti;
- a->flow_id = ai.flows[fd].flow_id;
+ a->flow_id = ai.flows[fd].info.id;
pthread_mutex_lock(&rw.lock);