summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-05-03 18:25:33 +0200
committerSander Vrijders <sander@ouroboros.rocks>2026-05-20 08:17:06 +0200
commit1e75103b1546f1fc732c37c93b85510b4b4f8c81 (patch)
tree1ed73603919803122094ad838aa72f6d93e1bd56
parentea864c3d3e8ff75ffbbc1e3f01db09daa9b7a5c8 (diff)
downloadouroboros-1e75103b1546f1fc732c37c93b85510b4b4f8c81.tar.gz
ouroboros-1e75103b1546f1fc732c37c93b85510b4b4f8c81.zip
lib: Add generic hierarchical timing wheel
Introduce a generic 3-level / 256-slot deadline-ordered callback queue (1 ms / 16 ms / 256 ms per-slot resolution at levels 0/1/2). Will replace the existing timerwheel.c. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--include/ouroboros/time.h6
-rw-r--r--include/ouroboros/tw.h77
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/tests/CMakeLists.txt1
-rw-r--r--src/lib/tests/tw_test.c663
-rw-r--r--src/lib/tw.c307
6 files changed, 1055 insertions, 0 deletions
diff --git a/include/ouroboros/time.h b/include/ouroboros/time.h
index 3d037a3c..a4136e8e 100644
--- a/include/ouroboros/time.h
+++ b/include/ouroboros/time.h
@@ -46,6 +46,12 @@
#define TS_TO_UINT64(ts) \
((uint64_t)(ts).tv_sec * BILLION + (uint64_t)(ts).tv_nsec)
+#define UINT64_TO_TS(ns, ts) \
+ do { \
+ (ts)->tv_sec = (time_t)((ns) / BILLION); \
+ (ts)->tv_nsec = (long)((ns) % BILLION); \
+ } while (0)
+
#define TIMEVAL_INIT_S(s) {(s), 0}
#define TIMEVAL_INIT_MS(ms) {(ms) / 1000, ((ms) % 1000) * 1000}
#define TIMEVAL_INIT_US(us) {(us) / MILLION, ((us) % MILLION)}
diff --git a/include/ouroboros/tw.h b/include/ouroboros/tw.h
new file mode 100644
index 00000000..156f99db
--- /dev/null
+++ b/include/ouroboros/tw.h
@@ -0,0 +1,77 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Generic deadline-ordered callback queue (timing wheel)
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_TW_H
+#define OUROBOROS_TW_H
+
+#include <ouroboros/cdefs.h>
+#include <ouroboros/list.h>
+
+#include <stddef.h>
+#include <stdint.h>
+#include <time.h>
+
+typedef void (*tw_fire_fn_t)(void * arg);
+
+struct tw_entry {
+ struct list_head next;
+ uint64_t deadline_ns;
+ tw_fire_fn_t fire;
+ void * arg;
+ size_t lvl;
+};
+
+__BEGIN_DECLS
+
+int tw_init(void);
+
+void tw_fini(void);
+
+void tw_init_entry(struct tw_entry * e);
+
+/*
+ * Schedule e to fire at deadline_ns. If e is already posted,
+ * the previous schedule is cancelled and replaced.
+ */
+void tw_post(struct tw_entry * e,
+ uint64_t deadline_ns,
+ tw_fire_fn_t fire,
+ void * arg);
+
+void tw_cancel(struct tw_entry * e);
+
+/*
+ * Advance the wheel and fire due callbacks. Callbacks run with the wheel
+ * unlocked and may call tw_post / tw_cancel on any entry, including the one
+ * currently firing. Concurrent tw_move from a second thread is a no-op.
+ */
+void tw_move(void);
+
+/*
+ * Write the absolute deadline of the earliest pending entry to *out.
+ * Empty wheel is signalled by out->tv_nsec == -1.
+ */
+void tw_next_expiry(struct timespec * out);
+
+__END_DECLS
+
+#endif /* OUROBOROS_TW_H */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index f68d3601..6cd3a8a4 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -39,6 +39,7 @@ set(SOURCE_FILES_COMMON
ssm/pool.c
sockets.c
tpm.c
+ tw.c
utils.c
)
diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt
index 337d85a6..32836589 100644
--- a/src/lib/tests/CMakeLists.txt
+++ b/src/lib/tests/CMakeLists.txt
@@ -19,6 +19,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
sockets_test.c
time_test.c
tpm_test.c
+ tw_test.c
)
add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests})
diff --git a/src/lib/tests/tw_test.c b/src/lib/tests/tw_test.c
new file mode 100644
index 00000000..32c302c4
--- /dev/null
+++ b/src/lib/tests/tw_test.c
@@ -0,0 +1,663 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Generic timing-wheel tests
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200809L
+#endif
+
+#include "config.h"
+
+#include <test/test.h>
+
+#include <ouroboros/time.h>
+#include <ouroboros/tw.h>
+
+#include <stdint.h>
+#include <stdio.h>
+#include <time.h>
+
+struct payload {
+ struct tw_entry tw;
+ int fired;
+};
+
+struct cancel_payload {
+ struct tw_entry tw;
+ int fired;
+ struct tw_entry * sibling;
+};
+
+struct repost_payload {
+ struct tw_entry tw;
+ int fired;
+ struct payload * sibling;
+ uint64_t repost_at;
+};
+
+static void cb_count(void * arg)
+{
+ struct payload * p = arg;
+ p->fired++;
+}
+
+static void cb_cancel_sibling(void * arg)
+{
+ struct cancel_payload * p = arg;
+ p->fired++;
+ tw_cancel(p->sibling);
+}
+
+static void cb_repost_sibling(void * arg)
+{
+ struct repost_payload * p = arg;
+ p->fired++;
+ tw_post(&p->sibling->tw, p->repost_at, cb_count, p->sibling);
+}
+
+static uint64_t now_ns(void)
+{
+ struct timespec ts;
+ clock_gettime(PTHREAD_COND_CLOCK, &ts);
+ return TS_TO_UINT64(ts);
+}
+
+static void sleep_ns(uint64_t ns)
+{
+ struct timespec ts;
+ UINT64_TO_TS(ns, &ts);
+ nanosleep(&ts, NULL);
+}
+
+static int test_tw_init_fini(void)
+{
+ TEST_START();
+
+ if (tw_init() < 0) {
+ printf("tw_init failed.\n");
+ goto fail;
+ }
+
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_tw_post_fires_after_deadline(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 5 * MILLION, cb_count, &p);
+
+ sleep_ns(20 * MILLION);
+ tw_move();
+
+ if (p.fired != 1) {
+ printf("expected 1 fire, got %d\n", p.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_tw_no_fire_before_deadline(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 100 * MILLION, cb_count, &p);
+
+ sleep_ns(2 * MILLION);
+ tw_move();
+
+ if (p.fired != 0) {
+ printf("expected 0 fires, got %d\n", p.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_tw_cancel_prevents_fire(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 5 * MILLION, cb_count, &p);
+ tw_cancel(&p.tw);
+
+ sleep_ns(20 * MILLION);
+ tw_move();
+
+ if (p.fired != 0) {
+ printf("cancelled entry fired %d times\n", p.fired);
+ goto fail_init;
+ }
+
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_init:
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_tw_cancel_unposted_is_noop(void)
+{
+ struct tw_entry e;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&e);
+ tw_cancel(&e);
+ tw_cancel(&e);
+
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_tw_fire_only_once(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 3 * MILLION, cb_count, &p);
+
+ sleep_ns(20 * MILLION);
+ tw_move();
+ tw_move();
+ tw_move();
+
+ if (p.fired != 1) {
+ printf("expected 1 fire, got %d after 3 moves\n", p.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* Multi-level: post a level-1 (>= 256ms) deadline; should still fire. */
+static int test_tw_post_level1_fires(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 300 * MILLION, cb_count, &p);
+
+ if (p.tw.lvl != 1) {
+ printf("expected level 1 placement, got %zu\n", p.tw.lvl);
+ goto fail_post;
+ }
+
+ sleep_ns(320 * MILLION);
+ tw_move();
+
+ if (p.fired != 1) {
+ printf("level-1 entry didn't fire (got %d)\n", p.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_tw_many_entries_all_fire(void)
+{
+ struct payload pl[16];
+ size_t i;
+ size_t total = 0;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ for (i = 0; i < 16; ++i) {
+ tw_init_entry(&pl[i].tw);
+ pl[i].fired = 0;
+ tw_post(&pl[i].tw, now_ns() + (1 + i) * MILLION,
+ cb_count, &pl[i]);
+ }
+
+ sleep_ns(40 * MILLION);
+ tw_move();
+
+ for (i = 0; i < 16; ++i)
+ total += pl[i].fired;
+
+ if (total != 16) {
+ printf("expected 16 fires, got %zu\n", total);
+ goto fail_post;
+ }
+
+ for (i = 0; i < 16; ++i)
+ tw_cancel(&pl[i].tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ for (i = 0; i < 16; ++i)
+ tw_cancel(&pl[i].tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* tw_next_expiry signals empty wheel via tv_nsec == -1. */
+static int test_tw_next_expiry_empty(void)
+{
+ struct timespec out;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_next_expiry(&out);
+ if (out.tv_nsec != -1) {
+ printf("expected tv_nsec=-1, got %ld\n", (long) out.tv_nsec);
+ goto fail_init;
+ }
+
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_init:
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* tw_next_expiry returns a deadline within the right ballpark. */
+static int test_tw_next_expiry_returns_deadline(void)
+{
+ struct payload p;
+ struct timespec out;
+ uint64_t target;
+ uint64_t out_ns;
+ int64_t skew;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ target = now_ns() + 50 * MILLION;
+ tw_post(&p.tw, target, cb_count, &p);
+
+ tw_next_expiry(&out);
+ out_ns = TS_TO_UINT64(out);
+
+ /* Level-0 quantization gives ±1 slot of skew. */
+ skew = (int64_t)(out_ns) - (int64_t)(target);
+ if (skew < -2 * MILLION || skew > 4 * MILLION) {
+ printf("deadline not in -2..+4 ms, skew=%ld ns\n", (long) skew);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* Repost: fire, then post again. */
+static int test_tw_repost_after_fire(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 3 * MILLION, cb_count, &p);
+ sleep_ns(20 * MILLION);
+ tw_move();
+ if (p.fired != 1) {
+ printf("first fire missed\n");
+ goto fail_post;
+ }
+
+ tw_post(&p.tw, now_ns() + 3 * MILLION, cb_count, &p);
+ sleep_ns(20 * MILLION);
+ tw_move();
+ if (p.fired != 2) {
+ printf("second fire missed (fired=%d)\n", p.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* Double-post replaces the schedule; only the second fires. */
+static int test_tw_double_post_replaces(void)
+{
+ struct payload p;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&p.tw);
+ p.fired = 0;
+
+ tw_post(&p.tw, now_ns() + 30 * MILLION, cb_count, &p);
+ tw_post(&p.tw, now_ns() + 3 * MILLION, cb_count, &p);
+
+ sleep_ns(20 * MILLION);
+ tw_move();
+
+ if (p.fired != 1) {
+ printf("expected 1 fire after replace, got %d\n", p.fired);
+ goto fail_post;
+ }
+
+ sleep_ns(40 * MILLION);
+ tw_move();
+
+ if (p.fired != 1) {
+ printf("first schedule fired after replace (got %d)\n",
+ p.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&p.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&p.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* Fire callback may safely cancel a sibling in the same slot. */
+static int test_tw_fire_cancels_sibling(void)
+{
+ struct cancel_payload a;
+ struct payload b;
+ uint64_t deadline;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&a.tw);
+ tw_init_entry(&b.tw);
+ a.fired = 0;
+ a.sibling = &b.tw;
+ b.fired = 0;
+
+ deadline = now_ns() + 3 * MILLION;
+ tw_post(&a.tw, deadline, cb_cancel_sibling, &a);
+ tw_post(&b.tw, deadline, cb_count, &b);
+
+ sleep_ns(20 * MILLION);
+ tw_move();
+
+ if (a.fired != 1) {
+ printf("a expected 1 fire, got %d\n", a.fired);
+ goto fail_post;
+ }
+ if (b.fired != 0) {
+ printf("b should not have fired (got %d)\n", b.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&a.tw);
+ tw_cancel(&b.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&a.tw);
+ tw_cancel(&b.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+/* Fire callback may safely repost a sibling to a future slot. */
+static int test_tw_fire_posts_sibling(void)
+{
+ struct repost_payload a;
+ struct payload b;
+ uint64_t deadline;
+
+ TEST_START();
+
+ if (tw_init() < 0)
+ goto fail;
+
+ tw_init_entry(&a.tw);
+ tw_init_entry(&b.tw);
+ a.fired = 0;
+ a.sibling = &b;
+ a.repost_at = now_ns() + 30 * MILLION;
+ b.fired = 0;
+
+ deadline = now_ns() + 3 * MILLION;
+ tw_post(&a.tw, deadline, cb_repost_sibling, &a);
+ tw_post(&b.tw, deadline, cb_count, &b);
+
+ sleep_ns(20 * MILLION);
+ tw_move();
+
+ if (a.fired != 1) {
+ printf("a expected 1 fire, got %d\n", a.fired);
+ goto fail_post;
+ }
+ if (b.fired != 0) {
+ printf("b fired before reposted deadline (got %d)\n",
+ b.fired);
+ goto fail_post;
+ }
+
+ sleep_ns(25 * MILLION);
+ tw_move();
+
+ if (b.fired != 1) {
+ printf("b expected 1 fire after repost, got %d\n",
+ b.fired);
+ goto fail_post;
+ }
+
+ tw_cancel(&a.tw);
+ tw_cancel(&b.tw);
+ tw_fini();
+
+ TEST_SUCCESS();
+
+ return TEST_RC_SUCCESS;
+ fail_post:
+ tw_cancel(&a.tw);
+ tw_cancel(&b.tw);
+ tw_fini();
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int tw_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_tw_init_fini();
+ ret |= test_tw_post_fires_after_deadline();
+ ret |= test_tw_no_fire_before_deadline();
+ ret |= test_tw_cancel_prevents_fire();
+ ret |= test_tw_cancel_unposted_is_noop();
+ ret |= test_tw_fire_only_once();
+ ret |= test_tw_post_level1_fires();
+ ret |= test_tw_many_entries_all_fire();
+ ret |= test_tw_next_expiry_empty();
+ ret |= test_tw_next_expiry_returns_deadline();
+ ret |= test_tw_repost_after_fire();
+ ret |= test_tw_double_post_replaces();
+ ret |= test_tw_fire_cancels_sibling();
+ ret |= test_tw_fire_posts_sibling();
+
+ return ret;
+}
diff --git a/src/lib/tw.c b/src/lib/tw.c
new file mode 100644
index 00000000..ccde7dd1
--- /dev/null
+++ b/src/lib/tw.c
@@ -0,0 +1,307 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Generic deadline-ordered callback queue (timing wheel)
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200809L
+#endif
+
+#include "config.h"
+
+#include <ouroboros/list.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/time.h>
+#include <ouroboros/tw.h>
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+/* 3 levels × 256 slots, 1 ms / 16 ms / 256 ms per-slot resolution. */
+#define TW_LVLS 3
+#define TW_SLOTS 256
+#define TW_BUMP 4
+#define TW_RES 20 /* 2^20 ns ≈ 1 ms per slot at level 0. */
+
+#define TW_SLOT(x) ((x) & (TW_SLOTS - 1))
+
+static struct {
+ struct list_head levels[TW_LVLS][TW_SLOTS];
+ size_t prv[TW_LVLS];
+ pthread_mutex_t mtx;
+ pthread_mutex_t move_mtx;
+ bool initialised;
+} tw;
+
+static size_t tw_lvl_res(size_t lvl)
+{
+ return TW_RES + TW_BUMP * lvl;
+}
+
+/* Smallest level whose slot range covers the deadline. */
+static size_t tw_pick_lvl(uint64_t now_ns,
+ uint64_t deadline_ns)
+{
+ uint64_t delta;
+ size_t lvl;
+
+ delta = deadline_ns > now_ns ? deadline_ns - now_ns : 0;
+ lvl = 0;
+
+ while (lvl < TW_LVLS - 1 && (delta >> tw_lvl_res(lvl)) >= TW_SLOTS)
+ ++lvl;
+
+ return lvl;
+}
+
+static size_t tw_slot(uint64_t ns,
+ size_t lvl)
+{
+ return TW_SLOT(ns >> tw_lvl_res(lvl));
+}
+
+int tw_init(void)
+{
+ struct timespec now;
+ size_t i;
+ size_t j;
+
+ assert(!tw.initialised);
+
+ if (pthread_mutex_init(&tw.mtx, NULL))
+ goto fail_mtx;
+
+ if (pthread_mutex_init(&tw.move_mtx, NULL))
+ goto fail_move_mtx;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ for (i = 0; i < TW_LVLS; ++i) {
+ tw.prv[i] = TW_SLOT(tw_slot(TS_TO_UINT64(now), i) - 1);
+ for (j = 0; j < TW_SLOTS; ++j)
+ list_head_init(&tw.levels[i][j]);
+ }
+
+ tw.initialised = true;
+
+ return 0;
+
+ fail_move_mtx:
+ pthread_mutex_destroy(&tw.mtx);
+ fail_mtx:
+ return -1;
+}
+
+void tw_fini(void)
+{
+ size_t i;
+ size_t j;
+
+ assert(tw.initialised);
+
+ for (i = 0; i < TW_LVLS; ++i) {
+ for (j = 0; j < TW_SLOTS; ++j)
+ assert(list_is_empty(&tw.levels[i][j]));
+ }
+
+ pthread_mutex_destroy(&tw.move_mtx);
+ pthread_mutex_destroy(&tw.mtx);
+
+ tw.initialised = false;
+}
+
+void tw_init_entry(struct tw_entry * e)
+{
+ list_head_init(&e->next);
+
+ e->deadline_ns = 0;
+ e->fire = NULL;
+ e->arg = NULL;
+ e->lvl = 0;
+}
+
+void tw_post(struct tw_entry * e,
+ uint64_t deadline_ns,
+ tw_fire_fn_t fire,
+ void * arg)
+{
+ struct timespec now;
+ size_t lvl;
+ size_t slot;
+
+ assert(tw.initialised);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ lvl = tw_pick_lvl(TS_TO_UINT64(now), deadline_ns);
+ /* +1 so deadline <= slot_start; lands later in slot. */
+ slot = TW_SLOT(tw_slot(deadline_ns, lvl) + 1);
+
+ e->deadline_ns = deadline_ns;
+ e->fire = fire;
+ e->arg = arg;
+ e->lvl = lvl;
+
+ pthread_mutex_lock(&tw.mtx);
+
+ if (!list_is_empty(&e->next))
+ list_del(&e->next);
+
+ list_add_tail(&e->next, &tw.levels[lvl][slot]);
+
+ pthread_mutex_unlock(&tw.mtx);
+}
+
+void tw_cancel(struct tw_entry * e)
+{
+ if (e == NULL)
+ return;
+
+ assert(tw.initialised);
+
+ pthread_mutex_lock(&tw.mtx);
+
+ if (!list_is_empty(&e->next)) {
+ list_del(&e->next);
+ list_head_init(&e->next);
+ }
+
+ pthread_mutex_unlock(&tw.mtx);
+}
+
+void tw_move(void)
+{
+ struct timespec now;
+ struct list_head deferred;
+ struct list_head * p;
+ uint64_t now_ns;
+ size_t i;
+ size_t j;
+ size_t cur;
+
+ assert(tw.initialised);
+
+ if (pthread_mutex_trylock(&tw.move_mtx) != 0)
+ return;
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &tw.move_mtx);
+
+ pthread_mutex_lock(&tw.mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &tw.mtx);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ now_ns = TS_TO_UINT64(now);
+
+ for (i = 0; i < TW_LVLS; ++i) {
+ cur = tw_slot(now_ns, i);
+
+ j = tw.prv[i];
+ if (cur < j)
+ cur += TW_SLOTS;
+
+ while (j++ < cur) {
+ size_t s = TW_SLOT(j);
+
+ /* Pop-front so fire may mutate any entry. */
+ list_head_init(&deferred);
+
+ while (!list_is_empty(&tw.levels[i][s])) {
+ struct tw_entry * e;
+ p = tw.levels[i][s].nxt;
+ e = list_entry(p, struct tw_entry, next);
+ list_del(&e->next);
+
+ if (e->deadline_ns > now_ns) {
+ list_add_tail(&e->next, &deferred);
+ continue;
+ }
+
+ pthread_mutex_unlock(&tw.mtx);
+ e->fire(e->arg);
+ pthread_mutex_lock(&tw.mtx);
+ }
+
+ while (!list_is_empty(&deferred)) {
+ p = deferred.nxt;
+ list_del(p);
+ list_add_tail(p, &tw.levels[i][s]);
+ }
+ }
+
+ tw.prv[i] = TW_SLOT(cur);
+ }
+
+ pthread_cleanup_pop(true); /* tw.mtx */
+ pthread_cleanup_pop(true); /* tw.move_mtx */
+}
+
+/* Earliest pending deadline at level lvl, INT64_MAX if level is empty. */
+static int64_t tw_lvl_earliest(size_t lvl,
+ uint64_t now_ns)
+{
+ size_t cur = tw_slot(now_ns, lvl);
+ size_t j;
+
+ for (j = 1; j <= TW_SLOTS; ++j) {
+ size_t s = TW_SLOT(cur + j);
+
+ if (list_is_empty(&tw.levels[lvl][s]))
+ continue;
+
+ return (int64_t)(now_ns + ((uint64_t) j << tw_lvl_res(lvl)));
+ }
+
+ return INT64_MAX;
+}
+
+void tw_next_expiry(struct timespec * out)
+{
+ struct timespec now;
+ uint64_t now_ns;
+ int64_t earliest = INT64_MAX;
+ size_t i;
+
+ assert(tw.initialised);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ now_ns = TS_TO_UINT64(now);
+
+ pthread_mutex_lock(&tw.mtx);
+
+ for (i = 0; i < TW_LVLS; ++i) {
+ int64_t dl = tw_lvl_earliest(i, now_ns);
+ if (dl < earliest)
+ earliest = dl;
+ }
+
+ pthread_mutex_unlock(&tw.mtx);
+
+ if (earliest == INT64_MAX) {
+ /* Empty wheel: tv_nsec=-1 is an invalid normalised value. */
+ out->tv_sec = 0;
+ out->tv_nsec = -1;
+ } else {
+ UINT64_TO_TS((uint64_t) earliest, out);
+ }
+}