summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c286
1 files changed, 145 insertions, 141 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index a1e792af..c6fef35c 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2021
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Flow and Retransmission Control
*
@@ -20,6 +20,8 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
+#include <ouroboros/endian.h>
+
#define DELT_RDV (100 * MILLION) /* ns */
#define MAX_RDV (1 * BILLION) /* ns */
@@ -52,10 +54,20 @@ struct frcti {
uint32_t rttseq;
struct timespec t_probe; /* Probe time */
bool probe; /* Probe active */
-
+#ifdef PROC_FLOW_STATS
+ size_t n_rtx; /* Number of rxm packets */
+ size_t n_prb; /* Number of rtt probes */
+ size_t n_rtt; /* Number of estimates */
+ size_t n_dup; /* Duplicates received */
+ size_t n_dak; /* Delayed ACKs received */
+ size_t n_rdv; /* Number of rdv packets */
+ size_t n_out; /* Packets out of window */
+ size_t n_rqo; /* Packets out of rqueue */
+#endif
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
+
ssize_t rq[RQ_SIZE];
pthread_rwlock_t lock;
@@ -130,7 +142,15 @@ static int frct_rib_read(const char * path,
"Receiver left window edge: %20u\n"
"Receiver right window edge: %20u\n"
"Receiver inactive (ns): %20ld\n"
- "Receiver last ack: %20u\n",
+ "Receiver last ack: %20u\n"
+ "Number of pkt retransmissions: %20zu\n"
+ "Number of rtt probes: %20zu\n"
+ "Number of rtt estimates: %20zu\n"
+ "Number of duplicates received: %20zu\n"
+ "Number of delayed acks received: %20zu\n"
+ "Number of rendez-vous sent: %20zu\n"
+ "Number of packets out of window: %20zu\n"
+ "Number of packets out of rqueue: %20zu\n",
frcti->mpl,
frcti->a,
frcti->r,
@@ -144,7 +164,15 @@ static int frct_rib_read(const char * path,
frcti->rcv_cr.lwe,
frcti->rcv_cr.rwe,
ts_diff_ns(&frcti->rcv_cr.act, &now),
- frcti->rcv_cr.seqno);
+ frcti->rcv_cr.seqno,
+ frcti->n_rtx,
+ frcti->n_prb,
+ frcti->n_rtt,
+ frcti->n_dup,
+ frcti->n_dak,
+ frcti->n_rdv,
+ frcti->n_out,
+ frcti->n_rqo);
pthread_rwlock_unlock(&flow->frcti->lock);
@@ -156,10 +184,19 @@ static int frct_rib_read(const char * path,
static int frct_rib_readdir(char *** buf)
{
*buf = malloc(sizeof(**buf));
+ if (*buf == NULL)
+ goto fail_malloc;
(*buf)[0] = strdup("frct");
+ if ((*buf)[0] == NULL)
+ goto fail_strdup;
return 1;
+
+ fail_strdup:
+ free(*buf);
+ fail_malloc:
+ return -ENOMEM;
}
static int frct_rib_getattr(const char * path,
@@ -168,7 +205,7 @@ static int frct_rib_getattr(const char * path,
(void) path;
(void) attr;
- attr->size = 1024;
+ attr->size = 1189;
attr->mtime = 0;
return 0;
@@ -223,16 +260,24 @@ static void __send_frct_pkt(int fd,
pci->ackno = hton32(ackno);
f = &ai.flows[fd];
+
+ if (crypt_encrypt(&f->crypt, sdb) < 0)
+ goto fail;
+
#ifdef RXM_BLOCKING
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL))
#else
- if (shm_rbuff_write(f->tx_rb, idx)) {
+ if (shm_rbuff_write(f->tx_rb, idx))
#endif
- ipcp_sdb_release(sdb);
- return;
- }
+ goto fail;
+
+ shm_flow_set_notify(f->set, f->info.id, FLOW_PKT);
+
+ return;
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+ fail:
+ ipcp_sdb_release(sdb);
+ return;
}
static void send_frct_pkt(struct frcti * frcti)
@@ -245,9 +290,11 @@ static void send_frct_pkt(struct frcti * frcti)
assert(frcti);
- pthread_rwlock_rdlock(&frcti->lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&frcti->lock);
- if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) {
pthread_rwlock_unlock(&frcti->lock);
return;
}
@@ -256,65 +303,46 @@ static void send_frct_pkt(struct frcti * frcti)
ackno = frcti->rcv_cr.lwe;
rwe = frcti->rcv_cr.rwe;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
-
- pthread_rwlock_unlock(&frcti->lock);
-
- if (diff > frcti->a || diff < DELT_ACK)
+ if (diff > frcti->a) {
+ pthread_rwlock_unlock(&frcti->lock);
return;
+ }
- __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
-
- pthread_rwlock_wrlock(&frcti->lock);
+ diff = ts_diff_ns(&frcti->snd_cr.act, &now);
+ if (diff < TICTIME) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
- if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
- frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
pthread_rwlock_unlock(&frcti->lock);
+ __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
}
static void __send_rdv(int fd)
{
- struct shm_du_buff * sdb;
- struct frct_pci * pci;
- ssize_t idx;
- struct flow * f;
-
- /* Raw calls needed to bypass frcti. */
- idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
- if (idx < 0)
- return;
-
- pci = (struct frct_pci *) shm_du_buff_head(sdb);
- memset(pci, 0, sizeof(*pci));
-
- pci->flags = FRCT_RDVS;
-
- f = &ai.flows[fd];
-
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- ipcp_sdb_release(sdb);
- return;
- }
-
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+ __send_frct_pkt(fd, FRCT_RDVS, 0, 0);
}
-static struct frcti * frcti_create(int fd)
+static struct frcti * frcti_create(int fd,
+ time_t a,
+ time_t r,
+ time_t mpl)
{
struct frcti * frcti;
ssize_t idx;
struct timespec now;
- time_t mpl;
- time_t a;
- time_t r;
pthread_condattr_t cattr;
#ifdef PROC_FLOW_STATS
char frctstr[FRCT_NAME_STRLEN + 1];
#endif
+ mpl *= BILLION;
+ a *= BILLION;
+ r *= BILLION;
+
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
goto fail_malloc;
@@ -347,9 +375,9 @@ static struct frcti * frcti_create(int fd)
clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->mpl = mpl = DELT_MPL;
- frcti->a = a = DELT_A;
- frcti->r = r = DELT_R;
+ frcti->mpl = mpl;
+ frcti->a = a;
+ frcti->r = r;
frcti->rdv = DELT_RDV;
frcti->fd = fd;
@@ -358,10 +386,19 @@ static struct frcti * frcti_create(int fd)
frcti->probe = false;
frcti->srtt = 0; /* Updated on first ACK */
- frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
- frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
-
- if (ai.flows[fd].qs.loss == 0) {
+ frcti->mdev = 10 * MILLION; /* Updated on first ACK */
+ frcti->rto = BILLION; /* Initial rxm will be after 1 s */
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtx = 0;
+ frcti->n_prb = 0;
+ frcti->n_rtt = 0;
+ frcti->n_dup = 0;
+ frcti->n_dak = 0;
+ frcti->n_rdv = 0;
+ frcti->n_out = 0;
+ frcti->n_rqo = 0;
+#endif
+ if (ai.flows[fd].info.qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
}
@@ -448,9 +485,6 @@ static void frcti_setflags(struct frcti * frcti,
#define frcti_rcv(frcti, sdb) \
(frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
-#define frcti_tick(frcti) \
- (frcti == NULL ? 0 : __frcti_tick())
-
#define frcti_dealloc(frcti) \
(frcti == NULL ? 0 : __frcti_dealloc(frcti))
@@ -494,6 +528,10 @@ static bool __frcti_is_window_open(struct frcti * frcti)
if (diff > frcti->rdv) {
frcti->t_rdvs = now;
__send_rdv(frcti->fd);
+#ifdef PROC_FLOW_STATS
+ frcti->n_rdv++;
+#endif
+
}
}
@@ -520,7 +558,6 @@ static int __frcti_window_wait(struct frcti * frcti,
while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
struct timespec now;
-
pthread_rwlock_unlock(&frcti->lock);
pthread_mutex_lock(&frcti->mtx);
@@ -534,9 +571,7 @@ static int __frcti_window_wait(struct frcti * frcti,
pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx);
- ret = -pthread_cond_timedwait(&frcti->cond,
- &frcti->mtx,
- abstime);
+ ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime);
pthread_cleanup_pop(false);
@@ -658,6 +693,7 @@ static int __frcti_snd(struct frcti * frcti,
bool rtx;
assert(frcti);
+ assert(shm_du_buff_len(sdb) != 0);
snd_cr = &frcti->snd_cr;
rcv_cr = &frcti->rcv_cr;
@@ -706,9 +742,11 @@ static int __frcti_snd(struct frcti * frcti,
frcti->rttseq = snd_cr->seqno;
frcti->t_probe = now;
frcti->probe = true;
+#ifdef PROC_FLOW_STATS
+ frcti->n_prb++;
+#endif
}
-
- if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) {
+ if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
rcv_cr->seqno = rcv_cr->lwe;
@@ -738,17 +776,19 @@ static void rtt_estimator(struct frcti * frcti,
} else {
time_t delta = mrtt - srtt;
srtt += (delta >> 3);
- rttvar += (ABS(delta) - rttvar) >> 2;
+ delta = (ABS(delta) - rttvar) >> 2;
+#ifdef FRCT_LINUX_RTT_ESTIMATOR
+ if (delta < 0)
+ delta >>= 3;
+#endif
+ rttvar += delta;
}
-
- frcti->srtt = MAX(1000U, srtt);
- frcti->mdev = MAX(100U, rttvar);
- frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1));
-}
-
-static void __frcti_tick(void)
-{
- timerwheel_move();
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtt++;
+#endif
+ frcti->srtt = MAX(1000L, srtt);
+ frcti->mdev = MAX(100L, rttvar);
+ frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL));
}
/* Always queues the next application packet on the RQ. */
@@ -785,11 +825,14 @@ static void __frcti_rcv(struct frcti * frcti,
if (pci->flags & FRCT_DRF) { /* New run. */
rcv_cr->lwe = seqno;
rcv_cr->rwe = seqno + RQ_SIZE;
- } else {
+ rcv_cr->seqno = seqno;
+ } else if (pci->flags & FRCT_DATA) {
goto drop_packet;
}
}
+ rcv_cr->act = now;
+
/* For now, just send an immediate window update. */
if (pci->flags & FRCT_RDVS) {
fd = frcti->fd;
@@ -808,6 +851,10 @@ static void __frcti_rcv(struct frcti * frcti,
frcti->snd_cr.lwe = ackno;
if (frcti->probe && after(ackno, frcti->rttseq)) {
+#ifdef PROC_FLOW_STATS
+ if (!(pci->flags & FRCT_DATA))
+ frcti->n_dak++;
+#endif
rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
frcti->probe = false;
}
@@ -838,20 +885,33 @@ static void __frcti_rcv(struct frcti * frcti,
if (before(seqno, rcv_cr->lwe)) {
rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
goto drop_packet;
}
if (rcv_cr->cflags & FRCTFRTX) {
- if (!before(seqno, rcv_cr->rwe)) /* Out of window. */
+ if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_out++;
+#endif
goto drop_packet;
+ }
- if (!before(seqno, rcv_cr->lwe + RQ_SIZE))
+ if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_rqo++;
+#endif
goto drop_packet; /* Out of rq. */
-
- if (frcti->rq[pos] != -1)
+ }
+ if (frcti->rq[pos] != -1) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
goto drop_packet; /* Duplicate in rq. */
-
+ }
fd = frcti->fd;
} else {
rcv_cr->lwe = seqno;
@@ -859,72 +919,16 @@ static void __frcti_rcv(struct frcti * frcti,
frcti->rq[pos] = idx;
- rcv_cr->act = now;
-
pthread_rwlock_unlock(&frcti->lock);
if (fd != -1)
- timerwheel_ack(fd, frcti);
+ timerwheel_delayed_ack(fd, frcti);
return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
-
- send_frct_pkt(frcti);
-
shm_rdrbuff_remove(ai.rdrb, idx);
+ send_frct_pkt(frcti);
return;
}
-
-/* Filter fqueue events for non-data packets */
-int frcti_filter(struct fqueue * fq)
-{
- struct shm_du_buff * sdb;
- int fd;
- ssize_t idx;
- struct frcti * frcti;
- struct shm_rbuff * rb;
-
- while (fq->next < fq->fqsize) {
- if (fq->fqueue[fq->next + 1] != FLOW_PKT)
- return 1;
-
- pthread_rwlock_rdlock(&ai.lock);
-
- fd = ai.ports[fq->fqueue[fq->next]].fd;
- rb = ai.flows[fd].rx_rb;
- frcti = ai.flows[fd].frcti;
-
- if (frcti == NULL) {
- pthread_rwlock_unlock(&ai.lock);
- return 1;
- }
-
- if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&ai.lock);
- return 1;
- }
-
- idx = shm_rbuff_read(rb);
- if (idx < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return 0;
- }
-
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
- __frcti_rcv(frcti, sdb);
-
- if (__frcti_pdu_ready(frcti) >= 0) {
- pthread_rwlock_unlock(&ai.lock);
- return 1;
- }
-
- pthread_rwlock_unlock(&ai.lock);
-
- fq->next += 2;
- }
-
- return fq->next < fq->fqsize;
-}