diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-05-10 19:06:21 +0200 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-05-20 08:17:07 +0200 |
| commit | 63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9 (patch) | |
| tree | 88f0827466b40d0e83da7954123d00cbb5f6c676 /src/lib/timerwheel.c | |
| parent | f33769c818cb1f01079405f543b36aa294764112 (diff) | |
| download | ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.tar.gz ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.zip | |
lib: Update FRCP implementation
The Flow and Retransmission Control Protocol (FRCP) runs end-to-end
between two peers over a flow. It provides reliability, in-order
delivery, flow control, and liveness. Note that congestion avoidance
is orthogonal to FRCP and handled in the IPCP.
A fixed 16-octet header, network byte order, is prefixed to every FRCP
packet:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| flags | hcs |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| window |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| seqno |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ackno |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| payload (variable) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
hcs is a CRC-16-CCITT-FALSE checksum over the PCI (and the stream
extension when present), verified before any flag-driven dispatch. A
single packet can simultaneously carry DATA + ACK + FC + RXM by OR-ing
flag bits. An optional CRC trailer covers the body on DATA when qs.ber
== 0, and on every SACK packet; an optional AEAD wrap (per-flow keys)
sits outermost.
Flag bits (MSB-first; bits 13..15 reserved, MUST be zero):
+------+--------+--------+----------------------------------------+
| Bit | Mask | Name | Meaning |
+------+--------+--------+----------------------------------------+
| 0 | 0x8000 | DATA | Carries caller payload |
| 1 | 0x4000 | DRF | Start of a fresh data run |
| 2 | 0x2000 | ACK | ackno field valid |
| 3 | 0x1000 | NACK | Pre-DRF nudge (seqno informational) |
| 4 | 0x0800 | FC | window field valid (rwe advertisement) |
| 5 | 0x0400 | RDVS | Rendezvous probe (window-closed) |
| 6 | 0x0200 | FFGM | First Fragment of a multi-fragment SDU |
| 7 | 0x0100 | LFGM | Last Fragment of a multi-fragment SDU |
| 8 | 0x0080 | RXM | Retransmission |
| 9 | 0x0040 | SACK | Block list follows in payload |
| 10 | 0x0020 | RTTP | RTT probe / echo (payload follows) |
| 11 | 0x0010 | KA | Keepalive |
| 12 | 0x0008 | FIN | End of stream marker |
| 13-15| -- | -- | Reserved (MUST be zero) |
+------+--------+--------+----------------------------------------+
(FFGM, LFGM) encodes the fragment role of a DATA packet (SCTP-style
B/E): 11=SOLE, 10=FIRST, 00=MID, 01=LAST. Each fragment carries its
own seqno; Retransmission recovers fragments individually, reassembly
runs at consume time. In stream mode FFGM/LFGM are unused; per-byte
position is carried by the stream extension below and end-of-stream is
signalled by FIN on a 0-byte DATA packet.
SACK payload (FRCT_ACK | FRCT_FC | FRCT_SACK):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| n_blocks | padding (2 octets) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| start[0] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| end[0] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
... n_blocks pairs total ...
Each block describes a *present* (received) range strictly above the
cumulative ACK in the PCI ackno. D-SACK (RFC 2883) is signalled
in-band as block[0] - no flag bit, no extra framing - and consumed by
the RACK reo_wnd_mult scaler (RFC 8985 sec. 7.2).
RTTP payload (FRCT_RTTP only; 24 octets):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| probe_id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| echo_id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ nonce (16 octets, echoed verbatim) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Stream PCI extension (in_order == STREAM only; 8 octets after the base
PCI on every DATA packet):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| start |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| end |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
start, end are monotonic 32-bit byte offsets; end - start equals the
on-wire payload length. Stream mode is negotiated at flow allocation;
the extension is present iff stream mode is in use, never on a
per-packet basis.
Service modes are an orthogonal (in_order, loss, ber) vector selected
at flow_alloc; the cubes above map to the axes:
+----------------+---------+------+-----+-----------------------+
| Cube | in_order| loss | ber | Engaged |
+----------------+---------+------+-----+-----------------------+
| qos_raw | 0 | 1 | 1 | Raw passthrough |
| qos_raw_safe | 0 | 1 | 0 | Raw + CRC trailer |
| qos_rt | 1 | 1 | 1 | FRCP, no FRTX, no CRC |
| qos_rt_safe | 1 | 1 | 0 | FRCP, no FRTX, CRC |
| qos_msg | 1 | 0 | 0 | FRCP + FRTX |
| qos_stream | 2 | 0 | 0 | FRCP + FRTX, stream |
+----------------+---------+------+-----+-----------------------+
in_order=0 sends raw datagrams with no PCI (UDP-equivalent);
in_order=1 engages FRCP with SDU framing; in_order=2 (stream) requires
loss=0 and is rejected otherwise. loss=0 engages the FRTX retransmit
machinery. ber=0 appends the CRC-32 trailer; QOS_DISABLE_CRC at build
time forces ber=1 for development. Encryption is a separate per-flow
attribute layered as an AEAD wrap outside the FRCP packet.
Heritage: delta-t (Watson 1981) supplies timer-based connection
management - no SYN/FIN handshake, the DRF marker, the t_mpl / t_a /
t_r timers. RINA (Day 2008) supplies the unified flow_alloc(name, qos,
...) primitive and the orthogonal QoS-cube axes. Loss detection
follows TCP/QUIC practice (RFCs 2018, 2883, 6582, 6298, 8985); RTT
probing is nonce-authenticated like QUIC PATH_CHALLENGE.
Adds oftp, a minimal file-transfer tool over an FRCP stream flow. The
client reads from stdin or --in FILE and writes through a
flow_alloc(qos_stream); the server (--listen) calls flow_accept and
writes to stdout or --out FILE. Both sides compute a CRC-64/NVMe over
the bytes they handle and print the result. The server rejects flows
whose negotiated qs.in_order != STREAM.
Two FRCP knobs are exposed via env vars on either side:
OFTP_FRCT_RTO_MIN fccntl FRCTSRTOMIN (ns)
OFTP_FRCT_STREAM_RING_SZ fccntl FRCTSRRINGSZ (octets)
The ocbr_client gains an OCBR_QOS env var to pick the cube the client
uses for flow_alloc; recognised values are raw, safe, rt, rt_safe,
msg, stream. Unknown values fall back to raw with a warning on
stderr. Without the env set behaviour is unchanged.
Removes the deprecated lib/timerwheel.c
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/timerwheel.c')
| -rw-r--r-- | src/lib/timerwheel.c | 414 |
1 files changed, 0 insertions, 414 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c deleted file mode 100644 index d0f5c05c..00000000 --- a/src/lib/timerwheel.c +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2026 - * - * Timerwheel - * - * Dimitri Staessens <dimitri@ouroboros.rocks> - * Sander Vrijders <sander@ouroboros.rocks> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include <ouroboros/list.h> - -/* Overflow limits range to about 6 hours. */ -#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) -#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) -#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES) - -struct rxm { - struct list_head next; - uint32_t seqno; -#ifndef RXM_BUFFER_ON_HEAP - struct ssm_pk_buff * spb; -#endif - struct frct_pci * pkt; - size_t len; - time_t t0; /* Time when original was sent (us). */ - struct frcti * frcti; - int fd; - int flow_id; /* Prevent rtx when fd reused. */ -}; - -struct ack { - struct list_head next; - struct frcti * frcti; - int fd; - int flow_id; -}; - -struct { - /* - * At a 1 ms min resolution, every level bumps the - * resolution by a factor of 16. - */ - struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS]; - - struct list_head acks[ACKQ_SLOTS]; - bool map[ACKQ_SLOTS][PROC_MAX_FLOWS]; - - size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */ - size_t prv_ack; /* Last processed ack slot. */ - pthread_mutex_t lock; -} rw; - -static void timerwheel_fini(void) -{ - size_t i; - size_t j; - struct list_head * p; - struct list_head * h; - - pthread_mutex_lock(&rw.lock); - - for (i = 0; i < RXMQ_LVLS; ++i) { - for (j = 0; j < RXMQ_SLOTS; j++) { - list_for_each_safe(p, h, &rw.rxms[i][j]) { - struct rxm * rxm; - rxm = list_entry(p, struct rxm, next); - list_del(&rxm->next); -#ifdef RXM_BUFFER_ON_HEAP - free(rxm->pkt); -#else - ssm_pk_buff_ack(rxm->spb); - ipcp_spb_release(rxm->spb); -#endif - free(rxm); - } - } - } - - for (i = 0; i < ACKQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw.acks[i]) { - struct ack * a = list_entry(p, struct ack, next); - list_del(&a->next); - free(a); - } - } - - pthread_mutex_unlock(&rw.lock); - - pthread_mutex_destroy(&rw.lock); -} - -static int timerwheel_init(void) -{ - struct timespec now; - size_t i; - size_t j; - - if (pthread_mutex_init(&rw.lock, NULL)) - return -1; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - for (i = 0; i < RXMQ_LVLS; ++i) { - rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1); - rw.prv_rxm[i] >>= (RXMQ_BUMP * i); - rw.prv_rxm[i] &= (RXMQ_SLOTS - 1); - for (j = 0; j < RXMQ_SLOTS; ++j) - list_head_init(&rw.rxms[i][j]); - } - - rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1); - for (i = 0; i < ACKQ_SLOTS; ++i) - list_head_init(&rw.acks[i]); - - return 0; -} - -static void timerwheel_move(void) -{ - struct timespec now; - struct list_head * p; - struct list_head * h; - size_t rxm_slot; - size_t ack_slot; - size_t i; - size_t j; - - pthread_mutex_lock(&rw.lock); - - pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock); - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - rxm_slot = ts_to_rxm_slot(now); - - for (i = 0; i < RXMQ_LVLS; ++i) { - size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); - j = rw.prv_rxm[i]; - if (j_max_slot < j) - j_max_slot += RXMQ_SLOTS; - while (j++ < j_max_slot) { - list_for_each_safe(p, h, - &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { - struct rxm * r; - struct frct_cr * snd_cr; - struct frct_cr * rcv_cr; - size_t slot; - size_t rslot; - ssize_t idx; - struct ssm_pk_buff * spb; - struct frct_pci * pci; - struct flow * f; - uint32_t snd_lwe; - uint32_t rcv_lwe; - size_t lvl = 0; - - r = list_entry(p, struct rxm, next); - - list_del(&r->next); - - snd_cr = &r->frcti->snd_cr; - rcv_cr = &r->frcti->rcv_cr; - f = &proc.flows[r->fd]; -#ifndef RXM_BUFFER_ON_HEAP - ssm_pk_buff_ack(r->spb); -#endif - if (f->frcti == NULL - || f->info.id != r->flow_id) - goto cleanup; - - pthread_rwlock_rdlock(&r->frcti->lock); - - snd_lwe = snd_cr->lwe; - rcv_lwe = rcv_cr->lwe; - - pthread_rwlock_unlock(&r->frcti->lock); - - /* Has been ack'd, remove. */ - if (before(r->seqno, snd_lwe)) - goto cleanup; - - /* Check for r-timer expiry. */ - if (ts_to_ns(now) - r->t0 > r->frcti->r) - goto flow_down; - - pthread_rwlock_wrlock(&r->frcti->lock); - - if (r->seqno == r->frcti->rttseq) { - r->frcti->rto += - r->frcti->rto >> RTO_DIV; - r->frcti->probe = false; - } -#ifdef PROC_FLOW_STATS - r->frcti->n_rtx++; -#endif - rslot = r->frcti->rto >> RXMQ_RES; - - pthread_rwlock_unlock(&r->frcti->lock); - - /* Schedule at least in the next time slot. */ - slot = ts_to_ns(now) >> RXMQ_RES; - - while (rslot >= RXMQ_SLOTS) { - ++lvl; - rslot >>= RXMQ_BUMP; - slot >>= RXMQ_BUMP; - } - - if (lvl >= RXMQ_LVLS) /* Can't reschedule */ - goto flow_down; - - rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1); -#ifdef RXM_BLOCKING - if (ipcp_spb_reserve(&spb, r->len) < 0) -#else - if (ssm_pool_alloc(proc.pool, r->len, NULL, - &spb) < 0) -#endif - goto reschedule; /* rdrbuff full */ - - pci = (struct frct_pci *) ssm_pk_buff_head(spb); - memcpy(pci, r->pkt, r->len); -#ifndef RXM_BUFFER_ON_HEAP - ipcp_spb_release(r->spb); - r->spb = spb; - r->pkt = pci; - ssm_pk_buff_wait_ack(spb); -#endif - idx = ssm_pk_buff_get_off(spb); - - /* Retransmit the copy. */ - pci->ackno = hton32(rcv_lwe); -#ifdef RXM_BLOCKING - if (ssm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) -#else - if (ssm_rbuff_write(f->tx_rb, idx) < 0) -#endif - goto flow_down; - ssm_flow_set_notify(f->set, f->info.id, - FLOW_PKT); - reschedule: - list_add(&r->next, &rw.rxms[lvl][rslot]); - continue; - - flow_down: - ssm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - ssm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - cleanup: -#ifdef RXM_BUFFER_ON_HEAP - free(r->pkt); -#else - ipcp_spb_release(r->spb); -#endif - free(r); - } - } - rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1); - /* Move up a level in the wheel. */ - rxm_slot >>= RXMQ_BUMP; - } - - ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; - - j = rw.prv_ack; - - if (ack_slot < j) - ack_slot += ACKQ_SLOTS; - - while (j++ < ack_slot) { - list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) { - struct ack * a; - struct flow * f; - - a = list_entry(p, struct ack, next); - - list_del(&a->next); - - f = &proc.flows[a->fd]; - - rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; - - if (f->info.id == a->flow_id && f->frcti != NULL) - send_frct_pkt(a->frcti); - - free(a); - } - } - - rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1); - - pthread_cleanup_pop(true); -} - -static int timerwheel_rxm(struct frcti * frcti, - uint32_t seqno, - struct ssm_pk_buff * spb) -{ - struct timespec now; - struct rxm * r; - size_t slot; - size_t lvl = 0; - time_t rto_slot; - - r = malloc(sizeof(*r)); - if (r == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - r->t0 = ts_to_ns(now); - r->seqno = seqno; - r->frcti = frcti; - r->len = ssm_pk_buff_len(spb); -#ifdef RXM_BUFFER_ON_HEAP - r->pkt = malloc(r->len); - if (r->pkt == NULL) { - free(r); - return -ENOMEM; - } - memcpy(r->pkt, ssm_pk_buff_head(spb), r->len); -#else - r->spb = spb; - r->pkt = (struct frct_pci *) ssm_pk_buff_head(spb); -#endif - pthread_rwlock_rdlock(&r->frcti->lock); - - rto_slot = frcti->rto >> RXMQ_RES; - slot = r->t0 >> RXMQ_RES; - - r->fd = frcti->fd; - r->flow_id = proc.flows[r->fd].info.id; - - pthread_rwlock_unlock(&r->frcti->lock); - - while (rto_slot >= RXMQ_SLOTS) { - ++lvl; - rto_slot >>= RXMQ_BUMP; - slot >>= RXMQ_BUMP; - } - - if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ -#ifdef RXM_BUFFER_ON_HEAP - free(r->pkt); -#endif - free(r); - return -EPERM; - } - - slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1); - - pthread_mutex_lock(&rw.lock); - - list_add_tail(&r->next, &rw.rxms[lvl][slot]); -#ifndef RXM_BUFFER_ON_HEAP - ssm_pk_buff_wait_ack(spb); -#endif - pthread_mutex_unlock(&rw.lock); - - return 0; -} - -static int timerwheel_delayed_ack(int fd, - struct frcti * frcti) -{ - struct timespec now; - struct ack * a; - size_t slot; - - a = malloc(sizeof(*a)); - if (a == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_rdlock(&frcti->lock); - - slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1) - & (ACKQ_SLOTS - 1); - - pthread_rwlock_unlock(&frcti->lock); - - a->fd = fd; - a->frcti = frcti; - a->flow_id = proc.flows[fd].info.id; - - pthread_mutex_lock(&rw.lock); - - if (rw.map[slot][fd]) { - pthread_mutex_unlock(&rw.lock); - free(a); - return 0; - } - - rw.map[slot][fd] = true; - - list_add_tail(&a->next, &rw.acks[slot]); - - pthread_mutex_unlock(&rw.lock); - - return 0; -} |
