summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-06-14 16:16:03 +0200
committerSander Vrijders <sander@ouroboros.rocks>2026-06-29 08:32:58 +0200
commitfdb50b8256f1038d5bc4f906b41605cacc769bf4 (patch)
tree8962c4188a208f81e3cdba39cc54a01da933d787 /src/lib
parentc386d9b7caa56f472fdce20ff5b2841ed41dd539 (diff)
downloadouroboros-fdb50b8256f1038d5bc4f906b41605cacc769bf4.tar.gz
ouroboros-fdb50b8256f1038d5bc4f906b41605cacc769bf4.zip
irmd: Deliver flow re-keying
Re-key each encrypted flow's batch root periodically so a long-lived flow never exhausts or over-uses a single root. The IRMd re-runs the OAP exchange with the peer IRMd over the flow-update relay. The per-flow re-keying state is tracked in the registry (reg_flow). A re-key delivers one root seed from the OAP exchange. keyrot immediately HKDF-expands it into 128 node keys (KR_NODES_SZ = 128 × 32 B) and wipes the root. Then each of the 128 node keys is itself a root → HKDF-expanded into 64 (2^KEY_NODE_BITS) leaf keys, forked per direction; each leaf key is the actual AEAD key, good for 2^20 packets (the low counter bits are its nonce/seq). If the number of keys runs low, a re-key will be triggered (KEY_REKEY_WATERMARK). The rekey is signalled out of band to the application. The rbuff ACL is generalized into a flags word, so an RB_REKEY bit rides alongside the access RB_RD/RB_WR and FLOWDOWN/FLOWPEER bits. The RD and WR bits are revised ditching the fcntl historical weirdness. The seed is pulled via flow_read/flow_write, installed with crypt_rekey(). TX holds the old epoch until the peer is observed on the new one (or a grace deadline elapses), promoted from both the read and write paths so a recv-mostly flow still advances. Also fix the FLOW_ACCEPT and FLOW_ALLOC handlers, which on a key-buffer allocation failure returned from inside the cleanup-push region: that leaked the reply message and skipped both the stack-key scrub and the cleanup pop. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/dev.c229
-rw-r--r--src/lib/frct.c6
-rw-r--r--src/lib/serdes-irm.c132
-rw-r--r--src/lib/ssm/rbuff.c95
-rw-r--r--src/lib/ssm/tests/rbuff_test.c30
5 files changed, 388 insertions, 104 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index cff1ebf2..3064b1e2 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -100,6 +100,9 @@ struct flow {
struct crypt_ctx * crypt;
int headsz; /* selector */
int tailsz; /* Tag + CRC */
+ struct timespec rk_grace; /* TX-promote deadline (0 = none) */
+ bool rk_wm_inflight; /* re-key trigger in flight */
+ uint32_t rk_wm_ctr; /* throttles the consult */
struct timespec snd_act;
struct timespec rcv_act;
@@ -509,6 +512,66 @@ static void flow_drain_rx_nb(struct flow * flow)
}
}
+/* TX-promotion grace when the peer's install latency is unknown (raw). */
+#define REKEY_GRACE_MS 1000
+
+/*
+ * Pull a parked re-key seed from the IRMd and install it. Driven from the
+ * data path when RB_REKEY shows on rx_rb. crypt_rekey is concurrency-safe
+ * on its own; proc.lock (rd) only guards against teardown.
+ */
+static void flow_rekey(struct flow * flow)
+{
+ struct flow_info info;
+ struct crypt_sk sk;
+ struct timespec now;
+ struct timespec intv;
+ time_t ms;
+ uint8_t key[SYMMKEYSZ];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {SOCK_BUF_SIZE, buf};
+ bool has_key;
+
+ pthread_rwlock_rdlock(&proc.lock);
+ if (flow->info.id < 0 || flow->crypt == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ return;
+ }
+ info = flow->info;
+ pthread_rwlock_unlock(&proc.lock);
+
+ if (flow_update__irm_req_ser(&msg, &info, false) < 0)
+ return;
+
+ if (send_recv_msg(&msg) < 0)
+ return;
+
+ sk.key = key;
+ if (flow_rekey__irm_result_des(&msg, &sk, &has_key) < 0)
+ return;
+
+ if (!has_key)
+ return;
+
+ pthread_rwlock_rdlock(&proc.lock);
+ if (flow->info.id == info.id && flow->crypt != NULL) {
+ if (crypt_rekey(flow->crypt, &sk) == 0) {
+ /* Hold TX on the old epoch until the peer installs. */
+ ms = flow->info.mpl > 0 ? flow->info.mpl * 3
+ : REKEY_GRACE_MS;
+ intv.tv_sec = ms / 1000;
+ intv.tv_nsec = (ms % 1000) * MILLION;
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &intv, &flow->rk_grace);
+ }
+ /* Re-arm the watermark even if the install was a no-op. */
+ STORE_RELAXED(&flow->rk_wm_inflight, false);
+ }
+ pthread_rwlock_unlock(&proc.lock);
+
+ crypt_secure_clear(key, SYMMKEYSZ);
+}
+
/*
* Wait clamped by caller deadline, next tw expiry, and TICTIME;
* a clamp-timeout means tw work is due, not caller-deadline.
@@ -533,6 +596,14 @@ static int flow_rx_one(struct flow * flow,
return -EFLOWDOWN;
}
+ /* Pull a parked re-key before re-blocking (idle reader). */
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(rx_rb) & RB_REKEY)) {
+ pthread_rwlock_unlock(&proc.lock);
+ flow_rekey(flow);
+ continue;
+ }
+
idx = ssm_rbuff_read_b(rx_rb, &wait_abs);
if (idx == -ETIMEDOUT) {
pthread_rwlock_unlock(&proc.lock);
@@ -593,7 +664,7 @@ static void flow_clear(int fd)
}
/*
- * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes
+ * Set RB_FLOWDOWN on rx/tx so any in-flight blocking reads or writes
* wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's
* wrlock, else the wrlock blocks on those rdlock holders and the
* in-flight calls never see the FLOWDOWN signal.
@@ -604,9 +675,9 @@ static void flow_quiesce(int fd)
struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb;
if (rx_rb != NULL)
- ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rx_rb, RB_FLOWDOWN);
if (tx_rb != NULL)
- ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(tx_rb, RB_FLOWDOWN);
}
static void do_flow_fini(int fd)
@@ -1256,8 +1327,6 @@ int fccntl(int fd,
va_list l;
struct timespec * timeo;
qosspec_t * qs;
- uint32_t rx_acl;
- uint32_t tx_acl;
size_t * qlen;
struct flow * flow;
uint16_t old_acc;
@@ -1353,31 +1422,26 @@ int fccntl(int fd,
&& flow->frcti != NULL)
emit_eos = true;
- rx_acl = ssm_rbuff_get_acl(flow->rx_rb);
- tx_acl = ssm_rbuff_get_acl(flow->tx_rb);
- /* Our flow write-only -> peer's read-only. */
+ /* Our flow write-only -> peer's read-only; restore on RDWR. */
if (flow->oflags & FLOWFWRONLY)
- rx_acl |= ACL_RDONLY;
- if (flow->oflags & FLOWFRDWR)
- rx_acl |= ACL_RDWR;
+ ssm_rbuff_clr_bits(flow->rx_rb, RB_WR);
+ else
+ ssm_rbuff_set_bits(flow->rx_rb, RB_WR);
if (flow->oflags & FLOWFDOWN) {
- rx_acl |= ACL_FLOWDOWN;
- tx_acl |= ACL_FLOWDOWN;
+ ssm_rbuff_set_bits(flow->rx_rb, RB_FLOWDOWN);
+ ssm_rbuff_set_bits(flow->tx_rb, RB_FLOWDOWN);
ssm_flow_set_notify(flow->set,
flow->info.id,
FLOW_DOWN);
} else {
- rx_acl &= ~ACL_FLOWDOWN;
- tx_acl &= ~ACL_FLOWDOWN;
+ ssm_rbuff_clr_bits(flow->rx_rb, RB_FLOWDOWN);
+ ssm_rbuff_clr_bits(flow->tx_rb, RB_FLOWDOWN);
ssm_flow_set_notify(flow->set,
flow->info.id,
FLOW_UP);
}
- ssm_rbuff_set_acl(flow->rx_rb, rx_acl);
- ssm_rbuff_set_acl(flow->tx_rb, tx_acl);
-
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
@@ -1667,6 +1731,92 @@ static ssize_t flow_write_frag(struct flow * flow,
return (ssize_t) count;
}
+/*
+ * Watermark: re-key when the TX batch is within KEY_REKEY_WATERMARK node
+ * keys of exhaustion (0 disables), ahead of the timer; consult keyrot at
+ * most once per FLOW_WM_CHECK writes.
+ */
+#define FLOW_WM_CHECK (1u << 16)
+
+/*
+ * Switch TX to the freshly installed epoch once the peer is seen on it
+ * (peer_synced) or the install grace has elapsed (breaks the symmetric
+ * wait where neither side sends the new epoch first).
+ */
+static void flow_tx_promote(struct flow * flow,
+ const struct timespec * now)
+{
+ if (flow->crypt == NULL)
+ return;
+
+ if (flow->rk_grace.tv_sec == 0 && flow->rk_grace.tv_nsec == 0)
+ return;
+
+ if (!crypt_peer_synced(flow->crypt)
+ && ts_diff_ns(now, &flow->rk_grace) < 0)
+ return;
+
+ crypt_tx_promote(flow->crypt);
+ flow->rk_grace.tv_sec = 0;
+ flow->rk_grace.tv_nsec = 0;
+}
+
+/*
+ * Ask the IRMd to start an OAP re-key for this flow. The reply carries no
+ * key; the seed arrives later over RB_REKEY. Fired from the write path as
+ * the TX batch nears exhaustion, ahead of the timer.
+ */
+static int flow_rekey_trigger(struct flow * flow)
+{
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {SOCK_BUF_SIZE, buf};
+
+ pthread_rwlock_rdlock(&proc.lock);
+ if (flow->info.id < 0 || flow->crypt == NULL) {
+ pthread_rwlock_unlock(&proc.lock);
+ return -1;
+ }
+ info = flow->info;
+ pthread_rwlock_unlock(&proc.lock);
+
+ if (flow_update__irm_req_ser(&msg, &info, true) < 0)
+ return -1;
+
+ if (send_recv_msg(&msg) < 0)
+ return -1;
+
+ return 0;
+}
+
+/*
+ * True when the live TX batch has run low and no re-key is in flight.
+ * Advances a throttle so the (locking) keyrot consult runs at most once
+ * per FLOW_WM_CHECK writes.
+ */
+static bool flow_wm_due(struct flow * flow)
+{
+ uint32_t tick;
+
+ if (KEY_REKEY_WATERMARK == 0)
+ return false;
+
+ if (flow->crypt == NULL)
+ return false;
+
+ if (LOAD_RELAXED(&flow->rk_wm_inflight))
+ return false;
+
+ tick = FETCH_ADD_RELAXED(&flow->rk_wm_ctr, 1);
+ if ((tick & (FLOW_WM_CHECK - 1)) != 0)
+ return false;
+
+ if (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)
+ return false;
+
+ return crypt_nodes_left(flow->crypt) <= KEY_REKEY_WATERMARK;
+}
+
ssize_t flow_write(int fd,
const void * buf,
size_t count)
@@ -1710,6 +1860,19 @@ ssize_t flow_write(int fd,
if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
return -EPERM;
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
+ flow_rekey(flow);
+
+ flow_tx_promote(flow, &now);
+
+ /* Pre-empt TX key exhaustion; the timer is the backstop. */
+ if (flow_wm_due(flow)) {
+ STORE_RELAXED(&flow->rk_wm_inflight, true);
+ if (flow_rekey_trigger(flow) < 0)
+ STORE_RELAXED(&flow->rk_wm_inflight, false);
+ }
+
tw_move_safe();
if (flow->frcti != NULL) {
@@ -1784,6 +1947,10 @@ static ssize_t raw_flow_read_pkt(struct flow * flow,
ssize_t idx;
while (true) {
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
+ flow_rekey(flow);
+
if (!block) {
idx = ssm_rbuff_read(flow->rx_rb);
if (idx < 0)
@@ -1917,6 +2084,16 @@ ssize_t flow_read(int fd,
pthread_rwlock_unlock(&proc.lock);
+ if (flow->crypt != NULL
+ && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY))
+ flow_rekey(flow);
+
+ /* Advance TX off a stale epoch even on recv-mostly (ACK-only) flows. */
+ if (flow->crypt != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ flow_tx_promote(flow, &now);
+ }
+
tw_move_safe();
idx = flow->part_idx;
@@ -2101,6 +2278,18 @@ static int fqueue_filter(struct fqueue * fq)
pthread_rwlock_rdlock(&proc.lock);
while (fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next].event == FLOW_UPD) {
+ /* Re-key doorbell: pull internally, never surface. */
+ fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd;
+ ++fq->next;
+ if (fd >= 0) {
+ pthread_rwlock_unlock(&proc.lock);
+ flow_rekey(&proc.flows[fd]);
+ pthread_rwlock_rdlock(&proc.lock);
+ }
+ continue;
+ }
+
if (fq->fqueue[fq->next].event != FLOW_PKT) {
ret = 1;
goto out;
@@ -2643,8 +2832,8 @@ int ipcp_flow_fini(int fd)
return -1;
}
- ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN);
- ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(proc.flows[fd].rx_rb, RB_FLOWDOWN);
+ ssm_rbuff_set_bits(proc.flows[fd].tx_rb, RB_FLOWDOWN);
ssm_flow_set_notify(proc.flows[fd].set,
proc.flows[fd].info.id,
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 3077df65..c055433d 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -878,10 +878,10 @@ static void frct_mark_flow_down(struct frcti * frcti)
struct flow * f = frcti_to_flow(frcti);
if (f->rx_rb != NULL)
- ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(f->rx_rb, RB_FLOWDOWN);
if (f->tx_rb != NULL)
- ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(f->tx_rb, RB_FLOWDOWN);
}
__attribute__((cold))
@@ -890,7 +890,7 @@ static void frct_mark_peer_dead(struct frcti * frcti)
struct flow * f = frcti_to_flow(frcti);
if (f->rx_rb != NULL)
- ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWPEER);
+ ssm_rbuff_set_bits(f->rx_rb, RB_FLOWPEER);
if (proc.fqset != NULL)
ssm_flow_set_notify(proc.fqset, f->info.id, FLOW_PEER);
diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c
index a896576d..24bb349f 100644
--- a/src/lib/serdes-irm.c
+++ b/src/lib/serdes-irm.c
@@ -174,6 +174,48 @@ int flow__irm_result_des(buffer_t * buf,
else
memset(sk->key, 0, SYMMKEYSZ);
+ sk->epoch = msg->has_generation ? (uint8_t) msg->generation : 0;
+
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return err;
+}
+
+int flow_rekey__irm_result_des(buffer_t * buf,
+ struct crypt_sk * sk,
+ bool * has_key)
+{
+ irm_msg_t * msg;
+ int err;
+
+ msg = irm_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ err = -EIRMD;
+ goto fail_msg;
+ }
+
+ if (!msg->has_result) {
+ err = -EIRMD;
+ goto fail;
+ }
+
+ if (msg->result < 0) {
+ err = msg->result;
+ goto fail;
+ }
+
+ *has_key = msg->has_sym_key && msg->sym_key.len == SYMMKEYSZ;
+ if (*has_key) {
+ memcpy(sk->key, msg->sym_key.data, SYMMKEYSZ);
+ sk->nid = NID_undef;
+ sk->epoch = msg->has_generation ?
+ (uint8_t) msg->generation : 0;
+ }
+
irm_msg__free_unpacked(msg, NULL);
return 0;
@@ -222,6 +264,44 @@ int flow_dealloc__irm_req_ser(buffer_t * buf,
return -ENOMEM;
}
+int flow_update__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ bool rekey)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_FLOW_UPDATE;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->has_rekey = true;
+ msg->rekey = rekey;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf,
const struct flow_info * flow)
{
@@ -398,15 +478,19 @@ int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf,
return 0;
fail_msg:
+ /* hash/pk are borrowed from the caller; detach before free. */
+ msg->hash.len = 0;
+ msg->hash.data = NULL;
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
irm_msg__free_unpacked(msg, NULL);
fail_malloc:
return -ENOMEM;
}
-int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
- const struct flow_info * flow,
- int response,
- const buffer_t * data)
+int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const buffer_t * data)
{
irm_msg_t * msg;
size_t len;
@@ -417,16 +501,14 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
irm_msg__init(msg);
- msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg->flow_info = flow_info_s_to_msg(flow);
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR;
+ msg->flow_info = flow_info_s_to_msg(flow);
if (msg->flow_info == NULL)
goto fail_msg;
msg->has_pk = true;
msg->pk.len = data->len;
msg->pk.data = data->data;
- msg->has_response = true;
- msg->response = response;
len = irm_msg__get_packed_size(msg);
if (len == 0 || len > buf->len)
@@ -436,27 +518,25 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
irm_msg__pack(msg, buf->data);
- /* Don't free * data! */
- msg->pk.len = 0;
+ /* Don't free data! */
+ msg->pk.len = 0;
msg->pk.data = NULL;
-
irm_msg__free_unpacked(msg, NULL);
return 0;
fail_msg:
- /* hash/pk are borrowed from the caller; detach before free. */
- msg->hash.len = 0;
- msg->hash.data = NULL;
- msg->pk.len = 0;
- msg->pk.data = NULL;
+ /* pk.data is borrowed from the caller; detach before free. */
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
irm_msg__free_unpacked(msg, NULL);
fail_malloc:
return -ENOMEM;
}
-int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
- const struct flow_info * flow,
- const buffer_t * data)
+int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ int response,
+ const buffer_t * data)
{
irm_msg_t * msg;
size_t len;
@@ -467,14 +547,16 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
irm_msg__init(msg);
- msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR;
- msg->flow_info = flow_info_s_to_msg(flow);
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
+ msg->flow_info = flow_info_s_to_msg(flow);
if (msg->flow_info == NULL)
goto fail_msg;
msg->has_pk = true;
msg->pk.len = data->len;
msg->pk.data = data->data;
+ msg->has_response = true;
+ msg->response = response;
len = irm_msg__get_packed_size(msg);
if (len == 0 || len > buf->len)
@@ -484,16 +566,14 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf,
irm_msg__pack(msg, buf->data);
- /* Don't free data! */
- msg->pk.len = 0;
+ /* Don't free * data! */
+ msg->pk.len = 0;
msg->pk.data = NULL;
+
irm_msg__free_unpacked(msg, NULL);
return 0;
fail_msg:
- /* pk.data is borrowed from the caller; detach before free. */
- msg->pk.len = 0;
- msg->pk.data = NULL;
irm_msg__free_unpacked(msg, NULL);
fail_malloc:
return -ENOMEM;
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c
index c149c306..0121af89 100644
--- a/src/lib/ssm/rbuff.c
+++ b/src/lib/ssm/rbuff.c
@@ -74,7 +74,7 @@ struct ssm_rbuff {
ssize_t * shm_base; /* start of shared memory */
size_t * head; /* start of ringbuffer */
size_t * tail;
- size_t * acl; /* access control */
+ size_t * flags; /* out-of-band flags (RB_*) */
pthread_mutex_t * mtx; /* lock for cond vars only */
pthread_cond_t * add; /* signal when new data */
pthread_cond_t * del; /* signal when data removed */
@@ -114,8 +114,8 @@ static struct ssm_rbuff * rbuff_create(pid_t pid,
rb->shm_base = shm_base;
rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE));
rb->tail = (size_t *) (rb->head + 1);
- rb->acl = (size_t *) (rb->tail + 1);
- rb->mtx = (pthread_mutex_t *) (rb->acl + 1);
+ rb->flags = (size_t *) (rb->tail + 1);
+ rb->mtx = (pthread_mutex_t *) (rb->flags + 1);
rb->add = (pthread_cond_t *) (rb->mtx + 1);
rb->del = rb->add + 1;
rb->pid = pid;
@@ -181,7 +181,7 @@ struct ssm_rbuff * ssm_rbuff_create(pid_t pid,
if (pthread_cond_init(rb->del, &cattr))
goto fail_del;
- *rb->acl = ACL_RDWR;
+ *rb->flags = RB_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -231,7 +231,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
assert(rb);
/*
- * Caller must set ACL_FLOWDOWN first; if a user becomes
+ * Caller must set RB_FLOWDOWN first; if a user becomes
* cancellable, push a cleanup that decrements n_users.
*/
while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) {
@@ -245,7 +245,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb)
int ssm_rbuff_write(struct ssm_rbuff * rb,
size_t off)
{
- size_t acl;
+ size_t flags;
bool was_empty;
int ret = 0;
@@ -253,15 +253,15 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl != ACL_RDWR) {
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags != RB_RDWR) {
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
- goto fail_acl;
+ goto fail_flags;
}
- if (acl & ACL_RDONLY) {
+ if (!(flags & RB_WR)) {
ret = -ENOTALLOC;
- goto fail_acl;
+ goto fail_flags;
}
}
@@ -287,7 +287,7 @@ int ssm_rbuff_write(struct ssm_rbuff * rb,
fail_mutex:
pthread_mutex_unlock(rb->mtx);
- fail_acl:
+ fail_flags:
__atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
@@ -296,7 +296,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
size_t off,
const struct timespec * abstime)
{
- size_t acl;
+ size_t flags;
int ret = 0;
bool was_empty;
@@ -304,15 +304,15 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl != ACL_RDWR) {
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags != RB_RDWR) {
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
- goto fail_acl;
+ goto fail_flags;
}
- if (acl & ACL_RDONLY) {
+ if (!(flags & RB_WR)) {
ret = -ENOTALLOC;
- goto fail_acl;
+ goto fail_flags;
}
}
@@ -321,8 +321,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
while (IS_FULL(rb) && ret != -ETIMEDOUT) {
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (acl & ACL_FLOWDOWN) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (flags & RB_FLOWDOWN) {
ret = -EFLOWDOWN;
break;
}
@@ -341,25 +341,28 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb,
pthread_mutex_unlock(rb->mtx);
- fail_acl:
+ fail_flags:
__atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST);
return ret;
}
-static int check_rb_acl(struct ssm_rbuff * rb)
+static int check_rb_flags(struct ssm_rbuff * rb)
{
- size_t acl;
+ size_t flags;
assert(rb != NULL);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
- if (acl & ACL_FLOWDOWN)
+ if (flags & RB_FLOWDOWN)
return -EFLOWDOWN;
- if (acl & ACL_FLOWPEER)
+ if (flags & RB_FLOWPEER)
return -EFLOWPEER;
+ if (!(flags & RB_RD))
+ return -ENOTALLOC;
+
return -EAGAIN;
}
@@ -372,7 +375,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
if (IS_EMPTY(rb)) {
- ret = check_rb_acl(rb);
+ ret = check_rb_flags(rb);
goto out;
}
@@ -380,7 +383,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
if (IS_EMPTY(rb)) {
pthread_mutex_unlock(rb->mtx);
- ret = check_rb_acl(rb);
+ ret = check_rb_flags(rb);
goto out;
}
@@ -400,14 +403,14 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
const struct timespec * abstime)
{
ssize_t idx = -1;
- size_t acl;
+ size_t flags;
assert(rb != NULL);
__atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST);
- acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
- if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) {
+ flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
+ if (IS_EMPTY(rb) && (flags & RB_FLOWDOWN)) {
idx = -EFLOWDOWN;
goto out;
}
@@ -418,7 +421,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
while (IS_EMPTY(rb) &&
idx != -ETIMEDOUT &&
- check_rb_acl(rb) == -EAGAIN) {
+ check_rb_flags(rb) == -EAGAIN) {
idx = -robust_wait(rb->add, rb->mtx, abstime);
}
@@ -429,7 +432,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
ADVANCE_TAIL(rb);
pthread_cond_broadcast(rb->del);
} else if (idx != -ETIMEDOUT) {
- idx = check_rb_acl(rb);
+ idx = check_rb_flags(rb);
}
pthread_mutex_unlock(rb->mtx);
@@ -441,23 +444,35 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
return idx;
}
-void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
- uint32_t flags)
+void ssm_rbuff_set_bits(struct ssm_rbuff * rb,
+ uint32_t bits)
+{
+ assert(rb != NULL);
+
+ robust_mutex_lock(rb->mtx);
+ __atomic_fetch_or(rb->flags, (size_t) bits, __ATOMIC_SEQ_CST);
+ pthread_cond_broadcast(rb->add);
+ pthread_cond_broadcast(rb->del);
+ pthread_mutex_unlock(rb->mtx);
+}
+
+void ssm_rbuff_clr_bits(struct ssm_rbuff * rb,
+ uint32_t bits)
{
assert(rb != NULL);
robust_mutex_lock(rb->mtx);
- __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST);
+ __atomic_fetch_and(rb->flags, ~(size_t) bits, __ATOMIC_SEQ_CST);
pthread_cond_broadcast(rb->add);
pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->mtx);
}
-uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb)
+uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb)
{
assert(rb != NULL);
- return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ return (uint32_t) __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST);
}
void ssm_rbuff_fini(struct ssm_rbuff * rb)
diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c
index 58cb39c3..48e5a714 100644
--- a/src/lib/ssm/tests/rbuff_test.c
+++ b/src/lib/ssm/tests/rbuff_test.c
@@ -206,10 +206,10 @@ static int test_ssm_rbuff_fill_drain(void)
return TEST_RC_FAIL;
}
-static int test_ssm_rbuff_acl(void)
+static int test_ssm_rbuff_flags(void)
{
struct ssm_rbuff * rb;
- uint32_t acl;
+ uint32_t flags;
TEST_START();
@@ -219,16 +219,16 @@ static int test_ssm_rbuff_acl(void)
goto fail;
}
- acl = ssm_rbuff_get_acl(rb);
- if (acl != ACL_RDWR) {
- printf("Expected ACL_RDWR, got %u.\n", acl);
+ flags = ssm_rbuff_get_flags(rb);
+ if (flags != RB_RDWR) {
+ printf("Expected RB_RDWR, got %u.\n", flags);
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDONLY);
- acl = ssm_rbuff_get_acl(rb);
- if (acl != ACL_RDONLY) {
- printf("Expected ACL_RDONLY, got %u.\n", acl);
+ ssm_rbuff_clr_bits(rb, RB_WR);
+ flags = ssm_rbuff_get_flags(rb);
+ if (flags != RB_RD) {
+ printf("Expected RB_RD, got %u.\n", flags);
goto fail_rb;
}
@@ -237,7 +237,7 @@ static int test_ssm_rbuff_acl(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) {
printf("Expected -EFLOWDOWN on FLOWDOWN.\n");
goto fail_rb;
@@ -553,7 +553,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &interval, &abs_timeout);
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
ret = ssm_rbuff_read_b(rb, &abs_timeout);
if (ret != -EFLOWDOWN) {
@@ -561,7 +561,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDWR);
+ ssm_rbuff_clr_bits(rb, RB_FLOWDOWN);
for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
if (ssm_rbuff_write(rb, i) < 0) {
@@ -573,7 +573,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
ts_add(&now, &interval, &abs_timeout);
- ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ ssm_rbuff_set_bits(rb, RB_FLOWDOWN);
ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
if (ret != -EFLOWDOWN) {
@@ -581,7 +581,7 @@ static int test_ssm_rbuff_blocking_flowdown(void)
goto fail_rb;
}
- ssm_rbuff_set_acl(rb, ACL_RDWR);
+ ssm_rbuff_clr_bits(rb, RB_FLOWDOWN);
while (ssm_rbuff_read(rb) >= 0)
;
@@ -664,7 +664,7 @@ int rbuff_test(int argc,
ret |= test_ssm_rbuff_write_read();
ret |= test_ssm_rbuff_read_empty();
ret |= test_ssm_rbuff_fill_drain();
- ret |= test_ssm_rbuff_acl();
+ ret |= test_ssm_rbuff_flags();
ret |= test_ssm_rbuff_open_close();
ret |= test_ssm_rbuff_threaded();
ret |= test_ssm_rbuff_blocking();