diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 229 | ||||
| -rw-r--r-- | src/lib/frct.c | 6 | ||||
| -rw-r--r-- | src/lib/serdes-irm.c | 132 | ||||
| -rw-r--r-- | src/lib/ssm/rbuff.c | 95 | ||||
| -rw-r--r-- | src/lib/ssm/tests/rbuff_test.c | 30 |
5 files changed, 388 insertions, 104 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index cff1ebf2..3064b1e2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -100,6 +100,9 @@ struct flow { struct crypt_ctx * crypt; int headsz; /* selector */ int tailsz; /* Tag + CRC */ + struct timespec rk_grace; /* TX-promote deadline (0 = none) */ + bool rk_wm_inflight; /* re-key trigger in flight */ + uint32_t rk_wm_ctr; /* throttles the consult */ struct timespec snd_act; struct timespec rcv_act; @@ -509,6 +512,66 @@ static void flow_drain_rx_nb(struct flow * flow) } } +/* TX-promotion grace when the peer's install latency is unknown (raw). */ +#define REKEY_GRACE_MS 1000 + +/* + * Pull a parked re-key seed from the IRMd and install it. Driven from the + * data path when RB_REKEY shows on rx_rb. crypt_rekey is concurrency-safe + * on its own; proc.lock (rd) only guards against teardown. + */ +static void flow_rekey(struct flow * flow) +{ + struct flow_info info; + struct crypt_sk sk; + struct timespec now; + struct timespec intv; + time_t ms; + uint8_t key[SYMMKEYSZ]; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + bool has_key; + + pthread_rwlock_rdlock(&proc.lock); + if (flow->info.id < 0 || flow->crypt == NULL) { + pthread_rwlock_unlock(&proc.lock); + return; + } + info = flow->info; + pthread_rwlock_unlock(&proc.lock); + + if (flow_update__irm_req_ser(&msg, &info, false) < 0) + return; + + if (send_recv_msg(&msg) < 0) + return; + + sk.key = key; + if (flow_rekey__irm_result_des(&msg, &sk, &has_key) < 0) + return; + + if (!has_key) + return; + + pthread_rwlock_rdlock(&proc.lock); + if (flow->info.id == info.id && flow->crypt != NULL) { + if (crypt_rekey(flow->crypt, &sk) == 0) { + /* Hold TX on the old epoch until the peer installs. */ + ms = flow->info.mpl > 0 ? flow->info.mpl * 3 + : REKEY_GRACE_MS; + intv.tv_sec = ms / 1000; + intv.tv_nsec = (ms % 1000) * MILLION; + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &intv, &flow->rk_grace); + } + /* Re-arm the watermark even if the install was a no-op. */ + STORE_RELAXED(&flow->rk_wm_inflight, false); + } + pthread_rwlock_unlock(&proc.lock); + + crypt_secure_clear(key, SYMMKEYSZ); +} + /* * Wait clamped by caller deadline, next tw expiry, and TICTIME; * a clamp-timeout means tw work is due, not caller-deadline. @@ -533,6 +596,14 @@ static int flow_rx_one(struct flow * flow, return -EFLOWDOWN; } + /* Pull a parked re-key before re-blocking (idle reader). */ + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(rx_rb) & RB_REKEY)) { + pthread_rwlock_unlock(&proc.lock); + flow_rekey(flow); + continue; + } + idx = ssm_rbuff_read_b(rx_rb, &wait_abs); if (idx == -ETIMEDOUT) { pthread_rwlock_unlock(&proc.lock); @@ -593,7 +664,7 @@ static void flow_clear(int fd) } /* - * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes + * Set RB_FLOWDOWN on rx/tx so any in-flight blocking reads or writes * wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's * wrlock, else the wrlock blocks on those rdlock holders and the * in-flight calls never see the FLOWDOWN signal. @@ -604,9 +675,9 @@ static void flow_quiesce(int fd) struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb; if (rx_rb != NULL) - ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rx_rb, RB_FLOWDOWN); if (tx_rb != NULL) - ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(tx_rb, RB_FLOWDOWN); } static void do_flow_fini(int fd) @@ -1256,8 +1327,6 @@ int fccntl(int fd, va_list l; struct timespec * timeo; qosspec_t * qs; - uint32_t rx_acl; - uint32_t tx_acl; size_t * qlen; struct flow * flow; uint16_t old_acc; @@ -1353,31 +1422,26 @@ int fccntl(int fd, && flow->frcti != NULL) emit_eos = true; - rx_acl = ssm_rbuff_get_acl(flow->rx_rb); - tx_acl = ssm_rbuff_get_acl(flow->tx_rb); - /* Our flow write-only -> peer's read-only. */ + /* Our flow write-only -> peer's read-only; restore on RDWR. */ if (flow->oflags & FLOWFWRONLY) - rx_acl |= ACL_RDONLY; - if (flow->oflags & FLOWFRDWR) - rx_acl |= ACL_RDWR; + ssm_rbuff_clr_bits(flow->rx_rb, RB_WR); + else + ssm_rbuff_set_bits(flow->rx_rb, RB_WR); if (flow->oflags & FLOWFDOWN) { - rx_acl |= ACL_FLOWDOWN; - tx_acl |= ACL_FLOWDOWN; + ssm_rbuff_set_bits(flow->rx_rb, RB_FLOWDOWN); + ssm_rbuff_set_bits(flow->tx_rb, RB_FLOWDOWN); ssm_flow_set_notify(flow->set, flow->info.id, FLOW_DOWN); } else { - rx_acl &= ~ACL_FLOWDOWN; - tx_acl &= ~ACL_FLOWDOWN; + ssm_rbuff_clr_bits(flow->rx_rb, RB_FLOWDOWN); + ssm_rbuff_clr_bits(flow->tx_rb, RB_FLOWDOWN); ssm_flow_set_notify(flow->set, flow->info.id, FLOW_UP); } - ssm_rbuff_set_acl(flow->rx_rb, rx_acl); - ssm_rbuff_set_acl(flow->tx_rb, tx_acl); - break; case FLOWGFLAGS: fflags = va_arg(l, uint32_t *); @@ -1667,6 +1731,92 @@ static ssize_t flow_write_frag(struct flow * flow, return (ssize_t) count; } +/* + * Watermark: re-key when the TX batch is within KEY_REKEY_WATERMARK node + * keys of exhaustion (0 disables), ahead of the timer; consult keyrot at + * most once per FLOW_WM_CHECK writes. + */ +#define FLOW_WM_CHECK (1u << 16) + +/* + * Switch TX to the freshly installed epoch once the peer is seen on it + * (peer_synced) or the install grace has elapsed (breaks the symmetric + * wait where neither side sends the new epoch first). + */ +static void flow_tx_promote(struct flow * flow, + const struct timespec * now) +{ + if (flow->crypt == NULL) + return; + + if (flow->rk_grace.tv_sec == 0 && flow->rk_grace.tv_nsec == 0) + return; + + if (!crypt_peer_synced(flow->crypt) + && ts_diff_ns(now, &flow->rk_grace) < 0) + return; + + crypt_tx_promote(flow->crypt); + flow->rk_grace.tv_sec = 0; + flow->rk_grace.tv_nsec = 0; +} + +/* + * Ask the IRMd to start an OAP re-key for this flow. The reply carries no + * key; the seed arrives later over RB_REKEY. Fired from the write path as + * the TX batch nears exhaustion, ahead of the timer. + */ +static int flow_rekey_trigger(struct flow * flow) +{ + struct flow_info info; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + + pthread_rwlock_rdlock(&proc.lock); + if (flow->info.id < 0 || flow->crypt == NULL) { + pthread_rwlock_unlock(&proc.lock); + return -1; + } + info = flow->info; + pthread_rwlock_unlock(&proc.lock); + + if (flow_update__irm_req_ser(&msg, &info, true) < 0) + return -1; + + if (send_recv_msg(&msg) < 0) + return -1; + + return 0; +} + +/* + * True when the live TX batch has run low and no re-key is in flight. + * Advances a throttle so the (locking) keyrot consult runs at most once + * per FLOW_WM_CHECK writes. + */ +static bool flow_wm_due(struct flow * flow) +{ + uint32_t tick; + + if (KEY_REKEY_WATERMARK == 0) + return false; + + if (flow->crypt == NULL) + return false; + + if (LOAD_RELAXED(&flow->rk_wm_inflight)) + return false; + + tick = FETCH_ADD_RELAXED(&flow->rk_wm_ctr, 1); + if ((tick & (FLOW_WM_CHECK - 1)) != 0) + return false; + + if (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY) + return false; + + return crypt_nodes_left(flow->crypt) <= KEY_REKEY_WATERMARK; +} + ssize_t flow_write(int fd, const void * buf, size_t count) @@ -1710,6 +1860,19 @@ ssize_t flow_write(int fd, if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)) + flow_rekey(flow); + + flow_tx_promote(flow, &now); + + /* Pre-empt TX key exhaustion; the timer is the backstop. */ + if (flow_wm_due(flow)) { + STORE_RELAXED(&flow->rk_wm_inflight, true); + if (flow_rekey_trigger(flow) < 0) + STORE_RELAXED(&flow->rk_wm_inflight, false); + } + tw_move_safe(); if (flow->frcti != NULL) { @@ -1784,6 +1947,10 @@ static ssize_t raw_flow_read_pkt(struct flow * flow, ssize_t idx; while (true) { + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)) + flow_rekey(flow); + if (!block) { idx = ssm_rbuff_read(flow->rx_rb); if (idx < 0) @@ -1917,6 +2084,16 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&proc.lock); + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)) + flow_rekey(flow); + + /* Advance TX off a stale epoch even on recv-mostly (ACK-only) flows. */ + if (flow->crypt != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow_tx_promote(flow, &now); + } + tw_move_safe(); idx = flow->part_idx; @@ -2101,6 +2278,18 @@ static int fqueue_filter(struct fqueue * fq) pthread_rwlock_rdlock(&proc.lock); while (fq->next < fq->fqsize) { + if (fq->fqueue[fq->next].event == FLOW_UPD) { + /* Re-key doorbell: pull internally, never surface. */ + fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd; + ++fq->next; + if (fd >= 0) { + pthread_rwlock_unlock(&proc.lock); + flow_rekey(&proc.flows[fd]); + pthread_rwlock_rdlock(&proc.lock); + } + continue; + } + if (fq->fqueue[fq->next].event != FLOW_PKT) { ret = 1; goto out; @@ -2643,8 +2832,8 @@ int ipcp_flow_fini(int fd) return -1; } - ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); - ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(proc.flows[fd].rx_rb, RB_FLOWDOWN); + ssm_rbuff_set_bits(proc.flows[fd].tx_rb, RB_FLOWDOWN); ssm_flow_set_notify(proc.flows[fd].set, proc.flows[fd].info.id, diff --git a/src/lib/frct.c b/src/lib/frct.c index 3077df65..c055433d 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -878,10 +878,10 @@ static void frct_mark_flow_down(struct frcti * frcti) struct flow * f = frcti_to_flow(frcti); if (f->rx_rb != NULL) - ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(f->rx_rb, RB_FLOWDOWN); if (f->tx_rb != NULL) - ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(f->tx_rb, RB_FLOWDOWN); } __attribute__((cold)) @@ -890,7 +890,7 @@ static void frct_mark_peer_dead(struct frcti * frcti) struct flow * f = frcti_to_flow(frcti); if (f->rx_rb != NULL) - ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWPEER); + ssm_rbuff_set_bits(f->rx_rb, RB_FLOWPEER); if (proc.fqset != NULL) ssm_flow_set_notify(proc.fqset, f->info.id, FLOW_PEER); diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c index a896576d..24bb349f 100644 --- a/src/lib/serdes-irm.c +++ b/src/lib/serdes-irm.c @@ -174,6 +174,48 @@ int flow__irm_result_des(buffer_t * buf, else memset(sk->key, 0, SYMMKEYSZ); + sk->epoch = msg->has_generation ? (uint8_t) msg->generation : 0; + + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail: + irm_msg__free_unpacked(msg, NULL); + fail_msg: + return err; +} + +int flow_rekey__irm_result_des(buffer_t * buf, + struct crypt_sk * sk, + bool * has_key) +{ + irm_msg_t * msg; + int err; + + msg = irm_msg__unpack(NULL, buf->len, buf->data); + if (msg == NULL) { + err = -EIRMD; + goto fail_msg; + } + + if (!msg->has_result) { + err = -EIRMD; + goto fail; + } + + if (msg->result < 0) { + err = msg->result; + goto fail; + } + + *has_key = msg->has_sym_key && msg->sym_key.len == SYMMKEYSZ; + if (*has_key) { + memcpy(sk->key, msg->sym_key.data, SYMMKEYSZ); + sk->nid = NID_undef; + sk->epoch = msg->has_generation ? + (uint8_t) msg->generation : 0; + } + irm_msg__free_unpacked(msg, NULL); return 0; @@ -222,6 +264,44 @@ int flow_dealloc__irm_req_ser(buffer_t * buf, return -ENOMEM; } +int flow_update__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + bool rekey) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IRM_FLOW_UPDATE; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->has_rekey = true; + msg->rekey = rekey; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf, const struct flow_info * flow) { @@ -398,15 +478,19 @@ int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf, return 0; fail_msg: + /* hash/pk are borrowed from the caller; detach before free. */ + msg->hash.len = 0; + msg->hash.data = NULL; + msg->pk.len = 0; + msg->pk.data = NULL; irm_msg__free_unpacked(msg, NULL); fail_malloc: return -ENOMEM; } -int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, - const struct flow_info * flow, - int response, - const buffer_t * data) +int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const buffer_t * data) { irm_msg_t * msg; size_t len; @@ -417,16 +501,14 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, irm_msg__init(msg); - msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg->flow_info = flow_info_s_to_msg(flow); + msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR; + msg->flow_info = flow_info_s_to_msg(flow); if (msg->flow_info == NULL) goto fail_msg; msg->has_pk = true; msg->pk.len = data->len; msg->pk.data = data->data; - msg->has_response = true; - msg->response = response; len = irm_msg__get_packed_size(msg); if (len == 0 || len > buf->len) @@ -436,27 +518,25 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, irm_msg__pack(msg, buf->data); - /* Don't free * data! */ - msg->pk.len = 0; + /* Don't free data! */ + msg->pk.len = 0; msg->pk.data = NULL; - irm_msg__free_unpacked(msg, NULL); return 0; fail_msg: - /* hash/pk are borrowed from the caller; detach before free. */ - msg->hash.len = 0; - msg->hash.data = NULL; - msg->pk.len = 0; - msg->pk.data = NULL; + /* pk.data is borrowed from the caller; detach before free. */ + msg->pk.len = 0; + msg->pk.data = NULL; irm_msg__free_unpacked(msg, NULL); fail_malloc: return -ENOMEM; } -int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, - const struct flow_info * flow, - const buffer_t * data) +int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, + const struct flow_info * flow, + int response, + const buffer_t * data) { irm_msg_t * msg; size_t len; @@ -467,14 +547,16 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, irm_msg__init(msg); - msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR; - msg->flow_info = flow_info_s_to_msg(flow); + msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; + msg->flow_info = flow_info_s_to_msg(flow); if (msg->flow_info == NULL) goto fail_msg; msg->has_pk = true; msg->pk.len = data->len; msg->pk.data = data->data; + msg->has_response = true; + msg->response = response; len = irm_msg__get_packed_size(msg); if (len == 0 || len > buf->len) @@ -484,16 +566,14 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, irm_msg__pack(msg, buf->data); - /* Don't free data! */ - msg->pk.len = 0; + /* Don't free * data! */ + msg->pk.len = 0; msg->pk.data = NULL; + irm_msg__free_unpacked(msg, NULL); return 0; fail_msg: - /* pk.data is borrowed from the caller; detach before free. */ - msg->pk.len = 0; - msg->pk.data = NULL; irm_msg__free_unpacked(msg, NULL); fail_malloc: return -ENOMEM; diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c index c149c306..0121af89 100644 --- a/src/lib/ssm/rbuff.c +++ b/src/lib/ssm/rbuff.c @@ -74,7 +74,7 @@ struct ssm_rbuff { ssize_t * shm_base; /* start of shared memory */ size_t * head; /* start of ringbuffer */ size_t * tail; - size_t * acl; /* access control */ + size_t * flags; /* out-of-band flags (RB_*) */ pthread_mutex_t * mtx; /* lock for cond vars only */ pthread_cond_t * add; /* signal when new data */ pthread_cond_t * del; /* signal when data removed */ @@ -114,8 +114,8 @@ static struct ssm_rbuff * rbuff_create(pid_t pid, rb->shm_base = shm_base; rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE)); rb->tail = (size_t *) (rb->head + 1); - rb->acl = (size_t *) (rb->tail + 1); - rb->mtx = (pthread_mutex_t *) (rb->acl + 1); + rb->flags = (size_t *) (rb->tail + 1); + rb->mtx = (pthread_mutex_t *) (rb->flags + 1); rb->add = (pthread_cond_t *) (rb->mtx + 1); rb->del = rb->add + 1; rb->pid = pid; @@ -181,7 +181,7 @@ struct ssm_rbuff * ssm_rbuff_create(pid_t pid, if (pthread_cond_init(rb->del, &cattr)) goto fail_del; - *rb->acl = ACL_RDWR; + *rb->flags = RB_RDWR; *rb->head = 0; *rb->tail = 0; @@ -231,7 +231,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb) assert(rb); /* - * Caller must set ACL_FLOWDOWN first; if a user becomes + * Caller must set RB_FLOWDOWN first; if a user becomes * cancellable, push a cleanup that decrements n_users. */ while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) { @@ -245,7 +245,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb) int ssm_rbuff_write(struct ssm_rbuff * rb, size_t off) { - size_t acl; + size_t flags; bool was_empty; int ret = 0; @@ -253,15 +253,15 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (acl != ACL_RDWR) { - if (acl & ACL_FLOWDOWN) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (flags != RB_RDWR) { + if (flags & RB_FLOWDOWN) { ret = -EFLOWDOWN; - goto fail_acl; + goto fail_flags; } - if (acl & ACL_RDONLY) { + if (!(flags & RB_WR)) { ret = -ENOTALLOC; - goto fail_acl; + goto fail_flags; } } @@ -287,7 +287,7 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, fail_mutex: pthread_mutex_unlock(rb->mtx); - fail_acl: + fail_flags: __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -296,7 +296,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, size_t off, const struct timespec * abstime) { - size_t acl; + size_t flags; int ret = 0; bool was_empty; @@ -304,15 +304,15 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (acl != ACL_RDWR) { - if (acl & ACL_FLOWDOWN) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (flags != RB_RDWR) { + if (flags & RB_FLOWDOWN) { ret = -EFLOWDOWN; - goto fail_acl; + goto fail_flags; } - if (acl & ACL_RDONLY) { + if (!(flags & RB_WR)) { ret = -ENOTALLOC; - goto fail_acl; + goto fail_flags; } } @@ -321,8 +321,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); while (IS_FULL(rb) && ret != -ETIMEDOUT) { - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (acl & ACL_FLOWDOWN) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (flags & RB_FLOWDOWN) { ret = -EFLOWDOWN; break; } @@ -341,25 +341,28 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, pthread_mutex_unlock(rb->mtx); - fail_acl: + fail_flags: __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } -static int check_rb_acl(struct ssm_rbuff * rb) +static int check_rb_flags(struct ssm_rbuff * rb) { - size_t acl; + size_t flags; assert(rb != NULL); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); - if (acl & ACL_FLOWDOWN) + if (flags & RB_FLOWDOWN) return -EFLOWDOWN; - if (acl & ACL_FLOWPEER) + if (flags & RB_FLOWPEER) return -EFLOWPEER; + if (!(flags & RB_RD)) + return -ENOTALLOC; + return -EAGAIN; } @@ -372,7 +375,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); if (IS_EMPTY(rb)) { - ret = check_rb_acl(rb); + ret = check_rb_flags(rb); goto out; } @@ -380,7 +383,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) if (IS_EMPTY(rb)) { pthread_mutex_unlock(rb->mtx); - ret = check_rb_acl(rb); + ret = check_rb_flags(rb); goto out; } @@ -400,14 +403,14 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, const struct timespec * abstime) { ssize_t idx = -1; - size_t acl; + size_t flags; assert(rb != NULL); __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (IS_EMPTY(rb) && (flags & RB_FLOWDOWN)) { idx = -EFLOWDOWN; goto out; } @@ -418,7 +421,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, while (IS_EMPTY(rb) && idx != -ETIMEDOUT && - check_rb_acl(rb) == -EAGAIN) { + check_rb_flags(rb) == -EAGAIN) { idx = -robust_wait(rb->add, rb->mtx, abstime); } @@ -429,7 +432,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, ADVANCE_TAIL(rb); pthread_cond_broadcast(rb->del); } else if (idx != -ETIMEDOUT) { - idx = check_rb_acl(rb); + idx = check_rb_flags(rb); } pthread_mutex_unlock(rb->mtx); @@ -441,23 +444,35 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, return idx; } -void ssm_rbuff_set_acl(struct ssm_rbuff * rb, - uint32_t flags) +void ssm_rbuff_set_bits(struct ssm_rbuff * rb, + uint32_t bits) +{ + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + __atomic_fetch_or(rb->flags, (size_t) bits, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(rb->add); + pthread_cond_broadcast(rb->del); + pthread_mutex_unlock(rb->mtx); +} + +void ssm_rbuff_clr_bits(struct ssm_rbuff * rb, + uint32_t bits) { assert(rb != NULL); robust_mutex_lock(rb->mtx); - __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST); + __atomic_fetch_and(rb->flags, ~(size_t) bits, __ATOMIC_SEQ_CST); pthread_cond_broadcast(rb->add); pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->mtx); } -uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb) +uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb) { assert(rb != NULL); - return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + return (uint32_t) __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); } void ssm_rbuff_fini(struct ssm_rbuff * rb) diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c index 58cb39c3..48e5a714 100644 --- a/src/lib/ssm/tests/rbuff_test.c +++ b/src/lib/ssm/tests/rbuff_test.c @@ -206,10 +206,10 @@ static int test_ssm_rbuff_fill_drain(void) return TEST_RC_FAIL; } -static int test_ssm_rbuff_acl(void) +static int test_ssm_rbuff_flags(void) { struct ssm_rbuff * rb; - uint32_t acl; + uint32_t flags; TEST_START(); @@ -219,16 +219,16 @@ static int test_ssm_rbuff_acl(void) goto fail; } - acl = ssm_rbuff_get_acl(rb); - if (acl != ACL_RDWR) { - printf("Expected ACL_RDWR, got %u.\n", acl); + flags = ssm_rbuff_get_flags(rb); + if (flags != RB_RDWR) { + printf("Expected RB_RDWR, got %u.\n", flags); goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_RDONLY); - acl = ssm_rbuff_get_acl(rb); - if (acl != ACL_RDONLY) { - printf("Expected ACL_RDONLY, got %u.\n", acl); + ssm_rbuff_clr_bits(rb, RB_WR); + flags = ssm_rbuff_get_flags(rb); + if (flags != RB_RD) { + printf("Expected RB_RD, got %u.\n", flags); goto fail_rb; } @@ -237,7 +237,7 @@ static int test_ssm_rbuff_acl(void) goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rb, RB_FLOWDOWN); if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) { printf("Expected -EFLOWDOWN on FLOWDOWN.\n"); goto fail_rb; @@ -553,7 +553,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &interval, &abs_timeout); - ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rb, RB_FLOWDOWN); ret = ssm_rbuff_read_b(rb, &abs_timeout); if (ret != -EFLOWDOWN) { @@ -561,7 +561,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_RDWR); + ssm_rbuff_clr_bits(rb, RB_FLOWDOWN); for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { if (ssm_rbuff_write(rb, i) < 0) { @@ -573,7 +573,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &interval, &abs_timeout); - ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rb, RB_FLOWDOWN); ret = ssm_rbuff_write_b(rb, 999, &abs_timeout); if (ret != -EFLOWDOWN) { @@ -581,7 +581,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_RDWR); + ssm_rbuff_clr_bits(rb, RB_FLOWDOWN); while (ssm_rbuff_read(rb) >= 0) ; @@ -664,7 +664,7 @@ int rbuff_test(int argc, ret |= test_ssm_rbuff_write_read(); ret |= test_ssm_rbuff_read_empty(); ret |= test_ssm_rbuff_fill_drain(); - ret |= test_ssm_rbuff_acl(); + ret |= test_ssm_rbuff_flags(); ret |= test_ssm_rbuff_open_close(); ret |= test_ssm_rbuff_threaded(); ret |= test_ssm_rbuff_blocking(); |
