diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-01-20 22:25:41 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-01-26 07:50:33 +0100 |
| commit | 0ca48453a067c7862f0bb6b85f152da826f59af7 (patch) | |
| tree | 5daf26d84777ec6ad1c266601b66e59f9dcc88ca /src/ipcpd/unicast/dt.c | |
| parent | 1775201647a10923b9f73addf2304c3124350836 (diff) | |
| download | ouroboros-0ca48453a067c7862f0bb6b85f152da826f59af7.tar.gz ouroboros-0ca48453a067c7862f0bb6b85f152da826f59af7.zip | |
lib: Replace rdrbuff with a proper slab allocatorbe
This is a first step towards the Secure Shared Memory (SSM)
infrastructure for Ouroboros, which will allow proper resource
separation for non-privileged processes.
This replaces the rdrbuff (random-deletion ring buffer) PoC allocator
with a sharded slab allocator for the packet buffer pool to avoid the
head-of-line blocking behaviour of the rdrb and reduce lock contention
in multi-process scenarios. Each size class contains multiple
independent shards, allowing parallel allocations without blocking.
- Configurable shard count per size class (default: 4, set via
SSM_POOL_SHARDS in CMake). The configured number of blocks are
spread over the number of shards. As an example:
SSM_POOL_512_BLOCKS = 768 blocks total
These 768 blocks are shared among 4 shards
(not 768 × 4 = 3072 blocks)
- Lazy block distribution: all blocks initially reside in shard 0
and naturally migrate to process-local shards upon first
allocation and subsequent free operations
- Fallback with work stealing: processes attempt allocation from
their local shard (pid % SSM_POOL_SHARDS) first, then steal
from other shards if local is exhausted, eliminating
fragmentation while maintaining low contention
- Round-robin condvar signaling: blocking allocations cycle
through all shard condition variables to ensure fairness
- Blocks freed to allocator's shard: uses allocator_pid to
determine target shard, enabling natural load balancing as
process allocation patterns stabilize over time
Maintains existing robust mutex semantics including EOWNERDEAD
handling for dead process recovery. Internal structures exposed in
ssm.h for testing purposes. Adds some tests (pool_test,
pool_sharding_test.c. etc) verifying lazy distribution, migration,
fallback stealing, and multiprocess behavior.
Updates the ring buffer (rbuff) to use relaxed/acquire/release
ordering on atomic indices. The ring buffer requires the (robust)
mutex to ensure cross-structure synchronization between pool buffer
writes and ring buffer index publication.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
| -rw-r--r-- | src/ipcpd/unicast/dt.c | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index e2679ffe..9435350f 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -68,7 +68,7 @@ #endif struct comp_info { - void (* post_packet)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct ssm_pk_buff * spb); void * comp; char * name; }; @@ -135,11 +135,11 @@ static void dt_pci_des(uint8_t * head, memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size); } -static void dt_pci_shrink(struct shm_du_buff * sdb) +static void dt_pci_shrink(struct ssm_pk_buff * spb) { - assert(sdb); + assert(spb); - shm_du_buff_head_release(sdb, dt_pci_info.head_size); + ssm_pk_buff_head_release(spb, dt_pci_info.head_size); } struct { @@ -429,7 +429,7 @@ static void handle_event(void * self, static void packet_handler(int fd, qoscube_t qc, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct dt_pci dt_pci; int ret; @@ -437,7 +437,7 @@ static void packet_handler(int fd, uint8_t * head; size_t len; - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); #ifndef IPCP_FLOW_STATS (void) fd; @@ -451,13 +451,13 @@ static void packet_handler(int fd, #endif memset(&dt_pci, 0, sizeof(dt_pci)); - head = shm_du_buff_head(sdb); + head = ssm_pk_buff_head(spb); dt_pci_des(head, &dt_pci); if (dt_pci.dst_addr != dt.addr) { if (dt_pci.ttl == 0) { log_dbg("TTL was zero."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -474,7 +474,7 @@ static void packet_handler(int fd, if (ofd < 0) { log_dbg("No next hop for %" PRIu64 ".", dt_pci.dst_addr); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -488,12 +488,12 @@ static void packet_handler(int fd, (void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len); - ret = ipcp_flow_write(ofd, sdb); + ret = ipcp_flow_write(ofd, spb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[ofd].lock); @@ -513,17 +513,17 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[ofd].lock); #endif } else { - dt_pci_shrink(sdb); + dt_pci_shrink(spb); if (dt_pci.eid >= PROG_RES_FDS) { uint8_t ecn = *(head + dt_pci_info.ecn_o); - fa_np1_rcv(dt_pci.eid, ecn, sdb); + fa_np1_rcv(dt_pci.eid, ecn, spb); return; } if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %" PRIu64 ".", dt_pci.eid); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } #ifdef IPCP_FLOW_STATS @@ -541,7 +541,7 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, - sdb); + spb); } } @@ -758,7 +758,7 @@ void dt_stop(void) } int dt_reg_comp(void * comp, - void (* func)(void * func, struct shm_du_buff *), + void (* func)(void * func, struct ssm_pk_buff *), char * name) { int eid; @@ -809,7 +809,7 @@ void dt_unreg_comp(int eid) int dt_write_packet(uint64_t dst_addr, qoscube_t qc, uint64_t eid, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct dt_pci dt_pci; int fd; @@ -817,10 +817,10 @@ int dt_write_packet(uint64_t dst_addr, uint8_t * head; size_t len; - assert(sdb); + assert(spb); assert(dst_addr != dt.addr); - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); #ifdef IPCP_FLOW_STATS if (eid < PROG_RES_FDS) { @@ -849,13 +849,13 @@ int dt_write_packet(uint64_t dst_addr, return -EPERM; } - head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size); + head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size); if (head == NULL) { log_dbg("Failed to allocate DT header."); goto fail_write; } - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); dt_pci.dst_addr = dst_addr; dt_pci.qc = qc; @@ -866,7 +866,7 @@ int dt_write_packet(uint64_t dst_addr, dt_pci_ser(head, &dt_pci); - ret = ipcp_flow_write(fd, sdb); + ret = ipcp_flow_write(fd, spb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) |
