diff options
| -rw-r--r-- | cmake/config/irmd.cmake | 2 | ||||
| -rw-r--r-- | include/ouroboros/fqueue.h | 3 | ||||
| -rw-r--r-- | include/ouroboros/ssm_rbuff.h | 19 | ||||
| -rw-r--r-- | src/irmd/config.h.in | 1 | ||||
| -rw-r--r-- | src/irmd/main.c | 644 | ||||
| -rw-r--r-- | src/irmd/reg/flow.c | 4 | ||||
| -rw-r--r-- | src/irmd/reg/flow.h | 11 | ||||
| -rw-r--r-- | src/irmd/reg/reg.c | 189 | ||||
| -rw-r--r-- | src/irmd/reg/reg.h | 34 | ||||
| -rw-r--r-- | src/lib/dev.c | 229 | ||||
| -rw-r--r-- | src/lib/frct.c | 6 | ||||
| -rw-r--r-- | src/lib/serdes-irm.c | 132 | ||||
| -rw-r--r-- | src/lib/ssm/rbuff.c | 95 | ||||
| -rw-r--r-- | src/lib/ssm/tests/rbuff_test.c | 30 |
14 files changed, 1285 insertions, 114 deletions
diff --git a/cmake/config/irmd.cmake b/cmake/config/irmd.cmake index b6b2dc40..79e24bae 100644 --- a/cmake/config/irmd.cmake +++ b/cmake/config/irmd.cmake @@ -22,6 +22,8 @@ set(OAP_REPLAY_TIMER 20 CACHE STRING "OAP replay protection window (s)") set(OAP_REPLAY_MAX 4096 CACHE STRING "Maximum entries in the OAP replay cache (bounds memory/CPU under flood)") +set(OAP_REKEY_TIMER 120 CACHE STRING + "Tier-2 re-key interval (s); bounds key age / PCS healing, 0 disables") set(OAP_CLIENT_AUTH_DEFAULT TRUE CACHE BOOL "Client requires the server to authenticate by default") set(DEBUG_PROTO_OAP FALSE CACHE BOOL diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h index 2546c79d..322da3ea 100644 --- a/include/ouroboros/fqueue.h +++ b/include/ouroboros/fqueue.h @@ -34,7 +34,8 @@ enum fqtype { FLOW_UP = (1 << 2), FLOW_ALLOC = (1 << 3), FLOW_DEALLOC = (1 << 4), - FLOW_PEER = (1 << 5) + FLOW_PEER = (1 << 5), + FLOW_UPD = (1 << 6) }; struct flow_set; diff --git a/include/ouroboros/ssm_rbuff.h b/include/ouroboros/ssm_rbuff.h index 2443b63d..e77eec09 100644 --- a/include/ouroboros/ssm_rbuff.h +++ b/include/ouroboros/ssm_rbuff.h @@ -28,10 +28,12 @@ #include <stdint.h> -#define ACL_RDWR 0000 -#define ACL_RDONLY 0001 -#define ACL_FLOWDOWN 0002 -#define ACL_FLOWPEER 0004 +#define RB_RD 0001 /* read permitted (0 = no access) */ +#define RB_WR 0002 /* write permitted (0 = no access) */ +#define RB_RDWR (RB_RD | RB_WR) +#define RB_FLOWDOWN 0004 +#define RB_FLOWPEER 0010 +#define RB_REKEY 0020 /* re-key seed parked (out-of-band signal) */ struct ssm_rbuff; @@ -45,10 +47,13 @@ struct ssm_rbuff * ssm_rbuff_open(pid_t pid, void ssm_rbuff_close(struct ssm_rbuff * rb); -void ssm_rbuff_set_acl(struct ssm_rbuff * rb, - uint32_t flags); +void ssm_rbuff_set_bits(struct ssm_rbuff * rb, + uint32_t bits); -uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb); +void ssm_rbuff_clr_bits(struct ssm_rbuff * rb, + uint32_t bits); + +uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb); void ssm_rbuff_fini(struct ssm_rbuff * rb); diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in index 84d58130..e14cff75 100644 --- a/src/irmd/config.h.in +++ b/src/irmd/config.h.in @@ -43,6 +43,7 @@ #define OAP_REPLAY_TIMER @OAP_REPLAY_TIMER@ #define OAP_REPLAY_MAX @OAP_REPLAY_MAX@ +#define OAP_REKEY_TIMER @OAP_REKEY_TIMER@ #cmakedefine01 OAP_CLIENT_AUTH_DEFAULT #define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@ diff --git a/src/irmd/main.c b/src/irmd/main.c index 484a265a..3519e079 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,6 +36,7 @@ #include <ouroboros/crypt.h> #include <ouroboros/errno.h> #include <ouroboros/flow.h> +#include <ouroboros/fqueue.h> #include <ouroboros/hash.h> #include <ouroboros/irm.h> #include <ouroboros/list.h> @@ -86,6 +87,8 @@ #define TIMESYNC_SLACK 100 /* ms */ #define OAP_SEEN_TIMER 20 /* s */ #define DEALLOC_TIME 300 /* s */ +#define REKEY_BATCH 64 /* flows re-keyed per timer pass */ +#define REKEY_RESP_TIMEO 20 /* s; give-up on a re-key RESPONSE */ #define DIRECT_MPL 20 /* ms */ /* bytes; in-process, bounded only by PUP/GSPP. */ #define DIRECT_MTU 65000 @@ -105,6 +108,29 @@ struct cmd { int fd; }; +/* In-flight Tier-2 re-key, owned solely by the re-key worker thread. */ +struct rekey_ctx { + struct list_head next; + + int flow_id; + void * ctx; /* OAP client ctx (opaque) */ + struct timespec deadline; /* reap if no RESPONSE by then */ +}; + +enum rekey_evt_type { + REKEY_INIT = 0, /* start an exchange for flow_id */ + REKEY_RESP /* a RESPONSE arrived for flow_id */ +}; + +struct rekey_evt { + struct list_head next; + + enum rekey_evt_type type; + int flow_id; + pid_t n_1_pid; /* INIT: flow's lower IPCP */ + buffer_t buf; /* RESP: owned RESPONSE payload */ +}; + struct { bool log_stdout; /* log to stdout */ #ifdef HAVE_TOML @@ -126,6 +152,14 @@ struct { pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t acceptor; /* accept new commands */ + + struct { + pthread_t worker; /* Tier-2 re-key orchestrator */ + struct list_head inbox; /* re-key events for worker */ + pthread_cond_t cond; /* inbox signal condvar */ + pthread_mutex_t lock; /* inbox lock */ + bool stop; /* worker shutdown flag */ + } rk; } irmd; static enum irm_state irmd_get_state(void) @@ -932,6 +966,8 @@ static int flow_accept(struct flow_info * flow, log_err("Failed to respond to flow allocation."); goto fail_resp; } else { + if (sk->nid != NID_undef) + reg_flow_set_rekey(flow->id, false); log_info("Flow %d accepted by %d for %s (uid %d).", flow->id, flow->n_pid, name, flow->uid); } @@ -1358,6 +1394,9 @@ static int flow_alloc(const char * dst, goto fail_complete; } + if (sk->nid != NID_undef) + reg_flow_set_rekey(flow->id, true); + freebuf(req_hdr); freebuf(resp_hdr); freebuf(hash); @@ -1432,6 +1471,512 @@ static int flow_dealloc_resp(struct flow_info * flow) return 0; } +/* + * Inbox producers. Any thread may post; the worker drains. INIT carries + * the flow's lower IPCP pid; RESP transfers ownership of buf. + */ +static void rekey_post(enum rekey_evt_type type, + int flow_id, + pid_t n_1_pid, + buffer_t * buf) +{ + struct rekey_evt * evt; + + evt = malloc(sizeof(*evt)); + if (evt == NULL) { + log_err("Failed to post re-key event for flow %d.", flow_id); + if (buf != NULL) + freebuf(*buf); + return; + } + + list_head_init(&evt->next); + evt->type = type; + evt->flow_id = flow_id; + evt->n_1_pid = n_1_pid; + clrbuf(evt->buf); + if (buf != NULL) { + evt->buf = *buf; + clrbuf(*buf); + } + + pthread_mutex_lock(&irmd.rk.lock); + + list_add_tail(&evt->next, &irmd.rk.inbox); + pthread_cond_signal(&irmd.rk.cond); + + pthread_mutex_unlock(&irmd.rk.lock); +} + +static void rekey_post_init(int flow_id, + pid_t n_1_pid) +{ + rekey_post(REKEY_INIT, flow_id, n_1_pid, NULL); +} + +static void rekey_post_resp(int flow_id, + buffer_t * buf) +{ + rekey_post(REKEY_RESP, flow_id, 0, buf); +} + +/* Worker-only: find an in-flight entry by flow_id. */ +static struct rekey_ctx * rekey_find(struct list_head * tbl, + int flow_id) +{ + struct list_head * p; + + list_for_each(p, tbl) { + struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next); + if (e->flow_id == flow_id) + return e; + } + + return NULL; +} + +/* Worker-only: drop an entry, freeing its OAP ctx. */ +static void rekey_drop(struct rekey_ctx * e) +{ + if (e->ctx != NULL) + oap_ctx_free(e->ctx); + + list_del(&e->next); + free(e); +} + +/* Flow-update relay payload: a 1-byte type prefix on an opaque body. */ +enum flow_upd_type { + FLOW_UPD_REKEY_REQ = 0, + FLOW_UPD_REKEY_RESP = 1, +}; + +/* Prepend the update type to body; caller frees out on success. */ +static int flow_upd_wrap(buffer_t * out, + uint8_t type, + const buffer_t * body) +{ + out->len = body->len + 1; + out->data = malloc(out->len); + if (out->data == NULL) + return -ENOMEM; + + out->data[0] = type; + memcpy(out->data + 1, body->data, body->len); + + return 0; +} + +/* + * Worker-only: start a fresh OAP exchange over the live flow. Replaces + * any prior in-flight entry for flow_id (handles flow_id recycling). + */ +static void rekey_do_initiate(struct list_head * tbl, + int flow_id, + pid_t n_1_pid) +{ + struct rekey_ctx * e; + struct flow_info info; + struct name_info name; + buffer_t req = BUF_INIT; + buffer_t upd = BUF_INIT; + buffer_t data = BUF_INIT; + char nbuf[NAME_SIZE + 1]; + void * ctx = NULL; + int ret; + + e = rekey_find(tbl, flow_id); + if (e != NULL) + rekey_drop(e); + + if (reg_get_name_for_flow_id(nbuf, flow_id) < 0 || + reg_get_name_info(nbuf, &name) < 0) { + log_warn("No name info to re-key flow %d.", flow_id); + reg_flow_clear_in_flight(flow_id); + return; + } + + if (oap_cli_prepare(&ctx, &name, &req, data) < 0) { + log_err("Failed to prepare re-key for flow %d.", flow_id); + reg_flow_clear_in_flight(flow_id); + return; + } + + memset(&info, 0, sizeof(info)); + info.id = flow_id; + info.n_1_pid = n_1_pid; + + if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_REQ, &req) < 0) { + log_err("Failed to wrap re-key request for flow %d.", flow_id); + goto fail_ctx; + } + + ret = ipcp_flow_update(&info, upd); + freebuf(upd); + if (ret < 0) { + log_err("Failed to send re-key request for flow %d.", flow_id); + goto fail_ctx; + } + + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err("Failed to track re-key for flow %d.", flow_id); + goto fail_ctx; + } + + list_head_init(&e->next); + e->flow_id = flow_id; + e->ctx = ctx; + clock_gettime(PTHREAD_COND_CLOCK, &e->deadline); + e->deadline.tv_sec += REKEY_RESP_TIMEO; + + list_add(&e->next, tbl); + + log_dbg("Re-key request sent for flow %d.", flow_id); + + freebuf(req); + + return; + + fail_ctx: + oap_ctx_free(ctx); + reg_flow_clear_in_flight(flow_id); + freebuf(req); +} + +/* Worker-only: complete the exchange, install the pending seed. */ +static void rekey_do_complete(struct list_head * tbl, + int flow_id, + buffer_t buf) +{ + struct rekey_ctx * e; + struct name_info info; + struct crypt_sk sk; + uint8_t kbuf[SYMMKEYSZ]; + buffer_t data = BUF_INIT; + char name[NAME_SIZE + 1]; + uint8_t newgen; + + e = rekey_find(tbl, flow_id); + if (e == NULL) { + log_dbg("Stale re-key RESPONSE for flow %d.", flow_id); + return; + } + + if (reg_get_name_for_flow_id(name, flow_id) < 0 || + reg_get_name_info(name, &info) < 0) { + log_err("No name info to re-key flow %d.", flow_id); + goto finish; + } + + sk.key = kbuf; + + /* oap_cli_complete frees the ctx on every path. */ + if (oap_cli_complete(e->ctx, &info, buf, &data, &sk) < 0) { + log_err("Re-key completion failed for flow %d.", flow_id); + e->ctx = NULL; + goto finish; + } + + e->ctx = NULL; + + newgen = data.len == 1 ? *(uint8_t *) data.data : 0; + + if (newgen >= 16) { + log_warn("Re-key gen %u out of range for flow %d.", + newgen, flow_id); + goto finish_clear; + } + + if (reg_flow_store_pending(flow_id, kbuf, newgen) < 0) + log_warn("Flow %d gone during re-key.", flow_id); + else + reg_notify_flow(flow_id, FLOW_UPD); + + log_dbg("Re-key completed for flow %d (gen %u).", flow_id, newgen); + + finish_clear: + crypt_secure_clear(kbuf, SYMMKEYSZ); + freebuf(data); + finish: + rekey_drop(e); + reg_flow_clear_in_flight(flow_id); +} + +/* Worker-only: reap entries whose RESPONSE never arrived. */ +static void rekey_reap_expired(struct list_head * tbl) +{ + struct list_head * p; + struct list_head * h; + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + list_for_each_safe(p, h, tbl) { + struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next); + if (ts_diff_ns(&e->deadline, &now) > 0) + continue; + + log_warn("Re-key timed out for flow %d.", e->flow_id); + reg_flow_clear_in_flight(e->flow_id); + rekey_drop(e); + } +} + +/* Responder side: process request, install pending seed, send response. */ +static int rekey_respond(struct flow_info * flow, + buffer_t * pk) +{ + struct name_info info; + struct crypt_sk sk; + uint8_t kbuf[SYMMKEYSZ]; + buffer_t rsp = BUF_INIT; + buffer_t upd = BUF_INIT; + buffer_t data = BUF_INIT; + char name[NAME_SIZE + 1]; + uint8_t newgen; + int epoch; + int err; + + epoch = reg_flow_get_epoch(flow->id); + if (epoch < 0) { + log_warn("Re-key for unknown flow %d.", flow->id); + return -EBADF; + } + + if (reg_get_name_for_flow_id(name, flow->id) < 0 || + reg_get_name_info(name, &info) < 0) { + log_err("No name info to re-key flow %d.", flow->id); + return -ENAME; + } + + if (reg_flow_rekey_pending(flow->id)) { + log_dbg("Duplicate re-key request for flow %d.", flow->id); + return 0; + } + + newgen = (uint8_t) ((epoch + 1) & 0x0F); + data.data = &newgen; + data.len = 1; + + sk.key = kbuf; + + err = oap_srv_process(&info, *pk, &rsp, &data, &sk); + if (err < 0) { + /* data still points to stack newgen; don't free it. */ + log_err("Re-key OAP failed for flow %d.", flow->id); + goto finish; + } + + /* On success oap_srv_process repointed data to client output. */ + freebuf(data); + + if (reg_flow_store_pending(flow->id, kbuf, newgen) < 0) { + log_warn("Flow %d gone during re-key.", flow->id); + err = -EBADF; + goto finish; + } + + reg_notify_flow(flow->id, FLOW_UPD); + + if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_RESP, &rsp) == 0) { + if (ipcp_flow_update(flow, upd) < 0) + log_err("Failed to send re-key response for flow %d.", + flow->id); + freebuf(upd); + } + + err = 0; + finish: + crypt_secure_clear(kbuf, SYMMKEYSZ); + freebuf(rsp); + + return err; +} + +static int flow_update_arr(struct flow_info * flow, + buffer_t * pk) +{ + uint8_t type; + + if (pk->len < 1) + return -EINVAL; + + type = pk->data[0]; + + /* Strip the type byte, keeping the malloc base for hand-off. */ + memmove(pk->data, pk->data + 1, pk->len - 1); + pk->len -= 1; + + switch (type) { + case FLOW_UPD_REKEY_REQ: + return rekey_respond(flow, pk); + case FLOW_UPD_REKEY_RESP: + /* Hand the payload to the worker and take ownership. */ + rekey_post_resp(flow->id, pk); + return 0; + default: + log_warn("Unknown flow update type %u.", type); + return -EINVAL; + } +} + +static int flow_update(struct flow_info * flow, + bool rekey, + struct crypt_sk * sk, + bool * has_key) +{ + uint8_t seed[SYMMKEYSZ]; + uint8_t epoch; + + *has_key = false; + + if (rekey) { + /* Watermark re-key: the app can't know its lower IPCP. */ + pid_t n_1_pid = reg_flow_get_n_1_pid(flow->id); + if (n_1_pid > 0) + rekey_post_init(flow->id, n_1_pid); + return 0; + } + + if (!reg_flow_take_pending(flow->id, seed, &epoch)) + return 0; + + memcpy(sk->key, seed, SYMMKEYSZ); + sk->epoch = epoch; + *has_key = true; + + crypt_secure_clear(seed, SYMMKEYSZ); + + log_dbg("Delivered re-key seed for flow %d (gen %u).", + flow->id, epoch); + + return 0; +} + +/* Free every parked OAP ctx at worker exit or cancellation. */ +static void rekey_table_cleanup(void * o) +{ + struct list_head * tbl = o; + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, tbl) { + struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next); + rekey_drop(e); + } +} + +/* Pop one event, or NULL if none, draining the inbox under its lock. */ +static struct rekey_evt * rekey_inbox_wait(const struct timespec * deadline) +{ + struct rekey_evt * evt = NULL; + struct timespec now; + + pthread_mutex_lock(&irmd.rk.lock); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + while (list_is_empty(&irmd.rk.inbox) && !irmd.rk.stop && + ts_diff_ns(deadline, &now) > 0) { + pthread_cond_timedwait(&irmd.rk.cond, &irmd.rk.lock, deadline); + clock_gettime(PTHREAD_COND_CLOCK, &now); + } + + if (!list_is_empty(&irmd.rk.inbox)) { + evt = list_first_entry(&irmd.rk.inbox, struct rekey_evt, next); + list_del(&evt->next); + } + + pthread_mutex_unlock(&irmd.rk.lock); + + return evt; +} + +/* + * Single worker owning all in-flight Tier-2 re-keys. It drains the + * inbox, runs the periodic snapshot, and reaps timed-out exchanges. + * The table is touched only here, so it needs no lock. + */ +static void * rekey_worker(void * o) +{ + struct list_head table; + struct timespec next; + + (void) o; + + list_head_init(&table); + + clock_gettime(PTHREAD_COND_CLOCK, &next); + next.tv_sec += OAP_REKEY_TIMER; + + pthread_cleanup_push(rekey_table_cleanup, &table); + + while (!irmd.rk.stop) { + struct rekey_evt * evt; + struct timespec now; + struct timespec deadline = next; + struct list_head * p; + + /* Wake no later than the soonest in-flight deadline. */ + list_for_each(p, &table) { + struct rekey_ctx * e; + e = list_entry(p, struct rekey_ctx, next); + if (ts_diff_ns(&e->deadline, &deadline) < 0) + deadline = e->deadline; + } + + evt = rekey_inbox_wait(&deadline); + + if (irmd.rk.stop) { + if (evt != NULL) { + freebuf(evt->buf); + free(evt); + } + break; + } + + if (evt != NULL) { + switch (evt->type) { + case REKEY_INIT: + rekey_do_initiate(&table, evt->flow_id, + evt->n_1_pid); + break; + case REKEY_RESP: + rekey_do_complete(&table, evt->flow_id, + evt->buf); + freebuf(evt->buf); + break; + default: + break; + } + + free(evt); + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + if (ts_diff_ns(&next, &now) <= 0) { + struct rekey_info snap[REKEY_BATCH]; + int n; + int i; + + n = reg_flow_snapshot_rekey_due(snap, REKEY_BATCH); + for (i = 0; i < n; ++i) + rekey_do_initiate(&table, snap[i].flow_id, + snap[i].n_1_pid); + + clock_gettime(PTHREAD_COND_CLOCK, &next); + next.tv_sec += OAP_REKEY_TIMER; + } + + rekey_reap_expired(&table); + } + + pthread_cleanup_pop(1); + + return (void *) 0; +} + static void * acceptloop(void * o) { int csockfd; @@ -1502,6 +2047,7 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, struct timespec now; struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */ int res; + bool has_key = false; irm_msg_t * ret_msg; buffer_t data; @@ -1622,7 +2168,8 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, hbuf = malloc(SYMMKEYSZ); if (hbuf == NULL) { log_err("Failed to malloc key buf"); - return NULL; + res = -ENOMEM; + break; } memcpy(hbuf, kbuf, SYMMKEYSZ); @@ -1652,7 +2199,8 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, hbuf = malloc(SYMMKEYSZ); if (hbuf == NULL) { log_err("Failed to malloc key buf"); - return NULL; + res = -ENOMEM; + break; } memcpy(hbuf, kbuf, SYMMKEYSZ); ret_msg->sym_key.data = hbuf; @@ -1698,6 +2246,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, flow = flow_info_msg_to_s(msg->flow_info); res = flow_alloc_reply(&flow, msg->response, &data); break; + case IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR: + data.len = msg->pk.len; + data.data = msg->pk.data; + msg->pk.data = NULL; /* pass data */ + msg->pk.len = 0; + flow = flow_info_msg_to_s(msg->flow_info); + res = flow_update_arr(&flow, &data); + freebuf(data); + break; + case IRM_MSG_CODE__IRM_FLOW_UPDATE: + flow = flow_info_msg_to_s(msg->flow_info); + sk.key = kbuf; + res = flow_update(&flow, msg->rekey, &sk, &has_key); + if (res == 0) { + ret_msg->flow_info = flow_info_s_to_msg(&flow); + if (has_key) { + hbuf = malloc(SYMMKEYSZ); + if (hbuf == NULL) { + log_err("Failed to malloc key buf"); + res = -ENOMEM; + break; + } + + memcpy(hbuf, kbuf, SYMMKEYSZ); + ret_msg->sym_key.data = hbuf; + ret_msg->sym_key.len = SYMMKEYSZ; + ret_msg->has_sym_key = true; + ret_msg->has_generation = true; + ret_msg->generation = sk.epoch; + } + } + break; default: log_err("Don't know that message code."); res = -1; @@ -2060,6 +2640,29 @@ static int irm_init(void) list_head_init(&irmd.cmds); + if (pthread_mutex_init(&irmd.rk.lock, NULL)) { + log_err("Failed to initialize mutex."); + goto fail_rk_lock; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + goto fail_rk_lock; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd.rk.cond, &cattr)) { + log_err("Failed to initialize condvar."); + pthread_condattr_destroy(&cattr); + goto fail_rk_cond; + } + + pthread_condattr_destroy(&cattr); + + list_head_init(&irmd.rk.inbox); + if (stat(SOCK_PATH, &st) == -1) { if (mkdir(SOCK_PATH, 0777)) { log_err("Failed to create sockets directory."); @@ -2163,6 +2766,10 @@ static int irm_init(void) fail_sock_path: unlink(IRM_SOCK_PATH); fail_stat: + pthread_cond_destroy(&irmd.rk.cond); + fail_rk_cond: + pthread_mutex_destroy(&irmd.rk.lock); + fail_rk_lock: pthread_cond_destroy(&irmd.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&irmd.cmd_lock); @@ -2211,8 +2818,22 @@ static void irm_fini(void) pthread_mutex_unlock(&irmd.cmd_lock); + pthread_mutex_lock(&irmd.rk.lock); + + list_for_each_safe(p, h, &irmd.rk.inbox) { + struct rekey_evt * evt; + evt = list_entry(p, struct rekey_evt, next); + list_del(&evt->next); + freebuf(evt->buf); + free(evt); + } + + pthread_mutex_unlock(&irmd.rk.lock); + pthread_mutex_destroy(&irmd.cmd_lock); pthread_cond_destroy(&irmd.cmd_cond); + pthread_mutex_destroy(&irmd.rk.lock); + pthread_cond_destroy(&irmd.rk.cond); pthread_rwlock_destroy(&irmd.state_lock); #ifdef HAVE_FUSE @@ -2250,10 +2871,18 @@ static int irm_start(void) if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) goto fail_acceptor; + irmd.rk.stop = false; + if (OAP_REKEY_TIMER > 0 && + pthread_create(&irmd.rk.worker, NULL, rekey_worker, NULL)) + goto fail_rekey_worker; + log_info("Ouroboros IPC Resource Manager daemon started..."); return 0; + fail_rekey_worker: + pthread_cancel(irmd.acceptor); + pthread_join(irmd.acceptor, NULL); fail_acceptor: pthread_cancel(irmd.irm_sanitize); pthread_join(irmd.irm_sanitize, NULL); @@ -2293,6 +2922,17 @@ static void irm_sigwait(sigset_t sigset) static void irm_stop(void) { + if (OAP_REKEY_TIMER > 0) { + pthread_mutex_lock(&irmd.rk.lock); + + irmd.rk.stop = true; + pthread_cond_signal(&irmd.rk.cond); + + pthread_mutex_unlock(&irmd.rk.lock); + + pthread_join(irmd.rk.worker, NULL); + } + pthread_cancel(irmd.acceptor); pthread_cancel(irmd.irm_sanitize); diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c index 5c709dea..ccb2562d 100644 --- a/src/irmd/reg/flow.c +++ b/src/irmd/reg/flow.c @@ -24,6 +24,7 @@ #define OUROBOROS_PREFIX "reg/flow" +#include <ouroboros/crypt.h> #include <ouroboros/logs.h> #include "flow.h" @@ -32,6 +33,7 @@ #include <errno.h> #include <stdbool.h> #include <stdlib.h> +#include <string.h> struct reg_flow * reg_flow_create(const struct flow_info * info) { @@ -79,6 +81,8 @@ void reg_flow_destroy(struct reg_flow * flow) { assert(flow != NULL); + crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ); + switch(flow->info.state) { case FLOW_ACCEPT_PENDING: clrbuf(flow->req_data); diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h index 9a4046d3..15fc7b8f 100644 --- a/src/irmd/reg/flow.h +++ b/src/irmd/reg/flow.h @@ -49,6 +49,17 @@ struct reg_flow { bool direct; + /* Tier-2 re-key state (encrypted flows only) */ + struct { + bool encrypted; /* flow carries a cipher */ + uint8_t epoch; /* last epoch installed by app */ + bool initiator; /* OAP initiator (role 0) */ + bool in_flight; /* a re-key is in progress */ + uint8_t pending_seed[SYMMKEYSZ]; + uint8_t pending_epoch; + bool has_pending; /* new seed awaits app pull */ + } rk; + struct ssm_rbuff * n_rb; struct ssm_rbuff * n_1_rb; }; diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c index 365064e5..70baf64e 100644 --- a/src/irmd/reg/reg.c +++ b/src/irmd/reg/reg.c @@ -25,6 +25,7 @@ The IPC Resource Manager - Registry #define OUROBOROS_PREFIX "reg" #include <ouroboros/bitmap.h> +#include <ouroboros/crypt.h> #include <ouroboros/errno.h> #include <ouroboros/list.h> #include <ouroboros/logs.h> @@ -2102,6 +2103,194 @@ bool reg_flow_is_direct(int flow_id) return ret; } +void reg_flow_set_rekey(int flow_id, + bool initiator) +{ + struct reg_flow * flow; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL) { + flow->rk.encrypted = true; + flow->rk.initiator = initiator; + flow->rk.epoch = 0; + } + + pthread_mutex_unlock(®.mtx); +} + +int reg_flow_get_epoch(int flow_id) +{ + struct reg_flow * flow; + int epoch = -1; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL && flow->rk.encrypted) + epoch = flow->rk.epoch; + + pthread_mutex_unlock(®.mtx); + + return epoch; +} + +bool reg_flow_rekey_pending(int flow_id) +{ + struct reg_flow * flow; + bool ret = false; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL) + ret = flow->rk.has_pending; + + pthread_mutex_unlock(®.mtx); + + return ret; +} + +pid_t reg_flow_get_n_1_pid(int flow_id) +{ + struct reg_flow * flow; + pid_t pid = -1; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL) + pid = flow->info.n_1_pid; + + pthread_mutex_unlock(®.mtx); + + return pid; +} + +int reg_flow_snapshot_rekey_due(struct rekey_info * snap, + int max) +{ + struct list_head * p; + int n = 0; + + pthread_mutex_lock(®.mtx); + + llist_for_each(p, ®.flows) { + struct reg_flow * f; + + if (n == max) + break; + + f = list_entry(p, struct reg_flow, next); + + if (f->info.state != FLOW_ALLOCATED || f->direct) + continue; + + if (!f->rk.encrypted || !f->rk.initiator) + continue; + + if (f->rk.in_flight || f->rk.has_pending) + continue; + + f->rk.in_flight = true; + + snap[n].flow_id = f->info.id; + snap[n].n_pid = f->info.n_pid; + snap[n].n_1_pid = f->info.n_1_pid; + snap[n].epoch = f->rk.epoch; + strcpy(snap[n].name, f->name); + ++n; + } + + pthread_mutex_unlock(®.mtx); + + return n; +} + +void reg_flow_clear_in_flight(int flow_id) +{ + struct reg_flow * flow; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL) + flow->rk.in_flight = false; + + pthread_mutex_unlock(®.mtx); +} + +int reg_flow_store_pending(int flow_id, + const uint8_t * seed, + uint8_t epoch) +{ + struct reg_flow * flow; + int ret = -ENOENT; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL) { + memcpy(flow->rk.pending_seed, seed, SYMMKEYSZ); + flow->rk.pending_epoch = epoch; + flow->rk.has_pending = true; + flow->rk.in_flight = false; + /* Doorbell raised only after the seed is parked. */ + if (flow->n_rb != NULL) + ssm_rbuff_set_bits(flow->n_rb, RB_REKEY); + ret = 0; + } + + pthread_mutex_unlock(®.mtx); + + return ret; +} + +bool reg_flow_take_pending(int flow_id, + uint8_t * seed, + uint8_t * epoch) +{ + struct reg_flow * flow; + bool ret = false; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL && flow->rk.has_pending) { + memcpy(seed, flow->rk.pending_seed, SYMMKEYSZ); + *epoch = flow->rk.pending_epoch; + flow->rk.epoch = flow->rk.pending_epoch; /* app installed it */ + flow->rk.has_pending = false; + crypt_secure_clear(flow->rk.pending_seed, SYMMKEYSZ); + if (flow->n_rb != NULL) + ssm_rbuff_clr_bits(flow->n_rb, RB_REKEY); + ret = true; + } + + pthread_mutex_unlock(®.mtx); + + return ret; +} + +void reg_notify_flow(int flow_id, + int event) +{ + struct reg_flow * flow; + struct reg_proc * proc; + + pthread_mutex_lock(®.mtx); + + flow = __reg_get_flow(flow_id); + if (flow != NULL) { + proc = __reg_get_proc(flow->info.n_pid); + if (proc != NULL) + ssm_flow_set_notify(proc->set, flow_id, event); + } + + pthread_mutex_unlock(®.mtx); +} + int reg_respond_flow_direct(int flow_id, buffer_t * pbuf) { diff --git a/src/irmd/reg/reg.h b/src/irmd/reg/reg.h index 6b576471..e0c64fed 100644 --- a/src/irmd/reg/reg.h +++ b/src/irmd/reg/reg.h @@ -163,6 +163,40 @@ int reg_wait_flow_direct(int flow_id, bool reg_flow_is_direct(int flow_id); +/* Per-flow snapshot for the re-key timer */ +struct rekey_info { + int flow_id; + pid_t n_pid; + pid_t n_1_pid; + char name[NAME_SIZE + 1]; + uint8_t epoch; +}; + +void reg_flow_set_rekey(int flow_id, + bool initiator); + +int reg_flow_get_epoch(int flow_id); + +bool reg_flow_rekey_pending(int flow_id); + +pid_t reg_flow_get_n_1_pid(int flow_id); + +int reg_flow_snapshot_rekey_due(struct rekey_info * snap, + int max); + +void reg_flow_clear_in_flight(int flow_id); + +int reg_flow_store_pending(int flow_id, + const uint8_t * seed, + uint8_t epoch); + +bool reg_flow_take_pending(int flow_id, + uint8_t * seed, + uint8_t * epoch); + +void reg_notify_flow(int flow_id, + int event); + void reg_dealloc_flow(struct flow_info * info); void reg_dealloc_flow_resp(struct flow_info * info); diff --git a/src/lib/dev.c b/src/lib/dev.c index cff1ebf2..3064b1e2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -100,6 +100,9 @@ struct flow { struct crypt_ctx * crypt; int headsz; /* selector */ int tailsz; /* Tag + CRC */ + struct timespec rk_grace; /* TX-promote deadline (0 = none) */ + bool rk_wm_inflight; /* re-key trigger in flight */ + uint32_t rk_wm_ctr; /* throttles the consult */ struct timespec snd_act; struct timespec rcv_act; @@ -509,6 +512,66 @@ static void flow_drain_rx_nb(struct flow * flow) } } +/* TX-promotion grace when the peer's install latency is unknown (raw). */ +#define REKEY_GRACE_MS 1000 + +/* + * Pull a parked re-key seed from the IRMd and install it. Driven from the + * data path when RB_REKEY shows on rx_rb. crypt_rekey is concurrency-safe + * on its own; proc.lock (rd) only guards against teardown. + */ +static void flow_rekey(struct flow * flow) +{ + struct flow_info info; + struct crypt_sk sk; + struct timespec now; + struct timespec intv; + time_t ms; + uint8_t key[SYMMKEYSZ]; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + bool has_key; + + pthread_rwlock_rdlock(&proc.lock); + if (flow->info.id < 0 || flow->crypt == NULL) { + pthread_rwlock_unlock(&proc.lock); + return; + } + info = flow->info; + pthread_rwlock_unlock(&proc.lock); + + if (flow_update__irm_req_ser(&msg, &info, false) < 0) + return; + + if (send_recv_msg(&msg) < 0) + return; + + sk.key = key; + if (flow_rekey__irm_result_des(&msg, &sk, &has_key) < 0) + return; + + if (!has_key) + return; + + pthread_rwlock_rdlock(&proc.lock); + if (flow->info.id == info.id && flow->crypt != NULL) { + if (crypt_rekey(flow->crypt, &sk) == 0) { + /* Hold TX on the old epoch until the peer installs. */ + ms = flow->info.mpl > 0 ? flow->info.mpl * 3 + : REKEY_GRACE_MS; + intv.tv_sec = ms / 1000; + intv.tv_nsec = (ms % 1000) * MILLION; + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &intv, &flow->rk_grace); + } + /* Re-arm the watermark even if the install was a no-op. */ + STORE_RELAXED(&flow->rk_wm_inflight, false); + } + pthread_rwlock_unlock(&proc.lock); + + crypt_secure_clear(key, SYMMKEYSZ); +} + /* * Wait clamped by caller deadline, next tw expiry, and TICTIME; * a clamp-timeout means tw work is due, not caller-deadline. @@ -533,6 +596,14 @@ static int flow_rx_one(struct flow * flow, return -EFLOWDOWN; } + /* Pull a parked re-key before re-blocking (idle reader). */ + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(rx_rb) & RB_REKEY)) { + pthread_rwlock_unlock(&proc.lock); + flow_rekey(flow); + continue; + } + idx = ssm_rbuff_read_b(rx_rb, &wait_abs); if (idx == -ETIMEDOUT) { pthread_rwlock_unlock(&proc.lock); @@ -593,7 +664,7 @@ static void flow_clear(int fd) } /* - * Set ACL_FLOWDOWN on rx/tx so any in-flight blocking reads or writes + * Set RB_FLOWDOWN on rx/tx so any in-flight blocking reads or writes * wake up and drop their proc.lock rdlock. Must run BEFORE flow_fini's * wrlock, else the wrlock blocks on those rdlock holders and the * in-flight calls never see the FLOWDOWN signal. @@ -604,9 +675,9 @@ static void flow_quiesce(int fd) struct ssm_rbuff * tx_rb = proc.flows[fd].tx_rb; if (rx_rb != NULL) - ssm_rbuff_set_acl(rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rx_rb, RB_FLOWDOWN); if (tx_rb != NULL) - ssm_rbuff_set_acl(tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(tx_rb, RB_FLOWDOWN); } static void do_flow_fini(int fd) @@ -1256,8 +1327,6 @@ int fccntl(int fd, va_list l; struct timespec * timeo; qosspec_t * qs; - uint32_t rx_acl; - uint32_t tx_acl; size_t * qlen; struct flow * flow; uint16_t old_acc; @@ -1353,31 +1422,26 @@ int fccntl(int fd, && flow->frcti != NULL) emit_eos = true; - rx_acl = ssm_rbuff_get_acl(flow->rx_rb); - tx_acl = ssm_rbuff_get_acl(flow->tx_rb); - /* Our flow write-only -> peer's read-only. */ + /* Our flow write-only -> peer's read-only; restore on RDWR. */ if (flow->oflags & FLOWFWRONLY) - rx_acl |= ACL_RDONLY; - if (flow->oflags & FLOWFRDWR) - rx_acl |= ACL_RDWR; + ssm_rbuff_clr_bits(flow->rx_rb, RB_WR); + else + ssm_rbuff_set_bits(flow->rx_rb, RB_WR); if (flow->oflags & FLOWFDOWN) { - rx_acl |= ACL_FLOWDOWN; - tx_acl |= ACL_FLOWDOWN; + ssm_rbuff_set_bits(flow->rx_rb, RB_FLOWDOWN); + ssm_rbuff_set_bits(flow->tx_rb, RB_FLOWDOWN); ssm_flow_set_notify(flow->set, flow->info.id, FLOW_DOWN); } else { - rx_acl &= ~ACL_FLOWDOWN; - tx_acl &= ~ACL_FLOWDOWN; + ssm_rbuff_clr_bits(flow->rx_rb, RB_FLOWDOWN); + ssm_rbuff_clr_bits(flow->tx_rb, RB_FLOWDOWN); ssm_flow_set_notify(flow->set, flow->info.id, FLOW_UP); } - ssm_rbuff_set_acl(flow->rx_rb, rx_acl); - ssm_rbuff_set_acl(flow->tx_rb, tx_acl); - break; case FLOWGFLAGS: fflags = va_arg(l, uint32_t *); @@ -1667,6 +1731,92 @@ static ssize_t flow_write_frag(struct flow * flow, return (ssize_t) count; } +/* + * Watermark: re-key when the TX batch is within KEY_REKEY_WATERMARK node + * keys of exhaustion (0 disables), ahead of the timer; consult keyrot at + * most once per FLOW_WM_CHECK writes. + */ +#define FLOW_WM_CHECK (1u << 16) + +/* + * Switch TX to the freshly installed epoch once the peer is seen on it + * (peer_synced) or the install grace has elapsed (breaks the symmetric + * wait where neither side sends the new epoch first). + */ +static void flow_tx_promote(struct flow * flow, + const struct timespec * now) +{ + if (flow->crypt == NULL) + return; + + if (flow->rk_grace.tv_sec == 0 && flow->rk_grace.tv_nsec == 0) + return; + + if (!crypt_peer_synced(flow->crypt) + && ts_diff_ns(now, &flow->rk_grace) < 0) + return; + + crypt_tx_promote(flow->crypt); + flow->rk_grace.tv_sec = 0; + flow->rk_grace.tv_nsec = 0; +} + +/* + * Ask the IRMd to start an OAP re-key for this flow. The reply carries no + * key; the seed arrives later over RB_REKEY. Fired from the write path as + * the TX batch nears exhaustion, ahead of the timer. + */ +static int flow_rekey_trigger(struct flow * flow) +{ + struct flow_info info; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {SOCK_BUF_SIZE, buf}; + + pthread_rwlock_rdlock(&proc.lock); + if (flow->info.id < 0 || flow->crypt == NULL) { + pthread_rwlock_unlock(&proc.lock); + return -1; + } + info = flow->info; + pthread_rwlock_unlock(&proc.lock); + + if (flow_update__irm_req_ser(&msg, &info, true) < 0) + return -1; + + if (send_recv_msg(&msg) < 0) + return -1; + + return 0; +} + +/* + * True when the live TX batch has run low and no re-key is in flight. + * Advances a throttle so the (locking) keyrot consult runs at most once + * per FLOW_WM_CHECK writes. + */ +static bool flow_wm_due(struct flow * flow) +{ + uint32_t tick; + + if (KEY_REKEY_WATERMARK == 0) + return false; + + if (flow->crypt == NULL) + return false; + + if (LOAD_RELAXED(&flow->rk_wm_inflight)) + return false; + + tick = FETCH_ADD_RELAXED(&flow->rk_wm_ctr, 1); + if ((tick & (FLOW_WM_CHECK - 1)) != 0) + return false; + + if (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY) + return false; + + return crypt_nodes_left(flow->crypt) <= KEY_REKEY_WATERMARK; +} + ssize_t flow_write(int fd, const void * buf, size_t count) @@ -1710,6 +1860,19 @@ ssize_t flow_write(int fd, if ((flags & FLOWFACCMODE) == FLOWFRDONLY) return -EPERM; + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)) + flow_rekey(flow); + + flow_tx_promote(flow, &now); + + /* Pre-empt TX key exhaustion; the timer is the backstop. */ + if (flow_wm_due(flow)) { + STORE_RELAXED(&flow->rk_wm_inflight, true); + if (flow_rekey_trigger(flow) < 0) + STORE_RELAXED(&flow->rk_wm_inflight, false); + } + tw_move_safe(); if (flow->frcti != NULL) { @@ -1784,6 +1947,10 @@ static ssize_t raw_flow_read_pkt(struct flow * flow, ssize_t idx; while (true) { + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)) + flow_rekey(flow); + if (!block) { idx = ssm_rbuff_read(flow->rx_rb); if (idx < 0) @@ -1917,6 +2084,16 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&proc.lock); + if (flow->crypt != NULL + && (ssm_rbuff_get_flags(flow->rx_rb) & RB_REKEY)) + flow_rekey(flow); + + /* Advance TX off a stale epoch even on recv-mostly (ACK-only) flows. */ + if (flow->crypt != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow_tx_promote(flow, &now); + } + tw_move_safe(); idx = flow->part_idx; @@ -2101,6 +2278,18 @@ static int fqueue_filter(struct fqueue * fq) pthread_rwlock_rdlock(&proc.lock); while (fq->next < fq->fqsize) { + if (fq->fqueue[fq->next].event == FLOW_UPD) { + /* Re-key doorbell: pull internally, never surface. */ + fd = proc.id_to_fd[fq->fqueue[fq->next].flow_id].fd; + ++fq->next; + if (fd >= 0) { + pthread_rwlock_unlock(&proc.lock); + flow_rekey(&proc.flows[fd]); + pthread_rwlock_rdlock(&proc.lock); + } + continue; + } + if (fq->fqueue[fq->next].event != FLOW_PKT) { ret = 1; goto out; @@ -2643,8 +2832,8 @@ int ipcp_flow_fini(int fd) return -1; } - ssm_rbuff_set_acl(proc.flows[fd].rx_rb, ACL_FLOWDOWN); - ssm_rbuff_set_acl(proc.flows[fd].tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(proc.flows[fd].rx_rb, RB_FLOWDOWN); + ssm_rbuff_set_bits(proc.flows[fd].tx_rb, RB_FLOWDOWN); ssm_flow_set_notify(proc.flows[fd].set, proc.flows[fd].info.id, diff --git a/src/lib/frct.c b/src/lib/frct.c index 3077df65..c055433d 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -878,10 +878,10 @@ static void frct_mark_flow_down(struct frcti * frcti) struct flow * f = frcti_to_flow(frcti); if (f->rx_rb != NULL) - ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(f->rx_rb, RB_FLOWDOWN); if (f->tx_rb != NULL) - ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(f->tx_rb, RB_FLOWDOWN); } __attribute__((cold)) @@ -890,7 +890,7 @@ static void frct_mark_peer_dead(struct frcti * frcti) struct flow * f = frcti_to_flow(frcti); if (f->rx_rb != NULL) - ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWPEER); + ssm_rbuff_set_bits(f->rx_rb, RB_FLOWPEER); if (proc.fqset != NULL) ssm_flow_set_notify(proc.fqset, f->info.id, FLOW_PEER); diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c index a896576d..24bb349f 100644 --- a/src/lib/serdes-irm.c +++ b/src/lib/serdes-irm.c @@ -174,6 +174,48 @@ int flow__irm_result_des(buffer_t * buf, else memset(sk->key, 0, SYMMKEYSZ); + sk->epoch = msg->has_generation ? (uint8_t) msg->generation : 0; + + irm_msg__free_unpacked(msg, NULL); + + return 0; + fail: + irm_msg__free_unpacked(msg, NULL); + fail_msg: + return err; +} + +int flow_rekey__irm_result_des(buffer_t * buf, + struct crypt_sk * sk, + bool * has_key) +{ + irm_msg_t * msg; + int err; + + msg = irm_msg__unpack(NULL, buf->len, buf->data); + if (msg == NULL) { + err = -EIRMD; + goto fail_msg; + } + + if (!msg->has_result) { + err = -EIRMD; + goto fail; + } + + if (msg->result < 0) { + err = msg->result; + goto fail; + } + + *has_key = msg->has_sym_key && msg->sym_key.len == SYMMKEYSZ; + if (*has_key) { + memcpy(sk->key, msg->sym_key.data, SYMMKEYSZ); + sk->nid = NID_undef; + sk->epoch = msg->has_generation ? + (uint8_t) msg->generation : 0; + } + irm_msg__free_unpacked(msg, NULL); return 0; @@ -222,6 +264,44 @@ int flow_dealloc__irm_req_ser(buffer_t * buf, return -ENOMEM; } +int flow_update__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + bool rekey) +{ + irm_msg_t * msg; + size_t len; + + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto fail_malloc; + + irm_msg__init(msg); + + msg->code = IRM_MSG_CODE__IRM_FLOW_UPDATE; + msg->flow_info = flow_info_s_to_msg(flow); + if (msg->flow_info == NULL) + goto fail_msg; + + msg->has_rekey = true; + msg->rekey = rekey; + + len = irm_msg__get_packed_size(msg); + if (len == 0 || len > buf->len) + goto fail_msg; + + buf->len = len; + + irm_msg__pack(msg, buf->data); + irm_msg__free_unpacked(msg, NULL); + + return 0; + + fail_msg: + irm_msg__free_unpacked(msg, NULL); + fail_malloc: + return -ENOMEM; +} + int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf, const struct flow_info * flow) { @@ -398,15 +478,19 @@ int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf, return 0; fail_msg: + /* hash/pk are borrowed from the caller; detach before free. */ + msg->hash.len = 0; + msg->hash.data = NULL; + msg->pk.len = 0; + msg->pk.data = NULL; irm_msg__free_unpacked(msg, NULL); fail_malloc: return -ENOMEM; } -int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, - const struct flow_info * flow, - int response, - const buffer_t * data) +int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, + const struct flow_info * flow, + const buffer_t * data) { irm_msg_t * msg; size_t len; @@ -417,16 +501,14 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, irm_msg__init(msg); - msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg->flow_info = flow_info_s_to_msg(flow); + msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR; + msg->flow_info = flow_info_s_to_msg(flow); if (msg->flow_info == NULL) goto fail_msg; msg->has_pk = true; msg->pk.len = data->len; msg->pk.data = data->data; - msg->has_response = true; - msg->response = response; len = irm_msg__get_packed_size(msg); if (len == 0 || len > buf->len) @@ -436,27 +518,25 @@ int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, irm_msg__pack(msg, buf->data); - /* Don't free * data! */ - msg->pk.len = 0; + /* Don't free data! */ + msg->pk.len = 0; msg->pk.data = NULL; - irm_msg__free_unpacked(msg, NULL); return 0; fail_msg: - /* hash/pk are borrowed from the caller; detach before free. */ - msg->hash.len = 0; - msg->hash.data = NULL; - msg->pk.len = 0; - msg->pk.data = NULL; + /* pk.data is borrowed from the caller; detach before free. */ + msg->pk.len = 0; + msg->pk.data = NULL; irm_msg__free_unpacked(msg, NULL); fail_malloc: return -ENOMEM; } -int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, - const struct flow_info * flow, - const buffer_t * data) +int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf, + const struct flow_info * flow, + int response, + const buffer_t * data) { irm_msg_t * msg; size_t len; @@ -467,14 +547,16 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, irm_msg__init(msg); - msg->code = IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR; - msg->flow_info = flow_info_s_to_msg(flow); + msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; + msg->flow_info = flow_info_s_to_msg(flow); if (msg->flow_info == NULL) goto fail_msg; msg->has_pk = true; msg->pk.len = data->len; msg->pk.data = data->data; + msg->has_response = true; + msg->response = response; len = irm_msg__get_packed_size(msg); if (len == 0 || len > buf->len) @@ -484,16 +566,14 @@ int ipcp_flow_update_arr__irm_req_ser(buffer_t * buf, irm_msg__pack(msg, buf->data); - /* Don't free data! */ - msg->pk.len = 0; + /* Don't free * data! */ + msg->pk.len = 0; msg->pk.data = NULL; + irm_msg__free_unpacked(msg, NULL); return 0; fail_msg: - /* pk.data is borrowed from the caller; detach before free. */ - msg->pk.len = 0; - msg->pk.data = NULL; irm_msg__free_unpacked(msg, NULL); fail_malloc: return -ENOMEM; diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c index c149c306..0121af89 100644 --- a/src/lib/ssm/rbuff.c +++ b/src/lib/ssm/rbuff.c @@ -74,7 +74,7 @@ struct ssm_rbuff { ssize_t * shm_base; /* start of shared memory */ size_t * head; /* start of ringbuffer */ size_t * tail; - size_t * acl; /* access control */ + size_t * flags; /* out-of-band flags (RB_*) */ pthread_mutex_t * mtx; /* lock for cond vars only */ pthread_cond_t * add; /* signal when new data */ pthread_cond_t * del; /* signal when data removed */ @@ -114,8 +114,8 @@ static struct ssm_rbuff * rbuff_create(pid_t pid, rb->shm_base = shm_base; rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE)); rb->tail = (size_t *) (rb->head + 1); - rb->acl = (size_t *) (rb->tail + 1); - rb->mtx = (pthread_mutex_t *) (rb->acl + 1); + rb->flags = (size_t *) (rb->tail + 1); + rb->mtx = (pthread_mutex_t *) (rb->flags + 1); rb->add = (pthread_cond_t *) (rb->mtx + 1); rb->del = rb->add + 1; rb->pid = pid; @@ -181,7 +181,7 @@ struct ssm_rbuff * ssm_rbuff_create(pid_t pid, if (pthread_cond_init(rb->del, &cattr)) goto fail_del; - *rb->acl = ACL_RDWR; + *rb->flags = RB_RDWR; *rb->head = 0; *rb->tail = 0; @@ -231,7 +231,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb) assert(rb); /* - * Caller must set ACL_FLOWDOWN first; if a user becomes + * Caller must set RB_FLOWDOWN first; if a user becomes * cancellable, push a cleanup that decrements n_users. */ while (__atomic_load_n(&rb->n_users, __ATOMIC_SEQ_CST) > 0) { @@ -245,7 +245,7 @@ void ssm_rbuff_close(struct ssm_rbuff * rb) int ssm_rbuff_write(struct ssm_rbuff * rb, size_t off) { - size_t acl; + size_t flags; bool was_empty; int ret = 0; @@ -253,15 +253,15 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (acl != ACL_RDWR) { - if (acl & ACL_FLOWDOWN) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (flags != RB_RDWR) { + if (flags & RB_FLOWDOWN) { ret = -EFLOWDOWN; - goto fail_acl; + goto fail_flags; } - if (acl & ACL_RDONLY) { + if (!(flags & RB_WR)) { ret = -ENOTALLOC; - goto fail_acl; + goto fail_flags; } } @@ -287,7 +287,7 @@ int ssm_rbuff_write(struct ssm_rbuff * rb, fail_mutex: pthread_mutex_unlock(rb->mtx); - fail_acl: + fail_flags: __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } @@ -296,7 +296,7 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, size_t off, const struct timespec * abstime) { - size_t acl; + size_t flags; int ret = 0; bool was_empty; @@ -304,15 +304,15 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (acl != ACL_RDWR) { - if (acl & ACL_FLOWDOWN) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (flags != RB_RDWR) { + if (flags & RB_FLOWDOWN) { ret = -EFLOWDOWN; - goto fail_acl; + goto fail_flags; } - if (acl & ACL_RDONLY) { + if (!(flags & RB_WR)) { ret = -ENOTALLOC; - goto fail_acl; + goto fail_flags; } } @@ -321,8 +321,8 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); while (IS_FULL(rb) && ret != -ETIMEDOUT) { - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (acl & ACL_FLOWDOWN) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (flags & RB_FLOWDOWN) { ret = -EFLOWDOWN; break; } @@ -341,25 +341,28 @@ int ssm_rbuff_write_b(struct ssm_rbuff * rb, pthread_mutex_unlock(rb->mtx); - fail_acl: + fail_flags: __atomic_fetch_sub(&rb->n_users, 1, __ATOMIC_SEQ_CST); return ret; } -static int check_rb_acl(struct ssm_rbuff * rb) +static int check_rb_flags(struct ssm_rbuff * rb) { - size_t acl; + size_t flags; assert(rb != NULL); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); - if (acl & ACL_FLOWDOWN) + if (flags & RB_FLOWDOWN) return -EFLOWDOWN; - if (acl & ACL_FLOWPEER) + if (flags & RB_FLOWPEER) return -EFLOWPEER; + if (!(flags & RB_RD)) + return -ENOTALLOC; + return -EAGAIN; } @@ -372,7 +375,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); if (IS_EMPTY(rb)) { - ret = check_rb_acl(rb); + ret = check_rb_flags(rb); goto out; } @@ -380,7 +383,7 @@ ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) if (IS_EMPTY(rb)) { pthread_mutex_unlock(rb->mtx); - ret = check_rb_acl(rb); + ret = check_rb_flags(rb); goto out; } @@ -400,14 +403,14 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, const struct timespec * abstime) { ssize_t idx = -1; - size_t acl; + size_t flags; assert(rb != NULL); __atomic_fetch_add(&rb->n_users, 1, __ATOMIC_SEQ_CST); - acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); - if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) { + flags = __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); + if (IS_EMPTY(rb) && (flags & RB_FLOWDOWN)) { idx = -EFLOWDOWN; goto out; } @@ -418,7 +421,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, while (IS_EMPTY(rb) && idx != -ETIMEDOUT && - check_rb_acl(rb) == -EAGAIN) { + check_rb_flags(rb) == -EAGAIN) { idx = -robust_wait(rb->add, rb->mtx, abstime); } @@ -429,7 +432,7 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, ADVANCE_TAIL(rb); pthread_cond_broadcast(rb->del); } else if (idx != -ETIMEDOUT) { - idx = check_rb_acl(rb); + idx = check_rb_flags(rb); } pthread_mutex_unlock(rb->mtx); @@ -441,23 +444,35 @@ ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, return idx; } -void ssm_rbuff_set_acl(struct ssm_rbuff * rb, - uint32_t flags) +void ssm_rbuff_set_bits(struct ssm_rbuff * rb, + uint32_t bits) +{ + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + __atomic_fetch_or(rb->flags, (size_t) bits, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(rb->add); + pthread_cond_broadcast(rb->del); + pthread_mutex_unlock(rb->mtx); +} + +void ssm_rbuff_clr_bits(struct ssm_rbuff * rb, + uint32_t bits) { assert(rb != NULL); robust_mutex_lock(rb->mtx); - __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST); + __atomic_fetch_and(rb->flags, ~(size_t) bits, __ATOMIC_SEQ_CST); pthread_cond_broadcast(rb->add); pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->mtx); } -uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb) +uint32_t ssm_rbuff_get_flags(struct ssm_rbuff * rb) { assert(rb != NULL); - return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + return (uint32_t) __atomic_load_n(rb->flags, __ATOMIC_SEQ_CST); } void ssm_rbuff_fini(struct ssm_rbuff * rb) diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c index 58cb39c3..48e5a714 100644 --- a/src/lib/ssm/tests/rbuff_test.c +++ b/src/lib/ssm/tests/rbuff_test.c @@ -206,10 +206,10 @@ static int test_ssm_rbuff_fill_drain(void) return TEST_RC_FAIL; } -static int test_ssm_rbuff_acl(void) +static int test_ssm_rbuff_flags(void) { struct ssm_rbuff * rb; - uint32_t acl; + uint32_t flags; TEST_START(); @@ -219,16 +219,16 @@ static int test_ssm_rbuff_acl(void) goto fail; } - acl = ssm_rbuff_get_acl(rb); - if (acl != ACL_RDWR) { - printf("Expected ACL_RDWR, got %u.\n", acl); + flags = ssm_rbuff_get_flags(rb); + if (flags != RB_RDWR) { + printf("Expected RB_RDWR, got %u.\n", flags); goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_RDONLY); - acl = ssm_rbuff_get_acl(rb); - if (acl != ACL_RDONLY) { - printf("Expected ACL_RDONLY, got %u.\n", acl); + ssm_rbuff_clr_bits(rb, RB_WR); + flags = ssm_rbuff_get_flags(rb); + if (flags != RB_RD) { + printf("Expected RB_RD, got %u.\n", flags); goto fail_rb; } @@ -237,7 +237,7 @@ static int test_ssm_rbuff_acl(void) goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rb, RB_FLOWDOWN); if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) { printf("Expected -EFLOWDOWN on FLOWDOWN.\n"); goto fail_rb; @@ -553,7 +553,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &interval, &abs_timeout); - ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rb, RB_FLOWDOWN); ret = ssm_rbuff_read_b(rb, &abs_timeout); if (ret != -EFLOWDOWN) { @@ -561,7 +561,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_RDWR); + ssm_rbuff_clr_bits(rb, RB_FLOWDOWN); for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { if (ssm_rbuff_write(rb, i) < 0) { @@ -573,7 +573,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) clock_gettime(PTHREAD_COND_CLOCK, &now); ts_add(&now, &interval, &abs_timeout); - ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + ssm_rbuff_set_bits(rb, RB_FLOWDOWN); ret = ssm_rbuff_write_b(rb, 999, &abs_timeout); if (ret != -EFLOWDOWN) { @@ -581,7 +581,7 @@ static int test_ssm_rbuff_blocking_flowdown(void) goto fail_rb; } - ssm_rbuff_set_acl(rb, ACL_RDWR); + ssm_rbuff_clr_bits(rb, RB_FLOWDOWN); while (ssm_rbuff_read(rb) >= 0) ; @@ -664,7 +664,7 @@ int rbuff_test(int argc, ret |= test_ssm_rbuff_write_read(); ret |= test_ssm_rbuff_read_empty(); ret |= test_ssm_rbuff_fill_drain(); - ret |= test_ssm_rbuff_acl(); + ret |= test_ssm_rbuff_flags(); ret |= test_ssm_rbuff_open_close(); ret |= test_ssm_rbuff_threaded(); ret |= test_ssm_rbuff_blocking(); |
