diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-01-20 22:25:41 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-01-26 07:50:33 +0100 |
| commit | 0ca48453a067c7862f0bb6b85f152da826f59af7 (patch) | |
| tree | 5daf26d84777ec6ad1c266601b66e59f9dcc88ca /src | |
| parent | 1775201647a10923b9f73addf2304c3124350836 (diff) | |
| download | ouroboros-0ca48453a067c7862f0bb6b85f152da826f59af7.tar.gz ouroboros-0ca48453a067c7862f0bb6b85f152da826f59af7.zip | |
lib: Replace rdrbuff with a proper slab allocatorbe
This is a first step towards the Secure Shared Memory (SSM)
infrastructure for Ouroboros, which will allow proper resource
separation for non-privileged processes.
This replaces the rdrbuff (random-deletion ring buffer) PoC allocator
with a sharded slab allocator for the packet buffer pool to avoid the
head-of-line blocking behaviour of the rdrb and reduce lock contention
in multi-process scenarios. Each size class contains multiple
independent shards, allowing parallel allocations without blocking.
- Configurable shard count per size class (default: 4, set via
SSM_POOL_SHARDS in CMake). The configured number of blocks are
spread over the number of shards. As an example:
SSM_POOL_512_BLOCKS = 768 blocks total
These 768 blocks are shared among 4 shards
(not 768 × 4 = 3072 blocks)
- Lazy block distribution: all blocks initially reside in shard 0
and naturally migrate to process-local shards upon first
allocation and subsequent free operations
- Fallback with work stealing: processes attempt allocation from
their local shard (pid % SSM_POOL_SHARDS) first, then steal
from other shards if local is exhausted, eliminating
fragmentation while maintaining low contention
- Round-robin condvar signaling: blocking allocations cycle
through all shard condition variables to ensure fairness
- Blocks freed to allocator's shard: uses allocator_pid to
determine target shard, enabling natural load balancing as
process allocation patterns stabilize over time
Maintains existing robust mutex semantics including EOWNERDEAD
handling for dead process recovery. Internal structures exposed in
ssm.h for testing purposes. Adds some tests (pool_test,
pool_sharding_test.c. etc) verifying lazy distribution, migration,
fallback stealing, and multiprocess behavior.
Updates the ring buffer (rbuff) to use relaxed/acquire/release
ordering on atomic indices. The ring buffer requires the (robust)
mutex to ensure cross-structure synchronization between pool buffer
writes and ring buffer index publication.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src')
38 files changed, 4092 insertions, 1319 deletions
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index e0ca6d89..806de2b6 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -29,11 +29,9 @@ #define SOCKET_TIMEOUT @SOCKET_TIMEOUT@ #define CONNECT_TIMEOUT @CONNECT_TIMEOUT@ -#define SHM_BUFFER_SIZE @SHM_BUFFER_SIZE@ -#define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@ +#define SSM_POOL_BLOCK_SIZE @SSM_POOL_BLOCK_SIZE@ #define DU_BUFF_HEADSPACE @DU_BUFF_HEADSPACE@ #define DU_BUFF_TAILSPACE @DU_BUFF_TAILSPACE@ -#cmakedefine SHM_RDRB_MULTI_BLOCK #define IPCP_MIN_THREADS @IPCP_MIN_THREADS@ #define IPCP_ADD_THREADS @IPCP_ADD_THREADS@ diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 29c5ff4f..f36e0b13 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -839,7 +839,7 @@ static void * eth_ipcp_packet_reader(void * o) #if defined(HAVE_NETMAP) struct nm_pkthdr hdr; #else - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; fd_set fds; int frame_len; #endif @@ -871,21 +871,21 @@ static void * eth_ipcp_packet_reader(void * o) if (select(eth_data.bpf + 1, &fds, NULL, NULL, NULL)) continue; assert(FD_ISSET(eth_data.bpf, &fds)); - if (ipcp_sdb_reserve(&sdb, BPF_LEN)) + if (ipcp_spb_reserve(&spb, BPF_LEN)) continue; - buf = shm_du_buff_head(sdb); + buf = ssm_pk_buff_head(spb); frame_len = read(eth_data.bpf, buf, BPF_BLEN); #elif defined(HAVE_RAW_SOCKETS) FD_SET(eth_data.s_fd, &fds); if (select(eth_data.s_fd + 1, &fds, NULL, NULL, NULL) < 0) continue; assert(FD_ISSET(eth_data.s_fd, &fds)); - if (ipcp_sdb_reserve(&sdb, ETH_MTU)) + if (ipcp_spb_reserve(&spb, ETH_MTU)) continue; - buf = shm_du_buff_head_alloc(sdb, ETH_HEADER_TOT_SIZE); + buf = ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE); if (buf == NULL) { log_dbg("Failed to allocate header."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); continue; } frame_len = recv(eth_data.s_fd, buf, @@ -893,7 +893,7 @@ static void * eth_ipcp_packet_reader(void * o) #endif if (frame_len <= 0) { log_dbg("Failed to receive frame."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); continue; } #endif @@ -935,7 +935,7 @@ static void * eth_ipcp_packet_reader(void * o) if (ssap == MGMT_SAP && dsap == MGMT_SAP) { #endif - ipcp_sdb_release(sdb); /* No need for the N+1 buffer. */ + ipcp_spb_release(spb); /* No need for the N+1 buffer. */ if (length > MGMT_FRAME_SIZE) { log_warn("Management frame size %u exceeds %u.", @@ -981,22 +981,22 @@ static void * eth_ipcp_packet_reader(void * o) pthread_rwlock_unlock(ð_data.flows_lock); #ifndef HAVE_NETMAP - shm_du_buff_head_release(sdb, ETH_HEADER_TOT_SIZE); - shm_du_buff_truncate(sdb, length); + ssm_pk_buff_head_release(spb, ETH_HEADER_TOT_SIZE); + ssm_pk_buff_truncate(spb, length); #else - if (ipcp_sdb_reserve(&sdb, length)) + if (ipcp_spb_reserve(&spb, length)) continue; - buf = shm_du_buff_head(sdb); + buf = ssm_pk_buff_head(spb); memcpy(buf, &e_frame->payload, length); #endif - if (np1_flow_write(fd, sdb) < 0) - ipcp_sdb_release(sdb); + if (np1_flow_write(fd, spb) < 0) + ipcp_spb_release(spb); continue; fail_frame: #ifndef HAVE_NETMAP - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #endif } } @@ -1012,7 +1012,7 @@ static void cleanup_writer(void * o) static void * eth_ipcp_packet_writer(void * o) { int fd; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; size_t len; #if defined(BUILD_ETH_DIX) uint16_t deid; @@ -1040,17 +1040,17 @@ static void * eth_ipcp_packet_writer(void * o) if (fqueue_type(fq) != FLOW_PKT) continue; - if (np1_flow_read(fd, &sdb)) { + if (np1_flow_read(fd, &spb)) { log_dbg("Bad read from fd %d.", fd); continue; } - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); - if (shm_du_buff_head_alloc(sdb, ETH_HEADER_TOT_SIZE) + if (ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE) == NULL) { log_dbg("Failed to allocate header."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); continue; } @@ -1073,10 +1073,10 @@ static void * eth_ipcp_packet_writer(void * o) #elif defined(BUILD_ETH_LLC) dsap, ssap, #endif - shm_du_buff_head(sdb), + ssm_pk_buff_head(spb), len)) log_dbg("Failed to send frame."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); } } @@ -1342,14 +1342,7 @@ static int eth_set_mtu(struct ifreq * ifr) IPCP_ETH_LO_MTU); eth_data.mtu = IPCP_ETH_LO_MTU; } -#ifndef SHM_RDRB_MULTI_BLOCK - maxsz = SHM_RDRB_BLOCK_SIZE - 5 * sizeof(size_t) - - (DU_BUFF_HEADSPACE + DU_BUFF_TAILSPACE); - if ((size_t) eth_data.mtu > maxsz ) { - log_dbg("Layer MTU truncated to shm block size."); - eth_data.mtu = maxsz; - } -#endif + log_dbg("Layer MTU is %d.", eth_data.mtu); return 0; @@ -1503,9 +1496,6 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf) char ifn[IFNAMSIZ]; #endif /* HAVE_NETMAP */ -#ifndef SHM_RDRB_MULTI_BLOCK - size_t maxsz; -#endif assert(conf); assert(conf->type == THIS_TYPE); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index ffa6dc5a..e0f3cc5a 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -121,8 +121,6 @@ static void * local_ipcp_packet_loop(void * o) if (idx < 0) continue; - assert(idx < (SHM_BUFFER_SIZE)); - pthread_rwlock_rdlock(&local_data.lock); fd = local_data.in_out[fd]; diff --git a/src/ipcpd/udp/udp.c b/src/ipcpd/udp/udp.c index d8b5b4cd..5e29cb52 100644 --- a/src/ipcpd/udp/udp.c +++ b/src/ipcpd/udp/udp.c @@ -443,7 +443,7 @@ static void * udp_ipcp_packet_reader(void * o) struct mgmt_frame * frame; struct __SOCKADDR r_saddr; socklen_t len; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; uint8_t * head; len = sizeof(r_saddr); @@ -487,13 +487,13 @@ static void * udp_ipcp_packet_reader(void * o) n-= sizeof(eid); - if (ipcp_sdb_reserve(&sdb, n)) + if (ipcp_spb_reserve(&spb, n)) continue; - head = shm_du_buff_head(sdb); + head = ssm_pk_buff_head(spb); memcpy(head, data, n); - if (np1_flow_write(eid, sdb) < 0) - ipcp_sdb_release(sdb); + if (np1_flow_write(eid, spb) < 0) + ipcp_spb_release(spb); } return (void *) 0; @@ -504,9 +504,9 @@ static void cleanup_fqueue(void * fq) fqueue_destroy((fqueue_t *) fq); } -static void cleanup_sdb(void * sdb) +static void cleanup_spb(void * spb) { - ipcp_sdb_release((struct shm_du_buff *) sdb); + ipcp_spb_release((struct ssm_pk_buff *) spb); } static void * udp_ipcp_packet_writer(void * o) @@ -529,29 +529,29 @@ static void * udp_ipcp_packet_writer(void * o) int fd; fevent(udp_data.np1_flows, fq, NULL); while ((fd = fqueue_next(fq)) >= 0) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; uint8_t * buf; uint16_t len; if (fqueue_type(fq) != FLOW_PKT) continue; - if (np1_flow_read(fd, &sdb)) { + if (np1_flow_read(fd, &spb)) { log_dbg("Bad read from fd %d.", fd); continue; } - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); if (len > IPCP_UDP_MAX_PACKET_SIZE) { log_dbg("Packet length exceeds MTU."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); continue; } - buf = shm_du_buff_head_alloc(sdb, OUR_HEADER_LEN); + buf = ssm_pk_buff_head_alloc(spb, OUR_HEADER_LEN); if (buf == NULL) { log_dbg("Failed to allocate header."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); continue; } @@ -564,7 +564,7 @@ static void * udp_ipcp_packet_writer(void * o) memcpy(buf, &eid, sizeof(eid)); - pthread_cleanup_push(cleanup_sdb, sdb); + pthread_cleanup_push(cleanup_spb, spb); if (sendto(udp_data.s_fd, buf, len + OUR_HEADER_LEN, SENDTO_FLAGS, diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c index 1388c2de..69309091 100644 --- a/src/ipcpd/unicast/dir/dht.c +++ b/src/ipcpd/unicast/dir/dht.c @@ -2266,7 +2266,7 @@ static int dht_send_msg(dht_msg_t * msg, uint64_t addr) { size_t len; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; if (msg == NULL) return 0; @@ -2279,21 +2279,21 @@ static int dht_send_msg(dht_msg_t * msg, goto fail_msg; } - if (ipcp_sdb_reserve(&sdb, len)) { - log_warn("%s failed to get sdb.", DHT_CODE(msg)); + if (ipcp_spb_reserve(&spb, len)) { + log_warn("%s failed to get spb.", DHT_CODE(msg)); goto fail_msg; } - dht_msg__pack(msg, shm_du_buff_head(sdb)); + dht_msg__pack(msg, ssm_pk_buff_head(spb)); - if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, sdb) < 0) { + if (dt_write_packet(addr, QOS_CUBE_BE, dht.eid, spb) < 0) { log_warn("%s write failed", DHT_CODE(msg)); goto fail_send; } return 0; fail_send: - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); fail_msg: return -1; } @@ -3191,7 +3191,7 @@ static void * dht_handle_packet(void * o) } #ifndef __DHT_TEST__ static void dht_post_packet(void * comp, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct cmd * cmd; @@ -3203,17 +3203,17 @@ static void dht_post_packet(void * comp, goto fail_cmd; } - cmd->cbuf.data = malloc(shm_du_buff_len(sdb)); + cmd->cbuf.data = malloc(ssm_pk_buff_len(spb)); if (cmd->cbuf.data == NULL) { log_err("Command buffer malloc failed."); goto fail_buf; } - cmd->cbuf.len = shm_du_buff_len(sdb); + cmd->cbuf.len = ssm_pk_buff_len(spb); - memcpy(cmd->cbuf.data, shm_du_buff_head(sdb), cmd->cbuf.len); + memcpy(cmd->cbuf.data, ssm_pk_buff_head(spb), cmd->cbuf.len); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); pthread_mutex_lock(&dht.cmds.mtx); @@ -3228,7 +3228,7 @@ static void dht_post_packet(void * comp, fail_buf: free(cmd); fail_cmd: - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } #endif diff --git a/src/ipcpd/unicast/dir/tests/CMakeLists.txt b/src/ipcpd/unicast/dir/tests/CMakeLists.txt index 3dda8104..dd15d4d8 100644 --- a/src/ipcpd/unicast/dir/tests/CMakeLists.txt +++ b/src/ipcpd/unicast/dir/tests/CMakeLists.txt @@ -28,7 +28,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c dht_test.c ) -protobuf_generate_c(DHT_PROTO_SRCS KAD_PROTO_HDRS ../dht.proto) +protobuf_generate_c(DHT_PROTO_SRCS KAD_PROTO_HDRS ${CURRENT_SOURCE_PARENT_DIR}/dht.proto) add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests} ${DHT_PROTO_SRCS}) diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index e2679ffe..9435350f 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -68,7 +68,7 @@ #endif struct comp_info { - void (* post_packet)(void * comp, struct shm_du_buff * sdb); + void (* post_packet)(void * comp, struct ssm_pk_buff * spb); void * comp; char * name; }; @@ -135,11 +135,11 @@ static void dt_pci_des(uint8_t * head, memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size); } -static void dt_pci_shrink(struct shm_du_buff * sdb) +static void dt_pci_shrink(struct ssm_pk_buff * spb) { - assert(sdb); + assert(spb); - shm_du_buff_head_release(sdb, dt_pci_info.head_size); + ssm_pk_buff_head_release(spb, dt_pci_info.head_size); } struct { @@ -429,7 +429,7 @@ static void handle_event(void * self, static void packet_handler(int fd, qoscube_t qc, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct dt_pci dt_pci; int ret; @@ -437,7 +437,7 @@ static void packet_handler(int fd, uint8_t * head; size_t len; - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); #ifndef IPCP_FLOW_STATS (void) fd; @@ -451,13 +451,13 @@ static void packet_handler(int fd, #endif memset(&dt_pci, 0, sizeof(dt_pci)); - head = shm_du_buff_head(sdb); + head = ssm_pk_buff_head(spb); dt_pci_des(head, &dt_pci); if (dt_pci.dst_addr != dt.addr) { if (dt_pci.ttl == 0) { log_dbg("TTL was zero."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -474,7 +474,7 @@ static void packet_handler(int fd, if (ofd < 0) { log_dbg("No next hop for %" PRIu64 ".", dt_pci.dst_addr); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); @@ -488,12 +488,12 @@ static void packet_handler(int fd, (void) ca_calc_ecn(ofd, head + dt_pci_info.ecn_o, qc, len); - ret = ipcp_flow_write(ofd, sdb); + ret = ipcp_flow_write(ofd, spb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", ofd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[ofd].lock); @@ -513,17 +513,17 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[ofd].lock); #endif } else { - dt_pci_shrink(sdb); + dt_pci_shrink(spb); if (dt_pci.eid >= PROG_RES_FDS) { uint8_t ecn = *(head + dt_pci_info.ecn_o); - fa_np1_rcv(dt_pci.eid, ecn, sdb); + fa_np1_rcv(dt_pci.eid, ecn, spb); return; } if (dt.comps[dt_pci.eid].post_packet == NULL) { log_err("No registered component on eid %" PRIu64 ".", dt_pci.eid); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } #ifdef IPCP_FLOW_STATS @@ -541,7 +541,7 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); #endif dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, - sdb); + spb); } } @@ -758,7 +758,7 @@ void dt_stop(void) } int dt_reg_comp(void * comp, - void (* func)(void * func, struct shm_du_buff *), + void (* func)(void * func, struct ssm_pk_buff *), char * name) { int eid; @@ -809,7 +809,7 @@ void dt_unreg_comp(int eid) int dt_write_packet(uint64_t dst_addr, qoscube_t qc, uint64_t eid, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct dt_pci dt_pci; int fd; @@ -817,10 +817,10 @@ int dt_write_packet(uint64_t dst_addr, uint8_t * head; size_t len; - assert(sdb); + assert(spb); assert(dst_addr != dt.addr); - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); #ifdef IPCP_FLOW_STATS if (eid < PROG_RES_FDS) { @@ -849,13 +849,13 @@ int dt_write_packet(uint64_t dst_addr, return -EPERM; } - head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size); + head = ssm_pk_buff_head_alloc(spb, dt_pci_info.head_size); if (head == NULL) { log_dbg("Failed to allocate DT header."); goto fail_write; } - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); dt_pci.dst_addr = dst_addr; dt_pci.qc = qc; @@ -866,7 +866,7 @@ int dt_write_packet(uint64_t dst_addr, dt_pci_ser(head, &dt_pci); - ret = ipcp_flow_write(fd, sdb); + ret = ipcp_flow_write(fd, spb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", fd); if (ret == -EFLOWDOWN) diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 2c5b7978..2e713700 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -25,7 +25,7 @@ #include <ouroboros/ipcp.h> #include <ouroboros/qoscube.h> -#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/ssm_pool.h> #define DT_COMP "Data Transfer" #define DT_PROTO "dtp" @@ -40,7 +40,7 @@ int dt_start(void); void dt_stop(void); int dt_reg_comp(void * comp, - void (* func)(void * comp, struct shm_du_buff * sdb), + void (* func)(void * comp, struct ssm_pk_buff * spb), char * name); void dt_unreg_comp(int eid); @@ -48,6 +48,6 @@ void dt_unreg_comp(int eid); int dt_write_packet(uint64_t dst_addr, qoscube_t qc, uint64_t eid, - struct shm_du_buff * sdb); + struct ssm_pk_buff * spb); #endif /* OUROBOROS_IPCPD_UNICAST_DT_H */ diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index dc23340c..06e4b043 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -85,7 +85,7 @@ struct fa_msg { struct cmd { struct list_head next; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; }; struct fa_flow { @@ -330,7 +330,7 @@ static uint64_t gen_eid(int fd) static void packet_handler(int fd, qoscube_t qc, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct fa_flow * flow; uint64_t r_addr; @@ -342,7 +342,7 @@ static void packet_handler(int fd, pthread_rwlock_wrlock(&fa.flows_lock); - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); #ifdef IPCP_FLOW_STATS ++flow->p_snd; @@ -357,8 +357,8 @@ static void packet_handler(int fd, ca_wnd_wait(wnd); - if (dt_write_packet(r_addr, qc, r_eid, sdb)) { - ipcp_sdb_release(sdb); + if (dt_write_packet(r_addr, qc, r_eid, spb)) { + ipcp_spb_release(spb); log_dbg("Failed to forward packet."); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); @@ -411,7 +411,7 @@ static void fa_flow_fini(struct fa_flow * flow) } static void fa_post_packet(void * comp, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct cmd * cmd; @@ -422,11 +422,11 @@ static void fa_post_packet(void * comp, cmd = malloc(sizeof(*cmd)); if (cmd == NULL) { log_err("Command failed. Out of memory."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } - cmd->sdb = sdb; + cmd->spb = spb; pthread_mutex_lock(&fa.mtx); @@ -454,16 +454,16 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg) pthread_cleanup_pop(true); - len = shm_du_buff_len(cmd->sdb); + len = ssm_pk_buff_len(cmd->spb); if (len > MSGBUFSZ || len < sizeof(*msg)) { log_warn("Invalid flow allocation message (len: %zd).", len); free(cmd); return 0; /* No valid message */ } - memcpy(msg, shm_du_buff_head(cmd->sdb), len); + memcpy(msg, ssm_pk_buff_head(cmd->spb), len); - ipcp_sdb_release(cmd->sdb); + ipcp_spb_release(cmd->spb); free(cmd); @@ -753,7 +753,7 @@ int fa_alloc(int fd, const buffer_t * data) { struct fa_msg * msg; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct fa_flow * flow; uint64_t addr; qoscube_t qc = QOS_CUBE_BE; @@ -766,10 +766,10 @@ int fa_alloc(int fd, len = sizeof(*msg) + ipcp_dir_hash_len(); - if (ipcp_sdb_reserve(&sdb, len + data->len)) + if (ipcp_spb_reserve(&spb, len + data->len)) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) ssm_pk_buff_head(spb); memset(msg, 0, sizeof(*msg)); eid = gen_eid(fd); @@ -788,11 +788,11 @@ int fa_alloc(int fd, memcpy(msg + 1, dst, ipcp_dir_hash_len()); if (data->len > 0) - memcpy(shm_du_buff_head(sdb) + len, data->data, data->len); + memcpy(ssm_pk_buff_head(spb) + len, data->data, data->len); - if (dt_write_packet(addr, qc, fa.eid, sdb)) { + if (dt_write_packet(addr, qc, fa.eid, spb)) { log_err("Failed to send flow allocation request packet."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return -1; } @@ -814,7 +814,7 @@ int fa_alloc_resp(int fd, const buffer_t * data) { struct fa_msg * msg; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct fa_flow * flow; qoscube_t qc = QOS_CUBE_BE; @@ -825,13 +825,13 @@ int fa_alloc_resp(int fd, goto fail_alloc_resp; } - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) { - log_err("Failed to reserve sdb (%zu bytes).", + if (ipcp_spb_reserve(&spb, sizeof(*msg) + data->len)) { + log_err("Failed to reserve spb (%zu bytes).", sizeof(*msg) + data->len); goto fail_reserve; } - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) ssm_pk_buff_head(spb); memset(msg, 0, sizeof(*msg)); msg->code = FLOW_REPLY; @@ -846,7 +846,7 @@ int fa_alloc_resp(int fd, pthread_rwlock_unlock(&fa.flows_lock); - if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { + if (dt_write_packet(flow->r_addr, qc, fa.eid, spb)) { log_err("Failed to send flow allocation response packet."); goto fail_packet; } @@ -862,7 +862,7 @@ int fa_alloc_resp(int fd, return 0; fail_packet: - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); fail_reserve: pthread_rwlock_wrlock(&fa.flows_lock); fa_flow_fini(flow); @@ -893,17 +893,17 @@ static int fa_update_remote(int fd, uint16_t ece) { struct fa_msg * msg; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; qoscube_t qc = QOS_CUBE_BE; struct fa_flow * flow; uint64_t r_addr; - if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { - log_err("Failed to reserve sdb (%zu bytes).", sizeof(*msg)); + if (ipcp_spb_reserve(&spb, sizeof(*msg))) { + log_err("Failed to reserve spb (%zu bytes).", sizeof(*msg)); return -1; } - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) ssm_pk_buff_head(spb); memset(msg, 0, sizeof(*msg)); @@ -922,9 +922,9 @@ static int fa_update_remote(int fd, pthread_rwlock_unlock(&fa.flows_lock); - if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { + if (dt_write_packet(r_addr, qc, fa.eid, spb)) { log_err("Failed to send flow update packet."); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return -1; } @@ -933,7 +933,7 @@ static int fa_update_remote(int fd, void fa_np1_rcv(uint64_t eid, uint8_t ecn, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct fa_flow * flow; bool update; @@ -941,7 +941,7 @@ void fa_np1_rcv(uint64_t eid, int fd; size_t len; - len = shm_du_buff_len(sdb); + len = ssm_pk_buff_len(spb); pthread_rwlock_wrlock(&fa.flows_lock); @@ -949,7 +949,7 @@ void fa_np1_rcv(uint64_t eid, if (fd < 0) { pthread_rwlock_unlock(&fa.flows_lock); log_dbg("Received packet for unknown EID %" PRIu64 ".", eid); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } @@ -963,9 +963,9 @@ void fa_np1_rcv(uint64_t eid, pthread_rwlock_unlock(&fa.flows_lock); - if (ipcp_flow_write(fd, sdb) < 0) { + if (ipcp_flow_write(fd, spb) < 0) { log_dbg("Failed to write to flow %d.", fd); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); #ifdef IPCP_FLOW_STATS pthread_rwlock_wrlock(&fa.flows_lock); ++flow->p_rcv_f; diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index 1e716966..6cd30995 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -47,6 +47,6 @@ int fa_dealloc(int fd); void fa_np1_rcv(uint64_t eid, uint8_t ecn, - struct shm_du_buff * sdb); + struct ssm_pk_buff * spb); #endif /* OUROBOROS_IPCPD_UNICAST_FA_H */ diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c index f74b4065..8c8caf19 100644 --- a/src/ipcpd/unicast/psched.c +++ b/src/ipcpd/unicast/psched.c @@ -69,7 +69,7 @@ static void cleanup_reader(void * o) static void * packet_reader(void * o) { struct psched * sched; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; int fd; fqueue_t * fq; qoscube_t qc; @@ -104,10 +104,10 @@ static void * packet_reader(void * o) notifier_event(NOTIFY_DT_FLOW_UP, &fd); break; case FLOW_PKT: - if (sched->read(fd, &sdb) < 0) + if (sched->read(fd, &spb) < 0) continue; - sched->callback(fd, qc, sdb); + sched->callback(fd, qc, spb); break; default: break; diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h index 831f8084..be9c09e3 100644 --- a/src/ipcpd/unicast/psched.h +++ b/src/ipcpd/unicast/psched.h @@ -28,10 +28,10 @@ typedef void (* next_packet_fn_t)(int fd, qoscube_t qc, - struct shm_du_buff * sdb); + struct ssm_pk_buff * spb); typedef int (* read_fn_t)(int fd, - struct shm_du_buff ** sdb); + struct ssm_pk_buff ** spb); struct psched * psched_create(next_packet_fn_t callback, read_fn_t read); diff --git a/src/ipcpd/unicast/routing/link-state.c b/src/ipcpd/unicast/routing/link-state.c index e5edf539..95a104bb 100644 --- a/src/ipcpd/unicast/routing/link-state.c +++ b/src/ipcpd/unicast/routing/link-state.c @@ -56,7 +56,7 @@ #include <string.h> #define LS_ENTRY_SIZE 104 -#define LSDB "lsdb" +#define Lspb "lspb" #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME @@ -199,7 +199,7 @@ static struct adjacency * get_adj(const char * path) return NULL; } -static int lsdb_rib_getattr(const char * path, +static int lspb_rib_getattr(const char * path, struct rib_attr * attr) { struct adjacency * adj; @@ -230,7 +230,7 @@ static int lsdb_rib_getattr(const char * path, return 0; } -static int lsdb_rib_read(const char * path, +static int lspb_rib_read(const char * path, char * buf, size_t len) { @@ -264,7 +264,7 @@ static int lsdb_rib_read(const char * path, return -1; } -static int lsdb_rib_readdir(char *** buf) +static int lspb_rib_readdir(char *** buf) { struct list_head * p; char entry[RIB_PATH_LEN + 1]; @@ -319,12 +319,12 @@ static int lsdb_rib_readdir(char *** buf) } static struct rib_ops r_ops = { - .read = lsdb_rib_read, - .readdir = lsdb_rib_readdir, - .getattr = lsdb_rib_getattr + .read = lspb_rib_read, + .readdir = lspb_rib_readdir, + .getattr = lspb_rib_getattr }; -static int lsdb_add_nb(uint64_t addr, +static int lspb_add_nb(uint64_t addr, int fd, enum nb_type type) { @@ -372,7 +372,7 @@ static int lsdb_add_nb(uint64_t addr, return 0; } -static int lsdb_del_nb(uint64_t addr, +static int lspb_del_nb(uint64_t addr, int fd) { struct list_head * p; @@ -478,7 +478,7 @@ static void set_pff_modified(bool calc) pthread_mutex_unlock(&ls.instances.mtx); } -static int lsdb_add_link(uint64_t src, +static int lspb_add_link(uint64_t src, uint64_t dst, uint64_t seqno, qosspec_t * qs) @@ -535,7 +535,7 @@ static int lsdb_add_link(uint64_t src, return 0; } -static int lsdb_del_link(uint64_t src, +static int lspb_del_link(uint64_t src, uint64_t dst) { struct list_head * p; @@ -616,8 +616,8 @@ static void send_lsm(uint64_t src, } } -/* replicate the lsdb to a mgmt neighbor */ -static void lsdb_replicate(int fd) +/* replicate the lspb to a mgmt neighbor */ +static void lspb_replicate(int fd) { struct list_head * p; struct list_head * h; @@ -625,7 +625,7 @@ static void lsdb_replicate(int fd) list_head_init(©); - /* Lock the lsdb, copy the lsms and send outside of lock. */ + /* Lock the lspb, copy the lsms and send outside of lock. */ pthread_rwlock_rdlock(&ls.lock); list_for_each(p, &ls.db.list) { @@ -634,7 +634,7 @@ static void lsdb_replicate(int fd) adj = list_entry(p, struct adjacency, next); cpy = malloc(sizeof(*cpy)); if (cpy == NULL) { - log_warn("Failed to replicate full lsdb."); + log_warn("Failed to replicate full lspb."); break; } @@ -814,7 +814,7 @@ static void * lsreader(void * o) LSU_VAL(msg.s_addr, msg.d_addr, msg.seqno), ADDR_VAL32(&ls.addr)); #endif - if (lsdb_add_link(msg.s_addr, + if (lspb_add_link(msg.s_addr, msg.d_addr, msg.seqno, &qs)) @@ -873,20 +873,20 @@ static void handle_event(void * self, send_lsm(ls.addr, c->conn_info.addr, 0); pthread_cleanup_pop(true); - if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) - log_dbg("Failed to add neighbor to LSDB."); + if (lspb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) + log_dbg("Failed to add neighbor to Lspb."); - if (lsdb_add_link(ls.addr, c->conn_info.addr, 0, &qs)) - log_dbg("Failed to add new adjacency to LSDB."); + if (lspb_add_link(ls.addr, c->conn_info.addr, 0, &qs)) + log_dbg("Failed to add new adjacency to Lspb."); break; case NOTIFY_DT_CONN_DEL: flow_event(c->flow_info.fd, false); - if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) - log_dbg("Failed to delete neighbor from LSDB."); + if (lspb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_dbg("Failed to delete neighbor from Lspb."); - if (lsdb_del_link(ls.addr, c->conn_info.addr)) - log_dbg("Local link was not in LSDB."); + if (lspb_del_link(ls.addr, c->conn_info.addr)) + log_dbg("Local link was not in Lspb."); break; case NOTIFY_DT_CONN_QOS: log_dbg("QoS changes currently unsupported."); @@ -901,15 +901,15 @@ static void handle_event(void * self, fccntl(c->flow_info.fd, FLOWGFLAGS, &flags); fccntl(c->flow_info.fd, FLOWSFLAGS, flags | FLOWFRNOPART); fset_add(ls.mgmt_set, c->flow_info.fd); - if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT)) - log_warn("Failed to add mgmt neighbor to LSDB."); - /* replicate the entire lsdb */ - lsdb_replicate(c->flow_info.fd); + if (lspb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT)) + log_warn("Failed to add mgmt neighbor to Lspb."); + /* replicate the entire lspb */ + lspb_replicate(c->flow_info.fd); break; case NOTIFY_MGMT_CONN_DEL: fset_del(ls.mgmt_set, c->flow_info.fd); - if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) - log_warn("Failed to delete mgmt neighbor from LSDB."); + if (lspb_del_nb(c->conn_info.addr, c->flow_info.fd)) + log_warn("Failed to delete mgmt neighbor from Lspb."); break; default: break; @@ -1094,7 +1094,7 @@ int link_state_init(struct ls_config * conf, list_head_init(&ls.nbs.list); list_head_init(&ls.instances.list); - if (rib_reg(LSDB, &r_ops)) + if (rib_reg(Lspb, &r_ops)) goto fail_rib_reg; ls.db.len = 0; @@ -1121,7 +1121,7 @@ void link_state_fini(void) struct list_head * p; struct list_head * h; - rib_unreg(LSDB); + rib_unreg(Lspb); fset_destroy(ls.mgmt_set); diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in index 06d51ccd..43d7f4ee 100644 --- a/src/irmd/config.h.in +++ b/src/irmd/config.h.in @@ -76,9 +76,9 @@ #cmakedefine HAVE_LIBGCRYPT #cmakedefine HAVE_OPENSSL #ifdef HAVE_OPENSSL -#define IRMD_SECMEM_MAX @IRMD_SECMEM_MAX@ #cmakedefine HAVE_OPENSSL_PQC #endif +#define IRMD_SECMEM_MAX @IRMD_SECMEM_MAX@ #ifdef CONFIG_OUROBOROS_DEBUG #cmakedefine DEBUG_PROTO_OAP #endif diff --git a/src/irmd/main.c b/src/irmd/main.c index 8a2c143d..e67fdd23 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -42,7 +42,7 @@ #include <ouroboros/pthread.h> #include <ouroboros/random.h> #include <ouroboros/rib.h> -#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/ssm_pool.h> #include <ouroboros/sockets.h> #include <ouroboros/time.h> #include <ouroboros/tpm.h> @@ -99,7 +99,7 @@ struct { char * cfg_file; /* configuration file path */ #endif struct lockfile * lf; /* single irmd per system */ - struct shm_rdrbuff * rdrb; /* rdrbuff for packets */ + struct ssm_pool * gspp; /* pool for packets */ int sockfd; /* UNIX socket */ @@ -1691,7 +1691,7 @@ static void destroy_mount(char * mnt) static int ouroboros_reset(void) { - shm_rdrbuff_purge(); + ssm_pool_purge(); lockfile_destroy(irmd.lf); return 0; @@ -1712,10 +1712,8 @@ static void cleanup_pid(pid_t pid) } destroy_mount(mnt); - -#else - (void) pid; #endif + ssm_pool_reclaim_orphans(irmd.gspp, pid); } void * irm_sanitize(void * o) @@ -1900,13 +1898,13 @@ static int irm_init(void) goto fail_sock_path; } - if ((irmd.rdrb = shm_rdrbuff_create()) == NULL) { - log_err("Failed to create rdrbuff."); - goto fail_rdrbuff; + if ((irmd.gspp = ssm_pool_create()) == NULL) { + log_err("Failed to create pool."); + goto fail_pool; } - if (shm_rdrbuff_mlock(irmd.rdrb) < 0) - log_warn("Failed to mlock rdrbuff."); + if (ssm_pool_mlock(irmd.gspp) < 0) + log_warn("Failed to mlock pool."); irmd.tpm = tpm_create(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop, NULL); @@ -1970,8 +1968,8 @@ static int irm_init(void) fail_oap: tpm_destroy(irmd.tpm); fail_tpm_create: - shm_rdrbuff_destroy(irmd.rdrb); - fail_rdrbuff: + ssm_pool_destroy(irmd.gspp); + fail_pool: close(irmd.sockfd); fail_sock_path: unlink(IRM_SOCK_PATH); @@ -2008,8 +2006,8 @@ static void irm_fini(void) if (unlink(IRM_SOCK_PATH)) log_dbg("Failed to unlink %s.", IRM_SOCK_PATH); - if (irmd.rdrb != NULL) - shm_rdrbuff_destroy(irmd.rdrb); + if (irmd.gspp != NULL) + ssm_pool_destroy(irmd.gspp); if (irmd.lf != NULL) lockfile_destroy(irmd.lf); diff --git a/src/irmd/oap/tests/CMakeLists.txt b/src/irmd/oap/tests/CMakeLists.txt index 09a40765..2e8f1319 100644 --- a/src/irmd/oap/tests/CMakeLists.txt +++ b/src/irmd/oap/tests/CMakeLists.txt @@ -1,6 +1,11 @@ get_filename_component(tmp ".." ABSOLUTE) get_filename_component(src_folder "${tmp}" NAME) +get_filename_component(OAP_SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}" DIRECTORY) +get_filename_component(OAP_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}" DIRECTORY) +get_filename_component(IRMD_SOURCE_DIR "${OAP_SOURCE_DIR}" DIRECTORY) +get_filename_component(IRMD_BINARY_DIR "${OAP_BINARY_DIR}" DIRECTORY) + compute_test_prefix() create_test_sourcelist(${src_folder}_tests test_suite.c @@ -15,11 +20,11 @@ create_test_sourcelist(${src_folder}_pqc_tests test_suite_pqc.c # OAP test needs io.c compiled with OAP_TEST_MODE set(OAP_TEST_SOURCES - ${CMAKE_CURRENT_SOURCE_DIR}/../io.c - ${CMAKE_CURRENT_SOURCE_DIR}/../hdr.c - ${CMAKE_CURRENT_SOURCE_DIR}/../auth.c - ${CMAKE_CURRENT_SOURCE_DIR}/../srv.c - ${CMAKE_CURRENT_SOURCE_DIR}/../cli.c + ${OAP_SOURCE_DIR}/io.c + ${OAP_SOURCE_DIR}/hdr.c + ${OAP_SOURCE_DIR}/auth.c + ${OAP_SOURCE_DIR}/srv.c + ${OAP_SOURCE_DIR}/cli.c ${CMAKE_CURRENT_SOURCE_DIR}/common.c ) @@ -32,8 +37,8 @@ set_source_files_properties(${OAP_TEST_SOURCES} disable_test_logging_for_target(${src_folder}_test) target_link_libraries(${src_folder}_test ouroboros-irm) target_include_directories(${src_folder}_test PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR}/../.. - ${CMAKE_CURRENT_BINARY_DIR}/../.. + ${IRMD_SOURCE_DIR} + ${IRMD_BINARY_DIR} ) # PQC test executable (ML-DSA) @@ -46,8 +51,8 @@ set_source_files_properties(${OAP_TEST_SOURCES} disable_test_logging_for_target(${src_folder}_pqc_test) target_link_libraries(${src_folder}_pqc_test ouroboros-irm) target_include_directories(${src_folder}_pqc_test PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR}/../.. - ${CMAKE_CURRENT_BINARY_DIR}/../.. + ${IRMD_SOURCE_DIR} + ${IRMD_BINARY_DIR} ) add_dependencies(build_tests ${src_folder}_test ${src_folder}_pqc_test) diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c index d6f6437f..02dc9c99 100644 --- a/src/irmd/reg/flow.c +++ b/src/irmd/reg/flow.c @@ -66,11 +66,11 @@ struct reg_flow * reg_flow_create(const struct flow_info * info) static void destroy_rbuffs(struct reg_flow * flow) { if (flow->n_rb != NULL) - shm_rbuff_destroy(flow->n_rb); + ssm_rbuff_destroy(flow->n_rb); flow->n_rb = NULL; if (flow->n_1_rb != NULL) - shm_rbuff_destroy(flow->n_1_rb); + ssm_rbuff_destroy(flow->n_1_rb); flow->n_1_rb = NULL; } @@ -103,28 +103,28 @@ static int create_rbuffs(struct reg_flow * flow, assert(flow != NULL); assert(info != NULL); - flow->n_rb = shm_rbuff_create(info->n_pid, info->id); + flow->n_rb = ssm_rbuff_create(info->n_pid, info->id); if (flow->n_rb == NULL) goto fail_n_rb; - if (shm_rbuff_mlock(flow->n_rb) < 0) + if (ssm_rbuff_mlock(flow->n_rb) < 0) log_warn("Failed to mlock n_rb for flow %d.", info->id); assert(flow->info.n_1_pid == 0); assert(flow->n_1_rb == NULL); flow->info.n_1_pid = info->n_1_pid; - flow->n_1_rb = shm_rbuff_create(info->n_1_pid, info->id); + flow->n_1_rb = ssm_rbuff_create(info->n_1_pid, info->id); if (flow->n_1_rb == NULL) goto fail_n_1_rb; - if (shm_rbuff_mlock(flow->n_1_rb) < 0) + if (ssm_rbuff_mlock(flow->n_1_rb) < 0) log_warn("Failed to mlock n_1_rb for flow %d.", info->id); return 0; fail_n_1_rb: - shm_rbuff_destroy(flow->n_rb); + ssm_rbuff_destroy(flow->n_rb); fail_n_rb: return -ENOMEM; } diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h index d1e4811c..b671d486 100644 --- a/src/irmd/reg/flow.h +++ b/src/irmd/reg/flow.h @@ -28,7 +28,7 @@ #include <ouroboros/name.h> #include <ouroboros/pthread.h> #include <ouroboros/qos.h> -#include <ouroboros/shm_rbuff.h> +#include <ouroboros/ssm_rbuff.h> #include <ouroboros/utils.h> #include <sys/types.h> @@ -45,8 +45,8 @@ struct reg_flow { char name[NAME_SIZE + 1]; - struct shm_rbuff * n_rb; - struct shm_rbuff * n_1_rb; + struct ssm_rbuff * n_rb; + struct ssm_rbuff * n_1_rb; }; struct reg_flow * reg_flow_create(const struct flow_info * info); diff --git a/src/irmd/reg/proc.c b/src/irmd/reg/proc.c index 9bbdf0eb..541731b2 100644 --- a/src/irmd/reg/proc.c +++ b/src/irmd/reg/proc.c @@ -75,7 +75,7 @@ struct reg_proc * reg_proc_create(const struct proc_info * info) goto fail_malloc; } - proc->set = shm_flow_set_create(info->pid); + proc->set = ssm_flow_set_create(info->pid); if (proc->set == NULL) { log_err("Failed to create flow set for %d.", info->pid); goto fail_set; @@ -99,7 +99,7 @@ void reg_proc_destroy(struct reg_proc * proc) { assert(proc != NULL); - shm_flow_set_destroy(proc->set); + ssm_flow_set_destroy(proc->set); __reg_proc_clear_names(proc); diff --git a/src/irmd/reg/proc.h b/src/irmd/reg/proc.h index 499ecc72..a790e5c9 100644 --- a/src/irmd/reg/proc.h +++ b/src/irmd/reg/proc.h @@ -25,7 +25,7 @@ #include <ouroboros/list.h> #include <ouroboros/proc.h> -#include <ouroboros/shm_flow_set.h> +#include <ouroboros/ssm_flow_set.h> struct reg_proc { struct list_head next; @@ -35,7 +35,7 @@ struct reg_proc { struct list_head names; /* process accepts flows for names */ size_t n_names; /* number of names */ - struct shm_flow_set * set; + struct ssm_flow_set * set; }; struct reg_proc * reg_proc_create(const struct proc_info * info); diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 465068cb..b2b17669 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -26,23 +26,16 @@ #ifdef HAVE_OPENSSL #cmakedefine HAVE_OPENSSL_PQC #define HAVE_ENCRYPTION -#define PROC_SECMEM_MAX @PROC_SECMEM_MAX@ #define SECMEM_GUARD @SECMEM_GUARD@ #endif +#define PROC_SECMEM_MAX @PROC_SECMEM_MAX@ #define SYS_MAX_FLOWS @SYS_MAX_FLOWS@ -#cmakedefine SHM_RDRB_MULTI_BLOCK #cmakedefine QOS_DISABLE_CRC #cmakedefine HAVE_OPENSSL_RNG -#define SHM_RBUFF_PREFIX "@SHM_RBUFF_PREFIX@" #define SHM_LOCKFILE_NAME "@SHM_LOCKFILE_NAME@" -#define SHM_FLOW_SET_PREFIX "@SHM_FLOW_SET_PREFIX@" -#define SHM_RDRB_NAME "@SHM_RDRB_NAME@" -#define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@ -#define SHM_BUFFER_SIZE @SHM_BUFFER_SIZE@ -#define SHM_RBUFF_SIZE @SHM_RBUFF_SIZE@ #define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@ #define TPM_DEBUG_REPORT_INTERVAL @TPM_DEBUG_REPORT_INTERVAL@ @@ -70,9 +63,6 @@ #define PROG_RES_FDS @PROG_RES_FDS@ #define PROG_MAX_FQUEUES @PROG_MAX_FQUEUES@ -#define DU_BUFF_HEADSPACE @DU_BUFF_HEADSPACE@ -#define DU_BUFF_TAILSPACE @DU_BUFF_TAILSPACE@ - /* Default Delta-t parameters */ #cmakedefine FRCT_LINUX_RTT_ESTIMATOR #define DELT_A (@DELTA_T_ACK@) /* ns */ diff --git a/src/lib/crypt.c b/src/lib/crypt.c index 600f8336..fdbae776 100644 --- a/src/lib/crypt.c +++ b/src/lib/crypt.c @@ -1030,6 +1030,7 @@ int crypt_secure_malloc_init(size_t max) #ifdef HAVE_OPENSSL return openssl_secure_malloc_init(max, SECMEM_GUARD); #else + (void) max; return 0; #endif } diff --git a/src/lib/dev.c b/src/lib/dev.c index 106a4256..35ea701b 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -27,6 +27,7 @@ #endif #include "config.h" +#include "ssm.h" #include <ouroboros/bitmap.h> #include <ouroboros/cep.h> @@ -45,9 +46,9 @@ #include <ouroboros/pthread.h> #include <ouroboros/random.h> #include <ouroboros/serdes-irm.h> -#include <ouroboros/shm_flow_set.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_rbuff.h> +#include <ouroboros/ssm_flow_set.h> +#include <ouroboros/ssm_pool.h> +#include <ouroboros/ssm_rbuff.h> #include <ouroboros/sockets.h> #include <ouroboros/utils.h> #ifdef PROC_FLOW_STATS @@ -92,9 +93,9 @@ struct flow { struct flow_info info; - struct shm_rbuff * rx_rb; - struct shm_rbuff * tx_rb; - struct shm_flow_set * set; + struct ssm_rbuff * rx_rb; + struct ssm_rbuff * tx_rb; + struct ssm_flow_set * set; uint16_t oflags; ssize_t part_idx; @@ -120,14 +121,14 @@ struct flow_set { }; struct fqueue { - struct flowevent fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */ + struct flowevent fqueue[SSM_RBUFF_SIZE]; /* Safe copy from shm. */ size_t fqsize; size_t next; }; struct { - struct shm_rdrbuff * rdrb; - struct shm_flow_set * fqset; + struct ssm_pool * gspp; + struct ssm_flow_set * fqset; struct bmp * fds; struct bmp * fqueues; @@ -254,8 +255,8 @@ static void proc_exit(void) send_recv_msg(&msg); } -static int sdb_encrypt(struct flow * flow, - struct shm_du_buff * sdb) +static int spb_encrypt(struct flow * flow, + struct ssm_pk_buff * spb) { buffer_t in; buffer_t out; @@ -265,17 +266,17 @@ static int sdb_encrypt(struct flow * flow, if (flow->crypt == NULL) return 0; /* No encryption */ - in.data = shm_du_buff_head(sdb); - in.len = shm_du_buff_len(sdb); + in.data = ssm_pk_buff_head(spb); + in.len = ssm_pk_buff_len(spb); if (crypt_encrypt(flow->crypt, in, &out) < 0) goto fail_encrypt; - head = shm_du_buff_head_alloc(sdb, flow->headsz); + head = ssm_pk_buff_head_alloc(spb, flow->headsz); if (head == NULL) goto fail_alloc; - tail = shm_du_buff_tail_alloc(sdb, flow->tailsz); + tail = ssm_pk_buff_tail_alloc(spb, flow->tailsz); if (tail == NULL) goto fail_alloc; @@ -290,8 +291,8 @@ static int sdb_encrypt(struct flow * flow, return -ECRYPT; } -static int sdb_decrypt(struct flow * flow, - struct shm_du_buff * sdb) +static int spb_decrypt(struct flow * flow, + struct ssm_pk_buff * spb) { buffer_t in; buffer_t out; @@ -300,15 +301,15 @@ static int sdb_decrypt(struct flow * flow, if (flow->crypt == NULL) return 0; /* No decryption */ - in.data = shm_du_buff_head(sdb); - in.len = shm_du_buff_len(sdb); + in.data = ssm_pk_buff_head(spb); + in.len = ssm_pk_buff_len(spb); if (crypt_decrypt(flow->crypt, in, &out) < 0) return -ENOMEM; - head = shm_du_buff_head_release(sdb, flow->headsz) + flow->headsz; - shm_du_buff_tail_release(sdb, flow->tailsz); + head = ssm_pk_buff_head_release(spb, flow->headsz) + flow->headsz; + ssm_pk_buff_tail_release(spb, flow->tailsz); memcpy(head, out.data, out.len); @@ -337,11 +338,11 @@ void * flow_tx(void * o) static void flow_send_keepalive(struct flow * flow, struct timespec now) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; ssize_t idx; uint8_t * ptr; - idx = shm_rdrbuff_alloc(ai.rdrb, 0, &ptr, &sdb); + idx = ssm_pool_alloc(ai.gspp, 0, &ptr, &spb); if (idx < 0) return; @@ -349,10 +350,10 @@ static void flow_send_keepalive(struct flow * flow, flow->snd_act = now; - if (shm_rbuff_write(flow->tx_rb, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); + if (ssm_rbuff_write(flow->tx_rb, idx)) + ssm_pool_remove(ai.gspp, idx); else - shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); } @@ -373,15 +374,15 @@ static void _flow_keepalive(struct flow * flow) flow_id = flow->info.id; timeo = flow->info.qs.timeout; - acl = shm_rbuff_get_acl(flow->rx_rb); + acl = ssm_rbuff_get_acl(flow->rx_rb); if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) return; clock_gettime(PTHREAD_COND_CLOCK, &now); if (ts_diff_ns(&now, &r_act) > (int64_t) timeo * MILLION) { - shm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); - shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + ssm_rbuff_set_acl(flow->rx_rb, ACL_FLOWPEER); + ssm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); return; } @@ -461,7 +462,7 @@ static void __flow_fini(int fd) pthread_join(ai.tx, NULL); } - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); + ssm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); frcti_destroy(ai.flows[fd].frcti); } @@ -472,20 +473,20 @@ static void __flow_fini(int fd) } if (ai.flows[fd].rx_rb != NULL) { - shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); - shm_rbuff_close(ai.flows[fd].rx_rb); + ssm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); + ssm_rbuff_close(ai.flows[fd].rx_rb); } if (ai.flows[fd].tx_rb != NULL) { - shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); - shm_rbuff_close(ai.flows[fd].tx_rb); + ssm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); + ssm_rbuff_close(ai.flows[fd].tx_rb); } if (ai.flows[fd].set != NULL) { - shm_flow_set_notify(ai.flows[fd].set, + ssm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].info.id, FLOW_DEALLOC); - shm_flow_set_close(ai.flows[fd].set); + ssm_flow_set_close(ai.flows[fd].set); } crypt_destroy_ctx(ai.flows[fd].crypt); @@ -528,15 +529,15 @@ static int flow_init(struct flow_info * info, flow->info = *info; - flow->rx_rb = shm_rbuff_open(info->n_pid, info->id); + flow->rx_rb = ssm_rbuff_open(info->n_pid, info->id); if (flow->rx_rb == NULL) goto fail_rx_rb; - flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id); + flow->tx_rb = ssm_rbuff_open(info->n_1_pid, info->id); if (flow->tx_rb == NULL) goto fail_tx_rb; - flow->set = shm_flow_set_open(info->n_1_pid); + flow->set = ssm_flow_set_open(info->n_1_pid); if (flow->set == NULL) goto fail_set; @@ -565,7 +566,7 @@ static int flow_init(struct flow_info * info, if (flow->frcti == NULL) goto fail_frcti; - if (shm_flow_set_add(ai.fqset, 0, info->id)) + if (ssm_flow_set_add(ai.fqset, 0, info->id)) goto fail_flow_set_add; ++ai.n_frcti; @@ -585,17 +586,17 @@ static int flow_init(struct flow_info * info, return fd; fail_tx_thread: - shm_flow_set_del(ai.fqset, 0, info->id); + ssm_flow_set_del(ai.fqset, 0, info->id); fail_flow_set_add: frcti_destroy(flow->frcti); fail_frcti: crypt_destroy_ctx(flow->crypt); fail_crypt: - shm_flow_set_close(flow->set); + ssm_flow_set_close(flow->set); fail_set: - shm_rbuff_close(flow->tx_rb); + ssm_rbuff_close(flow->tx_rb); fail_tx_rb: - shm_rbuff_close(flow->rx_rb); + ssm_rbuff_close(flow->rx_rb); fail_rx_rb: bmp_release(ai.fds, fd); fail_fds: @@ -661,8 +662,8 @@ static void init(int argc, goto fail_fqueues; } - ai.rdrb = shm_rdrbuff_open(); - if (ai.rdrb == NULL) { + ai.gspp = ssm_pool_open(); + if (ai.gspp == NULL) { fprintf(stderr, "FATAL: Could not open packet buffer.\n"); goto fail_rdrb; } @@ -700,7 +701,7 @@ static void init(int argc, goto fail_flow_lock; } - ai.fqset = shm_flow_set_open(getpid()); + ai.fqset = ssm_flow_set_open(getpid()); if (ai.fqset == NULL) { fprintf(stderr, "FATAL: Could not open flow set.\n"); goto fail_fqset; @@ -749,7 +750,7 @@ static void init(int argc, fail_timerwheel: fset_destroy(ai.frct_set); fail_frct_set: - shm_flow_set_close(ai.fqset); + ssm_flow_set_close(ai.fqset); fail_fqset: pthread_rwlock_destroy(&ai.lock); fail_flow_lock: @@ -761,7 +762,7 @@ static void init(int argc, fail_id_to_fd: free(ai.flows); fail_flows: - shm_rdrbuff_close(ai.rdrb); + ssm_pool_close(ai.gspp); fail_rdrb: bmp_destroy(ai.fqueues); fail_fqueues: @@ -787,9 +788,9 @@ static void fini(void) for (i = 0; i < PROG_MAX_FLOWS; ++i) { if (ai.flows[i].info.id != -1) { ssize_t idx; - shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); - while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); + while ((idx = ssm_rbuff_read(ai.flows[i].rx_rb)) >= 0) + ssm_pool_remove(ai.gspp, idx); __flow_fini(i); } } @@ -806,14 +807,14 @@ static void fini(void) fset_destroy(ai.frct_set); - shm_flow_set_close(ai.fqset); + ssm_flow_set_close(ai.fqset); pthread_rwlock_destroy(&ai.lock); free(ai.flows); free(ai.id_to_fd); - shm_rdrbuff_close(ai.rdrb); + ssm_pool_close(ai.gspp); bmp_destroy(ai.fds); bmp_destroy(ai.fqueues); @@ -1015,7 +1016,7 @@ int flow_dealloc(int fd) pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); - shm_rbuff_fini(flow->tx_rb); + ssm_rbuff_fini(flow->tx_rb); pthread_cleanup_pop(true); @@ -1150,16 +1151,16 @@ int fccntl(int fd, break; case FLOWGRXQLEN: qlen = va_arg(l, size_t *); - *qlen = shm_rbuff_queued(flow->rx_rb); + *qlen = ssm_rbuff_queued(flow->rx_rb); break; case FLOWGTXQLEN: qlen = va_arg(l, size_t *); - *qlen = shm_rbuff_queued(flow->tx_rb); + *qlen = ssm_rbuff_queued(flow->tx_rb); break; case FLOWSFLAGS: flow->oflags = va_arg(l, uint32_t); - rx_acl = shm_rbuff_get_acl(flow->rx_rb); - tx_acl = shm_rbuff_get_acl(flow->rx_rb); + rx_acl = ssm_rbuff_get_acl(flow->rx_rb); + tx_acl = ssm_rbuff_get_acl(flow->rx_rb); /* * Making our own flow write only means making the * the other side of the flow read only. @@ -1172,19 +1173,19 @@ int fccntl(int fd, if (flow->oflags & FLOWFDOWN) { rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; - shm_flow_set_notify(flow->set, + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; - shm_flow_set_notify(flow->set, + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_UP); } - shm_rbuff_set_acl(flow->rx_rb, rx_acl); - shm_rbuff_set_acl(flow->tx_rb, tx_acl); + ssm_rbuff_set_acl(flow->rx_rb, rx_acl); + ssm_rbuff_set_acl(flow->tx_rb, tx_acl); break; case FLOWGFLAGS: @@ -1230,35 +1231,34 @@ int fccntl(int fd, return -EPERM; } -static int chk_crc(struct shm_du_buff * sdb) +static int chk_crc(struct ssm_pk_buff * spb) { uint32_t crc; - uint8_t * head = shm_du_buff_head(sdb); - uint8_t * tail = shm_du_buff_tail_release(sdb, CRCLEN); + uint8_t * head = ssm_pk_buff_head(spb); + uint8_t * tail = ssm_pk_buff_tail_release(spb, CRCLEN); mem_hash(HASH_CRC32, &crc, head, tail - head); return !(crc == *((uint32_t *) tail)); } -static int add_crc(struct shm_du_buff * sdb) +static int add_crc(struct ssm_pk_buff * spb) { uint8_t * head; uint8_t * tail; - tail = shm_du_buff_tail_alloc(sdb, CRCLEN); + tail = ssm_pk_buff_tail_alloc(spb, CRCLEN); if (tail == NULL) return -ENOMEM; - head = shm_du_buff_head(sdb); - + head = ssm_pk_buff_head(spb); mem_hash(HASH_CRC32, tail, head, tail - head); return 0; } -static int flow_tx_sdb(struct flow * flow, - struct shm_du_buff * sdb, +static int flow_tx_spb(struct flow * flow, + struct ssm_pk_buff * spb, bool block, struct timespec * abstime) { @@ -1274,32 +1274,32 @@ static int flow_tx_sdb(struct flow * flow, pthread_rwlock_unlock(&ai.lock); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); pthread_rwlock_rdlock(&ai.lock); - if (shm_du_buff_len(sdb) > 0) { - if (frcti_snd(flow->frcti, sdb) < 0) + if (ssm_pk_buff_len(spb) > 0) { + if (frcti_snd(flow->frcti, spb) < 0) goto enomem; - if (sdb_encrypt(flow, sdb) < 0) + if (spb_encrypt(flow, spb) < 0) goto enomem; - if (flow->info.qs.ber == 0 && add_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && add_crc(spb) != 0) goto enomem; } pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); if (!block) - ret = shm_rbuff_write(flow->tx_rb, idx); + ret = ssm_rbuff_write(flow->tx_rb, idx); else - ret = shm_rbuff_write_b(flow->tx_rb, idx, abstime); + ret = ssm_rbuff_write_b(flow->tx_rb, idx, abstime); if (ret < 0) - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(ai.gspp, idx); else - shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_cleanup_pop(true); @@ -1307,7 +1307,7 @@ static int flow_tx_sdb(struct flow * flow, enomem: pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(ai.gspp, idx); return -ENOMEM; } @@ -1321,7 +1321,7 @@ ssize_t flow_write(int fd, int flags; struct timespec abs; struct timespec * abstime = NULL; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; uint8_t * ptr; if (buf == NULL && count != 0) @@ -1356,12 +1356,12 @@ ssize_t flow_write(int fd, if (flags & FLOWFWNOBLOCK) { if (!frcti_is_window_open(flow->frcti)) return -EAGAIN; - idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); + idx = ssm_pool_alloc(ai.gspp, count, &ptr, &spb); } else { ret = frcti_window_wait(flow->frcti, abstime); if (ret < 0) return ret; - idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); + idx = ssm_pool_alloc_b(ai.gspp, count, &ptr, &spb, abstime); } if (idx < 0) @@ -1370,36 +1370,36 @@ ssize_t flow_write(int fd, if (count > 0) memcpy(ptr, buf, count); - ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime); + ret = flow_tx_spb(flow, spb, !(flags & FLOWFWNOBLOCK), abstime); return ret < 0 ? (ssize_t) ret : (ssize_t) count; } static bool invalid_pkt(struct flow * flow, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { - if (shm_du_buff_len(sdb) == 0) + if (spb == NULL || ssm_pk_buff_len(spb) == 0) return true; - if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && chk_crc(spb) != 0) return true; - if (sdb_decrypt(flow, sdb) < 0) + if (spb_decrypt(flow, spb) < 0) return true; return false; } -static ssize_t flow_rx_sdb(struct flow * flow, - struct shm_du_buff ** sdb, +static ssize_t flow_rx_spb(struct flow * flow, + struct ssm_pk_buff ** spb, bool block, struct timespec * abstime) { ssize_t idx; struct timespec now; - idx = block ? shm_rbuff_read_b(flow->rx_rb, abstime) : - shm_rbuff_read(flow->rx_rb); + idx = block ? ssm_rbuff_read_b(flow->rx_rb, abstime) : + ssm_rbuff_read(flow->rx_rb); if (idx < 0) return idx; @@ -1411,10 +1411,10 @@ static ssize_t flow_rx_sdb(struct flow * flow, pthread_rwlock_unlock(&ai.lock); - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + *spb = ssm_pool_get(ai.gspp, idx); - if (invalid_pkt(flow, *sdb)) { - shm_rdrbuff_remove(ai.rdrb, idx); + if (invalid_pkt(flow, *spb)) { + ssm_pool_remove(ai.gspp, idx); return -EAGAIN; } @@ -1428,7 +1428,7 @@ ssize_t flow_read(int fd, ssize_t idx; ssize_t n; uint8_t * packet; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct timespec abs; struct timespec now; struct timespec * abstime = NULL; @@ -1469,7 +1469,7 @@ ssize_t flow_read(int fd, while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { pthread_rwlock_unlock(&ai.lock); - idx = flow_rx_sdb(flow, &sdb, block, abstime); + idx = flow_rx_spb(flow, &spb, block, abstime); if (idx < 0) { if (block && idx != -EAGAIN) return idx; @@ -1482,23 +1482,23 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - frcti_rcv(flow->frcti, sdb); + frcti_rcv(flow->frcti, spb); } } - sdb = shm_rdrbuff_get(ai.rdrb, idx); + spb = ssm_pool_get(ai.gspp, idx); pthread_rwlock_unlock(&ai.lock); - packet = shm_du_buff_head(sdb); + packet = ssm_pk_buff_head(spb); - n = shm_du_buff_len(sdb); + n = ssm_pk_buff_len(spb); assert(n >= 0); if (n <= (ssize_t) count) { memcpy(buf, packet, n); - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); pthread_rwlock_wrlock(&ai.lock); @@ -1512,7 +1512,7 @@ ssize_t flow_read(int fd, } else { if (partrd) { memcpy(buf, packet, count); - shm_du_buff_head_release(sdb, n); + ssm_pk_buff_head_release(spb, n); pthread_rwlock_wrlock(&ai.lock); flow->part_idx = idx; @@ -1521,7 +1521,7 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&ai.lock); return count; } else { - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return -EMSGSIZE; } } @@ -1578,7 +1578,7 @@ struct fqueue * fqueue_create(void) if (fq == NULL) return NULL; - memset(fq->fqueue, -1, SHM_BUFFER_SIZE * sizeof(*fq->fqueue)); + memset(fq->fqueue, -1, SSM_RBUFF_SIZE * sizeof(*fq->fqueue)); fq->fqsize = 0; fq->next = 0; @@ -1595,7 +1595,7 @@ void fset_zero(struct flow_set * set) if (set == NULL) return; - shm_flow_set_zero(ai.fqset, set->idx); + ssm_flow_set_zero(ai.fqset, set->idx); } int fset_add(struct flow_set * set, @@ -1617,14 +1617,14 @@ int fset_add(struct flow_set * set, } if (flow->frcti != NULL) - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); + ssm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id); + ret = ssm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id); if (ret < 0) goto fail; - if (shm_rbuff_queued(ai.flows[fd].rx_rb)) - shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT); + if (ssm_rbuff_queued(ai.flows[fd].rx_rb)) + ssm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1648,10 +1648,10 @@ void fset_del(struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); if (flow->info.id >= 0) - shm_flow_set_del(ai.fqset, set->idx, flow->info.id); + ssm_flow_set_del(ai.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id); + ssm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id); pthread_rwlock_unlock(&ai.lock); } @@ -1671,7 +1671,7 @@ bool fset_has(const struct flow_set * set, return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1); + ret = (ssm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1); pthread_rwlock_unlock(&ai.lock); @@ -1681,7 +1681,7 @@ bool fset_has(const struct flow_set * set, /* Filter fqueue events for non-data packets */ static int fqueue_filter(struct fqueue * fq) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; int fd; ssize_t idx; struct frcti * frcti; @@ -1712,15 +1712,15 @@ static int fqueue_filter(struct fqueue * fq) pthread_rwlock_unlock(&ai.lock); - idx = flow_rx_sdb(&ai.flows[fd], &sdb, false, NULL); + idx = flow_rx_spb(&ai.flows[fd], &spb, false, NULL); if (idx < 0) return 0; pthread_rwlock_rdlock(&ai.lock); - sdb = shm_rdrbuff_get(ai.rdrb, idx); + spb = ssm_pool_get(ai.gspp, idx); - __frcti_rcv(frcti, sdb); + __frcti_rcv(frcti, spb); if (__frcti_pdu_ready(frcti) >= 0) { pthread_rwlock_unlock(&ai.lock); @@ -1795,7 +1795,7 @@ ssize_t fevent(struct flow_set * set, } while (ret == 0) { - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); + ret = ssm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); if (ret == -ETIMEDOUT) return -ETIMEDOUT; @@ -1961,13 +1961,13 @@ int ipcp_flow_alloc_reply(int fd, } int ipcp_flow_read(int fd, - struct shm_du_buff ** sdb) + struct ssm_pk_buff ** spb) { struct flow * flow; ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); flow = &ai.flows[fd]; @@ -1978,13 +1978,13 @@ int ipcp_flow_read(int fd, while (frcti_queued_pdu(flow->frcti) < 0) { pthread_rwlock_unlock(&ai.lock); - idx = flow_rx_sdb(flow, sdb, false, NULL); + idx = flow_rx_spb(flow, spb, false, NULL); if (idx < 0) return idx; pthread_rwlock_rdlock(&ai.lock); - frcti_rcv(flow->frcti, *sdb); + frcti_rcv(flow->frcti, *spb); } pthread_rwlock_unlock(&ai.lock); @@ -1993,13 +1993,13 @@ int ipcp_flow_read(int fd, } int ipcp_flow_write(int fd, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct flow * flow; int ret; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); flow = &ai.flows[fd]; @@ -2017,19 +2017,19 @@ int ipcp_flow_write(int fd, pthread_rwlock_unlock(&ai.lock); - ret = flow_tx_sdb(flow, sdb, true, NULL); + ret = flow_tx_spb(flow, spb, true, NULL); return ret; } int np1_flow_read(int fd, - struct shm_du_buff ** sdb) + struct ssm_pk_buff ** spb) { struct flow * flow; ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); flow = &ai.flows[fd]; @@ -2037,7 +2037,7 @@ int np1_flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - idx = shm_rbuff_read(flow->rx_rb); + idx = ssm_rbuff_read(flow->rx_rb); if (idx < 0) { pthread_rwlock_unlock(&ai.lock); return idx; @@ -2045,20 +2045,20 @@ int np1_flow_read(int fd, pthread_rwlock_unlock(&ai.lock); - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + *spb = ssm_pool_get(ai.gspp, idx); return 0; } int np1_flow_write(int fd, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct flow * flow; int ret; ssize_t idx; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - assert(sdb); + assert(spb); flow = &ai.flows[fd]; @@ -2076,31 +2076,31 @@ int np1_flow_write(int fd, pthread_rwlock_unlock(&ai.lock); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); - ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); + ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret < 0) - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(ai.gspp, idx); else - shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); return ret; } -int ipcp_sdb_reserve(struct shm_du_buff ** sdb, +int ipcp_spb_reserve(struct ssm_pk_buff ** spb, size_t len) { - return shm_rdrbuff_alloc_b(ai.rdrb, len, NULL, sdb, NULL) < 0 ? -1 : 0; + return ssm_pool_alloc_b(ai.gspp, len, NULL, spb, NULL) < 0 ? -1 : 0; } -void ipcp_sdb_release(struct shm_du_buff * sdb) +void ipcp_spb_release(struct ssm_pk_buff * spb) { - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); + ssm_pool_remove(ai.gspp, ssm_pk_buff_get_idx(spb)); } int ipcp_flow_fini(int fd) { - struct shm_rbuff * rx_rb; + struct ssm_rbuff * rx_rb; assert(fd >= 0 && fd < SYS_MAX_FLOWS); @@ -2111,10 +2111,10 @@ int ipcp_flow_fini(int fd) return -1; } - shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); - shm_flow_set_notify(ai.flows[fd].set, + ssm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].info.id, FLOW_DEALLOC); @@ -2123,7 +2123,7 @@ int ipcp_flow_fini(int fd) pthread_rwlock_unlock(&ai.lock); if (rx_rb != NULL) - shm_rbuff_fini(rx_rb); + ssm_rbuff_fini(rx_rb); return 0; } @@ -2153,7 +2153,7 @@ size_t ipcp_flow_queued(int fd) assert(ai.flows[fd].info.id >= 0); - q = shm_rbuff_queued(ai.flows[fd].tx_rb); + q = ssm_rbuff_queued(ai.flows[fd].tx_rb); pthread_rwlock_unlock(&ai.lock); @@ -2168,7 +2168,7 @@ ssize_t local_flow_read(int fd) pthread_rwlock_rdlock(&ai.lock); - ret = shm_rbuff_read(ai.flows[fd].rx_rb); + ret = ssm_rbuff_read(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.lock); @@ -2192,11 +2192,11 @@ int local_flow_write(int fd, return -ENOTALLOC; } - ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); + ret = ssm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret == 0) - shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); + ssm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); else - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(ai.gspp, idx); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/frct.c b/src/lib/frct.c index 08c5ea80..76736931 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -237,21 +237,21 @@ static void __send_frct_pkt(int fd, uint32_t ackno, uint32_t rwe) { - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct frct_pci * pci; ssize_t idx; struct flow * f; /* Raw calls needed to bypass frcti. */ #ifdef RXM_BLOCKING - idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + idx = ssm_pool_alloc_b(ai.gspp, sizeof(*pci), NULL, &spb, NULL); #else - idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); + idx = ssm_pool_alloc(ai.gspp, sizeof(*pci), NULL, &spb); #endif if (idx < 0) return; - pci = (struct frct_pci *) shm_du_buff_head(sdb); + pci = (struct frct_pci *) ssm_pk_buff_head(spb); memset(pci, 0, sizeof(*pci)); *((uint32_t *) pci) = hton32(rwe); @@ -261,22 +261,22 @@ static void __send_frct_pkt(int fd, f = &ai.flows[fd]; - if (sdb_encrypt(f, sdb) < 0) + if (spb_encrypt(f, spb) < 0) goto fail; #ifdef RXM_BLOCKING - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) + if (ssm_rbuff_write_b(f->tx_rb, idx, NULL)) #else - if (shm_rbuff_write(f->tx_rb, idx)) + if (ssm_rbuff_write(f->tx_rb, idx)) #endif goto fail; - shm_flow_set_notify(f->set, f->info.id, FLOW_PKT); + ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT); return; fail: - ipcp_sdb_release(sdb); + ipcp_spb_release(spb); return; } @@ -479,11 +479,11 @@ static void frcti_setflags(struct frcti * frcti, #define frcti_queued_pdu(frcti) \ (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) -#define frcti_snd(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) +#define frcti_snd(frcti, spb) \ + (frcti == NULL ? 0 : __frcti_snd(frcti, spb)) -#define frcti_rcv(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) +#define frcti_rcv(frcti, spb) \ + (frcti == NULL ? 0 : __frcti_rcv(frcti, spb)) #define frcti_dealloc(frcti) \ (frcti == NULL ? 0 : __frcti_dealloc(frcti)) @@ -683,7 +683,7 @@ static time_t __frcti_dealloc(struct frcti * frcti) } static int __frcti_snd(struct frcti * frcti, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct frct_pci * pci; struct timespec now; @@ -693,14 +693,14 @@ static int __frcti_snd(struct frcti * frcti, bool rtx; assert(frcti); - assert(shm_du_buff_len(sdb) != 0); + assert(ssm_pk_buff_len(spb) != 0); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; timerwheel_move(); - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); + pci = (struct frct_pci *) ssm_pk_buff_head_alloc(spb, FRCT_PCILEN); if (pci == NULL) return -ENOMEM; @@ -759,7 +759,7 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); if (rtx) - timerwheel_rxm(frcti, seqno, sdb); + timerwheel_rxm(frcti, seqno, spb); return 0; } @@ -793,7 +793,7 @@ static void rtt_estimator(struct frcti * frcti, /* Always queues the next application packet on the RQ. */ static void __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { ssize_t idx; size_t pos; @@ -813,9 +813,9 @@ static void __frcti_rcv(struct frcti * frcti, clock_gettime(PTHREAD_COND_CLOCK, &now); - pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); + pci = (struct frct_pci *) ssm_pk_buff_head_release(spb, FRCT_PCILEN); - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); seqno = ntoh32(pci->seqno); pos = seqno & (RQ_SIZE - 1); @@ -841,7 +841,7 @@ static void __frcti_rcv(struct frcti * frcti, __send_frct_pkt(fd, FRCT_FC, 0, rwe); - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(ai.gspp, idx); return; } @@ -928,7 +928,7 @@ static void __frcti_rcv(struct frcti * frcti, drop_packet: pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); + ssm_pool_remove(ai.gspp, idx); send_frct_pkt(frcti); return; } diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c deleted file mode 100644 index 5cdeba9e..00000000 --- a/src/lib/shm_rdrbuff.c +++ /dev/null @@ -1,617 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2024 - * - * Random Deletion Ring Buffer for Data Units - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200809L - -#include "config.h" - -#include <ouroboros/errno.h> -#include <ouroboros/pthread.h> -#include <ouroboros/shm_rdrbuff.h> - -#include <assert.h> -#include <fcntl.h> -#include <signal.h> -#include <stdbool.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <sys/mman.h> -#include <sys/stat.h> - -#define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t) \ - + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ - + sizeof(pid_t)) -#define DU_BUFF_OVERHEAD (DU_BUFF_HEADSPACE + DU_BUFF_TAILSPACE) - -#define get_head_ptr(rdrb) \ - idx_to_du_buff_ptr(rdrb, *rdrb->head) - -#define get_tail_ptr(rdrb) \ - idx_to_du_buff_ptr(rdrb, *rdrb->tail) - -#define idx_to_du_buff_ptr(rdrb, idx) \ - ((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE)) - -#define shm_rdrb_used(rdrb) \ - (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1) \ - & ((SHM_BUFFER_SIZE) - 1)) - -#define shm_rdrb_free(rdrb, i) \ - (shm_rdrb_used(rdrb) + i < (SHM_BUFFER_SIZE)) - -#define shm_rdrb_empty(rdrb) \ - (*rdrb->tail == *rdrb->head) - -struct shm_du_buff { - size_t size; -#ifdef SHM_RDRB_MULTI_BLOCK - size_t blocks; -#endif - size_t du_head; - size_t du_tail; - size_t refs; - size_t idx; -}; - -struct shm_rdrbuff { - uint8_t * shm_base; /* start of blocks */ - size_t * head; /* start of ringbuffer head */ - size_t * tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * healthy; /* flag when packet is read */ - pid_t * pid; /* pid of the irmd owner */ -}; - -static void garbage_collect(struct shm_rdrbuff * rdrb) -{ -#ifdef SHM_RDRB_MULTI_BLOCK - struct shm_du_buff * sdb; - while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->refs == 0) - *rdrb->tail = (*rdrb->tail + sdb->blocks) - & ((SHM_BUFFER_SIZE) - 1); -#else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0) - *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); -#endif - pthread_cond_broadcast(rdrb->healthy); -} - -#ifdef HAVE_ROBUST_MUTEX -static void sanitize(struct shm_rdrbuff * rdrb) -{ - --get_head_ptr(rdrb)->refs; - garbage_collect(rdrb); - pthread_mutex_consistent(rdrb->lock); -} -#endif - -static char * rdrb_filename(void) -{ - char * str; - - str = malloc(strlen(SHM_RDRB_NAME) + 1); - if (str == NULL) - return NULL; - - sprintf(str, "%s", SHM_RDRB_NAME); - - return str; -} - -void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) -{ - assert(rdrb); - - munmap(rdrb->shm_base, SHM_FILE_SIZE); - free(rdrb); -} - -void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) -{ - char * shm_rdrb_fn; - - assert(rdrb); - - if (getpid() != *rdrb->pid && kill(*rdrb->pid, 0) == 0) { - free(rdrb); - return; - } - - shm_rdrbuff_close(rdrb); - - shm_rdrb_fn = rdrb_filename(); - if (shm_rdrb_fn == NULL) - return; - - shm_unlink(shm_rdrb_fn); - free(shm_rdrb_fn); -} - -#define MM_FLAGS (PROT_READ | PROT_WRITE) - -static struct shm_rdrbuff * rdrb_create(int flags) -{ - struct shm_rdrbuff * rdrb; - int fd; - uint8_t * shm_base; - char * shm_rdrb_fn; - - shm_rdrb_fn = rdrb_filename(); - if (shm_rdrb_fn == NULL) - goto fail_fn; - - rdrb = malloc(sizeof *rdrb); - if (rdrb == NULL) - goto fail_rdrb; - - fd = shm_open(shm_rdrb_fn, flags, 0666); - if (fd == -1) - goto fail_open; - - if ((flags & O_CREAT) && ftruncate(fd, SHM_FILE_SIZE) < 0) - goto fail_truncate; - - shm_base = mmap(NULL, SHM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0); - if (shm_base == MAP_FAILED) - goto fail_truncate; - - close(fd); - - rdrb->shm_base = shm_base; - rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); - rdrb->tail = rdrb->head + 1; - rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); - rdrb->healthy = (pthread_cond_t *) (rdrb->lock + 1); - rdrb->pid = (pid_t *) (rdrb->healthy + 1); - - free(shm_rdrb_fn); - - return rdrb; - - fail_truncate: - close(fd); - if (flags & O_CREAT) - shm_unlink(shm_rdrb_fn); - fail_open: - free(rdrb); - fail_rdrb: - free(shm_rdrb_fn); - fail_fn: - return NULL; -} - -struct shm_rdrbuff * shm_rdrbuff_create(void) -{ - struct shm_rdrbuff * rdrb; - mode_t mask; - pthread_mutexattr_t mattr; - pthread_condattr_t cattr; - - mask = umask(0); - - rdrb = rdrb_create(O_CREAT | O_EXCL | O_RDWR); - - umask(mask); - - if (rdrb == NULL) - goto fail_rdrb; - - if (pthread_mutexattr_init(&mattr)) - goto fail_mattr; - - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -#ifdef HAVE_ROBUST_MUTEX - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -#endif - if (pthread_mutex_init(rdrb->lock, &mattr)) - goto fail_mutex; - - if (pthread_condattr_init(&cattr)) - goto fail_cattr; - - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(rdrb->healthy, &cattr)) - goto fail_healthy; - - *rdrb->head = 0; - *rdrb->tail = 0; - - *rdrb->pid = getpid(); - - pthread_mutexattr_destroy(&mattr); - pthread_condattr_destroy(&cattr); - - return rdrb; - - fail_healthy: - pthread_condattr_destroy(&cattr); - fail_cattr: - pthread_mutex_destroy(rdrb->lock); - fail_mutex: - pthread_mutexattr_destroy(&mattr); - fail_mattr: - shm_rdrbuff_destroy(rdrb); - fail_rdrb: - return NULL; -} - -struct shm_rdrbuff * shm_rdrbuff_open(void) -{ - return rdrb_create(O_RDWR); -} - -void shm_rdrbuff_purge(void) -{ - char * shm_rdrb_fn; - - shm_rdrb_fn = rdrb_filename(); - if (shm_rdrb_fn == NULL) - return; - - shm_unlink(shm_rdrb_fn); - free(shm_rdrb_fn); -} - -int shm_rdrbuff_mlock(struct shm_rdrbuff * rdrb) -{ - assert(rdrb != NULL); - - return mlock(rdrb->shm_base, SHM_FILE_SIZE); -} - -ssize_t shm_rdrbuff_alloc(struct shm_rdrbuff * rdrb, - size_t len, - uint8_t ** ptr, - struct shm_du_buff ** psdb) -{ - struct shm_du_buff * sdb; - size_t size = DU_BUFF_OVERHEAD + len; -#ifdef SHM_RDRB_MULTI_BLOCK - size_t blocks = 0; - size_t padblocks = 0; -#endif - ssize_t sz = size + sizeof(*sdb); - - assert(rdrb); - assert(psdb); - -#ifndef SHM_RDRB_MULTI_BLOCK - if (sz > SHM_RDRB_BLOCK_SIZE) - return -EMSGSIZE; -#else - while (sz > 0) { - sz -= SHM_RDRB_BLOCK_SIZE; - ++blocks; - } -#endif -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rdrb->lock); -#else - if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - sanitize(rdrb); -#endif -#ifdef SHM_RDRB_MULTI_BLOCK - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; - - if (!shm_rdrb_free(rdrb, blocks + padblocks)) { -#else - if (!shm_rdrb_free(rdrb, 1)) { -#endif - pthread_mutex_unlock(rdrb->lock); - return -EAGAIN; - } - -#ifdef SHM_RDRB_MULTI_BLOCK - if (padblocks) { - sdb = get_head_ptr(rdrb); - sdb->size = 0; - sdb->blocks = padblocks; - sdb->refs = 0; - sdb->du_head = 0; - sdb->du_tail = 0; - sdb->idx = *rdrb->head; - - *rdrb->head = 0; - } -#endif - sdb = get_head_ptr(rdrb); - sdb->refs = 1; - sdb->idx = *rdrb->head; -#ifdef SHM_RDRB_MULTI_BLOCK - sdb->blocks = blocks; - - *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); -#else - *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); -#endif - pthread_mutex_unlock(rdrb->lock); - - sdb->size = size; - sdb->du_head = DU_BUFF_HEADSPACE; - sdb->du_tail = sdb->du_head + len; - - *psdb = sdb; - if (ptr != NULL) - *ptr = (uint8_t *) (sdb + 1) + sdb->du_head; - - return sdb->idx; -} - -ssize_t shm_rdrbuff_alloc_b(struct shm_rdrbuff * rdrb, - size_t len, - uint8_t ** ptr, - struct shm_du_buff ** psdb, - const struct timespec * abstime) -{ - struct shm_du_buff * sdb; - size_t size = DU_BUFF_OVERHEAD + len; -#ifdef SHM_RDRB_MULTI_BLOCK - size_t blocks = 0; - size_t padblocks = 0; -#endif - ssize_t sz = size + sizeof(*sdb); - int ret = 0; - - assert(rdrb); - assert(psdb); - -#ifndef SHM_RDRB_MULTI_BLOCK - if (sz > SHM_RDRB_BLOCK_SIZE) - return -EMSGSIZE; -#else - while (sz > 0) { - sz -= SHM_RDRB_BLOCK_SIZE; - ++blocks; - } -#endif -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rdrb->lock); -#else - if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - sanitize(rdrb); -#endif - pthread_cleanup_push(__cleanup_mutex_unlock, rdrb->lock); - -#ifdef SHM_RDRB_MULTI_BLOCK - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; - - while (!shm_rdrb_free(rdrb, blocks + padblocks) && ret != ETIMEDOUT) { -#else - while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) { -#endif - ret = __timedwait(rdrb->healthy, rdrb->lock, abstime); -#ifdef SHM_RDRB_MULTI_BLOCK - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) - padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; -#endif - } - - if (ret != ETIMEDOUT) { -#ifdef SHM_RDRB_MULTI_BLOCK - if (padblocks) { - sdb = get_head_ptr(rdrb); - sdb->size = 0; - sdb->blocks = padblocks; - sdb->refs = 0; - sdb->du_head = 0; - sdb->du_tail = 0; - sdb->idx = *rdrb->head; - - *rdrb->head = 0; - } -#endif - sdb = get_head_ptr(rdrb); - sdb->refs = 1; - sdb->idx = *rdrb->head; -#ifdef SHM_RDRB_MULTI_BLOCK - sdb->blocks = blocks; - - *rdrb->head = (*rdrb->head + blocks) & ((SHM_BUFFER_SIZE) - 1); -#else - *rdrb->head = (*rdrb->head + 1) & ((SHM_BUFFER_SIZE) - 1); -#endif - } - - pthread_cleanup_pop(true); - - if (ret == ETIMEDOUT) - return -ETIMEDOUT; - - sdb->size = size; - sdb->du_head = DU_BUFF_HEADSPACE; - sdb->du_tail = sdb->du_head + len; - - *psdb = sdb; - if (ptr != NULL) - *ptr = (uint8_t *) (sdb + 1) + sdb->du_head; - - return sdb->idx; -} - -ssize_t shm_rdrbuff_read(uint8_t ** dst, - struct shm_rdrbuff * rdrb, - size_t idx) -{ - struct shm_du_buff * sdb; - - assert(dst); - assert(rdrb); - assert(idx < (SHM_BUFFER_SIZE)); - - sdb = idx_to_du_buff_ptr(rdrb, idx); - *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; - - return (ssize_t) (sdb->du_tail - sdb->du_head); -} - -struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, - size_t idx) -{ - assert(rdrb); - assert(idx < (SHM_BUFFER_SIZE)); - - return idx_to_du_buff_ptr(rdrb, idx); -} - -int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, - size_t idx) -{ - struct shm_du_buff * sdb; - - assert(rdrb); - assert(idx < (SHM_BUFFER_SIZE)); - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rdrb->lock); -#else - if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - sanitize(rdrb); -#endif - /* assert(!shm_rdrb_empty(rdrb)); */ - - sdb = idx_to_du_buff_ptr(rdrb, idx); - - if (sdb->refs == 1) { /* only stack needs it, can be removed */ - sdb->refs = 0; - if (idx == *rdrb->tail) - garbage_collect(rdrb); - } - - pthread_mutex_unlock(rdrb->lock); - - return 0; -} - -size_t shm_du_buff_get_idx(struct shm_du_buff * sdb) -{ - assert(sdb); - - return sdb->idx; -} - -uint8_t * shm_du_buff_head(struct shm_du_buff * sdb) -{ - assert(sdb); - - return (uint8_t *) (sdb + 1) + sdb->du_head; -} - -uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb) -{ - assert(sdb); - - return (uint8_t *) (sdb + 1) + sdb->du_tail; -} - -size_t shm_du_buff_len(struct shm_du_buff * sdb) -{ - assert(sdb); - - return sdb->du_tail - sdb->du_head; -} - -uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, - size_t size) -{ - assert(sdb); - - if (sdb->du_head < size) - return NULL; - - sdb->du_head -= size; - - return (uint8_t *) (sdb + 1) + sdb->du_head; -} - -uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, - size_t size) -{ - uint8_t * buf; - - assert(sdb); - - if (sdb->du_tail + size >= sdb->size) - return NULL; - - buf = (uint8_t *) (sdb + 1) + sdb->du_tail; - - sdb->du_tail += size; - - return buf; -} - -uint8_t * shm_du_buff_head_release(struct shm_du_buff * sdb, - size_t size) -{ - uint8_t * buf; - - assert(sdb); - assert(!(size > sdb->du_tail - sdb->du_head)); - - buf = (uint8_t *) (sdb + 1) + sdb->du_head; - - sdb->du_head += size; - - return buf; -} - -uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size) -{ - assert(sdb); - assert(!(size > sdb->du_tail - sdb->du_head)); - - sdb->du_tail -= size; - - return (uint8_t *) (sdb + 1) + sdb->du_tail; -} - -void shm_du_buff_truncate(struct shm_du_buff * sdb, - size_t len) -{ - assert(sdb); - assert(len <= sdb->size); - - sdb->du_tail = sdb->du_head + len; -} - -int shm_du_buff_wait_ack(struct shm_du_buff * sdb) -{ - __sync_add_and_fetch(&sdb->refs, 1); - - return 0; -} - -int shm_du_buff_ack(struct shm_du_buff * sdb) -{ - __sync_sub_and_fetch(&sdb->refs, 1); - return 0; -} diff --git a/src/lib/shm_flow_set.c b/src/lib/ssm/flow_set.c index 39913fd1..ab24d357 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/ssm/flow_set.c @@ -23,11 +23,12 @@ #define _POSIX_C_SOURCE 200809L #include "config.h" +#include "ssm.h" #include <ouroboros/errno.h> #include <ouroboros/lockfile.h> #include <ouroboros/pthread.h> -#include <ouroboros/shm_flow_set.h> +#include <ouroboros/ssm_flow_set.h> #include <ouroboros/time.h> #include <assert.h> @@ -54,17 +55,17 @@ #define FN_MAX_CHARS 255 #define FS_PROT (PROT_READ | PROT_WRITE) -#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct flowevent)) +#define QUEUESIZE ((SSM_RBUFF_SIZE) * sizeof(struct flowevent)) -#define SHM_FSET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \ +#define SSM_FSET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \ + PROG_MAX_FQUEUES * sizeof(size_t) \ + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \ + PROG_MAX_FQUEUES * QUEUESIZE \ + sizeof(pthread_mutex_t)) -#define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx) +#define fqueue_ptr(fs, idx) (fs->fqueues + (SSM_RBUFF_SIZE) * idx) -struct shm_flow_set { +struct ssm_flow_set { ssize_t * mtable; size_t * heads; pthread_cond_t * conds; @@ -74,15 +75,15 @@ struct shm_flow_set { pid_t pid; }; -static struct shm_flow_set * flow_set_create(pid_t pid, +static struct ssm_flow_set * flow_set_create(pid_t pid, int oflags) { - struct shm_flow_set * set; + struct ssm_flow_set * set; ssize_t * shm_base; char fn[FN_MAX_CHARS]; int fd; - sprintf(fn, SHM_FLOW_SET_PREFIX "%d", pid); + sprintf(fn, SSM_FLOW_SET_PREFIX "%d", pid); set = malloc(sizeof(*set)); if (set == NULL) @@ -92,10 +93,10 @@ static struct shm_flow_set * flow_set_create(pid_t pid, if (fd == -1) goto fail_shm_open; - if ((oflags & O_CREAT) && ftruncate(fd, SHM_FSET_FILE_SIZE) < 0) + if ((oflags & O_CREAT) && ftruncate(fd, SSM_FSET_FILE_SIZE) < 0) goto fail_truncate; - shm_base = mmap(NULL, SHM_FSET_FILE_SIZE, FS_PROT, MAP_SHARED, fd, 0); + shm_base = mmap(NULL, SSM_FSET_FILE_SIZE, FS_PROT, MAP_SHARED, fd, 0); if (shm_base == MAP_FAILED) goto fail_mmap; @@ -106,7 +107,7 @@ static struct shm_flow_set * flow_set_create(pid_t pid, set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); set->fqueues = (struct flowevent *) (set->conds + PROG_MAX_FQUEUES); set->lock = (pthread_mutex_t *) - (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE)); + (set->fqueues + PROG_MAX_FQUEUES * (SSM_RBUFF_SIZE)); return set; @@ -121,9 +122,9 @@ static struct shm_flow_set * flow_set_create(pid_t pid, return NULL; } -struct shm_flow_set * shm_flow_set_create(pid_t pid) +struct ssm_flow_set * ssm_flow_set_create(pid_t pid) { - struct shm_flow_set * set; + struct ssm_flow_set * set; pthread_mutexattr_t mattr; pthread_condattr_t cattr; mode_t mask; @@ -184,38 +185,38 @@ struct shm_flow_set * shm_flow_set_create(pid_t pid) fail_mattr_set: pthread_mutexattr_destroy(&mattr); fail_mutexattr_init: - shm_flow_set_destroy(set); + ssm_flow_set_destroy(set); fail_set: return NULL; } -struct shm_flow_set * shm_flow_set_open(pid_t pid) +struct ssm_flow_set * ssm_flow_set_open(pid_t pid) { return flow_set_create(pid, O_RDWR); } -void shm_flow_set_destroy(struct shm_flow_set * set) +void ssm_flow_set_destroy(struct ssm_flow_set * set) { char fn[FN_MAX_CHARS]; assert(set); - sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->pid); + sprintf(fn, SSM_FLOW_SET_PREFIX "%d", set->pid); - shm_flow_set_close(set); + ssm_flow_set_close(set); shm_unlink(fn); } -void shm_flow_set_close(struct shm_flow_set * set) +void ssm_flow_set_close(struct ssm_flow_set * set) { assert(set); - munmap(set->mtable, SHM_FSET_FILE_SIZE); + munmap(set->mtable, SSM_FSET_FILE_SIZE); free(set); } -void shm_flow_set_zero(struct shm_flow_set * set, +void ssm_flow_set_zero(struct ssm_flow_set * set, size_t idx) { ssize_t i = 0; @@ -235,7 +236,7 @@ void shm_flow_set_zero(struct shm_flow_set * set, } -int shm_flow_set_add(struct shm_flow_set * set, +int ssm_flow_set_add(struct ssm_flow_set * set, size_t idx, int flow_id) { @@ -257,7 +258,7 @@ int shm_flow_set_add(struct shm_flow_set * set, return 0; } -void shm_flow_set_del(struct shm_flow_set * set, +void ssm_flow_set_del(struct ssm_flow_set * set, size_t idx, int flow_id) { @@ -273,7 +274,7 @@ void shm_flow_set_del(struct shm_flow_set * set, pthread_mutex_unlock(set->lock); } -int shm_flow_set_has(struct shm_flow_set * set, +int ssm_flow_set_has(struct ssm_flow_set * set, size_t idx, int flow_id) { @@ -293,7 +294,7 @@ int shm_flow_set_has(struct shm_flow_set * set, return ret; } -void shm_flow_set_notify(struct shm_flow_set * set, +void ssm_flow_set_notify(struct ssm_flow_set * set, int flow_id, int event) { @@ -323,7 +324,7 @@ void shm_flow_set_notify(struct shm_flow_set * set, } -ssize_t shm_flow_set_wait(const struct shm_flow_set * set, +ssize_t ssm_flow_set_wait(const struct ssm_flow_set * set, size_t idx, struct flowevent * fqueue, const struct timespec * abstime) diff --git a/src/lib/ssm/pool.c b/src/lib/ssm/pool.c new file mode 100644 index 00000000..b8cfe3a1 --- /dev/null +++ b/src/lib/ssm/pool.c @@ -0,0 +1,882 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Secure Shared Memory Infrastructure (SSMI) Packet Buffer + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/pthread.h> +#include <ouroboros/ssm_pool.h> + +#include "ssm.h" + +#include <assert.h> +#include <fcntl.h> +#include <signal.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/stat.h> + +/* Size class configuration from CMake */ +static const struct ssm_size_class_cfg ssm_sc_cfg[SSM_POOL_MAX_CLASSES] = { + { (1 << 8), SSM_POOL_256_BLOCKS }, + { (1 << 9), SSM_POOL_512_BLOCKS }, + { (1 << 10), SSM_POOL_1K_BLOCKS }, + { (1 << 11), SSM_POOL_2K_BLOCKS }, + { (1 << 12), SSM_POOL_4K_BLOCKS }, + { (1 << 14), SSM_POOL_16K_BLOCKS }, + { (1 << 16), SSM_POOL_64K_BLOCKS }, + { (1 << 18), SSM_POOL_256K_BLOCKS }, + { (1 << 20), SSM_POOL_1M_BLOCKS }, +}; + +#define PTR_TO_OFFSET(pool_base, ptr) \ + ((uintptr_t)(ptr) - (uintptr_t)(pool_base)) + +#define OFFSET_TO_PTR(pool_base, offset) \ + ((offset == 0) ? NULL : (void *)((uintptr_t)(pool_base) + offset)) + +#define GET_SHARD_FOR_PID(pid) ((int)((pid) % SSM_POOL_SHARDS)) + +#define LOAD_RELAXED(ptr) \ + (__atomic_load_n(ptr, __ATOMIC_RELAXED)) + +#define LOAD_ACQUIRE(ptr) \ + (__atomic_load_n(ptr, __ATOMIC_ACQUIRE)) + +#define STORE_RELEASE(ptr, val) \ + (__atomic_store_n(ptr, val, __ATOMIC_RELEASE)) + +#define LOAD(ptr) \ + (__atomic_load_n(ptr, __ATOMIC_SEQ_CST)) + +#define STORE(ptr, val) \ + (__atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)) + +#define FETCH_ADD(ptr, val) \ + (__atomic_fetch_add(ptr, val, __ATOMIC_SEQ_CST)) + +#define FETCH_SUB(ptr, val) \ + (__atomic_fetch_sub(ptr, val, __ATOMIC_SEQ_CST)) + +#define CAS(ptr, expected, desired) \ + (__atomic_compare_exchange_n(ptr, expected, desired, false, \ + __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + +#define SSM_FILE_SIZE (SSM_POOL_TOTAL_SIZE + sizeof(struct _ssm_pool_hdr)) + +struct ssm_pool { + uint8_t * shm_base; /* start of blocks */ + struct _ssm_pool_hdr * hdr; /* shared memory header */ + void * pool_base; /* base of the memory pool */ +}; + +static __inline__ +struct ssm_pk_buff * list_remove_head(struct _ssm_list_head * head, + void * base) +{ + uint32_t off; + uint32_t next_off; + struct ssm_pk_buff * blk; + + assert(head != NULL); + assert(base != NULL); + + off = LOAD(&head->head_offset); + if (off == 0) + return NULL; + + /* Validate offset is within pool bounds */ + if (off >= SSM_POOL_TOTAL_SIZE) + return NULL; + + blk = OFFSET_TO_PTR(base, off); + next_off = LOAD(&blk->next_offset); + + + + STORE(&head->head_offset, next_off); + STORE(&head->count, LOAD(&head->count) - 1); + + return blk; +} +static __inline__ void list_add_head(struct _ssm_list_head * head, + struct ssm_pk_buff * blk, + void * base) +{ + uint32_t off; + uint32_t old; + + assert(head != NULL); + assert(blk != NULL); + assert(base != NULL); + + off = (uint32_t) PTR_TO_OFFSET(base, blk); + old = LOAD(&head->head_offset); + + STORE(&blk->next_offset, old); + STORE(&head->head_offset, off); + STORE(&head->count, LOAD(&head->count) + 1); +} + +static __inline__ int select_size_class(size_t len) +{ + size_t sz; + int i; + + /* Total space needed: header + headspace + data + tailspace */ + sz = sizeof(struct ssm_pk_buff) + SSM_PK_BUFF_HEADSPACE + len + + SSM_PK_BUFF_TAILSPACE; + + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (ssm_sc_cfg[i].blocks > 0 && sz <= ssm_sc_cfg[i].size) + return i; + } + + return -1; +} + +static __inline__ int find_size_class_for_offset(struct ssm_pool * pool, + size_t offset) +{ + int c; + + assert(pool != NULL); + + for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) { + struct _ssm_size_class * sc = &pool->hdr->size_classes[c]; + + if (sc->object_size == 0) + continue; + + if (offset >= sc->pool_start && + offset < sc->pool_start + sc->pool_size) + return c; + } + + return -1; +} + +static void init_size_classes(struct ssm_pool * pool) +{ + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + uint8_t * region; + size_t offset; + int c; + int s; + size_t i; + + assert(pool != NULL); + + /* Check if already initialized */ + if (LOAD(&pool->hdr->initialized) != 0) + return; + + pthread_mutexattr_init(&mattr); + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +#ifdef HAVE_ROBUST_MUTEX + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setprotocol(&mattr, PTHREAD_PRIO_INHERIT); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + offset = 0; + + for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) { + if (ssm_sc_cfg[c].blocks == 0) + continue; + + sc = &pool->hdr->size_classes[c]; + + sc->object_size = ssm_sc_cfg[c].size; + sc->pool_start = offset; + sc->pool_size = ssm_sc_cfg[c].size * ssm_sc_cfg[c].blocks; + sc->object_count = ssm_sc_cfg[c].blocks; + + /* Initialize all shards */ + for (s = 0; s < SSM_POOL_SHARDS; s++) { + shard = &sc->shards[s]; + + STORE(&shard->free_list.head_offset, 0); + STORE(&shard->free_list.count, 0); + STORE(&shard->free_count, 0); + + pthread_mutex_init(&shard->mtx, &mattr); + pthread_cond_init(&shard->cond, &cattr); + } + + /* Lazy distribution: put all blocks in shard 0 initially */ + region = pool->shm_base + offset; + + for (i = 0; i < sc->object_count; ++i) { + struct ssm_pk_buff * blk; + + blk = (struct ssm_pk_buff *) + (region + i * sc->object_size); + + STORE(&blk->refcount, 0); + blk->allocator_pid = 0; + STORE(&blk->next_offset, 0); + + list_add_head(&sc->shards[0].free_list, blk, + pool->pool_base); + FETCH_ADD(&sc->shards[0].free_count, 1); + } + + offset += sc->pool_size; + } + + /* Mark as initialized - acts as memory barrier */ + STORE(&pool->hdr->initialized, 1); + + pthread_mutexattr_destroy(&mattr); + pthread_condattr_destroy(&cattr); +} + +/* + * Reclaim all blocks allocated by a specific pid in a size class. + * Called with shard mutex held. + */ +static size_t reclaim_pid_from_sc(struct _ssm_size_class * sc, + struct _ssm_shard * shard, + void * pool_base, + pid_t pid) +{ + uint8_t * region; + size_t i; + size_t recovered = 0; + struct ssm_pk_buff * blk; + + region = (uint8_t *) pool_base + sc->pool_start; + + for (i = 0; i < sc->object_count; ++i) { + blk = (struct ssm_pk_buff *)(region + i * sc->object_size); + + if (blk->allocator_pid == pid && LOAD(&blk->refcount) > 0) { + STORE(&blk->refcount, 0); + blk->allocator_pid = 0; + list_add_head(&shard->free_list, blk, pool_base); + FETCH_ADD(&shard->free_count, 1); + recovered++; + } + } + + return recovered; +} + +void ssm_pool_reclaim_orphans(struct ssm_pool * pool, + pid_t pid) +{ + size_t sc_idx; + + if (pool == NULL || pid <= 0) + return; + + for (sc_idx = 0; sc_idx < SSM_POOL_MAX_CLASSES; sc_idx++) { + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + + sc = &pool->hdr->size_classes[sc_idx]; + if (sc->object_count == 0) + continue; + + /* Reclaim to shard 0 for simplicity */ + shard = &sc->shards[0]; + robust_mutex_lock(&shard->mtx); + reclaim_pid_from_sc(sc, shard, pool->pool_base, pid); + pthread_mutex_unlock(&shard->mtx); + } +} + +static __inline__ +struct ssm_pk_buff * try_alloc_from_shard(struct _ssm_shard * shard, + void * base) +{ + struct ssm_pk_buff * blk; + + robust_mutex_lock(&shard->mtx); + + if (LOAD(&shard->free_count) > 0) { + blk = list_remove_head(&shard->free_list, base); + if (blk != NULL) { + FETCH_SUB(&shard->free_count, 1); + return blk; /* Caller must unlock */ + } + FETCH_SUB(&shard->free_count, 1); + } + + pthread_mutex_unlock(&shard->mtx); + return NULL; +} + +static __inline__ ssize_t init_block(struct ssm_pool * pool, + struct _ssm_size_class * sc, + struct _ssm_shard * shard, + struct ssm_pk_buff * blk, + size_t len, + uint8_t ** ptr, + struct ssm_pk_buff ** spb) +{ + STORE(&blk->refcount, 1); + blk->allocator_pid = getpid(); + blk->size = (uint32_t) (sc->object_size - + sizeof(struct ssm_pk_buff)); + blk->pk_head = SSM_PK_BUFF_HEADSPACE; + blk->pk_tail = blk->pk_head + (uint32_t) len; + blk->off = (uint32_t) PTR_TO_OFFSET(pool->pool_base, blk); + + pthread_mutex_unlock(&shard->mtx); + + *spb = blk; + if (ptr != NULL) + *ptr = blk->data + blk->pk_head; + + return blk->off; +} + +/* Non-blocking allocation from size class */ +static ssize_t alloc_from_sc(struct ssm_pool * pool, + int idx, + size_t len, + uint8_t ** ptr, + struct ssm_pk_buff ** spb) +{ + struct _ssm_size_class * sc; + struct ssm_pk_buff * blk; + int local; + int s; + + assert(pool != NULL); + assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES); + assert(spb != NULL); + + sc = &pool->hdr->size_classes[idx]; + local = GET_SHARD_FOR_PID(getpid()); + + for (s = 0; s < SSM_POOL_SHARDS; s++) { + struct _ssm_shard * shard; + int idx; + + idx = (local + s) % SSM_POOL_SHARDS; + shard = &sc->shards[idx]; + + blk = try_alloc_from_shard(shard, pool->pool_base); + if (blk != NULL) + return init_block(pool, sc, shard, blk, len, ptr, spb); + } + + return -EAGAIN; +} + +/* Blocking allocation from size class */ +static ssize_t alloc_from_sc_b(struct ssm_pool * pool, + int idx, + size_t len, + uint8_t ** ptr, + struct ssm_pk_buff ** spb, + const struct timespec * abstime) +{ + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + struct ssm_pk_buff * blk = NULL; + int local; + int s; + int ret = 0; + + assert(pool != NULL); + assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES); + assert(spb != NULL); + + sc = &pool->hdr->size_classes[idx]; + local = GET_SHARD_FOR_PID(getpid()); + + while (blk == NULL && ret != ETIMEDOUT) { + /* Try non-blocking allocation from any shard */ + for (s = 0; s < SSM_POOL_SHARDS && blk == NULL; s++) { + shard = &sc->shards[(local + s) % SSM_POOL_SHARDS]; + blk = try_alloc_from_shard(shard, pool->pool_base); + } + + if (blk != NULL) + break; + + /* Nothing available, wait for signal */ + shard = &sc->shards[local]; + robust_mutex_lock(&shard->mtx); + ret = robust_wait(&shard->cond, &shard->mtx, abstime); + pthread_mutex_unlock(&shard->mtx); + } + + if (ret == ETIMEDOUT) + return -ETIMEDOUT; + + return init_block(pool, sc, shard, blk, len, ptr, spb); +} + +/* Global Shared Packet Pool */ +static char * gspp_filename(void) +{ + char * str; + char * test_suffix; + + test_suffix = getenv("OUROBOROS_TEST_POOL_SUFFIX"); + if (test_suffix != NULL) { + str = malloc(strlen(SSM_POOL_NAME) + strlen(test_suffix) + 1); + if (str == NULL) + return NULL; + sprintf(str, "%s%s", SSM_POOL_NAME, test_suffix); + } else { + str = malloc(strlen(SSM_POOL_NAME) + 1); + if (str == NULL) + return NULL; + sprintf(str, "%s", SSM_POOL_NAME); + } + + return str; +} + +void ssm_pool_close(struct ssm_pool * pool) +{ + assert(pool != NULL); + + munmap(pool->shm_base, SSM_FILE_SIZE); + free(pool); +} + +void ssm_pool_destroy(struct ssm_pool * pool) +{ + char * fn; + + assert(pool != NULL); + + if (getpid() != pool->hdr->pid && kill(pool->hdr->pid, 0) == 0) { + free(pool); + return; + } + + ssm_pool_close(pool); + + fn = gspp_filename(); + if (fn == NULL) + return; + + shm_unlink(fn); + free(fn); +} + +#define MM_FLAGS (PROT_READ | PROT_WRITE) + +static struct ssm_pool * pool_create(int flags) +{ + struct ssm_pool * pool; + int fd; + uint8_t * shm_base; + char * fn; + + fn = gspp_filename(); + if (fn == NULL) + goto fail_fn; + + pool = malloc(sizeof *pool); + if (pool == NULL) + goto fail_rdrb; + + fd = shm_open(fn, flags, 0666); + if (fd == -1) + goto fail_open; + + if ((flags & O_CREAT) && ftruncate(fd, SSM_FILE_SIZE) < 0) + goto fail_truncate; + + shm_base = mmap(NULL, SSM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0); + if (shm_base == MAP_FAILED) + goto fail_truncate; + + pool->shm_base = shm_base; + pool->pool_base = shm_base; + pool->hdr = (struct _ssm_pool_hdr *) (shm_base + SSM_POOL_TOTAL_SIZE); + + if (flags & O_CREAT) + pool->hdr->mapped_addr = shm_base; + + close(fd); + + free(fn); + + return pool; + + fail_truncate: + close(fd); + if (flags & O_CREAT) + shm_unlink(fn); + fail_open: + free(pool); + fail_rdrb: + free(fn); + fail_fn: + return NULL; +} + +struct ssm_pool * ssm_pool_create(void) +{ + struct ssm_pool * pool; + mode_t mask; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + + mask = umask(0); + + pool = pool_create(O_CREAT | O_EXCL | O_RDWR); + + umask(mask); + + if (pool == NULL) + goto fail_rdrb; + + if (pthread_mutexattr_init(&mattr)) + goto fail_mattr; + + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +#ifdef HAVE_ROBUST_MUTEX + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + if (pthread_mutex_init(&pool->hdr->mtx, &mattr)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&pool->hdr->healthy, &cattr)) + goto fail_healthy; + + pool->hdr->pid = getpid(); + /* Will be set by init_size_classes */ + STORE(&pool->hdr->initialized, 0); + + init_size_classes(pool); + + pthread_mutexattr_destroy(&mattr); + pthread_condattr_destroy(&cattr); + + return pool; + + fail_healthy: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&pool->hdr->mtx); + fail_mutex: + pthread_mutexattr_destroy(&mattr); + fail_mattr: + ssm_pool_destroy(pool); + fail_rdrb: + return NULL; +} + +struct ssm_pool * ssm_pool_open(void) +{ + struct ssm_pool * pool; + + pool = pool_create(O_RDWR); + if (pool != NULL) + init_size_classes(pool); + + return pool; +} + +void ssm_pool_purge(void) +{ + char * fn; + + fn = gspp_filename(); + if (fn == NULL) + return; + + shm_unlink(fn); + free(fn); +} + +int ssm_pool_mlock(struct ssm_pool * pool) +{ + assert(pool != NULL); + + return mlock(pool->shm_base, SSM_FILE_SIZE); +} + +ssize_t ssm_pool_alloc(struct ssm_pool * pool, + size_t count, + uint8_t ** ptr, + struct ssm_pk_buff ** spb) +{ + int idx; + + assert(pool != NULL); + assert(spb != NULL); + + idx = select_size_class(count); + if (idx >= 0) + return alloc_from_sc(pool, idx, count, ptr, spb); + + return -EMSGSIZE; +} + +ssize_t ssm_pool_alloc_b(struct ssm_pool * pool, + size_t count, + uint8_t ** ptr, + struct ssm_pk_buff ** spb, + const struct timespec * abstime) +{ + int idx; + + assert(pool != NULL); + assert(spb != NULL); + + idx = select_size_class(count); + if (idx >= 0) + return alloc_from_sc_b(pool, idx, count, ptr, spb, abstime); + + return -EMSGSIZE; +} + +ssize_t ssm_pool_read(uint8_t ** dst, + struct ssm_pool * pool, + size_t off) +{ + struct ssm_pk_buff * blk; + + assert(dst != NULL); + assert(pool != NULL); + + blk = OFFSET_TO_PTR(pool->pool_base, off); + if (blk == NULL) + return -EINVAL; + + *dst = blk->data + blk->pk_head; + + return (ssize_t) (blk->pk_tail - blk->pk_head); +} + +struct ssm_pk_buff * ssm_pool_get(struct ssm_pool * pool, + size_t off) +{ + struct ssm_pk_buff * blk; + + assert(pool != NULL); + + if (off == 0 || off >= (size_t) SSM_POOL_TOTAL_SIZE) + return NULL; + + blk = OFFSET_TO_PTR(pool->pool_base, off); + if (blk == NULL) + return NULL; + + if (LOAD(&blk->refcount) == 0) + return NULL; + + return blk; +} + +int ssm_pool_remove(struct ssm_pool * pool, + size_t off) +{ + struct ssm_pk_buff * blk; + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + int sc_idx; + int shard_idx; + uint16_t old_ref; + + assert(pool != NULL); + + if (off == 0 || off >= SSM_POOL_TOTAL_SIZE) + return -EINVAL; + + blk = OFFSET_TO_PTR(pool->pool_base, off); + if (blk == NULL) + return -EINVAL; + + sc_idx = find_size_class_for_offset(pool, off); + if (sc_idx < 0) + return -EINVAL; + + sc = &pool->hdr->size_classes[sc_idx]; + + /* Free to allocator's shard (lazy distribution in action) */ + shard_idx = GET_SHARD_FOR_PID(blk->allocator_pid); + shard = &sc->shards[shard_idx]; + + robust_mutex_lock(&shard->mtx); + + old_ref = FETCH_SUB(&blk->refcount, 1); + if (old_ref > 1) { + /* Still referenced */ + pthread_mutex_unlock(&shard->mtx); + return 0; + } + + blk->allocator_pid = 0; +#ifdef CONFIG_OUROBOROS_DEBUG + if (old_ref == 0) { + /* Underflow - double free attempt */ + pthread_mutex_unlock(&shard->mtx); + abort(); + } + + /* Poison fields to detect use-after-free */ + blk->pk_head = 0xDEAD; + blk->pk_tail = 0xBEEF; +#endif + list_add_head(&shard->free_list, blk, pool->pool_base); + FETCH_ADD(&shard->free_count, 1); + + pthread_cond_signal(&shard->cond); + + pthread_mutex_unlock(&shard->mtx); + + return 0; +} + +size_t ssm_pk_buff_get_idx(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->off; +} + +uint8_t * ssm_pk_buff_head(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->data + spb->pk_head; +} + +uint8_t * ssm_pk_buff_tail(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->data + spb->pk_tail; +} + +size_t ssm_pk_buff_len(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->pk_tail - spb->pk_head; +} + +uint8_t * ssm_pk_buff_head_alloc(struct ssm_pk_buff * spb, + size_t size) +{ + assert(spb != NULL); + + if (spb->pk_head < size) + return NULL; + + spb->pk_head -= size; + + return spb->data + spb->pk_head; +} + +uint8_t * ssm_pk_buff_tail_alloc(struct ssm_pk_buff * spb, + size_t size) +{ + uint8_t * buf; + + assert(spb != NULL); + + if (spb->pk_tail + size >= spb->size) + return NULL; + + buf = spb->data + spb->pk_tail; + + spb->pk_tail += size; + + return buf; +} + +uint8_t * ssm_pk_buff_head_release(struct ssm_pk_buff * spb, + size_t size) +{ + uint8_t * buf; + + assert(spb != NULL); + assert(!(size > spb->pk_tail - spb->pk_head)); + + buf = spb->data + spb->pk_head; + + spb->pk_head += size; + + return buf; +} + +uint8_t * ssm_pk_buff_tail_release(struct ssm_pk_buff * spb, + size_t size) +{ + assert(spb != NULL); + assert(!(size > spb->pk_tail - spb->pk_head)); + + spb->pk_tail -= size; + + return spb->data + spb->pk_tail; +} + +void ssm_pk_buff_truncate(struct ssm_pk_buff * spb, + size_t len) +{ + assert(spb != NULL); + assert(len <= spb->size); + + spb->pk_tail = spb->pk_head + len; +} + +int ssm_pk_buff_wait_ack(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + FETCH_ADD(&spb->refcount, 1); + + return 0; +} + +int ssm_pk_buff_ack(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + FETCH_SUB(&spb->refcount, 1); + + return 0; +} diff --git a/src/lib/shm_rbuff.c b/src/lib/ssm/rbuff.c index ce432efb..e18f8ba7 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/ssm/rbuff.c @@ -23,8 +23,9 @@ #define _POSIX_C_SOURCE 200809L #include "config.h" +#include "ssm.h" -#include <ouroboros/shm_rbuff.h> +#include <ouroboros/ssm_rbuff.h> #include <ouroboros/lockfile.h> #include <ouroboros/errno.h> #include <ouroboros/fccntl.h> @@ -45,71 +46,54 @@ #define FN_MAX_CHARS 255 -#define SHM_RBUFF_FILESIZE ((SHM_RBUFF_SIZE) * sizeof(ssize_t) \ - + 3 * sizeof(size_t) \ - + sizeof(pthread_mutex_t) \ +#define SSM_RBUFF_FILESIZE ((SSM_RBUFF_SIZE) * sizeof(ssize_t) \ + + 3 * sizeof(size_t) \ + + sizeof(pthread_mutex_t) \ + 2 * sizeof(pthread_cond_t)) -#define HEAD(rb) \ - *(rb->shm_base + *rb->head) -#define TAIL(rb) \ - *(rb->shm_base + *rb->tail) -#define ADVANCE(el) \ - (*(el) = (*(el) + 1) & ((SHM_RBUFF_SIZE) - 1)) -#define QUEUED(rb) \ - ((*rb->head - *rb->tail + (SHM_RBUFF_SIZE)) & (SHM_RBUFF_SIZE - 1)) -#define IS_FULL(rb) \ - (QUEUED(rb) == (SHM_RBUFF_SIZE) - 1) -#define IS_EMPTY(rb) \ - (*rb->head == *rb->tail) - -struct shm_rbuff { - ssize_t * shm_base; /* start of entry */ - size_t * head; /* start of ringbuffer head */ - size_t * tail; /* start of ringbuffer tail */ - size_t * acl; /* access control */ - pthread_mutex_t * mtx; /* lock all space in shm */ - pthread_cond_t * add; /* packet arrived */ - pthread_cond_t * del; /* packet removed */ - pid_t pid; /* pid of the owner */ - int flow_id; /* flow_id of the flow */ +#define MODB(x) ((x) & (SSM_RBUFF_SIZE - 1)) + +#define LOAD_RELAXED(ptr) (__atomic_load_n(ptr, __ATOMIC_RELAXED)) +#define LOAD_ACQUIRE(ptr) (__atomic_load_n(ptr, __ATOMIC_ACQUIRE)) +#define STORE_RELEASE(ptr, val) \ + (__atomic_store_n(ptr, val, __ATOMIC_RELEASE)) + +#define HEAD(rb) (rb->shm_base[LOAD_RELAXED(rb->head)]) +#define TAIL(rb) (rb->shm_base[LOAD_RELAXED(rb->tail)]) +#define HEAD_IDX(rb) (LOAD_ACQUIRE(rb->head)) +#define TAIL_IDX(rb) (LOAD_ACQUIRE(rb->tail)) +#define ADVANCE_HEAD(rb) \ + (STORE_RELEASE(rb->head, MODB(LOAD_RELAXED(rb->head) + 1))) +#define ADVANCE_TAIL(rb) \ + (STORE_RELEASE(rb->tail, MODB(LOAD_RELAXED(rb->tail) + 1))) +#define QUEUED(rb) (MODB(HEAD_IDX(rb) - TAIL_IDX(rb))) +#define IS_FULL(rb) (QUEUED(rb) == (SSM_RBUFF_SIZE - 1)) +#define IS_EMPTY(rb) (HEAD_IDX(rb) == TAIL_IDX(rb)) + +struct ssm_rbuff { + ssize_t * shm_base; /* start of shared memory */ + size_t * head; /* start of ringbuffer */ + size_t * tail; + size_t * acl; /* access control */ + pthread_mutex_t * mtx; /* lock for cond vars only */ + pthread_cond_t * add; /* signal when new data */ + pthread_cond_t * del; /* signal when data removed */ + pid_t pid; /* pid of the owner */ + int flow_id; /* flow_id of the flow */ }; -static void robust_mutex_lock(pthread_mutex_t * mtx) -{ -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(mtx); -#else - if (pthread_mutex_lock(mtx) == EOWNERDEAD) - pthread_mutex_consistent(mtx); -#endif -} - -static int robust_wait(pthread_cond_t * cond, - pthread_mutex_t * mtx, - const struct timespec * abstime) -{ - int ret = __timedwait(cond, mtx, abstime); -#ifdef HAVE_ROBUST_MUTEX - if (ret == EOWNERDEAD) - pthread_mutex_consistent(mtx); -#endif - return ret; -} - - #define MM_FLAGS (PROT_READ | PROT_WRITE) -static struct shm_rbuff * rbuff_create(pid_t pid, +static struct ssm_rbuff * rbuff_create(pid_t pid, int flow_id, int flags) { - struct shm_rbuff * rb; + struct ssm_rbuff * rb; int fd; ssize_t * shm_base; char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", pid, flow_id); + sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", pid, flow_id); rb = malloc(sizeof(*rb)); if (rb == NULL) @@ -119,19 +103,17 @@ static struct shm_rbuff * rbuff_create(pid_t pid, if (fd == -1) goto fail_open; - if ((flags & O_CREAT) && ftruncate(fd, SHM_RBUFF_FILESIZE) < 0) + if ((flags & O_CREAT) && ftruncate(fd, SSM_RBUFF_FILESIZE) < 0) goto fail_truncate; - shm_base = mmap(NULL, SHM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0); - if (shm_base == MAP_FAILED) - goto fail_truncate; + shm_base = mmap(NULL, SSM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0); close(fd); rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + (SHM_RBUFF_SIZE)); - rb->tail = rb->head + 1; - rb->acl = rb->tail + 1; + rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE)); + rb->tail = (size_t *) (rb->head + 1); + rb->acl = (size_t *) (rb->tail + 1); rb->mtx = (pthread_mutex_t *) (rb->acl + 1); rb->add = (pthread_cond_t *) (rb->mtx + 1); rb->del = rb->add + 1; @@ -150,17 +132,17 @@ static struct shm_rbuff * rbuff_create(pid_t pid, return NULL; } -static void rbuff_destroy(struct shm_rbuff * rb) +static void rbuff_destroy(struct ssm_rbuff * rb) { - munmap(rb->shm_base, SHM_RBUFF_FILESIZE); + munmap(rb->shm_base, SSM_RBUFF_FILESIZE); free(rb); } -struct shm_rbuff * shm_rbuff_create(pid_t pid, +struct ssm_rbuff * ssm_rbuff_create(pid_t pid, int flow_id) { - struct shm_rbuff * rb; + struct ssm_rbuff * rb; pthread_mutexattr_t mattr; pthread_condattr_t cattr; mode_t mask; @@ -218,145 +200,165 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, fail_mutex: pthread_mutexattr_destroy(&mattr); fail_mattr: - shm_rbuff_destroy(rb); + ssm_rbuff_destroy(rb); fail_rb: return NULL; } -void shm_rbuff_destroy(struct shm_rbuff * rb) +void ssm_rbuff_destroy(struct ssm_rbuff * rb) { char fn[FN_MAX_CHARS]; assert(rb != NULL); - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); + sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); - shm_rbuff_close(rb); + ssm_rbuff_close(rb); shm_unlink(fn); } -struct shm_rbuff * shm_rbuff_open(pid_t pid, +struct ssm_rbuff * ssm_rbuff_open(pid_t pid, int flow_id) { return rbuff_create(pid, flow_id, O_RDWR); } -void shm_rbuff_close(struct shm_rbuff * rb) +void ssm_rbuff_close(struct ssm_rbuff * rb) { assert(rb); rbuff_destroy(rb); } -int shm_rbuff_write(struct shm_rbuff * rb, +int ssm_rbuff_write(struct ssm_rbuff * rb, size_t idx) { - int ret = 0; + size_t acl; + bool was_empty; + int ret = 0; assert(rb != NULL); - assert(idx < SHM_BUFFER_SIZE); - - robust_mutex_lock(rb->mtx); - if (*rb->acl != ACL_RDWR) { - if (*rb->acl & ACL_FLOWDOWN) + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (acl != ACL_RDWR) { + if (acl & ACL_FLOWDOWN) { ret = -EFLOWDOWN; - else if (*rb->acl & ACL_RDONLY) + goto fail_acl; + } + if (acl & ACL_RDONLY) { ret = -ENOTALLOC; - goto err; + goto fail_acl; + } } + robust_mutex_lock(rb->mtx); + if (IS_FULL(rb)) { ret = -EAGAIN; - goto err; + goto fail_mutex; } - if (IS_EMPTY(rb)) - pthread_cond_broadcast(rb->add); + was_empty = IS_EMPTY(rb); HEAD(rb) = (ssize_t) idx; - ADVANCE(rb->head); + ADVANCE_HEAD(rb); + + if (was_empty) + pthread_cond_broadcast(rb->add); pthread_mutex_unlock(rb->mtx); return 0; - err: + + fail_mutex: pthread_mutex_unlock(rb->mtx); + fail_acl: return ret; } -int shm_rbuff_write_b(struct shm_rbuff * rb, +int ssm_rbuff_write_b(struct ssm_rbuff * rb, size_t idx, const struct timespec * abstime) { - int ret = 0; + size_t acl; + int ret = 0; + bool was_empty; assert(rb != NULL); - assert(idx < SHM_BUFFER_SIZE); - - robust_mutex_lock(rb->mtx); - if (*rb->acl != ACL_RDWR) { - if (*rb->acl & ACL_FLOWDOWN) + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (acl != ACL_RDWR) { + if (acl & ACL_FLOWDOWN) { ret = -EFLOWDOWN; - else if (*rb->acl & ACL_RDONLY) + goto fail_acl; + } + if (acl & ACL_RDONLY) { ret = -ENOTALLOC; - goto err; + goto fail_acl; + } } + robust_mutex_lock(rb->mtx); + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); - while (IS_FULL(rb) - && ret != -ETIMEDOUT - && !(*rb->acl & ACL_FLOWDOWN)) { + while (IS_FULL(rb) && ret != -ETIMEDOUT) { + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (acl & ACL_FLOWDOWN) { + ret = -EFLOWDOWN; + break; + } ret = -robust_wait(rb->del, rb->mtx, abstime); } - if (ret != -ETIMEDOUT) { - if (IS_EMPTY(rb)) - pthread_cond_broadcast(rb->add); + pthread_cleanup_pop(false); + + if (ret != -ETIMEDOUT && ret != -EFLOWDOWN) { + was_empty = IS_EMPTY(rb); HEAD(rb) = (ssize_t) idx; - ADVANCE(rb->head); + ADVANCE_HEAD(rb); + if (was_empty) + pthread_cond_broadcast(rb->add); } - pthread_cleanup_pop(true); - - return ret; - err: pthread_mutex_unlock(rb->mtx); + + fail_acl: return ret; } -static int check_rb_acl(struct shm_rbuff * rb) +static int check_rb_acl(struct ssm_rbuff * rb) { + size_t acl; + assert(rb != NULL); - if (*rb->acl & ACL_FLOWDOWN) + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + + if (acl & ACL_FLOWDOWN) return -EFLOWDOWN; - if (*rb->acl & ACL_FLOWPEER) + if (acl & ACL_FLOWPEER) return -EFLOWPEER; return -EAGAIN; } -ssize_t shm_rbuff_read(struct shm_rbuff * rb) +ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) { - ssize_t ret = 0; + ssize_t ret; assert(rb != NULL); - robust_mutex_lock(rb->mtx); + if (IS_EMPTY(rb)) + return check_rb_acl(rb); - if (IS_EMPTY(rb)) { - ret = check_rb_acl(rb); - pthread_mutex_unlock(rb->mtx); - return ret; - } + robust_mutex_lock(rb->mtx); ret = TAIL(rb); - ADVANCE(rb->tail); + ADVANCE_TAIL(rb); + pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->mtx); @@ -364,19 +366,19 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) return ret; } -ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, +ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, const struct timespec * abstime) { ssize_t idx = -1; + size_t acl; assert(rb != NULL); - robust_mutex_lock(rb->mtx); - - if (IS_EMPTY(rb) && (*rb->acl & ACL_FLOWDOWN)) { - pthread_mutex_unlock(rb->mtx); + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) return -EFLOWDOWN; - } + + robust_mutex_lock(rb->mtx); pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); @@ -386,48 +388,39 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, idx = -robust_wait(rb->add, rb->mtx, abstime); } + pthread_cleanup_pop(false); + if (!IS_EMPTY(rb)) { idx = TAIL(rb); - ADVANCE(rb->tail); + ADVANCE_TAIL(rb); pthread_cond_broadcast(rb->del); } else if (idx != -ETIMEDOUT) { idx = check_rb_acl(rb); } - pthread_cleanup_pop(true); + pthread_mutex_unlock(rb->mtx); assert(idx != -EAGAIN); return idx; } -void shm_rbuff_set_acl(struct shm_rbuff * rb, +void ssm_rbuff_set_acl(struct ssm_rbuff * rb, uint32_t flags) { assert(rb != NULL); - robust_mutex_lock(rb->mtx); - *rb->acl = (size_t) flags; - - pthread_mutex_unlock(rb->mtx); + __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST); } -uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) +uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb) { - uint32_t flags; - assert(rb != NULL); - robust_mutex_lock(rb->mtx); - - flags = (uint32_t) *rb->acl; - - pthread_mutex_unlock(rb->mtx); - - return flags; + return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); } -void shm_rbuff_fini(struct shm_rbuff * rb) +void ssm_rbuff_fini(struct ssm_rbuff * rb) { assert(rb != NULL); @@ -441,24 +434,16 @@ void shm_rbuff_fini(struct shm_rbuff * rb) pthread_cleanup_pop(true); } -size_t shm_rbuff_queued(struct shm_rbuff * rb) +size_t ssm_rbuff_queued(struct ssm_rbuff * rb) { - size_t ret; - assert(rb != NULL); - robust_mutex_lock(rb->mtx); - - ret = QUEUED(rb); - - pthread_mutex_unlock(rb->mtx); - - return ret; + return QUEUED(rb); } -int shm_rbuff_mlock(struct shm_rbuff * rb) +int ssm_rbuff_mlock(struct ssm_rbuff * rb) { assert(rb != NULL); - return mlock(rb->shm_base, SHM_RBUFF_FILESIZE); + return mlock(rb->shm_base, SSM_RBUFF_FILESIZE); } diff --git a/src/lib/ssm/ssm.h.in b/src/lib/ssm/ssm.h.in new file mode 100644 index 00000000..d14cd49c --- /dev/null +++ b/src/lib/ssm/ssm.h.in @@ -0,0 +1,146 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2026 + * + * Secure Shared Memory configuration + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_SSM_H +#define OUROBOROS_LIB_SSM_H + +#include <stddef.h> +#include <stdint.h> +#include <stdatomic.h> +#include <sys/types.h> + +/* Pool naming configuration */ +#define SSM_PREFIX "@SSM_PREFIX@" +#define SSM_GSMP_SUFFIX "@SSM_GSMP_SUFFIX@" +#define SSM_PPP_SUFFIX "@SSM_PPP_SUFFIX@" + +/* Legacy SSM constants */ +#define SSM_RBUFF_PREFIX "@SSM_RBUFF_PREFIX@" +#define SSM_FLOW_SET_PREFIX "@SSM_FLOW_SET_PREFIX@" +#define SSM_POOL_NAME "@SSM_POOL_NAME@" +#define SSM_POOL_BLOCKS @SSM_POOL_BLOCKS@ +#define SSM_RBUFF_SIZE @SSM_RBUFF_SIZE@ + +/* Packet buffer space reservation */ +#define SSM_PK_BUFF_HEADSPACE @SSM_PK_BUFF_HEADSPACE@ +#define SSM_PK_BUFF_TAILSPACE @SSM_PK_BUFF_TAILSPACE@ + +/* Pool blocks per size class */ +#define SSM_POOL_256_BLOCKS @SSM_POOL_256_BLOCKS@ +#define SSM_POOL_512_BLOCKS @SSM_POOL_512_BLOCKS@ +#define SSM_POOL_1K_BLOCKS @SSM_POOL_1K_BLOCKS@ +#define SSM_POOL_2K_BLOCKS @SSM_POOL_2K_BLOCKS@ +#define SSM_POOL_4K_BLOCKS @SSM_POOL_4K_BLOCKS@ +#define SSM_POOL_16K_BLOCKS @SSM_POOL_16K_BLOCKS@ +#define SSM_POOL_64K_BLOCKS @SSM_POOL_64K_BLOCKS@ +#define SSM_POOL_256K_BLOCKS @SSM_POOL_256K_BLOCKS@ +#define SSM_POOL_1M_BLOCKS @SSM_POOL_1M_BLOCKS@ +#define SSM_POOL_TOTAL_SIZE @SSM_POOL_TOTAL_SIZE@ + +/* Size class configuration */ +#define SSM_POOL_MAX_CLASSES 9 +#define SSM_POOL_SHARDS @SSM_POOL_SHARDS@ + +/* Internal structures - exposed for testing */ +#ifdef __cplusplus +extern "C" { +#endif + +#include <errno.h> +#include <pthread.h> + +#include <ouroboros/pthread.h> + +static __inline__ void robust_mutex_lock(pthread_mutex_t * mtx) +{ +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(mtx); +#else + if (pthread_mutex_lock(mtx) == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif +} + +static __inline__ int robust_wait(pthread_cond_t * cond, + pthread_mutex_t * mtx, + const struct timespec * abstime) +{ + int ret = __timedwait(cond, mtx, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (ret == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif + return ret; +} + +/* Packet buffer structure used by pool, rbuff, and tests */ +struct ssm_pk_buff { + uint32_t next_offset; /* List linkage (pool < 4GB) */ + uint16_t refcount; /* Reference count (app + rtx) */ + pid_t allocator_pid; /* For orphan detection */ + uint32_t size; /* Block size (max 1MB) */ + uint32_t pk_head; /* Head offset into data */ + uint32_t pk_tail; /* Tail offset into data */ + uint32_t off; /* Block offset in pool */ + uint8_t data[]; /* Packet data */ +}; + +/* Size class configuration table */ +struct ssm_size_class_cfg { + size_t size; + size_t blocks; +}; + +struct _ssm_list_head { + uint32_t head_offset; + uint32_t count; +}; + +struct _ssm_shard { + pthread_mutex_t mtx; + pthread_cond_t cond; + struct _ssm_list_head free_list; + size_t free_count; +}; + +struct _ssm_size_class { + struct _ssm_shard shards[SSM_POOL_SHARDS]; + size_t object_size; + size_t pool_start; + size_t pool_size; + size_t object_count; +}; + +struct _ssm_pool_hdr { + pthread_mutex_t mtx; + pthread_cond_t healthy; + pid_t pid; + uint32_t initialized; + void * mapped_addr; + struct _ssm_size_class size_classes[SSM_POOL_MAX_CLASSES]; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* OUROBOROS_LIB_SSM_H */ diff --git a/src/lib/ssm/tests/CMakeLists.txt b/src/lib/ssm/tests/CMakeLists.txt new file mode 100644 index 00000000..827f8bf8 --- /dev/null +++ b/src/lib/ssm/tests/CMakeLists.txt @@ -0,0 +1,33 @@ +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +compute_test_prefix() + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c + # Add new tests here + pool_test.c + pool_sharding_test.c + rbuff_test.c + flow_set_test.c + ) + +add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests}) + +disable_test_logging_for_target(${PARENT_DIR}_test) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(build_tests ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +if(CMAKE_VERSION VERSION_LESS "3.29.0") + remove(tests_to_run test_suite.c) +else () + list(POP_FRONT tests_to_run) +endif() + +foreach (test ${tests_to_run}) + get_filename_component(test_name ${test} NAME_WE) + add_test(${TEST_PREFIX}/${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) + set_property(TEST ${TEST_PREFIX}/${test_name} + PROPERTY ENVIRONMENT "OUROBOROS_TEST_POOL_SUFFIX=.test") +endforeach (test) diff --git a/src/lib/ssm/tests/flow_set_test.c b/src/lib/ssm/tests/flow_set_test.c new file mode 100644 index 00000000..f9084d3c --- /dev/null +++ b/src/lib/ssm/tests/flow_set_test.c @@ -0,0 +1,255 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the SSM flow set + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_flow_set.h> +#include <ouroboros/errno.h> +#include <ouroboros/time.h> + +#include <stdio.h> +#include <unistd.h> +#include <pthread.h> + +static int test_ssm_flow_set_create_destroy(void) +{ + struct ssm_flow_set * set; + pid_t pid; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_flow_set_add_del_has(void) +{ + struct ssm_flow_set * set; + pid_t pid; + size_t idx = 0; + int flow_id = 42; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + if (ssm_flow_set_has(set, idx, flow_id)) { + printf("Flow should not be in set initially.\n"); + goto fail_destroy; + } + + if (ssm_flow_set_add(set, idx, flow_id) < 0) { + printf("Failed to add flow to set.\n"); + goto fail_destroy; + } + + if (!ssm_flow_set_has(set, idx, flow_id)) { + printf("Flow should be in set after add.\n"); + goto fail_destroy; + } + + /* Adding same flow again should fail */ + if (ssm_flow_set_add(set, idx, flow_id) != -EPERM) { + printf("Should not be able to add flow twice.\n"); + goto fail_destroy; + } + + ssm_flow_set_del(set, idx, flow_id); + + if (ssm_flow_set_has(set, idx, flow_id)) { + printf("Flow should not be in set after delete.\n"); + goto fail_destroy; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail_destroy: + ssm_flow_set_destroy(set); +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_flow_set_zero(void) +{ + struct ssm_flow_set * set; + pid_t pid; + size_t idx = 0; + int flow_id1 = 10; + int flow_id2 = 20; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + if (ssm_flow_set_add(set, idx, flow_id1) < 0) { + printf("Failed to add flow1 to set.\n"); + goto fail_destroy; + } + + if (ssm_flow_set_add(set, idx, flow_id2) < 0) { + printf("Failed to add flow2 to set.\n"); + goto fail_destroy; + } + + ssm_flow_set_zero(set, idx); + + if (ssm_flow_set_has(set, idx, flow_id1)) { + printf("Flow1 should not be in set after zero.\n"); + goto fail_destroy; + } + + if (ssm_flow_set_has(set, idx, flow_id2)) { + printf("Flow2 should not be in set after zero.\n"); + goto fail_destroy; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail_destroy: + ssm_flow_set_destroy(set); +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_flow_set_notify_wait(void) +{ + struct ssm_flow_set * set; + pid_t pid; + size_t idx = 0; + int flow_id = 100; + struct flowevent events[SSM_RBUFF_SIZE]; + struct timespec timeout; + ssize_t ret; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + if (ssm_flow_set_add(set, idx, flow_id) < 0) { + printf("Failed to add flow to set.\n"); + goto fail_destroy; + } + + /* Test immediate timeout when no events */ + clock_gettime(PTHREAD_COND_CLOCK, &timeout); + ret = ssm_flow_set_wait(set, idx, events, &timeout); + if (ret != -ETIMEDOUT) { + printf("Wait should timeout immediately when no events.\n"); + goto fail_destroy; + } + + /* Notify an event */ + ssm_flow_set_notify(set, flow_id, FLOW_PKT); + + /* Should be able to read the event immediately */ + clock_gettime(PTHREAD_COND_CLOCK, &timeout); + ts_add(&timeout, &timeout, &((struct timespec) {1, 0})); + + ret = ssm_flow_set_wait(set, idx, events, &timeout); + if (ret != 1) { + printf("Wait should return 1 event, got %zd.\n", ret); + goto fail_destroy; + } + + if (events[0].flow_id != flow_id) { + printf("Event flow_id mismatch: expected %d, got %d.\n", + flow_id, events[0].flow_id); + goto fail_destroy; + } + + if (events[0].event != FLOW_PKT) { + printf("Event type mismatch: expected %d, got %d.\n", + FLOW_PKT, events[0].event); + goto fail_destroy; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail_destroy: + ssm_flow_set_destroy(set); +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int flow_set_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ret |= test_ssm_flow_set_create_destroy(); + ret |= test_ssm_flow_set_add_del_has(); + ret |= test_ssm_flow_set_zero(); + ret |= test_ssm_flow_set_notify_wait(); + + return ret; +} diff --git a/src/lib/ssm/tests/pool_sharding_test.c b/src/lib/ssm/tests/pool_sharding_test.c new file mode 100644 index 00000000..72ae1cb7 --- /dev/null +++ b/src/lib/ssm/tests/pool_sharding_test.c @@ -0,0 +1,505 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2026 + * + * Test of the SSM pool sharding with fallback + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_pool.h> + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stdbool.h> +#include <sys/wait.h> +#include <sys/types.h> +#include <signal.h> + +#define TEST_SIZE 256 + +/* Helper to get pool header for inspection */ +static struct _ssm_pool_hdr * get_pool_hdr(struct ssm_pool * pool) +{ + /* ssm_pool is opaque, but we know its layout: + * uint8_t * shm_base + * struct _ssm_pool_hdr * hdr + * void * pool_base + */ + struct _ssm_pool_hdr ** hdr_ptr = + (struct _ssm_pool_hdr **)((uint8_t *)pool + sizeof(void *)); + return *hdr_ptr; +} + +static int test_lazy_distribution(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + int i; + int sc_idx; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + hdr = get_pool_hdr(pool); + if (hdr == NULL) { + printf("Failed to get pool header.\n"); + goto fail_pool; + } + + /* Find the first size class with blocks */ + sc_idx = -1; + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (hdr->size_classes[i].object_count > 0) { + sc_idx = i; + break; + } + } + + if (sc_idx < 0) { + printf("No size classes configured.\n"); + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + printf(" Class %d: count=%zu\n", i, + hdr->size_classes[i].object_count); + } + goto fail_pool; + } + + sc = &hdr->size_classes[sc_idx]; + + /* Verify all blocks start in shard 0 */ + if (sc->shards[0].free_count == 0) { + printf("Shard 0 should have all blocks initially.\n"); + goto fail_pool; + } + + /* Verify other shards are empty */ + for (i = 1; i < SSM_POOL_SHARDS; i++) { + if (sc->shards[i].free_count != 0) { + printf("Shard %d should be empty, has %zu.\n", + i, sc->shards[i].free_count); + goto fail_pool; + } + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_shard_migration(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t off; + int shard_idx; + int sc_idx; + int i; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + hdr = get_pool_hdr(pool); + + /* Find the first size class with blocks */ + sc_idx = -1; + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (hdr->size_classes[i].object_count > 0) { + sc_idx = i; + break; + } + } + + if (sc_idx < 0) { + printf("No size classes configured.\n"); + goto fail; + } + + sc = &hdr->size_classes[sc_idx]; + + /* Allocate from this process */ + off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb); + if (off < 0) { + printf("Allocation failed: %zd.\n", off); + goto fail_pool; + } + + /* Free it - should go to this process's shard */ + shard_idx = getpid() % SSM_POOL_SHARDS; + if (ssm_pool_remove(pool, off) != 0) { + printf("Remove failed.\n"); + goto fail_pool; + } + + /* Verify block migrated away from shard 0 or in allocator's shard */ + if (sc->shards[shard_idx].free_count == 0 && + sc->shards[0].free_count == 0) { + printf("Block should have been freed to a shard.\n"); + goto fail_pool; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_fallback_stealing(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + struct ssm_pk_buff ** spbs; + uint8_t ** ptrs; + size_t total_blocks; + size_t total_free; + size_t i; + int sc_idx; + int c; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + hdr = get_pool_hdr(pool); + + /* Find the first size class with blocks */ + sc_idx = -1; + for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) { + if (hdr->size_classes[c].object_count > 0) { + sc_idx = c; + break; + } + } + + if (sc_idx < 0) { + printf("No size classes configured.\n"); + goto fail; + } + + sc = &hdr->size_classes[sc_idx]; + total_blocks = sc->object_count; + + spbs = malloc(total_blocks * sizeof(struct ssm_pk_buff *)); + ptrs = malloc(total_blocks * sizeof(uint8_t *)); + if (spbs == NULL || ptrs == NULL) { + printf("Failed to allocate test arrays.\n"); + goto fail_pool; + } + + /* Allocate half the blocks from single process */ + for (i = 0; i < total_blocks / 2; i++) { + ssize_t off = ssm_pool_alloc(pool, TEST_SIZE, + &ptrs[i], &spbs[i]); + if (off < 0) { + printf("Allocation %zu failed: %zd.\n", i, off); + free(spbs); + free(ptrs); + goto fail_pool; + } + } + + /* Free them all - they go to local_shard */ + for (i = 0; i < total_blocks / 2; i++) { + size_t off = ssm_pk_buff_get_idx(spbs[i]); + if (ssm_pool_remove(pool, off) != 0) { + printf("Remove %zu failed.\n", i); + free(spbs); + free(ptrs); + goto fail_pool; + } + } + + /* Freed blocks should be in shards (all blocks free again) */ + total_free = 0; + for (i = 0; i < SSM_POOL_SHARDS; i++) { + total_free += sc->shards[i].free_count; + } + + if (total_free != total_blocks) { + printf("Expected %zu free blocks total, got %zu.\n", + total_blocks, total_free); + free(spbs); + free(ptrs); + goto fail_pool; + } + + /* Allocate again - should succeed by taking from shards */ + for (i = 0; i < total_blocks / 2; i++) { + ssize_t off = ssm_pool_alloc(pool, TEST_SIZE, + &ptrs[i], &spbs[i]); + if (off < 0) { + printf("Fallback alloc %zu failed: %zd.\n", i, off); + free(spbs); + free(ptrs); + goto fail_pool; + } + } + + /* Now all allocated blocks are in use again */ + /* Cleanup - free all allocated blocks */ + for (i = 0; i < total_blocks / 2; i++) { + size_t off = ssm_pk_buff_get_idx(spbs[i]); + ssm_pool_remove(pool, off); + } + + free(spbs); + free(ptrs); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_multiprocess_sharding(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + pid_t children[SSM_POOL_SHARDS]; + int i; + int status; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + /* Fork processes to test different shards */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + children[i] = fork(); + if (children[i] == -1) { + printf("Fork %d failed.\n", i); + goto fail_children; + } + + if (children[i] == 0) { + /* Child process */ + struct ssm_pool * child_pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t off; + int my_shard; + + child_pool = ssm_pool_open(); + if (child_pool == NULL) + exit(EXIT_FAILURE); + + my_shard = getpid() % SSM_POOL_SHARDS; + (void) my_shard; /* Reserved for future use */ + + /* Each child allocates and frees a block */ + off = ssm_pool_alloc(child_pool, TEST_SIZE, + &ptr, &spb); + if (off < 0) { + ssm_pool_close(child_pool); + exit(EXIT_FAILURE); + } + + /* Small delay to ensure allocation visible */ + usleep(10000); + + if (ssm_pool_remove(child_pool, off) != 0) { + ssm_pool_close(child_pool); + exit(EXIT_FAILURE); + } + + ssm_pool_close(child_pool); + exit(EXIT_SUCCESS); + } + } + + /* Wait for all children */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + if (waitpid(children[i], &status, 0) == -1) { + printf("Waitpid %d failed.\n", i); + goto fail_children; + } + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + printf("Child %d failed.\n", i); + goto fail_pool; + } + } + + /* Verify blocks distributed across shards */ + hdr = get_pool_hdr(pool); + + /* Find the first size class with blocks */ + sc = NULL; + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (hdr->size_classes[i].object_count > 0) { + sc = &hdr->size_classes[i]; + break; + } + } + + if (sc == NULL) { + printf("No size classes configured.\n"); + goto fail_pool; + } + + /* After children allocate and free, blocks should be in shards + * (though exact distribution depends on PID values) + */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + /* At least some shards should have blocks */ + if (sc->shards[i].free_count > 0) { + break; + } + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_children: + /* Kill any remaining children */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + if (children[i] > 0) + kill(children[i], SIGKILL); + } + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_exhaustion_with_fallback(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t off; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + /* Allocate until exhausted across all shards */ + while (true) { + off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb); + if (off < 0) { + if (off == -EAGAIN) + break; + printf("Unexpected error: %zd.\n", off); + goto fail_pool; + } + } + + /* Should fail with -EAGAIN when truly exhausted */ + off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb); + if (off != -EAGAIN) { + printf("Expected -EAGAIN, got %zd.\n", off); + goto fail_pool; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int pool_sharding_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ret |= test_lazy_distribution(); + ret |= test_shard_migration(); + ret |= test_fallback_stealing(); + ret |= test_multiprocess_sharding(); + ret |= test_exhaustion_with_fallback(); + + return ret; +} diff --git a/src/lib/ssm/tests/pool_test.c b/src/lib/ssm/tests/pool_test.c new file mode 100644 index 00000000..e298d9ab --- /dev/null +++ b/src/lib/ssm/tests/pool_test.c @@ -0,0 +1,1038 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the Secure Shared Memory (SSM) system + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_pool.h> +#include <ouroboros/ssm_rbuff.h> + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stdbool.h> +#include <stdatomic.h> +#include <sys/wait.h> +#include <sys/types.h> +#include <signal.h> +#include <time.h> + +#define POOL_256 256 +#define POOL_512 512 +#define POOL_1K 1024 +#define POOL_2K 2048 +#define POOL_4K 4096 +#define POOL_16K 16384 +#define POOL_64K 65536 +#define POOL_256K 262144 +#define POOL_1M 1048576 +#define POOL_2M (2 * 1024 * 1024) + +static int test_ssm_pool_basic_allocation(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Alloc failed: %zd.\n", ret); + goto fail_alloc; + } + + if (spb == NULL) { + printf("Spb is NULL.\n"); + goto fail_alloc; + } + + if (ptr == NULL) { + printf("Ptr is NULL.\n"); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb) != POOL_256) { + printf("Bad length: %zu.\n", ssm_pk_buff_len(spb)); + goto fail_alloc; + } + + ret = ssm_pool_remove(pool, ret); + if (ret != 0) { + printf("Remove failed: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_multiple_allocations(void) +{ + struct ssm_pool * pool; + uint8_t * ptr1; + uint8_t * ptr2; + uint8_t * ptr3; + struct ssm_pk_buff * spb1; + struct ssm_pk_buff * spb2; + struct ssm_pk_buff * spb3; + ssize_t ret1; + ssize_t ret2; + ssize_t ret3; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1); + ret2 = ssm_pool_alloc(pool, POOL_256, &ptr2, &spb2); + ret3 = ssm_pool_alloc(pool, POOL_256, &ptr3, &spb3); + if (ret1 < 0 || ret2 < 0 || ret3 < 0) { + printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3); + goto fail_alloc; + } + + if (spb1 == NULL) { + printf("Spb1 is NULL.\n"); + goto fail_alloc; + } + + if (ptr1 == NULL) { + printf("Ptr1 is NULL.\n"); + goto fail_alloc; + } + + if (spb2 == NULL) { + printf("Spb2 is NULL.\n"); + goto fail_alloc; + } + + if (ptr2 == NULL) { + printf("Ptr2 is NULL.\n"); + goto fail_alloc; + } + + if (spb3 == NULL) { + printf("Spb3 is NULL.\n"); + goto fail_alloc; + } + + if (ptr3 == NULL) { + printf("Ptr3 is NULL.\n"); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb1) != POOL_256) { + printf("Bad length spb1: %zu.\n", ssm_pk_buff_len(spb1)); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb2) != POOL_256) { + printf("Bad length spb2: %zu.\n", ssm_pk_buff_len(spb2)); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb3) != POOL_256) { + printf("Bad length spb3: %zu.\n", ssm_pk_buff_len(spb3)); + goto fail_alloc; + } + + if (ssm_pool_remove(pool, ret2) != 0) { + printf("Remove ret2 failed.\n"); + goto fail_alloc; + } + + if (ssm_pool_remove(pool, ret1) != 0) { + printf("Remove ret1 failed.\n"); + goto fail_alloc; + } + + if (ssm_pool_remove(pool, ret3) != 0) { + printf("Remove ret3 failed.\n"); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_no_fallback_for_large(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb); + if (ret >= 0) { + printf("Oversized alloc succeeded: %zd.\n", ret); + goto fail_alloc; + } + + if (ret != -EMSGSIZE) { + printf("Wrong error: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_blocking_vs_nonblocking(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb); + if (ret != -EMSGSIZE) { + printf("Nonblocking oversized: %zd.\n", ret); + goto fail_alloc; + } + + ret = ssm_pool_alloc_b(pool, POOL_2M, &ptr, &spb, NULL); + if (ret != -EMSGSIZE) { + printf("Blocking oversized: %zd.\n", ret); + goto fail_alloc; + } + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Valid alloc failed: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_remove(pool, ret); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_stress_test(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t * indices = NULL; + ssize_t ret; + size_t count = 0; + size_t i; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + indices = malloc(100 * sizeof(*indices)); + if (indices == NULL) { + printf("Malloc failed.\n"); + goto fail_alloc; + } + + for (i = 0; i < 100; i++) { + size_t j; + size_t num; + size_t size; + + num = (i % 100) + 1; + + for (j = 0; j < num && count < 100; j++) { + switch (i % 4) { + case 0: + /* FALLTHRU */ + case 1: + size = POOL_256; + break; + case 2: + /* FALLTHRU */ + case 3: + size = POOL_1K; + break; + default: + size = POOL_256; + break; + } + + ret = ssm_pool_alloc(pool, size, &ptr, &spb); + if (ret < 0) { + printf("Alloc at iter %zu: %zd.\n", i, ret); + goto fail_test; + } + indices[count++] = ret; + } + + for (j = 0; j < count / 2; j++) { + size_t idx = j * 2; + if (idx < count) { + ret = ssm_pool_remove(pool, indices[idx]); + if (ret != 0) { + printf("Remove at iter %zu: %zd.\n", + i, ret); + goto fail_test; + } + memmove(&indices[idx], &indices[idx + 1], + (count - idx - 1) * sizeof(*indices)); + count--; + } + } + + if (i % 10 == 0) { + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Periodic alloc at %zu: %zd.\n", i, ret); + goto fail_test; + } + ssm_pool_remove(pool, ret); + } + } + + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + + free(indices); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_test: + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + free(indices); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_open_initializes_ssm(void) +{ + struct ssm_pool * creator; + struct ssm_pool * opener; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + creator = ssm_pool_create(); + if (creator == NULL) + goto fail_create; + + ret = ssm_pool_alloc(creator, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Creator alloc failed: %zd.\n", ret); + goto fail_creator; + } + ssm_pool_remove(creator, ret); + + opener = ssm_pool_open(); + if (opener == NULL) { + printf("Open failed.\n"); + goto fail_creator; + } + + ret = ssm_pool_alloc(opener, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Opener alloc failed: %zd.\n", ret); + goto fail_opener; + } + + ssm_pool_remove(opener, ret); + ssm_pool_close(opener); + ssm_pool_destroy(creator); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_opener: + ssm_pool_close(opener); + fail_creator: + ssm_pool_destroy(creator); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_bounds_checking(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_256, NULL, &spb); + if (ret < 0) { + printf("alloc failed: %zd.\n", ret); + goto fail_alloc; + } + + spb = ssm_pool_get(pool, 0); + if (spb != NULL) { + printf("Get at offset 0.\n"); + goto fail_alloc; + } + + spb = ssm_pool_get(pool, 100000000UL); + if (spb != NULL) { + printf("Get beyond pool.\n"); + goto fail_alloc; + } + + ret = ssm_pool_remove(pool, 0); + if (ret != -EINVAL) { + printf("Remove at offset 0: %zd.\n", ret); + goto fail_alloc; + } + + ret = ssm_pool_remove(pool, 100000000UL); + if (ret != -EINVAL) { + printf("Remove beyond pool: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_inter_process_communication(void) +{ + struct ssm_pool * pool; + struct ssm_rbuff * rb; + struct ssm_pk_buff * spb; + uint8_t * ptr; + uint8_t * data; + const char * msg = "inter-process test"; + size_t len; + ssize_t idx; + pid_t pid; + int status; + + TEST_START(); + + len = strlen(msg) + 1; + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + rb = ssm_rbuff_create(getpid(), 1); + if (rb == NULL) { + printf("Rbuff create failed.\n"); + goto fail_pool; + } + + pid = fork(); + if (pid < 0) { + printf("Fork failed.\n"); + goto fail_rbuff; + } + + if (pid == 0) { + idx = ssm_rbuff_read_b(rb, NULL); + if (idx < 0) { + printf("Child: rbuff read: %zd.\n", idx); + exit(1); + } + + spb = ssm_pool_get(pool, idx); + if (spb == NULL) { + printf("Child: pool get failed.\n"); + exit(1); + } + + data = ssm_pk_buff_head(spb); + if (data == NULL) { + printf("Child: data is NULL.\n"); + ssm_pool_remove(pool, idx); + exit(1); + } + + if (strcmp((char *)data, msg) != 0) { + printf("Child: data mismatch.\n"); + ssm_pool_remove(pool, idx); + exit(1); + } + + ssm_pool_remove(pool, idx); + exit(0); + } + + idx = ssm_pool_alloc(pool, len, &ptr, &spb); + if (idx < 0) { + printf("Parent: pool alloc: %zd.\n", idx); + goto fail_child; + } + + memcpy(ptr, msg, len); + + if (ssm_rbuff_write(rb, idx) < 0) { + printf("Parent: rbuff write failed.\n"); + ssm_pool_remove(pool, idx); + goto fail_child; + } + + if (waitpid(pid, &status, 0) < 0) { + printf("Parent: waitpid failed.\n"); + ssm_pool_remove(pool, idx); + goto fail_rbuff; + } + + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + printf("Child failed.\n"); + ssm_pool_remove(pool, idx); + goto fail_rbuff; + } + + ssm_rbuff_destroy(rb); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_child: + waitpid(pid, &status, 0); + fail_rbuff: + ssm_rbuff_destroy(rb); + fail_pool: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_read_operation(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * wptr; + uint8_t * rptr; + const char * data = "ssm_pool_read test"; + size_t len; + ssize_t idx; + ssize_t ret; + + TEST_START(); + + len = strlen(data) + 1; + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + idx = ssm_pool_alloc(pool, len, &wptr, &spb); + if (idx < 0) { + printf("alloc failed: %zd.\n", idx); + goto fail_alloc; + } + + memcpy(wptr, data, len); + + ret = ssm_pool_read(&rptr, pool, idx); + if (ret < 0) { + printf("Read failed: %zd.\n", ret); + goto fail_read; + } + + if (rptr == NULL) { + printf("NULL pointer.\n"); + goto fail_read; + } + + if (strcmp((char *)rptr, data) != 0) { + printf("Data mismatch.\n"); + goto fail_read; + } + + ssm_pool_remove(pool, idx); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_read: + ssm_pool_remove(pool, idx); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_mlock_operation(void) +{ + struct ssm_pool * pool; + int ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_mlock(pool); + if (ret < 0) + printf("Mlock failed: %d (may need privileges).\n", ret); + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pk_buff_operations(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + uint8_t * head; + uint8_t * tail; + const char * data = "packet buffer test"; + size_t dlen; + size_t len; + ssize_t idx; + + TEST_START(); + + dlen = strlen(data); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + idx = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (idx < 0) { + printf("alloc failed: %zd.\n", idx); + goto fail_alloc; + } + + head = ssm_pk_buff_head(spb); + if (head != ptr) { + printf("Head mismatch.\n"); + goto fail_ops; + } + + len = ssm_pk_buff_len(spb); + if (len != POOL_256) { + printf("Bad length: %zu.\n", len); + goto fail_ops; + } + + tail = ssm_pk_buff_tail(spb); + if (tail != ptr + len) { + printf("Tail mismatch.\n"); + goto fail_ops; + } + + memcpy(head, data, dlen); + + tail = ssm_pk_buff_tail_alloc(spb, 32); + if (tail == NULL) { + printf("Tail_alloc failed.\n"); + goto fail_ops; + } + + if (ssm_pk_buff_len(spb) != POOL_256 + 32) { + printf("Length after tail_alloc: %zu.\n", + ssm_pk_buff_len(spb)); + goto fail_ops; + } + + if (memcmp(head, data, dlen) != 0) { + printf("Data corrupted.\n"); + goto fail_ops; + } + + tail = ssm_pk_buff_tail_release(spb, 32); + if (tail == NULL) { + printf("Tail_release failed.\n"); + goto fail_ops; + } + + if (ssm_pk_buff_len(spb) != POOL_256) { + printf("Length after tail_release: %zu.\n", + ssm_pk_buff_len(spb)); + goto fail_ops; + } + + ssm_pool_remove(pool, idx); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_ops: + ssm_pool_remove(pool, idx); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +#define OVERHEAD (offsetof(struct ssm_pk_buff, data) + \ + SSM_PK_BUFF_HEADSPACE + SSM_PK_BUFF_TAILSPACE) +static int test_ssm_pool_size_class_boundaries(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + size_t sizes[] = { + 1, + POOL_512 - OVERHEAD, + POOL_512 - OVERHEAD + 1, + POOL_1K - OVERHEAD, + POOL_1K - OVERHEAD + 1, + POOL_2K - OVERHEAD, + POOL_2K - OVERHEAD + 1, + POOL_4K - OVERHEAD, + POOL_4K - OVERHEAD + 1, + POOL_16K - OVERHEAD, + POOL_16K - OVERHEAD + 1, + POOL_64K - OVERHEAD, + POOL_64K - OVERHEAD + 1, + POOL_256K - OVERHEAD, + POOL_256K - OVERHEAD + 1, + POOL_1M - OVERHEAD, + }; + size_t expected_classes[] = { + 512, 512, 1024, 1024, 2048, 2048, 4096, 4096, 16384, + 16384, 65536, 65536, 262144, 262144, 1048576, 1048576 + }; + size_t i; + ssize_t idx; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + for (i = 0; i < sizeof(sizes) / sizeof(sizes[0]); i++) { + struct ssm_pk_buff * hdr; + size_t actual_class; + + idx = ssm_pool_alloc(pool, sizes[i], &ptr, &spb); + if (idx < 0) { + printf("Alloc at %zu failed: %zd.\n", sizes[i], idx); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb) != sizes[i]) { + printf("Length mismatch at %zu: %zu.\n", + sizes[i], ssm_pk_buff_len(spb)); + ssm_pool_remove(pool, idx); + goto fail_alloc; + } + + /* Verify correct size class was used + * hdr->size is the data array size (object_size - header) */ + hdr = spb; + actual_class = hdr->size + offsetof(struct ssm_pk_buff, data); + if (actual_class != expected_classes[i]) { + printf("Wrong class for len=%zu: want %zu, got %zu.\n", + sizes[i], expected_classes[i], actual_class); + ssm_pool_remove(pool, idx); + goto fail_alloc; + } + + memset(ptr, i & 0xFF, sizes[i]); + + ssm_pool_remove(pool, idx); + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_exhaustion(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t * indices; + size_t count = 0; + size_t i; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + indices = malloc(2048 * sizeof(*indices)); + if (indices == NULL) { + printf("Malloc failed.\n"); + goto fail_alloc; + } + + for (i = 0; i < 2048; i++) { + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + if (ret == -EAGAIN) + break; + printf("Alloc error: %zd.\n", ret); + goto fail_test; + } + indices[count++] = ret; + } + + if (count == 0) { + printf("No allocs succeeded.\n"); + goto fail_test; + } + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret >= 0) { + ssm_pool_remove(pool, ret); + } else if (ret != -EAGAIN) { + printf("Unexpected error: %zd.\n", ret); + goto fail_test; + } + + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Alloc after free failed: %zd.\n", ret); + goto fail_test; + } + ssm_pool_remove(pool, ret); + + free(indices); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_test: + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + free(indices); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_reclaim_orphans(void) +{ + struct ssm_pool * pool; + uint8_t * ptr1; + uint8_t * ptr2; + uint8_t * ptr3; + struct ssm_pk_buff * spb1; + struct ssm_pk_buff * spb2; + struct ssm_pk_buff * spb3; + ssize_t ret1; + ssize_t ret2; + ssize_t ret3; + pid_t my_pid; + pid_t fake_pid = 99999; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + my_pid = getpid(); + + /* Allocate some blocks */ + ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1); + ret2 = ssm_pool_alloc(pool, POOL_512, &ptr2, &spb2); + ret3 = ssm_pool_alloc(pool, POOL_1K, &ptr3, &spb3); + if (ret1 < 0 || ret2 < 0 || ret3 < 0) { + printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3); + goto fail_alloc; + } + + /* Simulate blocks from another process by changing allocator_pid */ + spb1->allocator_pid = fake_pid; + spb2->allocator_pid = fake_pid; + /* Keep spb3 with our pid */ + + /* Reclaim orphans from fake_pid */ + ssm_pool_reclaim_orphans(pool, fake_pid); + + /* Verify spb1 and spb2 have refcount 0 (reclaimed) */ + if (spb1->refcount != 0) { + printf("spb1 refcount should be 0, got %u.\n", spb1->refcount); + goto fail_test; + } + + if (spb2->refcount != 0) { + printf("spb2 refcount should be 0, got %u.\n", spb2->refcount); + goto fail_test; + } + + /* Verify spb3 still has refcount 1 (not reclaimed) */ + if (spb3->refcount != 1) { + printf("spb3 refcount should be 1, got %u.\n", spb3->refcount); + goto fail_test; + } + + /* Clean up */ + ssm_pool_remove(pool, ret3); + + /* Try allocating again - should get blocks from reclaimed pool */ + ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1); + if (ret1 < 0) { + printf("Alloc after reclaim failed: %zd.\n", ret1); + goto fail_test; + } + + /* Verify new allocation has our pid */ + if (spb1->allocator_pid != my_pid) { + printf("New block has wrong pid: %d vs %d.\n", + spb1->allocator_pid, my_pid); + goto fail_test; + } + + ssm_pool_remove(pool, ret1); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_test: + ssm_pool_remove(pool, ret3); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int pool_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ssm_pool_purge(); + + ret |= test_ssm_pool_basic_allocation(); + ret |= test_ssm_pool_multiple_allocations(); + ret |= test_ssm_pool_no_fallback_for_large(); + ret |= test_ssm_pool_blocking_vs_nonblocking(); + ret |= test_ssm_pool_stress_test(); + ret |= test_ssm_pool_open_initializes_ssm(); + ret |= test_ssm_pool_bounds_checking(); + ret |= test_ssm_pool_inter_process_communication(); + ret |= test_ssm_pool_read_operation(); + ret |= test_ssm_pool_mlock_operation(); + ret |= test_ssm_pk_buff_operations(); + ret |= test_ssm_pool_size_class_boundaries(); + ret |= test_ssm_pool_exhaustion(); + ret |= test_ssm_pool_reclaim_orphans(); + + return ret; +} diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c new file mode 100644 index 00000000..6e1cb5ec --- /dev/null +++ b/src/lib/ssm/tests/rbuff_test.c @@ -0,0 +1,675 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the SSM notification ring buffer + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_rbuff.h> +#include <ouroboros/errno.h> +#include <ouroboros/time.h> + +#include <errno.h> +#include <stdio.h> +#include <unistd.h> +#include <pthread.h> + +static int test_ssm_rbuff_create_destroy(void) +{ + struct ssm_rbuff * rb; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 1); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_write_read(void) +{ + struct ssm_rbuff * rb; + ssize_t idx; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 2); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + if (ssm_rbuff_write(rb, 42) < 0) { + printf("Failed to write value.\n"); + goto fail_rb; + } + + if (ssm_rbuff_queued(rb) != 1) { + printf("Queue length should be 1, got %zu.\n", + ssm_rbuff_queued(rb)); + goto fail_rb; + } + + idx = ssm_rbuff_read(rb); + if (idx != 42) { + printf("Expected 42, got %zd.\n", idx); + goto fail_rb; + } + + if (ssm_rbuff_queued(rb) != 0) { + printf("Queue should be empty, got %zu.\n", + ssm_rbuff_queued(rb)); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_read_empty(void) +{ + struct ssm_rbuff * rb; + ssize_t ret; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 3); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + ret = ssm_rbuff_read(rb); + if (ret != -EAGAIN) { + printf("Expected -EAGAIN, got %zd.\n", ret); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_fill_drain(void) +{ + struct ssm_rbuff * rb; + size_t i; + ssize_t ret; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 4); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + if (ssm_rbuff_queued(rb) != i) { + printf("Expected %zu queued, got %zu.\n", + i, ssm_rbuff_queued(rb)); + goto fail_rb; + } + if (ssm_rbuff_write(rb, i) < 0) { + printf("Failed to write at index %zu.\n", i); + goto fail_rb; + } + } + + if (ssm_rbuff_queued(rb) != SSM_RBUFF_SIZE - 1) { + printf("Expected %d queued, got %zu.\n", + SSM_RBUFF_SIZE - 1, ssm_rbuff_queued(rb)); + goto fail_rb; + } + + ret = ssm_rbuff_write(rb, 999); + if (ret != -EAGAIN) { + printf("Expected -EAGAIN on full buffer, got %zd.\n", ret); + goto fail_rb; + } + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + ret = ssm_rbuff_read(rb); + if (ret != (ssize_t) i) { + printf("Expected %zu, got %zd.\n", i, ret); + goto fail_rb; + } + } + + if (ssm_rbuff_queued(rb) != 0) { + printf("Expected empty queue, got %zu.\n", + ssm_rbuff_queued(rb)); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + while (ssm_rbuff_read(rb) >= 0) + ; + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_acl(void) +{ + struct ssm_rbuff * rb; + uint32_t acl; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 5); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + acl = ssm_rbuff_get_acl(rb); + if (acl != ACL_RDWR) { + printf("Expected ACL_RDWR, got %u.\n", acl); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_RDONLY); + acl = ssm_rbuff_get_acl(rb); + if (acl != ACL_RDONLY) { + printf("Expected ACL_RDONLY, got %u.\n", acl); + goto fail_rb; + } + + if (ssm_rbuff_write(rb, 1) != -ENOTALLOC) { + printf("Expected -ENOTALLOC on RDONLY.\n"); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN on FLOWDOWN.\n"); + goto fail_rb; + } + + if (ssm_rbuff_read(rb) != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN on read with FLOWDOWN.\n"); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_open_close(void) +{ + struct ssm_rbuff * rb1; + struct ssm_rbuff * rb2; + pid_t pid; + + TEST_START(); + + pid = getpid(); + + rb1 = ssm_rbuff_create(pid, 6); + if (rb1 == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + if (ssm_rbuff_write(rb1, 123) < 0) { + printf("Failed to write value.\n"); + goto fail_rb1; + } + + rb2 = ssm_rbuff_open(pid, 6); + if (rb2 == NULL) { + printf("Failed to open existing rbuff.\n"); + goto fail_rb1; + } + + if (ssm_rbuff_queued(rb2) != 1) { + printf("Expected 1 queued in opened rbuff, got %zu.\n", + ssm_rbuff_queued(rb2)); + goto fail_rb2; + } + + if (ssm_rbuff_read(rb2) != 123) { + printf("Failed to read from opened rbuff.\n"); + goto fail_rb2; + } + + ssm_rbuff_close(rb2); + ssm_rbuff_destroy(rb1); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb2: + ssm_rbuff_close(rb2); + fail_rb1: + ssm_rbuff_destroy(rb1); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +struct thread_args { + struct ssm_rbuff * rb; + int iterations; + int delay_us; +}; + +static void * writer_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + struct timespec delay = {0, 0}; + int i; + + delay.tv_nsec = args->delay_us * 1000L; + + for (i = 0; i < args->iterations; ++i) { + while (ssm_rbuff_write(args->rb, i) < 0) + nanosleep(&delay, NULL); + } + + return NULL; +} + +static void * reader_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + struct timespec delay = {0, 0}; + int i; + ssize_t val; + + delay.tv_nsec = args->delay_us * 1000L; + + for (i = 0; i < args->iterations; ++i) { + val = ssm_rbuff_read(args->rb); + while (val < 0) { + nanosleep(&delay, NULL); + val = ssm_rbuff_read(args->rb); + } + if (val != i) { + printf("Expected %d, got %zd.\n", i, val); + return (void *) -1; + } + } + + return NULL; +} + +static void * blocking_writer_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + int i; + + for (i = 0; i < args->iterations; ++i) { + if (ssm_rbuff_write_b(args->rb, i, NULL) < 0) + return (void *) -1; + } + + return NULL; +} + +static void * blocking_reader_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + int i; + ssize_t val; + + for (i = 0; i < args->iterations; ++i) { + val = ssm_rbuff_read_b(args->rb, NULL); + if (val < 0 || val != i) { + printf("Expected %d, got %zd.\n", i, val); + return (void *) -1; + } + } + + return NULL; +} + +static int test_ssm_rbuff_blocking(void) +{ + struct ssm_rbuff * rb; + pthread_t wthread; + pthread_t rthread; + struct thread_args args; + struct timespec delay = {0, 10 * MILLION}; + void * ret_w; + void * ret_r; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 8); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + args.rb = rb; + args.iterations = 50; + args.delay_us = 0; + + if (pthread_create(&rthread, NULL, blocking_reader_thread, &args)) { + printf("Failed to create reader thread.\n"); + goto fail_rthread; + } + + nanosleep(&delay, NULL); + + if (pthread_create(&wthread, NULL, blocking_writer_thread, &args)) { + printf("Failed to create writer thread.\n"); + pthread_cancel(rthread); + goto fail_wthread; + } + + pthread_join(wthread, &ret_w); + pthread_join(rthread, &ret_r); + + if (ret_w != NULL || ret_r != NULL) { + printf("Thread returned error.\n"); + goto fail_ret; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_ret: + fail_wthread: + pthread_join(rthread, NULL); + fail_rthread: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_blocking_timeout(void) +{ + struct ssm_rbuff * rb; + struct timespec abs_timeout; + struct timespec interval = {0, 100 * MILLION}; + struct timespec start; + struct timespec end; + ssize_t ret; + long elapsed_ms; + size_t i; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 9); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + clock_gettime(PTHREAD_COND_CLOCK, &start); + ts_add(&start, &interval, &abs_timeout); + + ret = ssm_rbuff_read_b(rb, &abs_timeout); + + clock_gettime(PTHREAD_COND_CLOCK, &end); + + if (ret != -ETIMEDOUT) { + printf("Expected -ETIMEDOUT, got %zd.\n", ret); + goto fail_rb; + } + + elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L + + (end.tv_nsec - start.tv_nsec) / 1000000L; + + if (elapsed_ms < 90 || elapsed_ms > 200) { + printf("Timeout took %ld ms, expected ~100 ms.\n", + elapsed_ms); + goto fail_rb; + } + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + if (ssm_rbuff_write(rb, i) < 0) { + printf("Failed to fill buffer.\n"); + goto fail_rb; + } + } + + clock_gettime(PTHREAD_COND_CLOCK, &start); + ts_add(&start, &interval, &abs_timeout); + + ret = ssm_rbuff_write_b(rb, 999, &abs_timeout); + + clock_gettime(PTHREAD_COND_CLOCK, &end); + + if (ret != -ETIMEDOUT) { + printf("Expected -ETIMEDOUT on full buffer, got %zd.\n", + ret); + goto fail_rb; + } + + elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L + + (end.tv_nsec - start.tv_nsec) / 1000000L; + + if (elapsed_ms < 90 || elapsed_ms > 200) { + printf("Write timeout took %ld ms, expected ~100 ms.\n", + elapsed_ms); + goto fail_rb; + } + + while (ssm_rbuff_read(rb) >= 0) + ; + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + while (ssm_rbuff_read(rb) >= 0) + ; + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_blocking_flowdown(void) +{ + struct ssm_rbuff * rb; + struct timespec abs_timeout; + struct timespec now; + struct timespec interval = {5, 0}; + ssize_t ret; + size_t i; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 10); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &interval, &abs_timeout); + + ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + + ret = ssm_rbuff_read_b(rb, &abs_timeout); + if (ret != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN, got %zd.\n", ret); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_RDWR); + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + if (ssm_rbuff_write(rb, i) < 0) { + printf("Failed to fill buffer.\n"); + goto fail_rb; + } + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &interval, &abs_timeout); + + ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + + ret = ssm_rbuff_write_b(rb, 999, &abs_timeout); + if (ret != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN on write, got %zd.\n", ret); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_RDWR); + while (ssm_rbuff_read(rb) >= 0) + ; + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + while (ssm_rbuff_read(rb) >= 0) + ; + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_threaded(void) +{ + struct ssm_rbuff * rb; + pthread_t wthread; + pthread_t rthread; + struct thread_args args; + void * ret_w; + void * ret_r; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 7); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + args.rb = rb; + args.iterations = 100; + args.delay_us = 100; + + if (pthread_create(&wthread, NULL, writer_thread, &args)) { + printf("Failed to create writer thread.\n"); + goto fail_rb; + } + + if (pthread_create(&rthread, NULL, reader_thread, &args)) { + printf("Failed to create reader thread.\n"); + pthread_cancel(wthread); + pthread_join(wthread, NULL); + goto fail_rb; + } + + pthread_join(wthread, &ret_w); + pthread_join(rthread, &ret_r); + + if (ret_w != NULL || ret_r != NULL) { + printf("Thread returned error.\n"); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int rbuff_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ret |= test_ssm_rbuff_create_destroy(); + ret |= test_ssm_rbuff_write_read(); + ret |= test_ssm_rbuff_read_empty(); + ret |= test_ssm_rbuff_fill_drain(); + ret |= test_ssm_rbuff_acl(); + ret |= test_ssm_rbuff_open_close(); + ret |= test_ssm_rbuff_threaded(); + ret |= test_ssm_rbuff_blocking(); + ret |= test_ssm_rbuff_blocking_timeout(); + ret |= test_ssm_rbuff_blocking_flowdown(); + + return ret; +} diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt index 6ab69bd1..d2535bc2 100644 --- a/src/lib/tests/CMakeLists.txt +++ b/src/lib/tests/CMakeLists.txt @@ -16,7 +16,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c kex_test_pqc.c md5_test.c sha3_test.c - shm_rbuff_test.c sockets_test.c time_test.c tpm_test.c diff --git a/src/lib/tests/shm_rbuff_test.c b/src/lib/tests/shm_rbuff_test.c deleted file mode 100644 index e36c3229..00000000 --- a/src/lib/tests/shm_rbuff_test.c +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2024 - * - * Test of the shm_rbuff - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#define _POSIX_C_SOURCE 200112L - -#include "config.h" - -#include <ouroboros/shm_rbuff.h> - -#include <errno.h> -#include <stdio.h> -#include <unistd.h> - -int shm_rbuff_test(int argc, - char ** argv) -{ - struct shm_rbuff * rb; - size_t i; - - (void) argc; - (void) argv; - - printf("Test: create rbuff..."); - - rb = shm_rbuff_create(getpid(), 1); - if (rb == NULL) - goto err; - - printf("success.\n\n"); - printf("Test: write a value..."); - - if (shm_rbuff_write(rb, 1) < 0) - goto error; - - printf("success.\n\n"); - printf("Test: check queue length is 1..."); - - if (shm_rbuff_queued(rb) != 1) - goto error; - - printf("success.\n\n"); - printf("Test: read a value..."); - - if (shm_rbuff_read(rb) != 1) - goto error; - - printf("success.\n\n"); - printf("Test: check queue is empty..."); - - if (shm_rbuff_read(rb) != -EAGAIN) - goto error; - - printf("success.\n\n"); - printf("Test: fill the queue..."); - - for (i = 0; i < SHM_RBUFF_SIZE - 1; ++i) { - if (shm_rbuff_queued(rb) != i) - goto error; - if (shm_rbuff_write(rb, 1) < 0) - goto error; - } - - printf("success.\n\n"); - printf("Test: check queue is full..."); - - if (shm_rbuff_queued(rb) != SHM_RBUFF_SIZE - 1) - goto error; - - printf("success [%zd entries].\n\n", shm_rbuff_queued(rb)); - - printf("Test: check queue is full by writing value..."); - if (!(shm_rbuff_write(rb, 1) < 0)) - goto error; - - printf("success [%zd entries].\n\n", shm_rbuff_queued(rb)); - - /* empty the rbuff */ - while (shm_rbuff_read(rb) >= 0) - ; - - shm_rbuff_destroy(rb); - - return 0; - - error: - /* empty the rbuff */ - while (shm_rbuff_read(rb) >= 0) - ; - - shm_rbuff_destroy(rb); - err: - printf("failed.\n\n"); - return -1; -} diff --git a/src/lib/tests/sockets_test.c b/src/lib/tests/sockets_test.c index 952f9529..ce9051b6 100644 --- a/src/lib/tests/sockets_test.c +++ b/src/lib/tests/sockets_test.c @@ -20,7 +20,11 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else #define _POSIX_C_SOURCE 200112L +#endif #include <ouroboros/sockets.h> #include <test/test.h> diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 96f4ac47..eaa684e5 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -31,7 +31,7 @@ struct rxm { struct list_head next; uint32_t seqno; #ifndef RXM_BUFFER_ON_HEAP - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; #endif struct frct_pci * pkt; size_t len; @@ -81,8 +81,8 @@ static void timerwheel_fini(void) #ifdef RXM_BUFFER_ON_HEAP free(rxm->pkt); #else - shm_du_buff_ack(rxm->sdb); - ipcp_sdb_release(rxm->sdb); + ssm_pk_buff_ack(rxm->spb); + ipcp_spb_release(rxm->spb); #endif free(rxm); } @@ -160,7 +160,7 @@ static void timerwheel_move(void) size_t slot; size_t rslot; ssize_t idx; - struct shm_du_buff * sdb; + struct ssm_pk_buff * spb; struct frct_pci * pci; struct flow * f; uint32_t snd_lwe; @@ -175,7 +175,7 @@ static void timerwheel_move(void) rcv_cr = &r->frcti->rcv_cr; f = &ai.flows[r->fd]; #ifndef RXM_BUFFER_ON_HEAP - shm_du_buff_ack(r->sdb); + ssm_pk_buff_ack(r->spb); #endif if (f->frcti == NULL || f->info.id != r->flow_id) @@ -224,45 +224,45 @@ static void timerwheel_move(void) rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1); #ifdef RXM_BLOCKING - if (ipcp_sdb_reserve(&sdb, r->len) < 0) + if (ipcp_spb_reserve(&spb, r->len) < 0) #else - if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL, - &sdb) < 0) + if (ssm_pool_alloc(ai.gspp, r->len, NULL, + &spb) < 0) #endif goto reschedule; /* rdrbuff full */ - pci = (struct frct_pci *) shm_du_buff_head(sdb); + pci = (struct frct_pci *) ssm_pk_buff_head(spb); memcpy(pci, r->pkt, r->len); #ifndef RXM_BUFFER_ON_HEAP - ipcp_sdb_release(r->sdb); - r->sdb = sdb; + ipcp_spb_release(r->spb); + r->spb = spb; r->pkt = pci; - shm_du_buff_wait_ack(sdb); + ssm_pk_buff_wait_ack(spb); #endif - idx = shm_du_buff_get_idx(sdb); + idx = ssm_pk_buff_get_idx(spb); /* Retransmit the copy. */ pci->ackno = hton32(rcv_lwe); #ifdef RXM_BLOCKING - if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) + if (ssm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) #else - if (shm_rbuff_write(f->tx_rb, idx) < 0) + if (ssm_rbuff_write(f->tx_rb, idx) < 0) #endif goto flow_down; - shm_flow_set_notify(f->set, f->info.id, + ssm_flow_set_notify(f->set, f->info.id, FLOW_PKT); reschedule: list_add(&r->next, &rw.rxms[lvl][rslot]); continue; flow_down: - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); cleanup: #ifdef RXM_BUFFER_ON_HEAP free(r->pkt); #else - ipcp_sdb_release(r->sdb); + ipcp_spb_release(r->spb); #endif free(r); } @@ -306,7 +306,7 @@ static void timerwheel_move(void) static int timerwheel_rxm(struct frcti * frcti, uint32_t seqno, - struct shm_du_buff * sdb) + struct ssm_pk_buff * spb) { struct timespec now; struct rxm * r; @@ -323,17 +323,17 @@ static int timerwheel_rxm(struct frcti * frcti, r->t0 = ts_to_ns(now); r->seqno = seqno; r->frcti = frcti; - r->len = shm_du_buff_len(sdb); + r->len = ssm_pk_buff_len(spb); #ifdef RXM_BUFFER_ON_HEAP r->pkt = malloc(r->len); if (r->pkt == NULL) { free(r); return -ENOMEM; } - memcpy(r->pkt, shm_du_buff_head(sdb), r->len); + memcpy(r->pkt, ssm_pk_buff_head(spb), r->len); #else - r->sdb = sdb; - r->pkt = (struct frct_pci *) shm_du_buff_head(sdb); + r->spb = spb; + r->pkt = (struct frct_pci *) ssm_pk_buff_head(spb); #endif pthread_rwlock_rdlock(&r->frcti->lock); @@ -365,7 +365,7 @@ static int timerwheel_rxm(struct frcti * frcti, list_add_tail(&r->next, &rw.rxms[lvl][slot]); #ifndef RXM_BUFFER_ON_HEAP - shm_du_buff_wait_ack(sdb); + ssm_pk_buff_wait_ack(spb); #endif pthread_mutex_unlock(&rw.lock); |
