diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-02-19 22:03:16 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-02-22 16:02:16 +0100 |
| commit | c3636005831064e71b03a5f8796a21e89b2a714f (patch) | |
| tree | ca57f7d09e9de015107edb1bda6f30654bf7699b /src/irmd/reg/reg.c | |
| parent | 1bf1d33db3e7622c8b97c5518f0f0ff984b989a8 (diff) | |
| download | ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.tar.gz ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.zip | |
irmd: Allow direct rbuff between local processes
This allows bypassing the IPCP for local processes that share the same
packet pool, lowering latency between processes to comparable levels
as Unix sockets (RTT in the order of a microsecond).
For local processes, no IPCPs are needed:
$ irm b prog oping n oping
$ oping -l
Ouroboros ping server started.
New flow 64.
Received 64 bytes on fd 64.
The direct IPC can be disabled with the DISABLE_DIRECT_IPC build
flag. Note that this is needed for rumba 'local' experiments to
emulate network topologies. Without this flag all processes will just
communicate directly.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd/reg/reg.c')
| -rw-r--r-- | src/irmd/reg/reg.c | 209 |
1 files changed, 196 insertions, 13 deletions
diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c index 64aa1513..0025f695 100644 --- a/src/irmd/reg/reg.c +++ b/src/irmd/reg/reg.c @@ -1785,7 +1785,8 @@ int reg_wait_flow_allocated(struct flow_info * info, } if (flow != NULL) { - reg_flow_get_data(flow, pbuf); + *pbuf = flow->rsp_data; + clrbuf(flow->rsp_data); *info = flow->info; } @@ -1820,8 +1821,8 @@ int reg_respond_alloc(struct flow_info * info, } assert(flow->info.state == FLOW_ALLOC_PENDING); - assert(flow->data.len == 0); - assert(flow->data.data == NULL); + assert(flow->rsp_data.len == 0); + assert(flow->rsp_data.data == NULL); info->n_pid = flow->info.n_pid; info->n_1_pid = flow->info.n_pid; @@ -1833,8 +1834,10 @@ int reg_respond_alloc(struct flow_info * info, flow->response = response; - if (info->state == FLOW_ALLOCATED) - reg_flow_set_data(flow, pbuf); + if (info->state == FLOW_ALLOCATED) { + flow->rsp_data = *pbuf; + clrbuf(*pbuf); + } pthread_cond_broadcast(®.cond); @@ -1944,7 +1947,8 @@ int reg_wait_flow_accepted(struct flow_info * info, pthread_cleanup_pop(true); /* __cleanup_wait_accept */ if (flow != NULL) { - reg_flow_get_data(flow, pbuf); + *pbuf = flow->req_data; + clrbuf(flow->req_data); *info = flow->info; } @@ -2004,8 +2008,63 @@ int reg_respond_accept(struct flow_info * info, info->n_pid = flow->info.n_pid; - reg_flow_set_data(flow, pbuf); - clrbuf(pbuf); + flow->req_data = *pbuf; + clrbuf(*pbuf); + + if (reg_flow_update(flow, info) < 0) { + log_err("Failed to create flow structs."); + goto fail_flow; + } + + pthread_cond_broadcast(®.cond); + + pthread_mutex_unlock(®.mtx); + + return 0; + + fail_flow: + pthread_mutex_unlock(®.mtx); + return -1; +} + +int reg_prepare_flow_direct(struct flow_info * info, + buffer_t * pbuf, + uid_t alloc_uid) +{ + struct reg_flow * flow; + struct reg_proc * proc; + uid_t accept_uid = 0; + + assert(info != NULL); + assert(info->state == FLOW_ALLOCATED); + assert(info->n_1_pid != 0); + assert(pbuf != NULL); + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(info->id); + if (flow == NULL) { + log_err("Flow not found: %d.", info->id); + goto fail_flow; + } + + assert(flow->info.state == FLOW_ACCEPT_PENDING); + + info->n_pid = flow->info.n_pid; + + proc = __reg_get_proc(info->n_pid); + if (proc != NULL && !is_ouroboros_member_uid(proc->info.uid)) + accept_uid = proc->info.uid; + + if (alloc_uid != accept_uid) { + pthread_mutex_unlock(®.mtx); + return -EPERM; + } + + flow->direct = true; + + flow->req_data = *pbuf; + clrbuf(*pbuf); if (reg_flow_update(flow, info) < 0) { log_err("Failed to create flow structs."); @@ -2023,6 +2082,109 @@ int reg_respond_accept(struct flow_info * info, return -1; } +bool reg_flow_is_direct(int flow_id) +{ + struct reg_flow * flow; + bool ret; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + + ret = flow != NULL && flow->direct; + + pthread_mutex_unlock(®.mtx); + + return ret; +} + +int reg_respond_flow_direct(int flow_id, + buffer_t * pbuf) +{ + struct reg_flow * flow; + + assert(pbuf != NULL); + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow == NULL) { + log_err("Flow %d not found.", flow_id); + goto fail; + } + + assert(flow->direct); + assert(flow->rsp_data.data == NULL); + + flow->rsp_data = *pbuf; + clrbuf(*pbuf); + + pthread_cond_broadcast(®.cond); + + pthread_mutex_unlock(®.mtx); + + return 0; + fail: + pthread_mutex_unlock(®.mtx); + return -1; +} + +int reg_wait_flow_direct(int flow_id, + buffer_t * pbuf, + const struct timespec * abstime) +{ + struct reg_flow * flow; + int ret = -1; + + assert(pbuf != NULL); + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow == NULL) + goto fail; + + assert(flow->direct); + + pthread_cleanup_push(__cleanup_mutex_unlock, ®.mtx); + + while (flow != NULL && flow->rsp_data.data == NULL) { + ret = -__timedwait(®.cond, ®.mtx, abstime); + if (ret == -ETIMEDOUT) + break; + flow = __reg_get_flow(flow_id); + } + + if (flow != NULL && flow->rsp_data.data != NULL) { + *pbuf = flow->rsp_data; + clrbuf(flow->rsp_data); + ret = 0; + } + + pthread_cleanup_pop(true); + + return ret; + fail: + pthread_mutex_unlock(®.mtx); + return -1; +} + +static int direct_flow_dealloc(struct reg_flow * flow, + pid_t pid) +{ + if (!flow->direct) + return -1; + + if (pid == flow->info.n_pid && flow->info.n_pid != -1) + flow->info.n_pid = -1; + else if (pid == flow->info.n_1_pid && flow->info.n_1_pid != -1) + flow->info.n_1_pid = -1; + else + return -1; + + return 0; +} + void reg_dealloc_flow(struct flow_info * info) { struct reg_flow * flow; @@ -2036,13 +2198,32 @@ void reg_dealloc_flow(struct flow_info * info) flow = __reg_get_flow(info->id); assert(flow != NULL); - assert(flow->data.data == NULL); - assert(flow->data.len == 0); + assert(flow->req_data.data == NULL); + assert(flow->req_data.len == 0); + assert(flow->rsp_data.data == NULL); + assert(flow->rsp_data.len == 0); + + info->n_1_pid = flow->info.n_1_pid; + + if (flow->info.state == FLOW_DEALLOC_PENDING) { + if (direct_flow_dealloc(flow, info->n_pid) < 0) { + info->state = FLOW_DEALLOC_PENDING; + pthread_mutex_unlock(®.mtx); + return; + } + flow->info.state = FLOW_DEALLOCATED; + info->state = FLOW_DEALLOCATED; + reg_flow_update(flow, info); + pthread_mutex_unlock(®.mtx); + return; + } + assert(flow->info.state == FLOW_ALLOCATED); flow->info.state = FLOW_DEALLOC_PENDING; info->state = FLOW_DEALLOC_PENDING; - info->n_1_pid = flow->info.n_1_pid; + + direct_flow_dealloc(flow, info->n_pid); memset(flow->name, 0, sizeof(flow->name)); @@ -2064,8 +2245,10 @@ void reg_dealloc_flow_resp(struct flow_info * info) flow = __reg_get_flow(info->id); assert(flow != NULL); - assert(flow->data.data == NULL); - assert(flow->data.len == 0); + assert(flow->req_data.data == NULL); + assert(flow->req_data.len == 0); + assert(flow->rsp_data.data == NULL); + assert(flow->rsp_data.len == 0); assert(flow->info.state == FLOW_DEALLOC_PENDING); flow->info.state = FLOW_DEALLOCATED; |
