summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-02-19 22:03:16 +0100
committerSander Vrijders <sander@ouroboros.rocks>2026-02-22 16:02:16 +0100
commitc3636005831064e71b03a5f8796a21e89b2a714f (patch)
treeca57f7d09e9de015107edb1bda6f30654bf7699b /src
parent1bf1d33db3e7622c8b97c5518f0f0ff984b989a8 (diff)
downloadouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.tar.gz
ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.zip
irmd: Allow direct rbuff between local processes
This allows bypassing the IPCP for local processes that share the same packet pool, lowering latency between processes to comparable levels as Unix sockets (RTT in the order of a microsecond). For local processes, no IPCPs are needed: $ irm b prog oping n oping $ oping -l Ouroboros ping server started. New flow 64. Received 64 bytes on fd 64. The direct IPC can be disabled with the DISABLE_DIRECT_IPC build flag. Note that this is needed for rumba 'local' experiments to emulate network topologies. Without this flag all processes will just communicate directly. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src')
-rw-r--r--src/irmd/config.h.in1
-rw-r--r--src/irmd/main.c302
-rw-r--r--src/irmd/oap/cli.c3
-rw-r--r--src/irmd/reg/flow.c35
-rw-r--r--src/irmd/reg/flow.h14
-rw-r--r--src/irmd/reg/reg.c209
-rw-r--r--src/irmd/reg/reg.h13
-rw-r--r--src/irmd/reg/tests/flow_test.c55
-rw-r--r--src/irmd/reg/tests/reg_test.c183
-rw-r--r--src/tools/oping/oping.c15
-rw-r--r--src/tools/oping/oping_client.c122
-rw-r--r--src/tools/oping/oping_server.c68
12 files changed, 786 insertions, 234 deletions
diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in
index 2888ce37..df0cd718 100644
--- a/src/irmd/config.h.in
+++ b/src/irmd/config.h.in
@@ -74,6 +74,7 @@
#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@
+#cmakedefine DISABLE_DIRECT_IPC
#cmakedefine IRMD_KILL_ALL_PROCESSES
#cmakedefine HAVE_LIBGCRYPT
#cmakedefine HAVE_OPENSSL
diff --git a/src/irmd/main.c b/src/irmd/main.c
index c7a5715b..e610a015 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -86,6 +86,7 @@
#define TIMESYNC_SLACK 100 /* ms */
#define OAP_SEEN_TIMER 20 /* s */
#define DEALLOC_TIME 300 /* s */
+#define DIRECT_MPL 1 /* s */
enum irm_state {
IRMD_NULL = 0,
@@ -914,21 +915,29 @@ static int flow_accept(struct flow_info * flow,
goto fail_oap;
}
- if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) {
+ if (reg_flow_is_direct(flow->id)) {
+ if (reg_respond_flow_direct(flow->id, &resp_hdr) < 0) {
+ log_err("Failed to respond to direct flow.");
+ goto fail_resp;
+ }
+ log_info("Flow %d accepted (direct) by %d for %s.",
+ flow->id, flow->n_pid, name);
+ } else if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) {
log_err("Failed to respond to flow allocation.");
goto fail_resp;
+ } else {
+ log_info("Flow %d accepted by %d for %s (uid %d).",
+ flow->id, flow->n_pid, name, flow->uid);
}
- log_info("Flow %d accepted by %d for %s (uid %d).",
- flow->id, flow->n_pid, name, flow->uid);
-
freebuf(req_hdr);
freebuf(resp_hdr);
return 0;
fail_oap:
- ipcp_flow_alloc_resp(flow, err, resp_hdr);
+ if (!reg_flow_is_direct(flow->id))
+ ipcp_flow_alloc_resp(flow, err, resp_hdr);
fail_wait:
reg_destroy_flow(flow->id);
fail_flow:
@@ -1028,7 +1037,7 @@ static int get_ipcp_by_dst(const char * dst,
pid_t * pid,
buffer_t * hash)
{
- ipcp_list_msg_t ** ipcps;
+ ipcp_list_msg_t ** ipcps = NULL;
int n;
int i;
int err = -EIPCP;
@@ -1081,6 +1090,171 @@ static int get_ipcp_by_dst(const char * dst,
return err;
}
+static int wait_for_accept(const char * name)
+{
+ struct timespec timeo = TIMESPEC_INIT_MS(IRMD_REQ_ARR_TIMEOUT);
+ struct timespec abstime;
+ char ** exec;
+ int ret;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, &timeo, &abstime);
+
+ ret = reg_wait_flow_accepting(name, &abstime);
+ if (ret == -ETIMEDOUT) {
+ if (reg_get_exec(name, &exec) < 0) {
+ log_dbg("No program bound for %s.", name);
+ goto fail;
+ }
+
+ if (spawn_program(exec) < 0) {
+ log_err("Failed to start %s for %s.", exec[0], name);
+ goto fail_spawn;
+ }
+
+ log_info("Starting %s for %s.", exec[0], name);
+
+ ts_add(&abstime, &timeo, &abstime);
+
+ ret = reg_wait_flow_accepting(name, &abstime);
+ if (ret == -ETIMEDOUT)
+ goto fail_spawn;
+
+ argvfree(exec);
+ }
+
+ return ret;
+
+ fail_spawn:
+ argvfree(exec);
+ fail:
+ return -1;
+}
+
+static int flow_req_arr(struct flow_info * flow,
+ const uint8_t * hash,
+ buffer_t * data)
+{
+ struct ipcp_info info;
+ struct layer_info layer;
+ enum hash_algo algo;
+ int ret;
+ char name[NAME_SIZE + 1];
+
+ info.pid = flow->n_1_pid;
+
+ log_dbg("Flow req arrived from IPCP %d for " HASH_FMT32 ".",
+ info.pid, HASH_VAL32(hash));
+
+ if (reg_get_ipcp(&info, &layer) < 0) {
+ log_err("No IPCP with pid %d.", info.pid);
+ ret = -EIPCP;
+ goto fail;
+ }
+
+ algo = (enum hash_algo) layer.dir_hash_algo;
+
+ if (reg_get_name_for_hash(name, algo, hash) < 0) {
+ log_warn("No name for " HASH_FMT32 ".", HASH_VAL32(hash));
+ ret = -ENAME;
+ goto fail;
+ }
+
+ log_info("Flow request arrived for %s.", name);
+
+ ret = wait_for_accept(name);
+ if (ret < 0) {
+ log_err("No active process for %s.", name);
+ goto fail;
+ }
+
+ flow->id = ret;
+ flow->state = FLOW_ALLOCATED;
+
+ ret = reg_respond_accept(flow, data);
+ if (ret < 0) {
+ log_err("Failed to respond to flow %d.", flow->id);
+ goto fail;
+ }
+
+ return 0;
+ fail:
+ return ret;
+}
+
+#ifndef DISABLE_DIRECT_IPC
+static int flow_alloc_direct(const char * dst,
+ struct flow_info * flow,
+ buffer_t * data,
+ struct timespec * abstime,
+ struct crypt_sk * sk,
+ struct name_info * info)
+{
+ struct flow_info acc; /* server side flow */
+ buffer_t req_hdr = BUF_INIT;
+ buffer_t resp_hdr = BUF_INIT;
+ void * ctx;
+ int err;
+
+ acc.id = wait_for_accept(dst);
+ if (acc.id < 0) {
+ log_dbg("No accepting process for %s.", dst);
+ return -EAGAIN;
+ }
+
+ if (oap_cli_prepare(&ctx, info, &req_hdr, *data) < 0) {
+ log_err("Failed to prepare OAP for %s.", dst);
+ return -EBADF;
+ }
+
+ acc.n_1_pid = flow->n_pid;
+ acc.mpl = DIRECT_MPL;
+ acc.qs = flow->qs;
+ acc.state = FLOW_ALLOCATED;
+
+ err = reg_prepare_flow_direct(&acc, &req_hdr, flow->uid);
+ if (err == -EPERM) {
+ log_dbg("UID mismatch, falling back.");
+ oap_ctx_free(ctx);
+ freebuf(req_hdr);
+ return -EPERM;
+ }
+
+ if (err < 0) {
+ log_err("Failed to prepare direct flow.");
+ oap_ctx_free(ctx);
+ freebuf(req_hdr);
+ return -EBADF;
+ }
+
+ err = reg_wait_flow_direct(acc.id, &resp_hdr, abstime);
+ if (err < 0) {
+ log_err("Timeout waiting for OAP response.");
+ oap_ctx_free(ctx);
+ return -ETIMEDOUT;
+ }
+
+ err = oap_cli_complete(ctx, info, resp_hdr, data, sk);
+ if (err < 0) {
+ log_err("OAP completion failed for %s.", dst);
+ freebuf(resp_hdr);
+ return err;
+ }
+
+ flow->id = acc.id;
+ flow->n_1_pid = acc.n_pid;
+ flow->mpl = DIRECT_MPL;
+ flow->state = FLOW_ALLOCATED;
+
+ log_info("Flow %d allocated (direct) for %d to %s.",
+ flow->id, flow->n_pid, dst);
+
+ freebuf(resp_hdr);
+
+ return 0;
+}
+#endif /* DISABLE_DIRECT_IPC */
+
static int flow_alloc(const char * dst,
struct flow_info * flow,
buffer_t * data,
@@ -1104,17 +1278,25 @@ static int flow_alloc(const char * dst,
goto fail_flow;
}
+ flow->uid = reg_get_proc_uid(flow->n_pid);
+
+ log_info("Allocating flow for %d to %s (uid %d).",
+ flow->n_pid, dst, flow->uid);
+
+#ifndef DISABLE_DIRECT_IPC
+ err = flow_alloc_direct(dst, flow, data, abstime, sk, &info);
+ if (err == 0)
+ return 0;
+
+ if (err != -EPERM && err != -EAGAIN)
+ goto fail_flow;
+#endif
if (reg_create_flow(flow) < 0) {
log_err("Failed to create flow.");
err = -EBADF;
goto fail_flow;
}
- flow->uid = reg_get_proc_uid(flow->n_pid);
-
- log_info("Allocating flow for %d to %s (uid %d).",
- flow->n_pid, dst, flow->uid);
-
if (get_ipcp_by_dst(dst, &flow->n_1_pid, &hash) < 0) {
log_err("Failed to find IPCP for %s.", dst);
err = -EIPCP;
@@ -1188,98 +1370,6 @@ static int flow_alloc(const char * dst,
return err;
}
-static int wait_for_accept(const char * name)
-{
- struct timespec timeo = TIMESPEC_INIT_MS(IRMD_REQ_ARR_TIMEOUT);
- struct timespec abstime;
- char ** exec;
- int ret;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, &timeo, &abstime);
-
- ret = reg_wait_flow_accepting(name, &abstime);
- if (ret == -ETIMEDOUT) {
- if (reg_get_exec(name, &exec) < 0) {
- log_dbg("No program bound for %s.", name);
- goto fail;
- }
-
- if (spawn_program(exec) < 0) {
- log_err("Failed to start %s for %s.", exec[0], name);
- goto fail_spawn;
- }
-
- log_info("Starting %s for %s.", exec[0], name);
-
- ts_add(&abstime, &timeo, &abstime);
-
- ret = reg_wait_flow_accepting(name, &abstime);
- if (ret == -ETIMEDOUT)
- goto fail_spawn;
-
- argvfree(exec);
- }
-
- return ret;
-
- fail_spawn:
- argvfree(exec);
- fail:
- return -1;
-}
-
-static int flow_req_arr(struct flow_info * flow,
- const uint8_t * hash,
- buffer_t * data)
-{
- struct ipcp_info info;
- struct layer_info layer;
- enum hash_algo algo;
- int ret;
- char name[NAME_SIZE + 1];
-
- info.pid = flow->n_1_pid;
-
- log_dbg("Flow req arrived from IPCP %d for " HASH_FMT32 ".",
- info.pid, HASH_VAL32(hash));
-
- if (reg_get_ipcp(&info, &layer) < 0) {
- log_err("No IPCP with pid %d.", info.pid);
- ret = -EIPCP;
- goto fail;
- }
-
- algo = (enum hash_algo) layer.dir_hash_algo;
-
- if (reg_get_name_for_hash(name, algo, hash) < 0) {
- log_warn("No name for " HASH_FMT32 ".", HASH_VAL32(hash));
- ret = -ENAME;
- goto fail;
- }
-
- log_info("Flow request arrived for %s.", name);
-
- ret = wait_for_accept(name);
- if (ret < 0) {
- log_err("No active process for %s.", name);
- goto fail;
- }
-
- flow->id = ret;
- flow->state = FLOW_ALLOCATED;
-
- ret = reg_respond_accept(flow, data);
- if (ret < 0) {
- log_err("Failed to respond to flow %d.", flow->id);
- goto fail;
- }
-
- return 0;
- fail:
- return ret;
-}
-
static int flow_alloc_reply(struct flow_info * flow,
int response,
buffer_t * data)
@@ -1303,6 +1393,12 @@ static int flow_dealloc(struct flow_info * flow,
reg_dealloc_flow(flow);
+ if (reg_flow_is_direct(flow->id)) {
+ if (flow->state == FLOW_DEALLOCATED)
+ reg_destroy_flow(flow->id);
+ return 0;
+ }
+
if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) {
log_err("Failed to request dealloc from %d.", flow->n_1_pid);
return -EIPCP;
diff --git a/src/irmd/oap/cli.c b/src/irmd/oap/cli.c
index 507f3f81..8ecd317d 100644
--- a/src/irmd/oap/cli.c
+++ b/src/irmd/oap/cli.c
@@ -311,6 +311,9 @@ int oap_cli_prepare(void ** ctx,
*req_buf = s->local_hdr.hdr;
clrbuf(s->local_hdr.hdr);
+ /* oap_hdr_encode repoints id into hdr; restore to __id */
+ s->local_hdr.id = s->id;
+
crypt_free_crt(crt);
crypt_free_key(pkp);
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c
index 15497d35..93c3e128 100644
--- a/src/irmd/reg/flow.c
+++ b/src/irmd/reg/flow.c
@@ -80,7 +80,7 @@ void reg_flow_destroy(struct reg_flow * flow)
switch(flow->info.state) {
case FLOW_ACCEPT_PENDING:
- clrbuf(flow->data);
+ clrbuf(flow->req_data);
/* FALLTHRU */
default:
destroy_rbuffs(flow);
@@ -89,8 +89,10 @@ void reg_flow_destroy(struct reg_flow * flow)
assert(flow->n_rb == NULL);
assert(flow->n_1_rb == NULL);
- assert(flow->data.data == NULL);
- assert(flow->data.len == 0);
+ assert(flow->req_data.data == NULL);
+ assert(flow->req_data.len == 0);
+ assert(flow->rsp_data.data == NULL);
+ assert(flow->rsp_data.len == 0);
assert(list_is_empty(&flow->next));
@@ -186,30 +188,3 @@ int reg_flow_update(struct reg_flow * flow,
fail:
return -ENOMEM;
}
-
-void reg_flow_set_data(struct reg_flow * flow,
- const buffer_t * buf)
-{
- assert(flow != NULL);
- assert(buf != NULL);
- assert(flow->data.data == NULL);
- assert(flow->data.len == 0);
-
- flow->data = *buf;
-}
-
-void reg_flow_get_data(struct reg_flow * flow,
- buffer_t * buf)
-{
- assert(flow != NULL);
- assert(buf != NULL);
-
- *buf = flow->data;
-
- clrbuf(flow->data);
-}
-
-void reg_flow_free_data(struct reg_flow * flow)
-{
- freebuf(flow->data);
-}
diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h
index d0078e1b..9a4046d3 100644
--- a/src/irmd/reg/flow.h
+++ b/src/irmd/reg/flow.h
@@ -31,6 +31,7 @@
#include <ouroboros/ssm_rbuff.h>
#include <ouroboros/utils.h>
+#include <stdbool.h>
#include <sys/types.h>
#include <time.h>
@@ -40,11 +41,14 @@ struct reg_flow {
struct flow_info info;
int response;
- buffer_t data;
+ buffer_t req_data;
+ buffer_t rsp_data;
struct timespec t0;
char name[NAME_SIZE + 1];
+ bool direct;
+
struct ssm_rbuff * n_rb;
struct ssm_rbuff * n_1_rb;
};
@@ -56,12 +60,4 @@ void reg_flow_destroy(struct reg_flow * flow);
int reg_flow_update(struct reg_flow * flow,
struct flow_info * info);
-void reg_flow_set_data(struct reg_flow * flow,
- const buffer_t * buf);
-
-void reg_flow_get_data(struct reg_flow * flow,
- buffer_t * buf);
-
-void reg_flow_free_data(struct reg_flow * flow);
-
#endif /* OUROBOROS_IRMD_REG_FLOW_H */
diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c
index 64aa1513..0025f695 100644
--- a/src/irmd/reg/reg.c
+++ b/src/irmd/reg/reg.c
@@ -1785,7 +1785,8 @@ int reg_wait_flow_allocated(struct flow_info * info,
}
if (flow != NULL) {
- reg_flow_get_data(flow, pbuf);
+ *pbuf = flow->rsp_data;
+ clrbuf(flow->rsp_data);
*info = flow->info;
}
@@ -1820,8 +1821,8 @@ int reg_respond_alloc(struct flow_info * info,
}
assert(flow->info.state == FLOW_ALLOC_PENDING);
- assert(flow->data.len == 0);
- assert(flow->data.data == NULL);
+ assert(flow->rsp_data.len == 0);
+ assert(flow->rsp_data.data == NULL);
info->n_pid = flow->info.n_pid;
info->n_1_pid = flow->info.n_pid;
@@ -1833,8 +1834,10 @@ int reg_respond_alloc(struct flow_info * info,
flow->response = response;
- if (info->state == FLOW_ALLOCATED)
- reg_flow_set_data(flow, pbuf);
+ if (info->state == FLOW_ALLOCATED) {
+ flow->rsp_data = *pbuf;
+ clrbuf(*pbuf);
+ }
pthread_cond_broadcast(&reg.cond);
@@ -1944,7 +1947,8 @@ int reg_wait_flow_accepted(struct flow_info * info,
pthread_cleanup_pop(true); /* __cleanup_wait_accept */
if (flow != NULL) {
- reg_flow_get_data(flow, pbuf);
+ *pbuf = flow->req_data;
+ clrbuf(flow->req_data);
*info = flow->info;
}
@@ -2004,8 +2008,63 @@ int reg_respond_accept(struct flow_info * info,
info->n_pid = flow->info.n_pid;
- reg_flow_set_data(flow, pbuf);
- clrbuf(pbuf);
+ flow->req_data = *pbuf;
+ clrbuf(*pbuf);
+
+ if (reg_flow_update(flow, info) < 0) {
+ log_err("Failed to create flow structs.");
+ goto fail_flow;
+ }
+
+ pthread_cond_broadcast(&reg.cond);
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return 0;
+
+ fail_flow:
+ pthread_mutex_unlock(&reg.mtx);
+ return -1;
+}
+
+int reg_prepare_flow_direct(struct flow_info * info,
+ buffer_t * pbuf,
+ uid_t alloc_uid)
+{
+ struct reg_flow * flow;
+ struct reg_proc * proc;
+ uid_t accept_uid = 0;
+
+ assert(info != NULL);
+ assert(info->state == FLOW_ALLOCATED);
+ assert(info->n_1_pid != 0);
+ assert(pbuf != NULL);
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(info->id);
+ if (flow == NULL) {
+ log_err("Flow not found: %d.", info->id);
+ goto fail_flow;
+ }
+
+ assert(flow->info.state == FLOW_ACCEPT_PENDING);
+
+ info->n_pid = flow->info.n_pid;
+
+ proc = __reg_get_proc(info->n_pid);
+ if (proc != NULL && !is_ouroboros_member_uid(proc->info.uid))
+ accept_uid = proc->info.uid;
+
+ if (alloc_uid != accept_uid) {
+ pthread_mutex_unlock(&reg.mtx);
+ return -EPERM;
+ }
+
+ flow->direct = true;
+
+ flow->req_data = *pbuf;
+ clrbuf(*pbuf);
if (reg_flow_update(flow, info) < 0) {
log_err("Failed to create flow structs.");
@@ -2023,6 +2082,109 @@ int reg_respond_accept(struct flow_info * info,
return -1;
}
+bool reg_flow_is_direct(int flow_id)
+{
+ struct reg_flow * flow;
+ bool ret;
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+
+ ret = flow != NULL && flow->direct;
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return ret;
+}
+
+int reg_respond_flow_direct(int flow_id,
+ buffer_t * pbuf)
+{
+ struct reg_flow * flow;
+
+ assert(pbuf != NULL);
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow == NULL) {
+ log_err("Flow %d not found.", flow_id);
+ goto fail;
+ }
+
+ assert(flow->direct);
+ assert(flow->rsp_data.data == NULL);
+
+ flow->rsp_data = *pbuf;
+ clrbuf(*pbuf);
+
+ pthread_cond_broadcast(&reg.cond);
+
+ pthread_mutex_unlock(&reg.mtx);
+
+ return 0;
+ fail:
+ pthread_mutex_unlock(&reg.mtx);
+ return -1;
+}
+
+int reg_wait_flow_direct(int flow_id,
+ buffer_t * pbuf,
+ const struct timespec * abstime)
+{
+ struct reg_flow * flow;
+ int ret = -1;
+
+ assert(pbuf != NULL);
+
+ pthread_mutex_lock(&reg.mtx);
+
+ flow = __reg_get_flow(flow_id);
+ if (flow == NULL)
+ goto fail;
+
+ assert(flow->direct);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &reg.mtx);
+
+ while (flow != NULL && flow->rsp_data.data == NULL) {
+ ret = -__timedwait(&reg.cond, &reg.mtx, abstime);
+ if (ret == -ETIMEDOUT)
+ break;
+ flow = __reg_get_flow(flow_id);
+ }
+
+ if (flow != NULL && flow->rsp_data.data != NULL) {
+ *pbuf = flow->rsp_data;
+ clrbuf(flow->rsp_data);
+ ret = 0;
+ }
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+ fail:
+ pthread_mutex_unlock(&reg.mtx);
+ return -1;
+}
+
+static int direct_flow_dealloc(struct reg_flow * flow,
+ pid_t pid)
+{
+ if (!flow->direct)
+ return -1;
+
+ if (pid == flow->info.n_pid && flow->info.n_pid != -1)
+ flow->info.n_pid = -1;
+ else if (pid == flow->info.n_1_pid && flow->info.n_1_pid != -1)
+ flow->info.n_1_pid = -1;
+ else
+ return -1;
+
+ return 0;
+}
+
void reg_dealloc_flow(struct flow_info * info)
{
struct reg_flow * flow;
@@ -2036,13 +2198,32 @@ void reg_dealloc_flow(struct flow_info * info)
flow = __reg_get_flow(info->id);
assert(flow != NULL);
- assert(flow->data.data == NULL);
- assert(flow->data.len == 0);
+ assert(flow->req_data.data == NULL);
+ assert(flow->req_data.len == 0);
+ assert(flow->rsp_data.data == NULL);
+ assert(flow->rsp_data.len == 0);
+
+ info->n_1_pid = flow->info.n_1_pid;
+
+ if (flow->info.state == FLOW_DEALLOC_PENDING) {
+ if (direct_flow_dealloc(flow, info->n_pid) < 0) {
+ info->state = FLOW_DEALLOC_PENDING;
+ pthread_mutex_unlock(&reg.mtx);
+ return;
+ }
+ flow->info.state = FLOW_DEALLOCATED;
+ info->state = FLOW_DEALLOCATED;
+ reg_flow_update(flow, info);
+ pthread_mutex_unlock(&reg.mtx);
+ return;
+ }
+
assert(flow->info.state == FLOW_ALLOCATED);
flow->info.state = FLOW_DEALLOC_PENDING;
info->state = FLOW_DEALLOC_PENDING;
- info->n_1_pid = flow->info.n_1_pid;
+
+ direct_flow_dealloc(flow, info->n_pid);
memset(flow->name, 0, sizeof(flow->name));
@@ -2064,8 +2245,10 @@ void reg_dealloc_flow_resp(struct flow_info * info)
flow = __reg_get_flow(info->id);
assert(flow != NULL);
- assert(flow->data.data == NULL);
- assert(flow->data.len == 0);
+ assert(flow->req_data.data == NULL);
+ assert(flow->req_data.len == 0);
+ assert(flow->rsp_data.data == NULL);
+ assert(flow->rsp_data.len == 0);
assert(flow->info.state == FLOW_DEALLOC_PENDING);
flow->info.state = FLOW_DEALLOCATED;
diff --git a/src/irmd/reg/reg.h b/src/irmd/reg/reg.h
index bda57711..6b576471 100644
--- a/src/irmd/reg/reg.h
+++ b/src/irmd/reg/reg.h
@@ -150,6 +150,19 @@ int reg_wait_flow_accepting(const char * name,
int reg_respond_accept(struct flow_info * info,
buffer_t * pbuf);
+int reg_prepare_flow_direct(struct flow_info * info,
+ buffer_t * pbuf,
+ uid_t alloc_uid);
+
+int reg_respond_flow_direct(int flow_id,
+ buffer_t * pbuf);
+
+int reg_wait_flow_direct(int flow_id,
+ buffer_t * pbuf,
+ const struct timespec * abstime);
+
+bool reg_flow_is_direct(int flow_id);
+
void reg_dealloc_flow(struct flow_info * info);
void reg_dealloc_flow_resp(struct flow_info * info);
diff --git a/src/irmd/reg/tests/flow_test.c b/src/irmd/reg/tests/flow_test.c
index bfdbceb5..7e1c1360 100644
--- a/src/irmd/reg/tests/flow_test.c
+++ b/src/irmd/reg/tests/flow_test.c
@@ -24,10 +24,6 @@
#include <test/test.h>
-#include <string.h>
-
-#define TEST_DATA "testpiggybackdata"
-
static int test_reg_flow_create_destroy(void)
{
struct reg_flow * f;
@@ -219,56 +215,6 @@ static int test_reg_flow_assert_fails(void)
return ret;
}
-static int test_flow_data(void)
-{
- struct reg_flow * f;
-
- struct flow_info info = {
- .id = 1,
- .n_pid = 1,
- .qs = qos_raw,
- .state = FLOW_INIT
- };
-
- char * data;
- buffer_t buf;
- buffer_t rcv = {0, NULL};
-
- TEST_START();
-
- data = strdup(TEST_DATA);
- if (data == NULL) {
- printf("Failed to strdup data.\n");
- goto fail;
- }
-
- buf.data = (uint8_t *) data;
- buf.len = strlen(data);
-
- f = reg_flow_create(&info);
- if (f == NULL) {
- printf("Failed to create flow.\n");
- goto fail;
- }
-
- reg_flow_set_data(f, &buf);
-
- reg_flow_get_data(f, &rcv);
-
- freebuf(buf);
- clrbuf(rcv);
-
- reg_flow_destroy(f);
-
- TEST_SUCCESS();
-
- return TEST_RC_SUCCESS;
- fail:
- free(data);
- TEST_FAIL();
- return TEST_RC_FAIL;
-}
-
int flow_test(int argc,
char ** argv)
{
@@ -280,7 +226,6 @@ int flow_test(int argc,
ret |= test_reg_flow_create_destroy();
ret |= test_reg_flow_update();
ret |= test_reg_flow_assert_fails();
- ret |= test_flow_data();
return ret;
}
diff --git a/src/irmd/reg/tests/reg_test.c b/src/irmd/reg/tests/reg_test.c
index 4d7e30ef..eb981349 100644
--- a/src/irmd/reg/tests/reg_test.c
+++ b/src/irmd/reg/tests/reg_test.c
@@ -485,6 +485,188 @@ static int test_reg_allocate_flow_fail(void)
return TEST_RC_FAIL;
}
+struct direct_alloc_info {
+ struct flow_info info;
+ buffer_t rsp;
+ struct timespec abstime;
+};
+
+static void * test_flow_alloc_direct(void * o)
+{
+ struct direct_alloc_info * dai;
+ buffer_t req;
+
+ dai = (struct direct_alloc_info *) o;
+
+ req.data = (uint8_t *) strdup(TEST_DATA);
+ if (req.data == NULL) {
+ printf("Failed to strdup req data.\n");
+ goto fail;
+ }
+ req.len = strlen(TEST_DATA) + 1;
+
+ if (reg_prepare_flow_direct(&dai->info, &req, 0) < 0) {
+ printf("Failed to prepare direct flow.\n");
+ freebuf(req);
+ goto fail;
+ }
+
+ if (reg_wait_flow_direct(dai->info.id, &dai->rsp, &dai->abstime) < 0) {
+ printf("Failed to wait direct flow.\n");
+ goto fail;
+ }
+
+ return (void *) 0;
+ fail:
+ return (void *) -1;
+}
+
+static int test_reg_direct_flow_success(void)
+{
+ pthread_t thr;
+ struct timespec abstime;
+ struct timespec timeo = TIMESPEC_INIT_S(1);
+ buffer_t rbuf = BUF_INIT;
+ buffer_t rsp;
+ struct direct_alloc_info dai;
+ void * thr_ret;
+
+ struct flow_info info = {
+ .n_pid = TEST_PID,
+ .qs = qos_raw
+ };
+
+ TEST_START();
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+ ts_add(&abstime, &timeo, &abstime);
+
+ if (reg_init() < 0) {
+ printf("Failed to init registry.\n");
+ goto fail;
+ }
+
+ if (reg_create_flow(&info) < 0) {
+ printf("Failed to add flow.\n");
+ goto fail;
+ }
+
+ if (reg_prepare_flow_accept(&info) < 0) {
+ printf("Failed to prepare for accept.\n");
+ goto fail;
+ }
+
+ dai.info.id = info.id;
+ dai.info.n_1_pid = TEST_N_1_PID;
+ dai.info.mpl = TEST_MPL;
+ dai.info.qs = qos_data;
+ dai.info.state = FLOW_ALLOCATED;
+ dai.rsp.len = 0;
+ dai.rsp.data = NULL;
+ dai.abstime = abstime;
+
+ pthread_create(&thr, NULL, test_flow_alloc_direct, &dai);
+
+ if (reg_wait_flow_accepted(&info, &rbuf, &abstime) < 0) {
+ printf("Flow accept failed.\n");
+ pthread_join(thr, NULL);
+ reg_destroy_flow(info.id);
+ reg_fini();
+ goto fail;
+ }
+
+ if (info.state != FLOW_ALLOCATED) {
+ printf("Flow not in allocated state.\n");
+ goto fail;
+ }
+
+ if (rbuf.data == NULL) {
+ printf("req_data not received.\n");
+ goto fail;
+ }
+
+ if (strcmp((char *) rbuf.data, TEST_DATA) != 0) {
+ printf("req_data content mismatch.\n");
+ goto fail;
+ }
+
+ freebuf(rbuf);
+
+ if (!reg_flow_is_direct(info.id)) {
+ printf("Flow not marked direct.\n");
+ goto fail;
+ }
+
+ rsp.data = (uint8_t *) strdup(TEST_DATA2);
+ if (rsp.data == NULL) {
+ printf("Failed to strdup rsp data.\n");
+ goto fail;
+ }
+ rsp.len = strlen(TEST_DATA2) + 1;
+
+ if (reg_respond_flow_direct(info.id, &rsp) < 0) {
+ printf("Failed to respond direct.\n");
+ freebuf(rsp);
+ goto fail;
+ }
+
+ pthread_join(thr, &thr_ret);
+
+ if (thr_ret != (void *) 0) {
+ printf("Allocator thread failed.\n");
+ goto fail;
+ }
+
+ if (dai.rsp.data == NULL) {
+ printf("rsp_data not received.\n");
+ goto fail;
+ }
+
+ if (strcmp((char *) dai.rsp.data, TEST_DATA2) != 0) {
+ printf("rsp_data content mismatch.\n");
+ goto fail;
+ }
+
+ freebuf(dai.rsp);
+
+ reg_dealloc_flow(&info);
+
+ if (info.state != FLOW_DEALLOC_PENDING) {
+ printf("Flow not in dealloc pending.\n");
+ goto fail;
+ }
+
+ info.n_pid = TEST_PID;
+
+ reg_dealloc_flow(&info);
+
+ if (info.state != FLOW_DEALLOC_PENDING) {
+ printf("Same endpoint dealloc changed state.\n");
+ goto fail;
+ }
+
+ info.n_pid = TEST_N_1_PID;
+
+ reg_dealloc_flow(&info);
+
+ if (info.state != FLOW_DEALLOCATED) {
+ printf("Flow not deallocated.\n");
+ goto fail;
+ }
+
+ reg_destroy_flow(info.id);
+
+ reg_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail:
+ REG_TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
static int test_reg_flow(void) {
int rc = 0;
@@ -493,6 +675,7 @@ static int test_reg_flow(void) {
rc |= test_reg_accept_flow_success();
rc |= test_reg_accept_flow_success_no_crypt();
rc |= test_reg_allocate_flow_fail();
+ rc |= test_reg_direct_flow_success();
return rc;
}
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 86796552..763c0d62 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -72,17 +72,19 @@
"and reports the Round Trip Time (RTT)\n" \
"\n" \
" -l, --listen Run in server mode\n" \
+" --poll Server uses polling (lower latency)\n" \
+" --busy Server uses busy-poll (single flow)\n" \
"\n" \
" -c, --count Number of packets\n" \
" -d, --duration Duration of the test (default 1s)\n" \
" -f, --flood Send back-to-back without waiting\n" \
+" -F, --flood-busy Flood with busy-polling (lower latency)\n" \
" -i, --interval Interval (default 1000ms)\n" \
" -n, --server-name Name of the oping server\n" \
-" -q, --qos QoS (raw, best, video, voice, data)\n" \
+" -q, --qos QoS (raw, best, video, voice, data)\n" \
" -s, --size Payload size (B, default 64)\n" \
" -Q, --quiet Only print final statistics\n" \
" -D, --timeofday Print time of day before each line\n" \
-" --poll Server uses polling (lower latency)\n" \
"\n" \
" --help Display this help text and exit\n" \
@@ -93,6 +95,7 @@ struct {
int size;
bool timestamp;
bool flood;
+ bool flood_busy;
qosspec_t qs;
/* stats */
@@ -118,6 +121,7 @@ struct {
bool quiet;
bool poll;
+ bool busy;
pthread_t cleaner_pt;
pthread_t accept_pt;
@@ -177,10 +181,12 @@ int main(int argc,
client.count = INT_MAX;
client.timestamp = false;
client.flood = false;
+ client.flood_busy = false;
client.qs = qos_raw;
client.quiet = false;
server.quiet = false;
server.poll = false;
+ server.busy = false;
while (argc > 0) {
if ((strcmp(*argv, "-i") == 0 ||
@@ -221,6 +227,9 @@ int main(int argc,
} else if (strcmp(*argv, "-f") == 0 ||
strcmp(*argv, "--flood") == 0) {
client.flood = true;
+ } else if (strcmp(*argv, "-F") == 0 ||
+ strcmp(*argv, "--flood-busy") == 0) {
+ client.flood_busy = true;
} else if (strcmp(*argv, "-D") == 0 ||
strcmp(*argv, "--timeofday") == 0) {
client.timestamp = true;
@@ -230,6 +239,8 @@ int main(int argc,
server.quiet = true;
} else if (strcmp(*argv, "--poll") == 0) {
server.poll = true;
+ } else if (strcmp(*argv, "--busy") == 0) {
+ server.busy = true;
} else {
goto fail;
}
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index 18dd3078..23807f65 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -67,6 +67,26 @@ static void update_rtt_stats(double ms)
client.rtt_m2 += d * (ms - client.rtt_avg);
}
+static double rtt_val(double ms)
+{
+ return ms < 0.1 ? ms * 1000 : ms;
+}
+
+static const char * rtt_unit(double ms)
+{
+ return ms < 0.1 ? "µs" : "ms";
+}
+
+static void print_rtt(int len, int seq,
+ double ms, const char * suf)
+{
+ printf("%d bytes from %s: seq=%d "
+ "time=%.3f %s%s\n",
+ len, client.s_apn, seq,
+ rtt_val(ms), rtt_unit(ms),
+ suf != NULL ? suf : "");
+}
+
void * reader(void * o)
{
struct timespec timeout = {client.interval / 1000 + 2, 0};
@@ -127,12 +147,9 @@ void * reader(void * o)
(size_t) rtc.tv_nsec / 1000);
}
- printf("%d bytes from %s: seq=%d time=%.3f ms%s\n",
- msg_len,
- client.s_apn,
- ntohl(msg->id),
- ms,
- id < exp_id ? " [out-of-order]" : "");
+ print_rtt(msg_len, ntohl(msg->id), ms,
+ id < exp_id ?
+ " [out-of-order]" : NULL);
}
update_rtt_stats(ms);
@@ -223,16 +240,87 @@ static void print_stats(struct timespec * tic,
printf("time: %.3f ms\n", ts_diff_us(toc, tic) / 1000.0);
if (client.rcvd > 0) {
+ double a = client.rtt_avg;
+ double f = a < 0.1 ? 1000 : 1;
printf("rtt min/avg/max/mdev = %.3f/%.3f/%.3f/",
- client.rtt_min,
- client.rtt_avg,
- client.rtt_max);
+ client.rtt_min * f, client.rtt_avg * f,
+ client.rtt_max * f);
if (client.rcvd > 1)
- printf("%.3f ms\n",
- sqrt(client.rtt_m2 / (client.rcvd - 1)));
+ printf("%.3f %s\n",
+ sqrt(client.rtt_m2 /
+ (client.rcvd - 1)) * f,
+ rtt_unit(a));
else
- printf("NaN ms\n");
+ printf("NaN %s\n", rtt_unit(a));
+ }
+}
+
+static int flood_busy_ping(int fd)
+{
+ char buf[OPING_BUF_SIZE];
+ struct oping_msg * msg = (struct oping_msg *) buf;
+ struct timespec sent;
+ struct timespec rcvd;
+ double ms;
+ int n;
+
+ memset(buf, 0, client.size);
+
+ fccntl(fd, FLOWSFLAGS,
+ FLOWFRDWR | FLOWFRNOPART | FLOWFRNOBLOCK);
+
+ if (!client.quiet)
+ printf("Pinging %s with %d bytes"
+ " of data (%u packets,"
+ " busy-poll):\n\n",
+ client.s_apn, client.size,
+ client.count);
+
+ while (!stop && client.sent < client.count) {
+ clock_gettime(CLOCK_MONOTONIC, &sent);
+
+ msg->type = htonl(ECHO_REQUEST);
+ msg->id = htonl(client.sent);
+ msg->tv_sec = sent.tv_sec;
+ msg->tv_nsec = sent.tv_nsec;
+
+ if (flow_write(fd, buf,
+ client.size) < 0) {
+ printf("Failed to send "
+ "packet.\n");
+ break;
+ }
+
+ ++client.sent;
+
+ do {
+ n = flow_read(fd, buf,
+ OPING_BUF_SIZE);
+ } while (n == -EAGAIN && !stop);
+
+ if (n < 0)
+ break;
+
+ clock_gettime(CLOCK_MONOTONIC, &rcvd);
+
+ if (ntohl(msg->type) != ECHO_REPLY)
+ continue;
+
+ ++client.rcvd;
+
+ sent.tv_sec = msg->tv_sec;
+ sent.tv_nsec = msg->tv_nsec;
+ ms = ts_diff_us(&rcvd, &sent) / 1000.0;
+
+ update_rtt_stats(ms);
+
+ if (!client.quiet)
+ print_rtt(client.size,
+ ntohl(msg->id), ms,
+ NULL);
}
+
+ return 0;
}
static int flood_ping(int fd)
@@ -283,9 +371,9 @@ static int flood_ping(int fd)
update_rtt_stats(ms);
if (!client.quiet)
- printf("%d bytes from %s: seq=%d time=%.3f ms\n",
- client.size, client.s_apn,
- ntohl(msg->id), ms);
+ print_rtt(client.size,
+ ntohl(msg->id), ms,
+ NULL);
}
return 0;
@@ -337,7 +425,9 @@ static int client_main(void)
clock_gettime(CLOCK_REALTIME, &tic);
- if (client.flood)
+ if (client.flood_busy)
+ flood_busy_ping(fd);
+ else if (client.flood)
flood_ping(fd);
else
threaded_ping(fd);
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 1670ebf3..33af28c4 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -138,7 +138,10 @@ void * accept_thread(void * o)
(void) o;
- printf("Ouroboros ping server started.\n");
+ printf("Ouroboros ping server started.");
+ if (server.busy)
+ printf(" [busy-poll]");
+ printf("\n");
while (true) {
fd = flow_accept(&qs, NULL);
@@ -158,12 +161,56 @@ void * accept_thread(void * o)
pthread_mutex_unlock(&server.lock);
fccntl(fd, FLOWSFLAGS,
- FLOWFRNOBLOCK | FLOWFRDWR | FLOWFRNOPART);
+ FLOWFRNOBLOCK | FLOWFRDWR
+ | FLOWFRNOPART);
}
return (void *) 0;
}
+void * busy_thread(void * o)
+{
+ char buf[OPING_BUF_SIZE];
+ struct oping_msg * msg = (struct oping_msg *) buf;
+ int fd;
+ int msg_len;
+
+ (void) o;
+
+ /* Accept a single flow. */
+ fd = flow_accept(NULL, NULL);
+ if (fd < 0) {
+ printf("Failed to accept flow.\n");
+ return (void *) -1;
+ }
+
+ printf("New flow %d (busy-poll).\n", fd);
+
+ fccntl(fd, FLOWSFLAGS,
+ FLOWFRNOBLOCK | FLOWFRDWR
+ | FLOWFRNOPART);
+
+ while (true) {
+ msg_len = flow_read(fd, buf,
+ OPING_BUF_SIZE);
+ if (msg_len == -EAGAIN)
+ continue;
+ if (msg_len < 0)
+ break;
+
+ if (ntohl(msg->type) != ECHO_REQUEST)
+ continue;
+
+ msg->type = htonl(ECHO_REPLY);
+
+ flow_write(fd, buf, msg_len);
+ }
+
+ flow_dealloc(fd);
+
+ return (void *) 0;
+}
+
int server_main(void)
{
struct sigaction sig_act;
@@ -191,12 +238,21 @@ int server_main(void)
}
pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);
- pthread_create(&server.accept_pt, NULL, accept_thread, NULL);
- pthread_create(&server.server_pt, NULL, server_thread, NULL);
- pthread_join(server.accept_pt, NULL);
+ if (server.busy) {
+ pthread_create(&server.server_pt, NULL,
+ busy_thread, NULL);
+ pthread_join(server.server_pt, NULL);
+ pthread_cancel(server.cleaner_pt);
+ } else {
+ pthread_create(&server.accept_pt, NULL,
+ accept_thread, NULL);
+ pthread_create(&server.server_pt, NULL,
+ server_thread, NULL);
+ pthread_join(server.accept_pt, NULL);
+ pthread_cancel(server.server_pt);
+ }
- pthread_cancel(server.server_pt);
pthread_cancel(server.cleaner_pt);
fset_destroy(server.flows);