diff options
Diffstat (limited to 'src/irmd/main.c')
| -rw-r--r-- | src/irmd/main.c | 644 |
1 files changed, 642 insertions, 2 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 484a265a..3519e079 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,6 +36,7 @@ #include <ouroboros/crypt.h> #include <ouroboros/errno.h> #include <ouroboros/flow.h> +#include <ouroboros/fqueue.h> #include <ouroboros/hash.h> #include <ouroboros/irm.h> #include <ouroboros/list.h> @@ -86,6 +87,8 @@ #define TIMESYNC_SLACK 100 /* ms */ #define OAP_SEEN_TIMER 20 /* s */ #define DEALLOC_TIME 300 /* s */ +#define REKEY_BATCH 64 /* flows re-keyed per timer pass */ +#define REKEY_RESP_TIMEO 20 /* s; give-up on a re-key RESPONSE */ #define DIRECT_MPL 20 /* ms */ /* bytes; in-process, bounded only by PUP/GSPP. */ #define DIRECT_MTU 65000 @@ -105,6 +108,29 @@ struct cmd { int fd; }; +/* In-flight Tier-2 re-key, owned solely by the re-key worker thread. */ +struct rekey_ctx { + struct list_head next; + + int flow_id; + void * ctx; /* OAP client ctx (opaque) */ + struct timespec deadline; /* reap if no RESPONSE by then */ +}; + +enum rekey_evt_type { + REKEY_INIT = 0, /* start an exchange for flow_id */ + REKEY_RESP /* a RESPONSE arrived for flow_id */ +}; + +struct rekey_evt { + struct list_head next; + + enum rekey_evt_type type; + int flow_id; + pid_t n_1_pid; /* INIT: flow's lower IPCP */ + buffer_t buf; /* RESP: owned RESPONSE payload */ +}; + struct { bool log_stdout; /* log to stdout */ #ifdef HAVE_TOML @@ -126,6 +152,14 @@ struct { pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t acceptor; /* accept new commands */ + + struct { + pthread_t worker; /* Tier-2 re-key orchestrator */ + struct list_head inbox; /* re-key events for worker */ + pthread_cond_t cond; /* inbox signal condvar */ + pthread_mutex_t lock; /* inbox lock */ + bool stop; /* worker shutdown flag */ + } rk; } irmd; static enum irm_state irmd_get_state(void) @@ -932,6 +966,8 @@ static int flow_accept(struct flow_info * flow, log_err("Failed to respond to flow allocation."); goto fail_resp; } else { + if (sk->nid != NID_undef) + reg_flow_set_rekey(flow->id, false); log_info("Flow %d accepted by %d for %s (uid %d).", flow->id, flow->n_pid, name, flow->uid); } @@ -1358,6 +1394,9 @@ static int flow_alloc(const char * dst, goto fail_complete; } + if (sk->nid != NID_undef) + reg_flow_set_rekey(flow->id, true); + freebuf(req_hdr); freebuf(resp_hdr); freebuf(hash); @@ -1432,6 +1471,512 @@ static int flow_dealloc_resp(struct flow_info * flow) return 0; } +/* + * Inbox producers. Any thread may post; the worker drains. INIT carries + * the flow's lower IPCP pid; RESP transfers ownership of buf. + */ +static void rekey_post(enum rekey_evt_type type, + int flow_id, + pid_t n_1_pid, + buffer_t * buf) +{ + struct rekey_evt * evt; + + evt = malloc(sizeof(*evt)); + if (evt == NULL) { + log_err("Failed to post re-key event for flow %d.", flow_id); + if (buf != NULL) + freebuf(*buf); + return; + } + + list_head_init(&evt->next); + evt->type = type; + evt->flow_id = flow_id; + evt->n_1_pid = n_1_pid; + clrbuf(evt->buf); + if (buf != NULL) { + evt->buf = *buf; + clrbuf(*buf); + } + + pthread_mutex_lock(&irmd.rk.lock); + + list_add_tail(&evt->next, &irmd.rk.inbox); + pthread_cond_signal(&irmd.rk.cond); + + pthread_mutex_unlock(&irmd.rk.lock); +} + +static void rekey_post_init(int flow_id, + pid_t n_1_pid) +{ + rekey_post(REKEY_INIT, flow_id, n_1_pid, NULL); +} + +static void rekey_post_resp(int flow_id, + buffer_t * buf) +{ + rekey_post(REKEY_RESP, flow_id, 0, buf); +} + +/* Worker-only: find an in-flight entry by flow_id. */ +static struct rekey_ctx * rekey_find(struct list_head * tbl, + int flow_id) +{ + struct list_head * p; + + list_for_each(p, tbl) { + struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next); + if (e->flow_id == flow_id) + return e; + } + + return NULL; +} + +/* Worker-only: drop an entry, freeing its OAP ctx. */ +static void rekey_drop(struct rekey_ctx * e) +{ + if (e->ctx != NULL) + oap_ctx_free(e->ctx); + + list_del(&e->next); + free(e); +} + +/* Flow-update relay payload: a 1-byte type prefix on an opaque body. */ +enum flow_upd_type { + FLOW_UPD_REKEY_REQ = 0, + FLOW_UPD_REKEY_RESP = 1, +}; + +/* Prepend the update type to body; caller frees out on success. */ +static int flow_upd_wrap(buffer_t * out, + uint8_t type, + const buffer_t * body) +{ + out->len = body->len + 1; + out->data = malloc(out->len); + if (out->data == NULL) + return -ENOMEM; + + out->data[0] = type; + memcpy(out->data + 1, body->data, body->len); + + return 0; +} + +/* + * Worker-only: start a fresh OAP exchange over the live flow. Replaces + * any prior in-flight entry for flow_id (handles flow_id recycling). + */ +static void rekey_do_initiate(struct list_head * tbl, + int flow_id, + pid_t n_1_pid) +{ + struct rekey_ctx * e; + struct flow_info info; + struct name_info name; + buffer_t req = BUF_INIT; + buffer_t upd = BUF_INIT; + buffer_t data = BUF_INIT; + char nbuf[NAME_SIZE + 1]; + void * ctx = NULL; + int ret; + + e = rekey_find(tbl, flow_id); + if (e != NULL) + rekey_drop(e); + + if (reg_get_name_for_flow_id(nbuf, flow_id) < 0 || + reg_get_name_info(nbuf, &name) < 0) { + log_warn("No name info to re-key flow %d.", flow_id); + reg_flow_clear_in_flight(flow_id); + return; + } + + if (oap_cli_prepare(&ctx, &name, &req, data) < 0) { + log_err("Failed to prepare re-key for flow %d.", flow_id); + reg_flow_clear_in_flight(flow_id); + return; + } + + memset(&info, 0, sizeof(info)); + info.id = flow_id; + info.n_1_pid = n_1_pid; + + if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_REQ, &req) < 0) { + log_err("Failed to wrap re-key request for flow %d.", flow_id); + goto fail_ctx; + } + + ret = ipcp_flow_update(&info, upd); + freebuf(upd); + if (ret < 0) { + log_err("Failed to send re-key request for flow %d.", flow_id); + goto fail_ctx; + } + + e = malloc(sizeof(*e)); + if (e == NULL) { + log_err("Failed to track re-key for flow %d.", flow_id); + goto fail_ctx; + } + + list_head_init(&e->next); + e->flow_id = flow_id; + e->ctx = ctx; + clock_gettime(PTHREAD_COND_CLOCK, &e->deadline); + e->deadline.tv_sec += REKEY_RESP_TIMEO; + + list_add(&e->next, tbl); + + log_dbg("Re-key request sent for flow %d.", flow_id); + + freebuf(req); + + return; + + fail_ctx: + oap_ctx_free(ctx); + reg_flow_clear_in_flight(flow_id); + freebuf(req); +} + +/* Worker-only: complete the exchange, install the pending seed. */ +static void rekey_do_complete(struct list_head * tbl, + int flow_id, + buffer_t buf) +{ + struct rekey_ctx * e; + struct name_info info; + struct crypt_sk sk; + uint8_t kbuf[SYMMKEYSZ]; + buffer_t data = BUF_INIT; + char name[NAME_SIZE + 1]; + uint8_t newgen; + + e = rekey_find(tbl, flow_id); + if (e == NULL) { + log_dbg("Stale re-key RESPONSE for flow %d.", flow_id); + return; + } + + if (reg_get_name_for_flow_id(name, flow_id) < 0 || + reg_get_name_info(name, &info) < 0) { + log_err("No name info to re-key flow %d.", flow_id); + goto finish; + } + + sk.key = kbuf; + + /* oap_cli_complete frees the ctx on every path. */ + if (oap_cli_complete(e->ctx, &info, buf, &data, &sk) < 0) { + log_err("Re-key completion failed for flow %d.", flow_id); + e->ctx = NULL; + goto finish; + } + + e->ctx = NULL; + + newgen = data.len == 1 ? *(uint8_t *) data.data : 0; + + if (newgen >= 16) { + log_warn("Re-key gen %u out of range for flow %d.", + newgen, flow_id); + goto finish_clear; + } + + if (reg_flow_store_pending(flow_id, kbuf, newgen) < 0) + log_warn("Flow %d gone during re-key.", flow_id); + else + reg_notify_flow(flow_id, FLOW_UPD); + + log_dbg("Re-key completed for flow %d (gen %u).", flow_id, newgen); + + finish_clear: + crypt_secure_clear(kbuf, SYMMKEYSZ); + freebuf(data); + finish: + rekey_drop(e); + reg_flow_clear_in_flight(flow_id); +} + +/* Worker-only: reap entries whose RESPONSE never arrived. */ +static void rekey_reap_expired(struct list_head * tbl) +{ + struct list_head * p; + struct list_head * h; + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + list_for_each_safe(p, h, tbl) { + struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next); + if (ts_diff_ns(&e->deadline, &now) > 0) + continue; + + log_warn("Re-key timed out for flow %d.", e->flow_id); + reg_flow_clear_in_flight(e->flow_id); + rekey_drop(e); + } +} + +/* Responder side: process request, install pending seed, send response. */ +static int rekey_respond(struct flow_info * flow, + buffer_t * pk) +{ + struct name_info info; + struct crypt_sk sk; + uint8_t kbuf[SYMMKEYSZ]; + buffer_t rsp = BUF_INIT; + buffer_t upd = BUF_INIT; + buffer_t data = BUF_INIT; + char name[NAME_SIZE + 1]; + uint8_t newgen; + int epoch; + int err; + + epoch = reg_flow_get_epoch(flow->id); + if (epoch < 0) { + log_warn("Re-key for unknown flow %d.", flow->id); + return -EBADF; + } + + if (reg_get_name_for_flow_id(name, flow->id) < 0 || + reg_get_name_info(name, &info) < 0) { + log_err("No name info to re-key flow %d.", flow->id); + return -ENAME; + } + + if (reg_flow_rekey_pending(flow->id)) { + log_dbg("Duplicate re-key request for flow %d.", flow->id); + return 0; + } + + newgen = (uint8_t) ((epoch + 1) & 0x0F); + data.data = &newgen; + data.len = 1; + + sk.key = kbuf; + + err = oap_srv_process(&info, *pk, &rsp, &data, &sk); + if (err < 0) { + /* data still points to stack newgen; don't free it. */ + log_err("Re-key OAP failed for flow %d.", flow->id); + goto finish; + } + + /* On success oap_srv_process repointed data to client output. */ + freebuf(data); + + if (reg_flow_store_pending(flow->id, kbuf, newgen) < 0) { + log_warn("Flow %d gone during re-key.", flow->id); + err = -EBADF; + goto finish; + } + + reg_notify_flow(flow->id, FLOW_UPD); + + if (flow_upd_wrap(&upd, FLOW_UPD_REKEY_RESP, &rsp) == 0) { + if (ipcp_flow_update(flow, upd) < 0) + log_err("Failed to send re-key response for flow %d.", + flow->id); + freebuf(upd); + } + + err = 0; + finish: + crypt_secure_clear(kbuf, SYMMKEYSZ); + freebuf(rsp); + + return err; +} + +static int flow_update_arr(struct flow_info * flow, + buffer_t * pk) +{ + uint8_t type; + + if (pk->len < 1) + return -EINVAL; + + type = pk->data[0]; + + /* Strip the type byte, keeping the malloc base for hand-off. */ + memmove(pk->data, pk->data + 1, pk->len - 1); + pk->len -= 1; + + switch (type) { + case FLOW_UPD_REKEY_REQ: + return rekey_respond(flow, pk); + case FLOW_UPD_REKEY_RESP: + /* Hand the payload to the worker and take ownership. */ + rekey_post_resp(flow->id, pk); + return 0; + default: + log_warn("Unknown flow update type %u.", type); + return -EINVAL; + } +} + +static int flow_update(struct flow_info * flow, + bool rekey, + struct crypt_sk * sk, + bool * has_key) +{ + uint8_t seed[SYMMKEYSZ]; + uint8_t epoch; + + *has_key = false; + + if (rekey) { + /* Watermark re-key: the app can't know its lower IPCP. */ + pid_t n_1_pid = reg_flow_get_n_1_pid(flow->id); + if (n_1_pid > 0) + rekey_post_init(flow->id, n_1_pid); + return 0; + } + + if (!reg_flow_take_pending(flow->id, seed, &epoch)) + return 0; + + memcpy(sk->key, seed, SYMMKEYSZ); + sk->epoch = epoch; + *has_key = true; + + crypt_secure_clear(seed, SYMMKEYSZ); + + log_dbg("Delivered re-key seed for flow %d (gen %u).", + flow->id, epoch); + + return 0; +} + +/* Free every parked OAP ctx at worker exit or cancellation. */ +static void rekey_table_cleanup(void * o) +{ + struct list_head * tbl = o; + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, tbl) { + struct rekey_ctx * e = list_entry(p, struct rekey_ctx, next); + rekey_drop(e); + } +} + +/* Pop one event, or NULL if none, draining the inbox under its lock. */ +static struct rekey_evt * rekey_inbox_wait(const struct timespec * deadline) +{ + struct rekey_evt * evt = NULL; + struct timespec now; + + pthread_mutex_lock(&irmd.rk.lock); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + while (list_is_empty(&irmd.rk.inbox) && !irmd.rk.stop && + ts_diff_ns(deadline, &now) > 0) { + pthread_cond_timedwait(&irmd.rk.cond, &irmd.rk.lock, deadline); + clock_gettime(PTHREAD_COND_CLOCK, &now); + } + + if (!list_is_empty(&irmd.rk.inbox)) { + evt = list_first_entry(&irmd.rk.inbox, struct rekey_evt, next); + list_del(&evt->next); + } + + pthread_mutex_unlock(&irmd.rk.lock); + + return evt; +} + +/* + * Single worker owning all in-flight Tier-2 re-keys. It drains the + * inbox, runs the periodic snapshot, and reaps timed-out exchanges. + * The table is touched only here, so it needs no lock. + */ +static void * rekey_worker(void * o) +{ + struct list_head table; + struct timespec next; + + (void) o; + + list_head_init(&table); + + clock_gettime(PTHREAD_COND_CLOCK, &next); + next.tv_sec += OAP_REKEY_TIMER; + + pthread_cleanup_push(rekey_table_cleanup, &table); + + while (!irmd.rk.stop) { + struct rekey_evt * evt; + struct timespec now; + struct timespec deadline = next; + struct list_head * p; + + /* Wake no later than the soonest in-flight deadline. */ + list_for_each(p, &table) { + struct rekey_ctx * e; + e = list_entry(p, struct rekey_ctx, next); + if (ts_diff_ns(&e->deadline, &deadline) < 0) + deadline = e->deadline; + } + + evt = rekey_inbox_wait(&deadline); + + if (irmd.rk.stop) { + if (evt != NULL) { + freebuf(evt->buf); + free(evt); + } + break; + } + + if (evt != NULL) { + switch (evt->type) { + case REKEY_INIT: + rekey_do_initiate(&table, evt->flow_id, + evt->n_1_pid); + break; + case REKEY_RESP: + rekey_do_complete(&table, evt->flow_id, + evt->buf); + freebuf(evt->buf); + break; + default: + break; + } + + free(evt); + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + if (ts_diff_ns(&next, &now) <= 0) { + struct rekey_info snap[REKEY_BATCH]; + int n; + int i; + + n = reg_flow_snapshot_rekey_due(snap, REKEY_BATCH); + for (i = 0; i < n; ++i) + rekey_do_initiate(&table, snap[i].flow_id, + snap[i].n_1_pid); + + clock_gettime(PTHREAD_COND_CLOCK, &next); + next.tv_sec += OAP_REKEY_TIMER; + } + + rekey_reap_expired(&table); + } + + pthread_cleanup_pop(1); + + return (void *) 0; +} + static void * acceptloop(void * o) { int csockfd; @@ -1502,6 +2047,7 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, struct timespec now; struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */ int res; + bool has_key = false; irm_msg_t * ret_msg; buffer_t data; @@ -1622,7 +2168,8 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, hbuf = malloc(SYMMKEYSZ); if (hbuf == NULL) { log_err("Failed to malloc key buf"); - return NULL; + res = -ENOMEM; + break; } memcpy(hbuf, kbuf, SYMMKEYSZ); @@ -1652,7 +2199,8 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, hbuf = malloc(SYMMKEYSZ); if (hbuf == NULL) { log_err("Failed to malloc key buf"); - return NULL; + res = -ENOMEM; + break; } memcpy(hbuf, kbuf, SYMMKEYSZ); ret_msg->sym_key.data = hbuf; @@ -1698,6 +2246,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg, flow = flow_info_msg_to_s(msg->flow_info); res = flow_alloc_reply(&flow, msg->response, &data); break; + case IRM_MSG_CODE__IPCP_FLOW_UPDATE_ARR: + data.len = msg->pk.len; + data.data = msg->pk.data; + msg->pk.data = NULL; /* pass data */ + msg->pk.len = 0; + flow = flow_info_msg_to_s(msg->flow_info); + res = flow_update_arr(&flow, &data); + freebuf(data); + break; + case IRM_MSG_CODE__IRM_FLOW_UPDATE: + flow = flow_info_msg_to_s(msg->flow_info); + sk.key = kbuf; + res = flow_update(&flow, msg->rekey, &sk, &has_key); + if (res == 0) { + ret_msg->flow_info = flow_info_s_to_msg(&flow); + if (has_key) { + hbuf = malloc(SYMMKEYSZ); + if (hbuf == NULL) { + log_err("Failed to malloc key buf"); + res = -ENOMEM; + break; + } + + memcpy(hbuf, kbuf, SYMMKEYSZ); + ret_msg->sym_key.data = hbuf; + ret_msg->sym_key.len = SYMMKEYSZ; + ret_msg->has_sym_key = true; + ret_msg->has_generation = true; + ret_msg->generation = sk.epoch; + } + } + break; default: log_err("Don't know that message code."); res = -1; @@ -2060,6 +2640,29 @@ static int irm_init(void) list_head_init(&irmd.cmds); + if (pthread_mutex_init(&irmd.rk.lock, NULL)) { + log_err("Failed to initialize mutex."); + goto fail_rk_lock; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + goto fail_rk_lock; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd.rk.cond, &cattr)) { + log_err("Failed to initialize condvar."); + pthread_condattr_destroy(&cattr); + goto fail_rk_cond; + } + + pthread_condattr_destroy(&cattr); + + list_head_init(&irmd.rk.inbox); + if (stat(SOCK_PATH, &st) == -1) { if (mkdir(SOCK_PATH, 0777)) { log_err("Failed to create sockets directory."); @@ -2163,6 +2766,10 @@ static int irm_init(void) fail_sock_path: unlink(IRM_SOCK_PATH); fail_stat: + pthread_cond_destroy(&irmd.rk.cond); + fail_rk_cond: + pthread_mutex_destroy(&irmd.rk.lock); + fail_rk_lock: pthread_cond_destroy(&irmd.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&irmd.cmd_lock); @@ -2211,8 +2818,22 @@ static void irm_fini(void) pthread_mutex_unlock(&irmd.cmd_lock); + pthread_mutex_lock(&irmd.rk.lock); + + list_for_each_safe(p, h, &irmd.rk.inbox) { + struct rekey_evt * evt; + evt = list_entry(p, struct rekey_evt, next); + list_del(&evt->next); + freebuf(evt->buf); + free(evt); + } + + pthread_mutex_unlock(&irmd.rk.lock); + pthread_mutex_destroy(&irmd.cmd_lock); pthread_cond_destroy(&irmd.cmd_cond); + pthread_mutex_destroy(&irmd.rk.lock); + pthread_cond_destroy(&irmd.rk.cond); pthread_rwlock_destroy(&irmd.state_lock); #ifdef HAVE_FUSE @@ -2250,10 +2871,18 @@ static int irm_start(void) if (pthread_create(&irmd.acceptor, NULL, acceptloop, NULL)) goto fail_acceptor; + irmd.rk.stop = false; + if (OAP_REKEY_TIMER > 0 && + pthread_create(&irmd.rk.worker, NULL, rekey_worker, NULL)) + goto fail_rekey_worker; + log_info("Ouroboros IPC Resource Manager daemon started..."); return 0; + fail_rekey_worker: + pthread_cancel(irmd.acceptor); + pthread_join(irmd.acceptor, NULL); fail_acceptor: pthread_cancel(irmd.irm_sanitize); pthread_join(irmd.irm_sanitize, NULL); @@ -2293,6 +2922,17 @@ static void irm_sigwait(sigset_t sigset) static void irm_stop(void) { + if (OAP_REKEY_TIMER > 0) { + pthread_mutex_lock(&irmd.rk.lock); + + irmd.rk.stop = true; + pthread_cond_signal(&irmd.rk.cond); + + pthread_mutex_unlock(&irmd.rk.lock); + + pthread_join(irmd.rk.worker, NULL); + } + pthread_cancel(irmd.acceptor); pthread_cancel(irmd.irm_sanitize); |
