diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-02-19 22:03:16 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-02-22 16:02:16 +0100 |
| commit | c3636005831064e71b03a5f8796a21e89b2a714f (patch) | |
| tree | ca57f7d09e9de015107edb1bda6f30654bf7699b /src/irmd/main.c | |
| parent | 1bf1d33db3e7622c8b97c5518f0f0ff984b989a8 (diff) | |
| download | ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.tar.gz ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.zip | |
irmd: Allow direct rbuff between local processes
This allows bypassing the IPCP for local processes that share the same
packet pool, lowering latency between processes to comparable levels
as Unix sockets (RTT in the order of a microsecond).
For local processes, no IPCPs are needed:
$ irm b prog oping n oping
$ oping -l
Ouroboros ping server started.
New flow 64.
Received 64 bytes on fd 64.
The direct IPC can be disabled with the DISABLE_DIRECT_IPC build
flag. Note that this is needed for rumba 'local' experiments to
emulate network topologies. Without this flag all processes will just
communicate directly.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd/main.c')
| -rw-r--r-- | src/irmd/main.c | 302 |
1 files changed, 199 insertions, 103 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index c7a5715b..e610a015 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -86,6 +86,7 @@ #define TIMESYNC_SLACK 100 /* ms */ #define OAP_SEEN_TIMER 20 /* s */ #define DEALLOC_TIME 300 /* s */ +#define DIRECT_MPL 1 /* s */ enum irm_state { IRMD_NULL = 0, @@ -914,21 +915,29 @@ static int flow_accept(struct flow_info * flow, goto fail_oap; } - if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) { + if (reg_flow_is_direct(flow->id)) { + if (reg_respond_flow_direct(flow->id, &resp_hdr) < 0) { + log_err("Failed to respond to direct flow."); + goto fail_resp; + } + log_info("Flow %d accepted (direct) by %d for %s.", + flow->id, flow->n_pid, name); + } else if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) { log_err("Failed to respond to flow allocation."); goto fail_resp; + } else { + log_info("Flow %d accepted by %d for %s (uid %d).", + flow->id, flow->n_pid, name, flow->uid); } - log_info("Flow %d accepted by %d for %s (uid %d).", - flow->id, flow->n_pid, name, flow->uid); - freebuf(req_hdr); freebuf(resp_hdr); return 0; fail_oap: - ipcp_flow_alloc_resp(flow, err, resp_hdr); + if (!reg_flow_is_direct(flow->id)) + ipcp_flow_alloc_resp(flow, err, resp_hdr); fail_wait: reg_destroy_flow(flow->id); fail_flow: @@ -1028,7 +1037,7 @@ static int get_ipcp_by_dst(const char * dst, pid_t * pid, buffer_t * hash) { - ipcp_list_msg_t ** ipcps; + ipcp_list_msg_t ** ipcps = NULL; int n; int i; int err = -EIPCP; @@ -1081,6 +1090,171 @@ static int get_ipcp_by_dst(const char * dst, return err; } +static int wait_for_accept(const char * name) +{ + struct timespec timeo = TIMESPEC_INIT_MS(IRMD_REQ_ARR_TIMEOUT); + struct timespec abstime; + char ** exec; + int ret; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeo, &abstime); + + ret = reg_wait_flow_accepting(name, &abstime); + if (ret == -ETIMEDOUT) { + if (reg_get_exec(name, &exec) < 0) { + log_dbg("No program bound for %s.", name); + goto fail; + } + + if (spawn_program(exec) < 0) { + log_err("Failed to start %s for %s.", exec[0], name); + goto fail_spawn; + } + + log_info("Starting %s for %s.", exec[0], name); + + ts_add(&abstime, &timeo, &abstime); + + ret = reg_wait_flow_accepting(name, &abstime); + if (ret == -ETIMEDOUT) + goto fail_spawn; + + argvfree(exec); + } + + return ret; + + fail_spawn: + argvfree(exec); + fail: + return -1; +} + +static int flow_req_arr(struct flow_info * flow, + const uint8_t * hash, + buffer_t * data) +{ + struct ipcp_info info; + struct layer_info layer; + enum hash_algo algo; + int ret; + char name[NAME_SIZE + 1]; + + info.pid = flow->n_1_pid; + + log_dbg("Flow req arrived from IPCP %d for " HASH_FMT32 ".", + info.pid, HASH_VAL32(hash)); + + if (reg_get_ipcp(&info, &layer) < 0) { + log_err("No IPCP with pid %d.", info.pid); + ret = -EIPCP; + goto fail; + } + + algo = (enum hash_algo) layer.dir_hash_algo; + + if (reg_get_name_for_hash(name, algo, hash) < 0) { + log_warn("No name for " HASH_FMT32 ".", HASH_VAL32(hash)); + ret = -ENAME; + goto fail; + } + + log_info("Flow request arrived for %s.", name); + + ret = wait_for_accept(name); + if (ret < 0) { + log_err("No active process for %s.", name); + goto fail; + } + + flow->id = ret; + flow->state = FLOW_ALLOCATED; + + ret = reg_respond_accept(flow, data); + if (ret < 0) { + log_err("Failed to respond to flow %d.", flow->id); + goto fail; + } + + return 0; + fail: + return ret; +} + +#ifndef DISABLE_DIRECT_IPC +static int flow_alloc_direct(const char * dst, + struct flow_info * flow, + buffer_t * data, + struct timespec * abstime, + struct crypt_sk * sk, + struct name_info * info) +{ + struct flow_info acc; /* server side flow */ + buffer_t req_hdr = BUF_INIT; + buffer_t resp_hdr = BUF_INIT; + void * ctx; + int err; + + acc.id = wait_for_accept(dst); + if (acc.id < 0) { + log_dbg("No accepting process for %s.", dst); + return -EAGAIN; + } + + if (oap_cli_prepare(&ctx, info, &req_hdr, *data) < 0) { + log_err("Failed to prepare OAP for %s.", dst); + return -EBADF; + } + + acc.n_1_pid = flow->n_pid; + acc.mpl = DIRECT_MPL; + acc.qs = flow->qs; + acc.state = FLOW_ALLOCATED; + + err = reg_prepare_flow_direct(&acc, &req_hdr, flow->uid); + if (err == -EPERM) { + log_dbg("UID mismatch, falling back."); + oap_ctx_free(ctx); + freebuf(req_hdr); + return -EPERM; + } + + if (err < 0) { + log_err("Failed to prepare direct flow."); + oap_ctx_free(ctx); + freebuf(req_hdr); + return -EBADF; + } + + err = reg_wait_flow_direct(acc.id, &resp_hdr, abstime); + if (err < 0) { + log_err("Timeout waiting for OAP response."); + oap_ctx_free(ctx); + return -ETIMEDOUT; + } + + err = oap_cli_complete(ctx, info, resp_hdr, data, sk); + if (err < 0) { + log_err("OAP completion failed for %s.", dst); + freebuf(resp_hdr); + return err; + } + + flow->id = acc.id; + flow->n_1_pid = acc.n_pid; + flow->mpl = DIRECT_MPL; + flow->state = FLOW_ALLOCATED; + + log_info("Flow %d allocated (direct) for %d to %s.", + flow->id, flow->n_pid, dst); + + freebuf(resp_hdr); + + return 0; +} +#endif /* DISABLE_DIRECT_IPC */ + static int flow_alloc(const char * dst, struct flow_info * flow, buffer_t * data, @@ -1104,17 +1278,25 @@ static int flow_alloc(const char * dst, goto fail_flow; } + flow->uid = reg_get_proc_uid(flow->n_pid); + + log_info("Allocating flow for %d to %s (uid %d).", + flow->n_pid, dst, flow->uid); + +#ifndef DISABLE_DIRECT_IPC + err = flow_alloc_direct(dst, flow, data, abstime, sk, &info); + if (err == 0) + return 0; + + if (err != -EPERM && err != -EAGAIN) + goto fail_flow; +#endif if (reg_create_flow(flow) < 0) { log_err("Failed to create flow."); err = -EBADF; goto fail_flow; } - flow->uid = reg_get_proc_uid(flow->n_pid); - - log_info("Allocating flow for %d to %s (uid %d).", - flow->n_pid, dst, flow->uid); - if (get_ipcp_by_dst(dst, &flow->n_1_pid, &hash) < 0) { log_err("Failed to find IPCP for %s.", dst); err = -EIPCP; @@ -1188,98 +1370,6 @@ static int flow_alloc(const char * dst, return err; } -static int wait_for_accept(const char * name) -{ - struct timespec timeo = TIMESPEC_INIT_MS(IRMD_REQ_ARR_TIMEOUT); - struct timespec abstime; - char ** exec; - int ret; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, &timeo, &abstime); - - ret = reg_wait_flow_accepting(name, &abstime); - if (ret == -ETIMEDOUT) { - if (reg_get_exec(name, &exec) < 0) { - log_dbg("No program bound for %s.", name); - goto fail; - } - - if (spawn_program(exec) < 0) { - log_err("Failed to start %s for %s.", exec[0], name); - goto fail_spawn; - } - - log_info("Starting %s for %s.", exec[0], name); - - ts_add(&abstime, &timeo, &abstime); - - ret = reg_wait_flow_accepting(name, &abstime); - if (ret == -ETIMEDOUT) - goto fail_spawn; - - argvfree(exec); - } - - return ret; - - fail_spawn: - argvfree(exec); - fail: - return -1; -} - -static int flow_req_arr(struct flow_info * flow, - const uint8_t * hash, - buffer_t * data) -{ - struct ipcp_info info; - struct layer_info layer; - enum hash_algo algo; - int ret; - char name[NAME_SIZE + 1]; - - info.pid = flow->n_1_pid; - - log_dbg("Flow req arrived from IPCP %d for " HASH_FMT32 ".", - info.pid, HASH_VAL32(hash)); - - if (reg_get_ipcp(&info, &layer) < 0) { - log_err("No IPCP with pid %d.", info.pid); - ret = -EIPCP; - goto fail; - } - - algo = (enum hash_algo) layer.dir_hash_algo; - - if (reg_get_name_for_hash(name, algo, hash) < 0) { - log_warn("No name for " HASH_FMT32 ".", HASH_VAL32(hash)); - ret = -ENAME; - goto fail; - } - - log_info("Flow request arrived for %s.", name); - - ret = wait_for_accept(name); - if (ret < 0) { - log_err("No active process for %s.", name); - goto fail; - } - - flow->id = ret; - flow->state = FLOW_ALLOCATED; - - ret = reg_respond_accept(flow, data); - if (ret < 0) { - log_err("Failed to respond to flow %d.", flow->id); - goto fail; - } - - return 0; - fail: - return ret; -} - static int flow_alloc_reply(struct flow_info * flow, int response, buffer_t * data) @@ -1303,6 +1393,12 @@ static int flow_dealloc(struct flow_info * flow, reg_dealloc_flow(flow); + if (reg_flow_is_direct(flow->id)) { + if (flow->state == FLOW_DEALLOCATED) + reg_destroy_flow(flow->id); + return 0; + } + if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) { log_err("Failed to request dealloc from %d.", flow->n_1_pid); return -EIPCP; |
