diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-02-19 22:03:16 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-02-22 16:02:16 +0100 |
| commit | c3636005831064e71b03a5f8796a21e89b2a714f (patch) | |
| tree | ca57f7d09e9de015107edb1bda6f30654bf7699b /src/irmd | |
| parent | 1bf1d33db3e7622c8b97c5518f0f0ff984b989a8 (diff) | |
| download | ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.tar.gz ouroboros-c3636005831064e71b03a5f8796a21e89b2a714f.zip | |
irmd: Allow direct rbuff between local processes
This allows bypassing the IPCP for local processes that share the same
packet pool, lowering latency between processes to comparable levels
as Unix sockets (RTT in the order of a microsecond).
For local processes, no IPCPs are needed:
$ irm b prog oping n oping
$ oping -l
Ouroboros ping server started.
New flow 64.
Received 64 bytes on fd 64.
The direct IPC can be disabled with the DISABLE_DIRECT_IPC build
flag. Note that this is needed for rumba 'local' experiments to
emulate network topologies. Without this flag all processes will just
communicate directly.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/config.h.in | 1 | ||||
| -rw-r--r-- | src/irmd/main.c | 302 | ||||
| -rw-r--r-- | src/irmd/oap/cli.c | 3 | ||||
| -rw-r--r-- | src/irmd/reg/flow.c | 35 | ||||
| -rw-r--r-- | src/irmd/reg/flow.h | 14 | ||||
| -rw-r--r-- | src/irmd/reg/reg.c | 209 | ||||
| -rw-r--r-- | src/irmd/reg/reg.h | 13 | ||||
| -rw-r--r-- | src/irmd/reg/tests/flow_test.c | 55 | ||||
| -rw-r--r-- | src/irmd/reg/tests/reg_test.c | 183 |
9 files changed, 605 insertions, 210 deletions
diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in index 2888ce37..df0cd718 100644 --- a/src/irmd/config.h.in +++ b/src/irmd/config.h.in @@ -74,6 +74,7 @@ #define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@ +#cmakedefine DISABLE_DIRECT_IPC #cmakedefine IRMD_KILL_ALL_PROCESSES #cmakedefine HAVE_LIBGCRYPT #cmakedefine HAVE_OPENSSL diff --git a/src/irmd/main.c b/src/irmd/main.c index c7a5715b..e610a015 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -86,6 +86,7 @@ #define TIMESYNC_SLACK 100 /* ms */ #define OAP_SEEN_TIMER 20 /* s */ #define DEALLOC_TIME 300 /* s */ +#define DIRECT_MPL 1 /* s */ enum irm_state { IRMD_NULL = 0, @@ -914,21 +915,29 @@ static int flow_accept(struct flow_info * flow, goto fail_oap; } - if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) { + if (reg_flow_is_direct(flow->id)) { + if (reg_respond_flow_direct(flow->id, &resp_hdr) < 0) { + log_err("Failed to respond to direct flow."); + goto fail_resp; + } + log_info("Flow %d accepted (direct) by %d for %s.", + flow->id, flow->n_pid, name); + } else if (ipcp_flow_alloc_resp(flow, 0, resp_hdr) < 0) { log_err("Failed to respond to flow allocation."); goto fail_resp; + } else { + log_info("Flow %d accepted by %d for %s (uid %d).", + flow->id, flow->n_pid, name, flow->uid); } - log_info("Flow %d accepted by %d for %s (uid %d).", - flow->id, flow->n_pid, name, flow->uid); - freebuf(req_hdr); freebuf(resp_hdr); return 0; fail_oap: - ipcp_flow_alloc_resp(flow, err, resp_hdr); + if (!reg_flow_is_direct(flow->id)) + ipcp_flow_alloc_resp(flow, err, resp_hdr); fail_wait: reg_destroy_flow(flow->id); fail_flow: @@ -1028,7 +1037,7 @@ static int get_ipcp_by_dst(const char * dst, pid_t * pid, buffer_t * hash) { - ipcp_list_msg_t ** ipcps; + ipcp_list_msg_t ** ipcps = NULL; int n; int i; int err = -EIPCP; @@ -1081,6 +1090,171 @@ static int get_ipcp_by_dst(const char * dst, return err; } +static int wait_for_accept(const char * name) +{ + struct timespec timeo = TIMESPEC_INIT_MS(IRMD_REQ_ARR_TIMEOUT); + struct timespec abstime; + char ** exec; + int ret; + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeo, &abstime); + + ret = reg_wait_flow_accepting(name, &abstime); + if (ret == -ETIMEDOUT) { + if (reg_get_exec(name, &exec) < 0) { + log_dbg("No program bound for %s.", name); + goto fail; + } + + if (spawn_program(exec) < 0) { + log_err("Failed to start %s for %s.", exec[0], name); + goto fail_spawn; + } + + log_info("Starting %s for %s.", exec[0], name); + + ts_add(&abstime, &timeo, &abstime); + + ret = reg_wait_flow_accepting(name, &abstime); + if (ret == -ETIMEDOUT) + goto fail_spawn; + + argvfree(exec); + } + + return ret; + + fail_spawn: + argvfree(exec); + fail: + return -1; +} + +static int flow_req_arr(struct flow_info * flow, + const uint8_t * hash, + buffer_t * data) +{ + struct ipcp_info info; + struct layer_info layer; + enum hash_algo algo; + int ret; + char name[NAME_SIZE + 1]; + + info.pid = flow->n_1_pid; + + log_dbg("Flow req arrived from IPCP %d for " HASH_FMT32 ".", + info.pid, HASH_VAL32(hash)); + + if (reg_get_ipcp(&info, &layer) < 0) { + log_err("No IPCP with pid %d.", info.pid); + ret = -EIPCP; + goto fail; + } + + algo = (enum hash_algo) layer.dir_hash_algo; + + if (reg_get_name_for_hash(name, algo, hash) < 0) { + log_warn("No name for " HASH_FMT32 ".", HASH_VAL32(hash)); + ret = -ENAME; + goto fail; + } + + log_info("Flow request arrived for %s.", name); + + ret = wait_for_accept(name); + if (ret < 0) { + log_err("No active process for %s.", name); + goto fail; + } + + flow->id = ret; + flow->state = FLOW_ALLOCATED; + + ret = reg_respond_accept(flow, data); + if (ret < 0) { + log_err("Failed to respond to flow %d.", flow->id); + goto fail; + } + + return 0; + fail: + return ret; +} + +#ifndef DISABLE_DIRECT_IPC +static int flow_alloc_direct(const char * dst, + struct flow_info * flow, + buffer_t * data, + struct timespec * abstime, + struct crypt_sk * sk, + struct name_info * info) +{ + struct flow_info acc; /* server side flow */ + buffer_t req_hdr = BUF_INIT; + buffer_t resp_hdr = BUF_INIT; + void * ctx; + int err; + + acc.id = wait_for_accept(dst); + if (acc.id < 0) { + log_dbg("No accepting process for %s.", dst); + return -EAGAIN; + } + + if (oap_cli_prepare(&ctx, info, &req_hdr, *data) < 0) { + log_err("Failed to prepare OAP for %s.", dst); + return -EBADF; + } + + acc.n_1_pid = flow->n_pid; + acc.mpl = DIRECT_MPL; + acc.qs = flow->qs; + acc.state = FLOW_ALLOCATED; + + err = reg_prepare_flow_direct(&acc, &req_hdr, flow->uid); + if (err == -EPERM) { + log_dbg("UID mismatch, falling back."); + oap_ctx_free(ctx); + freebuf(req_hdr); + return -EPERM; + } + + if (err < 0) { + log_err("Failed to prepare direct flow."); + oap_ctx_free(ctx); + freebuf(req_hdr); + return -EBADF; + } + + err = reg_wait_flow_direct(acc.id, &resp_hdr, abstime); + if (err < 0) { + log_err("Timeout waiting for OAP response."); + oap_ctx_free(ctx); + return -ETIMEDOUT; + } + + err = oap_cli_complete(ctx, info, resp_hdr, data, sk); + if (err < 0) { + log_err("OAP completion failed for %s.", dst); + freebuf(resp_hdr); + return err; + } + + flow->id = acc.id; + flow->n_1_pid = acc.n_pid; + flow->mpl = DIRECT_MPL; + flow->state = FLOW_ALLOCATED; + + log_info("Flow %d allocated (direct) for %d to %s.", + flow->id, flow->n_pid, dst); + + freebuf(resp_hdr); + + return 0; +} +#endif /* DISABLE_DIRECT_IPC */ + static int flow_alloc(const char * dst, struct flow_info * flow, buffer_t * data, @@ -1104,17 +1278,25 @@ static int flow_alloc(const char * dst, goto fail_flow; } + flow->uid = reg_get_proc_uid(flow->n_pid); + + log_info("Allocating flow for %d to %s (uid %d).", + flow->n_pid, dst, flow->uid); + +#ifndef DISABLE_DIRECT_IPC + err = flow_alloc_direct(dst, flow, data, abstime, sk, &info); + if (err == 0) + return 0; + + if (err != -EPERM && err != -EAGAIN) + goto fail_flow; +#endif if (reg_create_flow(flow) < 0) { log_err("Failed to create flow."); err = -EBADF; goto fail_flow; } - flow->uid = reg_get_proc_uid(flow->n_pid); - - log_info("Allocating flow for %d to %s (uid %d).", - flow->n_pid, dst, flow->uid); - if (get_ipcp_by_dst(dst, &flow->n_1_pid, &hash) < 0) { log_err("Failed to find IPCP for %s.", dst); err = -EIPCP; @@ -1188,98 +1370,6 @@ static int flow_alloc(const char * dst, return err; } -static int wait_for_accept(const char * name) -{ - struct timespec timeo = TIMESPEC_INIT_MS(IRMD_REQ_ARR_TIMEOUT); - struct timespec abstime; - char ** exec; - int ret; - - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, &timeo, &abstime); - - ret = reg_wait_flow_accepting(name, &abstime); - if (ret == -ETIMEDOUT) { - if (reg_get_exec(name, &exec) < 0) { - log_dbg("No program bound for %s.", name); - goto fail; - } - - if (spawn_program(exec) < 0) { - log_err("Failed to start %s for %s.", exec[0], name); - goto fail_spawn; - } - - log_info("Starting %s for %s.", exec[0], name); - - ts_add(&abstime, &timeo, &abstime); - - ret = reg_wait_flow_accepting(name, &abstime); - if (ret == -ETIMEDOUT) - goto fail_spawn; - - argvfree(exec); - } - - return ret; - - fail_spawn: - argvfree(exec); - fail: - return -1; -} - -static int flow_req_arr(struct flow_info * flow, - const uint8_t * hash, - buffer_t * data) -{ - struct ipcp_info info; - struct layer_info layer; - enum hash_algo algo; - int ret; - char name[NAME_SIZE + 1]; - - info.pid = flow->n_1_pid; - - log_dbg("Flow req arrived from IPCP %d for " HASH_FMT32 ".", - info.pid, HASH_VAL32(hash)); - - if (reg_get_ipcp(&info, &layer) < 0) { - log_err("No IPCP with pid %d.", info.pid); - ret = -EIPCP; - goto fail; - } - - algo = (enum hash_algo) layer.dir_hash_algo; - - if (reg_get_name_for_hash(name, algo, hash) < 0) { - log_warn("No name for " HASH_FMT32 ".", HASH_VAL32(hash)); - ret = -ENAME; - goto fail; - } - - log_info("Flow request arrived for %s.", name); - - ret = wait_for_accept(name); - if (ret < 0) { - log_err("No active process for %s.", name); - goto fail; - } - - flow->id = ret; - flow->state = FLOW_ALLOCATED; - - ret = reg_respond_accept(flow, data); - if (ret < 0) { - log_err("Failed to respond to flow %d.", flow->id); - goto fail; - } - - return 0; - fail: - return ret; -} - static int flow_alloc_reply(struct flow_info * flow, int response, buffer_t * data) @@ -1303,6 +1393,12 @@ static int flow_dealloc(struct flow_info * flow, reg_dealloc_flow(flow); + if (reg_flow_is_direct(flow->id)) { + if (flow->state == FLOW_DEALLOCATED) + reg_destroy_flow(flow->id); + return 0; + } + if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) { log_err("Failed to request dealloc from %d.", flow->n_1_pid); return -EIPCP; diff --git a/src/irmd/oap/cli.c b/src/irmd/oap/cli.c index 507f3f81..8ecd317d 100644 --- a/src/irmd/oap/cli.c +++ b/src/irmd/oap/cli.c @@ -311,6 +311,9 @@ int oap_cli_prepare(void ** ctx, *req_buf = s->local_hdr.hdr; clrbuf(s->local_hdr.hdr); + /* oap_hdr_encode repoints id into hdr; restore to __id */ + s->local_hdr.id = s->id; + crypt_free_crt(crt); crypt_free_key(pkp); diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c index 15497d35..93c3e128 100644 --- a/src/irmd/reg/flow.c +++ b/src/irmd/reg/flow.c @@ -80,7 +80,7 @@ void reg_flow_destroy(struct reg_flow * flow) switch(flow->info.state) { case FLOW_ACCEPT_PENDING: - clrbuf(flow->data); + clrbuf(flow->req_data); /* FALLTHRU */ default: destroy_rbuffs(flow); @@ -89,8 +89,10 @@ void reg_flow_destroy(struct reg_flow * flow) assert(flow->n_rb == NULL); assert(flow->n_1_rb == NULL); - assert(flow->data.data == NULL); - assert(flow->data.len == 0); + assert(flow->req_data.data == NULL); + assert(flow->req_data.len == 0); + assert(flow->rsp_data.data == NULL); + assert(flow->rsp_data.len == 0); assert(list_is_empty(&flow->next)); @@ -186,30 +188,3 @@ int reg_flow_update(struct reg_flow * flow, fail: return -ENOMEM; } - -void reg_flow_set_data(struct reg_flow * flow, - const buffer_t * buf) -{ - assert(flow != NULL); - assert(buf != NULL); - assert(flow->data.data == NULL); - assert(flow->data.len == 0); - - flow->data = *buf; -} - -void reg_flow_get_data(struct reg_flow * flow, - buffer_t * buf) -{ - assert(flow != NULL); - assert(buf != NULL); - - *buf = flow->data; - - clrbuf(flow->data); -} - -void reg_flow_free_data(struct reg_flow * flow) -{ - freebuf(flow->data); -} diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h index d0078e1b..9a4046d3 100644 --- a/src/irmd/reg/flow.h +++ b/src/irmd/reg/flow.h @@ -31,6 +31,7 @@ #include <ouroboros/ssm_rbuff.h> #include <ouroboros/utils.h> +#include <stdbool.h> #include <sys/types.h> #include <time.h> @@ -40,11 +41,14 @@ struct reg_flow { struct flow_info info; int response; - buffer_t data; + buffer_t req_data; + buffer_t rsp_data; struct timespec t0; char name[NAME_SIZE + 1]; + bool direct; + struct ssm_rbuff * n_rb; struct ssm_rbuff * n_1_rb; }; @@ -56,12 +60,4 @@ void reg_flow_destroy(struct reg_flow * flow); int reg_flow_update(struct reg_flow * flow, struct flow_info * info); -void reg_flow_set_data(struct reg_flow * flow, - const buffer_t * buf); - -void reg_flow_get_data(struct reg_flow * flow, - buffer_t * buf); - -void reg_flow_free_data(struct reg_flow * flow); - #endif /* OUROBOROS_IRMD_REG_FLOW_H */ diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c index 64aa1513..0025f695 100644 --- a/src/irmd/reg/reg.c +++ b/src/irmd/reg/reg.c @@ -1785,7 +1785,8 @@ int reg_wait_flow_allocated(struct flow_info * info, } if (flow != NULL) { - reg_flow_get_data(flow, pbuf); + *pbuf = flow->rsp_data; + clrbuf(flow->rsp_data); *info = flow->info; } @@ -1820,8 +1821,8 @@ int reg_respond_alloc(struct flow_info * info, } assert(flow->info.state == FLOW_ALLOC_PENDING); - assert(flow->data.len == 0); - assert(flow->data.data == NULL); + assert(flow->rsp_data.len == 0); + assert(flow->rsp_data.data == NULL); info->n_pid = flow->info.n_pid; info->n_1_pid = flow->info.n_pid; @@ -1833,8 +1834,10 @@ int reg_respond_alloc(struct flow_info * info, flow->response = response; - if (info->state == FLOW_ALLOCATED) - reg_flow_set_data(flow, pbuf); + if (info->state == FLOW_ALLOCATED) { + flow->rsp_data = *pbuf; + clrbuf(*pbuf); + } pthread_cond_broadcast(®.cond); @@ -1944,7 +1947,8 @@ int reg_wait_flow_accepted(struct flow_info * info, pthread_cleanup_pop(true); /* __cleanup_wait_accept */ if (flow != NULL) { - reg_flow_get_data(flow, pbuf); + *pbuf = flow->req_data; + clrbuf(flow->req_data); *info = flow->info; } @@ -2004,8 +2008,63 @@ int reg_respond_accept(struct flow_info * info, info->n_pid = flow->info.n_pid; - reg_flow_set_data(flow, pbuf); - clrbuf(pbuf); + flow->req_data = *pbuf; + clrbuf(*pbuf); + + if (reg_flow_update(flow, info) < 0) { + log_err("Failed to create flow structs."); + goto fail_flow; + } + + pthread_cond_broadcast(®.cond); + + pthread_mutex_unlock(®.mtx); + + return 0; + + fail_flow: + pthread_mutex_unlock(®.mtx); + return -1; +} + +int reg_prepare_flow_direct(struct flow_info * info, + buffer_t * pbuf, + uid_t alloc_uid) +{ + struct reg_flow * flow; + struct reg_proc * proc; + uid_t accept_uid = 0; + + assert(info != NULL); + assert(info->state == FLOW_ALLOCATED); + assert(info->n_1_pid != 0); + assert(pbuf != NULL); + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(info->id); + if (flow == NULL) { + log_err("Flow not found: %d.", info->id); + goto fail_flow; + } + + assert(flow->info.state == FLOW_ACCEPT_PENDING); + + info->n_pid = flow->info.n_pid; + + proc = __reg_get_proc(info->n_pid); + if (proc != NULL && !is_ouroboros_member_uid(proc->info.uid)) + accept_uid = proc->info.uid; + + if (alloc_uid != accept_uid) { + pthread_mutex_unlock(®.mtx); + return -EPERM; + } + + flow->direct = true; + + flow->req_data = *pbuf; + clrbuf(*pbuf); if (reg_flow_update(flow, info) < 0) { log_err("Failed to create flow structs."); @@ -2023,6 +2082,109 @@ int reg_respond_accept(struct flow_info * info, return -1; } +bool reg_flow_is_direct(int flow_id) +{ + struct reg_flow * flow; + bool ret; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + + ret = flow != NULL && flow->direct; + + pthread_mutex_unlock(®.mtx); + + return ret; +} + +int reg_respond_flow_direct(int flow_id, + buffer_t * pbuf) +{ + struct reg_flow * flow; + + assert(pbuf != NULL); + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow == NULL) { + log_err("Flow %d not found.", flow_id); + goto fail; + } + + assert(flow->direct); + assert(flow->rsp_data.data == NULL); + + flow->rsp_data = *pbuf; + clrbuf(*pbuf); + + pthread_cond_broadcast(®.cond); + + pthread_mutex_unlock(®.mtx); + + return 0; + fail: + pthread_mutex_unlock(®.mtx); + return -1; +} + +int reg_wait_flow_direct(int flow_id, + buffer_t * pbuf, + const struct timespec * abstime) +{ + struct reg_flow * flow; + int ret = -1; + + assert(pbuf != NULL); + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow == NULL) + goto fail; + + assert(flow->direct); + + pthread_cleanup_push(__cleanup_mutex_unlock, ®.mtx); + + while (flow != NULL && flow->rsp_data.data == NULL) { + ret = -__timedwait(®.cond, ®.mtx, abstime); + if (ret == -ETIMEDOUT) + break; + flow = __reg_get_flow(flow_id); + } + + if (flow != NULL && flow->rsp_data.data != NULL) { + *pbuf = flow->rsp_data; + clrbuf(flow->rsp_data); + ret = 0; + } + + pthread_cleanup_pop(true); + + return ret; + fail: + pthread_mutex_unlock(®.mtx); + return -1; +} + +static int direct_flow_dealloc(struct reg_flow * flow, + pid_t pid) +{ + if (!flow->direct) + return -1; + + if (pid == flow->info.n_pid && flow->info.n_pid != -1) + flow->info.n_pid = -1; + else if (pid == flow->info.n_1_pid && flow->info.n_1_pid != -1) + flow->info.n_1_pid = -1; + else + return -1; + + return 0; +} + void reg_dealloc_flow(struct flow_info * info) { struct reg_flow * flow; @@ -2036,13 +2198,32 @@ void reg_dealloc_flow(struct flow_info * info) flow = __reg_get_flow(info->id); assert(flow != NULL); - assert(flow->data.data == NULL); - assert(flow->data.len == 0); + assert(flow->req_data.data == NULL); + assert(flow->req_data.len == 0); + assert(flow->rsp_data.data == NULL); + assert(flow->rsp_data.len == 0); + + info->n_1_pid = flow->info.n_1_pid; + + if (flow->info.state == FLOW_DEALLOC_PENDING) { + if (direct_flow_dealloc(flow, info->n_pid) < 0) { + info->state = FLOW_DEALLOC_PENDING; + pthread_mutex_unlock(®.mtx); + return; + } + flow->info.state = FLOW_DEALLOCATED; + info->state = FLOW_DEALLOCATED; + reg_flow_update(flow, info); + pthread_mutex_unlock(®.mtx); + return; + } + assert(flow->info.state == FLOW_ALLOCATED); flow->info.state = FLOW_DEALLOC_PENDING; info->state = FLOW_DEALLOC_PENDING; - info->n_1_pid = flow->info.n_1_pid; + + direct_flow_dealloc(flow, info->n_pid); memset(flow->name, 0, sizeof(flow->name)); @@ -2064,8 +2245,10 @@ void reg_dealloc_flow_resp(struct flow_info * info) flow = __reg_get_flow(info->id); assert(flow != NULL); - assert(flow->data.data == NULL); - assert(flow->data.len == 0); + assert(flow->req_data.data == NULL); + assert(flow->req_data.len == 0); + assert(flow->rsp_data.data == NULL); + assert(flow->rsp_data.len == 0); assert(flow->info.state == FLOW_DEALLOC_PENDING); flow->info.state = FLOW_DEALLOCATED; diff --git a/src/irmd/reg/reg.h b/src/irmd/reg/reg.h index bda57711..6b576471 100644 --- a/src/irmd/reg/reg.h +++ b/src/irmd/reg/reg.h @@ -150,6 +150,19 @@ int reg_wait_flow_accepting(const char * name, int reg_respond_accept(struct flow_info * info, buffer_t * pbuf); +int reg_prepare_flow_direct(struct flow_info * info, + buffer_t * pbuf, + uid_t alloc_uid); + +int reg_respond_flow_direct(int flow_id, + buffer_t * pbuf); + +int reg_wait_flow_direct(int flow_id, + buffer_t * pbuf, + const struct timespec * abstime); + +bool reg_flow_is_direct(int flow_id); + void reg_dealloc_flow(struct flow_info * info); void reg_dealloc_flow_resp(struct flow_info * info); diff --git a/src/irmd/reg/tests/flow_test.c b/src/irmd/reg/tests/flow_test.c index bfdbceb5..7e1c1360 100644 --- a/src/irmd/reg/tests/flow_test.c +++ b/src/irmd/reg/tests/flow_test.c @@ -24,10 +24,6 @@ #include <test/test.h> -#include <string.h> - -#define TEST_DATA "testpiggybackdata" - static int test_reg_flow_create_destroy(void) { struct reg_flow * f; @@ -219,56 +215,6 @@ static int test_reg_flow_assert_fails(void) return ret; } -static int test_flow_data(void) -{ - struct reg_flow * f; - - struct flow_info info = { - .id = 1, - .n_pid = 1, - .qs = qos_raw, - .state = FLOW_INIT - }; - - char * data; - buffer_t buf; - buffer_t rcv = {0, NULL}; - - TEST_START(); - - data = strdup(TEST_DATA); - if (data == NULL) { - printf("Failed to strdup data.\n"); - goto fail; - } - - buf.data = (uint8_t *) data; - buf.len = strlen(data); - - f = reg_flow_create(&info); - if (f == NULL) { - printf("Failed to create flow.\n"); - goto fail; - } - - reg_flow_set_data(f, &buf); - - reg_flow_get_data(f, &rcv); - - freebuf(buf); - clrbuf(rcv); - - reg_flow_destroy(f); - - TEST_SUCCESS(); - - return TEST_RC_SUCCESS; - fail: - free(data); - TEST_FAIL(); - return TEST_RC_FAIL; -} - int flow_test(int argc, char ** argv) { @@ -280,7 +226,6 @@ int flow_test(int argc, ret |= test_reg_flow_create_destroy(); ret |= test_reg_flow_update(); ret |= test_reg_flow_assert_fails(); - ret |= test_flow_data(); return ret; } diff --git a/src/irmd/reg/tests/reg_test.c b/src/irmd/reg/tests/reg_test.c index 4d7e30ef..eb981349 100644 --- a/src/irmd/reg/tests/reg_test.c +++ b/src/irmd/reg/tests/reg_test.c @@ -485,6 +485,188 @@ static int test_reg_allocate_flow_fail(void) return TEST_RC_FAIL; } +struct direct_alloc_info { + struct flow_info info; + buffer_t rsp; + struct timespec abstime; +}; + +static void * test_flow_alloc_direct(void * o) +{ + struct direct_alloc_info * dai; + buffer_t req; + + dai = (struct direct_alloc_info *) o; + + req.data = (uint8_t *) strdup(TEST_DATA); + if (req.data == NULL) { + printf("Failed to strdup req data.\n"); + goto fail; + } + req.len = strlen(TEST_DATA) + 1; + + if (reg_prepare_flow_direct(&dai->info, &req, 0) < 0) { + printf("Failed to prepare direct flow.\n"); + freebuf(req); + goto fail; + } + + if (reg_wait_flow_direct(dai->info.id, &dai->rsp, &dai->abstime) < 0) { + printf("Failed to wait direct flow.\n"); + goto fail; + } + + return (void *) 0; + fail: + return (void *) -1; +} + +static int test_reg_direct_flow_success(void) +{ + pthread_t thr; + struct timespec abstime; + struct timespec timeo = TIMESPEC_INIT_S(1); + buffer_t rbuf = BUF_INIT; + buffer_t rsp; + struct direct_alloc_info dai; + void * thr_ret; + + struct flow_info info = { + .n_pid = TEST_PID, + .qs = qos_raw + }; + + TEST_START(); + + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + + ts_add(&abstime, &timeo, &abstime); + + if (reg_init() < 0) { + printf("Failed to init registry.\n"); + goto fail; + } + + if (reg_create_flow(&info) < 0) { + printf("Failed to add flow.\n"); + goto fail; + } + + if (reg_prepare_flow_accept(&info) < 0) { + printf("Failed to prepare for accept.\n"); + goto fail; + } + + dai.info.id = info.id; + dai.info.n_1_pid = TEST_N_1_PID; + dai.info.mpl = TEST_MPL; + dai.info.qs = qos_data; + dai.info.state = FLOW_ALLOCATED; + dai.rsp.len = 0; + dai.rsp.data = NULL; + dai.abstime = abstime; + + pthread_create(&thr, NULL, test_flow_alloc_direct, &dai); + + if (reg_wait_flow_accepted(&info, &rbuf, &abstime) < 0) { + printf("Flow accept failed.\n"); + pthread_join(thr, NULL); + reg_destroy_flow(info.id); + reg_fini(); + goto fail; + } + + if (info.state != FLOW_ALLOCATED) { + printf("Flow not in allocated state.\n"); + goto fail; + } + + if (rbuf.data == NULL) { + printf("req_data not received.\n"); + goto fail; + } + + if (strcmp((char *) rbuf.data, TEST_DATA) != 0) { + printf("req_data content mismatch.\n"); + goto fail; + } + + freebuf(rbuf); + + if (!reg_flow_is_direct(info.id)) { + printf("Flow not marked direct.\n"); + goto fail; + } + + rsp.data = (uint8_t *) strdup(TEST_DATA2); + if (rsp.data == NULL) { + printf("Failed to strdup rsp data.\n"); + goto fail; + } + rsp.len = strlen(TEST_DATA2) + 1; + + if (reg_respond_flow_direct(info.id, &rsp) < 0) { + printf("Failed to respond direct.\n"); + freebuf(rsp); + goto fail; + } + + pthread_join(thr, &thr_ret); + + if (thr_ret != (void *) 0) { + printf("Allocator thread failed.\n"); + goto fail; + } + + if (dai.rsp.data == NULL) { + printf("rsp_data not received.\n"); + goto fail; + } + + if (strcmp((char *) dai.rsp.data, TEST_DATA2) != 0) { + printf("rsp_data content mismatch.\n"); + goto fail; + } + + freebuf(dai.rsp); + + reg_dealloc_flow(&info); + + if (info.state != FLOW_DEALLOC_PENDING) { + printf("Flow not in dealloc pending.\n"); + goto fail; + } + + info.n_pid = TEST_PID; + + reg_dealloc_flow(&info); + + if (info.state != FLOW_DEALLOC_PENDING) { + printf("Same endpoint dealloc changed state.\n"); + goto fail; + } + + info.n_pid = TEST_N_1_PID; + + reg_dealloc_flow(&info); + + if (info.state != FLOW_DEALLOCATED) { + printf("Flow not deallocated.\n"); + goto fail; + } + + reg_destroy_flow(info.id); + + reg_fini(); + + TEST_SUCCESS(); + + return TEST_RC_SUCCESS; + fail: + REG_TEST_FAIL(); + return TEST_RC_FAIL; +} + static int test_reg_flow(void) { int rc = 0; @@ -493,6 +675,7 @@ static int test_reg_flow(void) { rc |= test_reg_accept_flow_success(); rc |= test_reg_accept_flow_success_no_crypt(); rc |= test_reg_allocate_flow_fail(); + rc |= test_reg_direct_flow_success(); return rc; } |
