summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c918
1 files changed, 728 insertions, 190 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 54f822f4..c6fef35c 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -1,10 +1,10 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2018
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Flow and Retransmission Control
*
- * Dimitri Staessens <dimitri.staessens@ugent.be>
- * Sander Vrijders <sander.vrijders@ugent.be>
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
@@ -20,135 +20,410 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-/* Default Delta-t parameters */
-#define DELT_MPL 60000 /* ms */
-#define DELT_A 0 /* ms */
-#define DELT_R 2000 /* ms */
+#include <ouroboros/endian.h>
-#define RQ_SIZE 20
+#define DELT_RDV (100 * MILLION) /* ns */
+#define MAX_RDV (1 * BILLION) /* ns */
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
-
-#define FRCT_PCILEN (sizeof(struct frct_pci))
-#define FRCT_CRCLEN (sizeof(uint32_t))
+#define FRCT "frct"
+#define FRCT_PCILEN (sizeof(struct frct_pci))
+#define FRCT_NAME_STRLEN 32
struct frct_cr {
- bool drf;
- uint32_t lwe;
- uint32_t rwe;
+ uint32_t lwe; /* Left window edge */
+ uint32_t rwe; /* Right window edge */
- uint32_t seqno;
- bool conf;
- uint8_t cflags;
+ uint8_t cflags;
+ uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */
- time_t act;
- time_t inact;
+ struct timespec act; /* Last seen activity */
+ time_t inact; /* Inactivity (s) */
};
struct frcti {
- int fd;
-
- time_t mpl;
- time_t a;
- time_t r;
-
- struct frct_cr snd_cr;
- struct frct_cr rcv_cr;
+ int fd;
+
+ time_t mpl;
+ time_t a;
+ time_t r;
+ time_t rdv;
+
+ time_t srtt; /* Smoothed rtt */
+ time_t mdev; /* Deviation */
+ time_t rto; /* Retransmission timeout */
+ uint32_t rttseq;
+ struct timespec t_probe; /* Probe time */
+ bool probe; /* Probe active */
+#ifdef PROC_FLOW_STATS
+ size_t n_rtx; /* Number of rxm packets */
+ size_t n_prb; /* Number of rtt probes */
+ size_t n_rtt; /* Number of estimates */
+ size_t n_dup; /* Duplicates received */
+ size_t n_dak; /* Delayed ACKs received */
+ size_t n_rdv; /* Number of rdv packets */
+ size_t n_out; /* Packets out of window */
+ size_t n_rqo; /* Packets out of rqueue */
+#endif
+ struct frct_cr snd_cr;
+ struct frct_cr rcv_cr;
- struct rq * rq;
- struct timespec rtt;
+ ssize_t rq[RQ_SIZE];
+ pthread_rwlock_t lock;
- pthread_rwlock_t lock;
+ bool open; /* Window open/closed */
+ struct timespec t_wnd; /* Window closed time */
+ struct timespec t_rdvs; /* Last rendez-vous sent */
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
};
-struct {
- struct timerwheel * tw;
-} frct;
-
enum frct_flags {
FRCT_DATA = 0x01, /* PDU carries data */
FRCT_DRF = 0x02, /* Data run flag */
- FRCT_ACK = 0x03, /* ACK field valid */
+ FRCT_ACK = 0x04, /* ACK field valid */
FRCT_FC = 0x08, /* FC window valid */
- FRCT_RDVZ = 0x10, /* Rendez-vous */
- FRCT_CFG = 0x20, /* Configuration */
+ FRCT_RDVS = 0x10, /* Rendez-vous */
+ FRCT_FFGM = 0x20, /* First Fragment */
FRCT_MFGM = 0x40, /* More fragments */
- FRCT_CRC = 0x80, /* CRC present */
};
struct frct_pci {
uint8_t flags;
- uint8_t cflags;
-
+ uint8_t pad; /* 24 bit window! */
uint16_t window;
uint32_t seqno;
uint32_t ackno;
} __attribute__((packed));
-static int frct_init(void)
+#ifdef PROC_FLOW_STATS
+
+static int frct_rib_read(const char * path,
+ char * buf,
+ size_t len)
+{
+ struct timespec now;
+ char * entry;
+ struct flow * flow;
+ struct frcti * frcti;
+ int fd;
+
+ (void) len;
+
+ entry = strstr(path, RIB_SEPARATOR);
+ assert(entry);
+ *entry = '\0';
+
+ fd = atoi(path);
+
+ flow = &ai.flows[fd];
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ frcti = flow->frcti;
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ sprintf(buf,
+ "Maximum packet lifetime (ns): %20ld\n"
+ "Max time to Ack (ns): %20ld\n"
+ "Max time to Retransmit (ns): %20ld\n"
+ "Smoothed rtt (ns): %20ld\n"
+ "RTT standard deviation (ns): %20ld\n"
+ "Retransmit timeout RTO (ns): %20ld\n"
+ "Sender left window edge: %20u\n"
+ "Sender right window edge: %20u\n"
+ "Sender inactive (ns): %20ld\n"
+ "Sender current sequence number: %20u\n"
+ "Receiver left window edge: %20u\n"
+ "Receiver right window edge: %20u\n"
+ "Receiver inactive (ns): %20ld\n"
+ "Receiver last ack: %20u\n"
+ "Number of pkt retransmissions: %20zu\n"
+ "Number of rtt probes: %20zu\n"
+ "Number of rtt estimates: %20zu\n"
+ "Number of duplicates received: %20zu\n"
+ "Number of delayed acks received: %20zu\n"
+ "Number of rendez-vous sent: %20zu\n"
+ "Number of packets out of window: %20zu\n"
+ "Number of packets out of rqueue: %20zu\n",
+ frcti->mpl,
+ frcti->a,
+ frcti->r,
+ frcti->srtt,
+ frcti->mdev,
+ frcti->rto,
+ frcti->snd_cr.lwe,
+ frcti->snd_cr.rwe,
+ ts_diff_ns(&frcti->snd_cr.act, &now),
+ frcti->snd_cr.seqno,
+ frcti->rcv_cr.lwe,
+ frcti->rcv_cr.rwe,
+ ts_diff_ns(&frcti->rcv_cr.act, &now),
+ frcti->rcv_cr.seqno,
+ frcti->n_rtx,
+ frcti->n_prb,
+ frcti->n_rtt,
+ frcti->n_dup,
+ frcti->n_dak,
+ frcti->n_rdv,
+ frcti->n_out,
+ frcti->n_rqo);
+
+ pthread_rwlock_unlock(&flow->frcti->lock);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ return strlen(buf);
+}
+
+static int frct_rib_readdir(char *** buf)
{
- frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
- if (frct.tw == NULL)
- return -1;
+ *buf = malloc(sizeof(**buf));
+ if (*buf == NULL)
+ goto fail_malloc;
+
+ (*buf)[0] = strdup("frct");
+ if ((*buf)[0] == NULL)
+ goto fail_strdup;
+
+ return 1;
+
+ fail_strdup:
+ free(*buf);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int frct_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ (void) path;
+ (void) attr;
+
+ attr->size = 1189;
+ attr->mtime = 0;
return 0;
}
-static void frct_fini(void)
+
+static struct rib_ops r_ops = {
+ .read = frct_rib_read,
+ .readdir = frct_rib_readdir,
+ .getattr = frct_rib_getattr
+};
+
+#endif /* PROC_FLOW_STATS */
+
+static bool before(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq1 - seq2) < 0;
+}
+
+static bool after(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq2 - seq1) < 0;
+}
+
+static void __send_frct_pkt(int fd,
+ uint8_t flags,
+ uint32_t ackno,
+ uint32_t rwe)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct flow * f;
+
+ /* Raw calls needed to bypass frcti. */
+#ifdef RXM_BLOCKING
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+#else
+ idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);
+#endif
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ *((uint32_t *) pci) = hton32(rwe);
+
+ pci->flags = flags;
+ pci->ackno = hton32(ackno);
+
+ f = &ai.flows[fd];
+
+ if (crypt_encrypt(&f->crypt, sdb) < 0)
+ goto fail;
+
+#ifdef RXM_BLOCKING
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL))
+#else
+ if (shm_rbuff_write(f->tx_rb, idx))
+#endif
+ goto fail;
+
+ shm_flow_set_notify(f->set, f->info.id, FLOW_PKT);
+
+ return;
+
+ fail:
+ ipcp_sdb_release(sdb);
+ return;
+}
+
+static void send_frct_pkt(struct frcti * frcti)
{
- assert(frct.tw);
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ uint32_t rwe;
+ int fd;
+
+ assert(frcti);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- timerwheel_destroy(frct.tw);
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ fd = frcti->fd;
+ ackno = frcti->rcv_cr.lwe;
+ rwe = frcti->rcv_cr.rwe;
+
+ diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+ if (diff > frcti->a) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ diff = ts_diff_ns(&frcti->snd_cr.act, &now);
+ if (diff < TICTIME) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
+}
+
+static void __send_rdv(int fd)
+{
+ __send_frct_pkt(fd, FRCT_RDVS, 0, 0);
}
-static struct frcti * frcti_create(int fd)
+static struct frcti * frcti_create(int fd,
+ time_t a,
+ time_t r,
+ time_t mpl)
{
- struct frcti * frcti;
- time_t delta_t;
+ struct frcti * frcti;
+ ssize_t idx;
+ struct timespec now;
+ pthread_condattr_t cattr;
+#ifdef PROC_FLOW_STATS
+ char frctstr[FRCT_NAME_STRLEN + 1];
+#endif
+ mpl *= BILLION;
+ a *= BILLION;
+ r *= BILLION;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
goto fail_malloc;
+ memset(frcti, 0, sizeof(*frcti));
+
if (pthread_rwlock_init(&frcti->lock, NULL))
goto fail_lock;
- frcti->rq = rq_create(RQ_SIZE);
- if (frcti->rq == NULL)
- goto fail_rq;
+ if (pthread_mutex_init(&frcti->mtx, NULL))
+ goto fail_mutex;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&frcti->cond, &cattr))
+ goto fail_cond;
+
+#ifdef PROC_FLOW_STATS
+ sprintf(frctstr, "%d", fd);
+ if (rib_reg(frctstr, &r_ops))
+ goto fail_rib_reg;
+#endif
+ pthread_condattr_destroy(&cattr);
+
+ for (idx = 0; idx < RQ_SIZE; ++idx)
+ frcti->rq[idx] = -1;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->mpl = DELT_MPL;
- frcti->a = DELT_A;
- frcti->r = DELT_R;
+ frcti->mpl = mpl;
+ frcti->a = a;
+ frcti->r = r;
+ frcti->rdv = DELT_RDV;
frcti->fd = fd;
- delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000;
- frcti->snd_cr.drf = true;
- frcti->snd_cr.conf = true;
-#ifdef OUROBOROS_CONFIG_DEBUG
- frcti->snd_cr.seqno = 0;
-#else
- random_buffer(&frcti->snd_cr.seqno, sizeof(frcti->snd_cr.seqno));
+ frcti->rttseq = 0;
+ frcti->probe = false;
+
+ frcti->srtt = 0; /* Updated on first ACK */
+ frcti->mdev = 10 * MILLION; /* Updated on first ACK */
+ frcti->rto = BILLION; /* Initial rxm will be after 1 s */
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtx = 0;
+ frcti->n_prb = 0;
+ frcti->n_rtt = 0;
+ frcti->n_dup = 0;
+ frcti->n_dak = 0;
+ frcti->n_rdv = 0;
+ frcti->n_out = 0;
+ frcti->n_rqo = 0;
#endif
- frcti->snd_cr.lwe = 0;
- frcti->snd_cr.rwe = 0;
- frcti->snd_cr.cflags = 0;
- frcti->snd_cr.inact = 2 * delta_t + 1;
+ if (ai.flows[fd].info.qs.loss == 0) {
+ frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
+ frcti->rcv_cr.cflags |= FRCTFRTX;
+ }
+
+ frcti->snd_cr.cflags |= FRCTFRESCNTL;
+
+ frcti->snd_cr.rwe = START_WINDOW;
- frcti->rcv_cr.drf = true;
- frcti->rcv_cr.lwe = 0;
- frcti->rcv_cr.rwe = 0;
- frcti->rcv_cr.cflags = 0;
- frcti->rcv_cr.inact = 3 * delta_t + 1;
+ frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);
+
+ frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1);
return frcti;
- fail_rq:
+#ifdef PROC_FLOW_STATS
+ fail_rib_reg:
+ pthread_cond_destroy(&frcti->cond);
+#endif
+ fail_cond:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&frcti->mtx);
+ fail_mutex:
pthread_rwlock_destroy(&frcti->lock);
fail_lock:
free(frcti);
@@ -158,73 +433,192 @@ static struct frcti * frcti_create(int fd)
static void frcti_destroy(struct frcti * frcti)
{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything is acked.
- */
-
+#ifdef PROC_FLOW_STATS
+ char frctstr[FRCT_NAME_STRLEN + 1];
+ sprintf(frctstr, "%d", frcti->fd);
+ rib_unreg(frctstr);
+#endif
+ pthread_cond_destroy(&frcti->cond);
+ pthread_mutex_destroy(&frcti->mtx);
pthread_rwlock_destroy(&frcti->lock);
- rq_destroy(frcti->rq);
free(frcti);
}
-static int frcti_setconf(struct frcti * frcti,
- uint16_t flags)
+static uint16_t frcti_getflags(struct frcti * frcti)
{
+ uint16_t ret;
+
assert(frcti);
- pthread_rwlock_wrlock(&frcti->lock);
+ pthread_rwlock_rdlock(&frcti->lock);
- if (frcti->snd_cr.cflags != flags) {
- frcti->snd_cr.cflags = flags;
- frcti->snd_cr.conf = true;
- frcti->snd_cr.drf = true;
- }
+ ret = frcti->snd_cr.cflags;
pthread_rwlock_unlock(&frcti->lock);
- return 0;
+ return ret;
}
-static uint16_t frcti_getconf(struct frcti * frcti)
+static void frcti_setflags(struct frcti * frcti,
+ uint16_t flags)
{
- uint16_t ret;
+ flags |= FRCTFRTX; /* Should not be set by command */
+
+ assert(frcti);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */
- assert (frcti);
+ frcti->snd_cr.cflags &= flags;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
+#define frcti_queued_pdu(frcti) \
+ (frcti == NULL ? idx : __frcti_queued_pdu(frcti))
+
+#define frcti_snd(frcti, sdb) \
+ (frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
+
+#define frcti_rcv(frcti, sdb) \
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+
+#define frcti_dealloc(frcti) \
+ (frcti == NULL ? 0 : __frcti_dealloc(frcti))
+
+#define frcti_is_window_open(frcti) \
+ (frcti == NULL ? true : __frcti_is_window_open(frcti))
+
+#define frcti_window_wait(frcti, abstime) \
+ (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime))
+
+
+static bool __frcti_is_window_open(struct frcti * frcti)
+{
+ struct frct_cr * snd_cr = &frcti->snd_cr;
+ bool ret = true;
pthread_rwlock_rdlock(&frcti->lock);
- ret = frcti->snd_cr.cflags;
+ if (snd_cr->cflags & FRCTFRESCNTL)
+ ret = before(snd_cr->seqno, snd_cr->rwe);
+
+ if (!ret) {
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_mutex_lock(&frcti->mtx);
+ if (frcti->open) {
+ frcti->open = false;
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ } else {
+ time_t diff;
+ diff = ts_diff_ns(&frcti->t_wnd, &now);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ pthread_rwlock_unlock(&frcti->lock);
+ return false;
+ }
+
+ diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+#ifdef PROC_FLOW_STATS
+ frcti->n_rdv++;
+#endif
+
+ }
+ }
+
+ pthread_mutex_unlock(&frcti->mtx);
+ }
pthread_rwlock_unlock(&frcti->lock);
return ret;
}
-#define frcti_queued_pdu(frcti) \
- (frcti == NULL ? -1 : __frcti_queued_pdu(frcti))
+static int __frcti_window_wait(struct frcti * frcti,
+ struct timespec * abstime)
+{
+ struct frct_cr * snd_cr = &frcti->snd_cr;
+ int ret = 0;
-#define frcti_snd(frcti, sdb) \
- (frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
+ pthread_rwlock_rdlock(&frcti->lock);
-#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+ if (!(snd_cr->cflags & FRCTFRESCNTL)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return 0;
+ }
+
+ while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
+ struct timespec now;
+ pthread_rwlock_unlock(&frcti->lock);
+ pthread_mutex_lock(&frcti->mtx);
+
+ if (frcti->open) {
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ frcti->open = false;
+ }
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx);
+
+ ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime);
+
+ pthread_cleanup_pop(false);
+
+ if (ret == -ETIMEDOUT) {
+ time_t diff;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->t_wnd, &now);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ return -ECONNRESET; /* write fails! */
+ }
+
+ diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+ }
+ }
+
+ pthread_mutex_unlock(&frcti->mtx);
+ pthread_rwlock_rdlock(&frcti->lock);
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return ret;
+}
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
- ssize_t idx = -1;
+ ssize_t idx;
+ size_t pos;
assert(frcti);
/* See if we already have the next PDU. */
pthread_rwlock_wrlock(&frcti->lock);
- if (!rq_is_empty(frcti->rq)) {
- if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) {
- ++frcti->rcv_cr.lwe;
- idx = rq_pop(frcti->rq);
- }
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+
+ idx = frcti->rq[pos];
+ if (idx != -1) {
+ ++frcti->rcv_cr.lwe;
+ ++frcti->rcv_cr.rwe;
+ frcti->rq[pos] = -1;
}
pthread_rwlock_unlock(&frcti->lock);
@@ -232,31 +626,60 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
return idx;
}
-static int frct_chk_crc(uint8_t * head,
- uint8_t * tail)
+static ssize_t __frcti_pdu_ready(struct frcti * frcti)
{
- uint32_t crc;
+ ssize_t idx;
+ size_t pos;
- mem_hash(HASH_CRC32, &crc, head, tail - head);
+ assert(frcti);
- return crc == *((uint32_t *) tail);
-}
+ /* See if we already have the next PDU. */
+ pthread_rwlock_rdlock(&frcti->lock);
-static void frct_add_crc(uint8_t * head,
- uint8_t * tail)
-{
- mem_hash(HASH_CRC32, tail, head, tail - head);
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+ idx = frcti->rq[pos];
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return idx;
}
-static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
+#include <timerwheel.c>
+
+/*
+ * Send a final ACK for everything that has not been ACK'd.
+ * If the flow should be kept active for retransmission,
+ * the returned time will be negative.
+ */
+static time_t __frcti_dealloc(struct frcti * frcti)
{
- struct frct_pci * pci = NULL;
+ struct timespec now;
+ time_t wait;
+ int ackno;
+ int fd = -1;
- pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
- if (pci != NULL)
- memset(pci, 0, sizeof(*pci));
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ ackno = frcti->rcv_cr.lwe;
+ if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno)
+ fd = frcti->fd;
+
+ wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
+ frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec);
+ wait = MAX(wait, 0);
+
+ if (frcti->snd_cr.cflags & FRCTFLINGER
+ && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
+ wait = -wait;
- return pci;
+ pthread_rwlock_unlock(&frcti->lock);
+
+ if (fd != -1)
+ __send_frct_pkt(fd, FRCT_ACK, ackno, 0);
+
+ return wait;
}
static int __frcti_snd(struct frcti * frcti,
@@ -265,132 +688,247 @@ static int __frcti_snd(struct frcti * frcti,
struct frct_pci * pci;
struct timespec now;
struct frct_cr * snd_cr;
+ struct frct_cr * rcv_cr;
+ uint32_t seqno;
+ bool rtx;
assert(frcti);
+ assert(shm_du_buff_len(sdb) != 0);
snd_cr = &frcti->snd_cr;
+ rcv_cr = &frcti->rcv_cr;
+
+ timerwheel_move();
- pci = frcti_alloc_head(sdb);
+ pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
- return -1;
+ return -ENOMEM;
+
+ memset(pci, 0, sizeof(*pci));
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&frcti->lock);
- /* Check if sender is inactive. */
- if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact)
- snd_cr->drf = true;
+ rtx = snd_cr->cflags & FRCTFRTX;
pci->flags |= FRCT_DATA;
- /* Set the DRF in the first packet of a new run of SDUs. */
- if (snd_cr->drf) {
+ /* Set DRF if there are no unacknowledged packets. */
+ if (snd_cr->seqno == snd_cr->lwe)
pci->flags |= FRCT_DRF;
- if (snd_cr->conf) {
- pci->flags |= FRCT_CFG;
- pci->cflags = snd_cr->cflags;
- }
- }
- pci->seqno = hton32(snd_cr->seqno++);
-
- if (snd_cr->cflags & FRCTFERRCHCK) {
- uint8_t * tail = shm_du_buff_tail_alloc(sdb, FRCT_CRCLEN);
- if (tail == NULL) {
- pthread_rwlock_unlock(&frcti->lock);
- return -1;
- }
+ /* Choose a new sequence number if sender inactivity expired. */
+ if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) {
+ /* There are no unacknowledged packets. */
+ assert(snd_cr->seqno == snd_cr->lwe);
+ random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
+ snd_cr->lwe = snd_cr->seqno;
+ snd_cr->rwe = snd_cr->lwe + START_WINDOW;
+ }
- frct_add_crc((uint8_t *) pci, tail);
+ seqno = snd_cr->seqno;
+ pci->seqno = hton32(seqno);
- pci->flags |= FRCT_CRC;
+ if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) {
+ pci->flags |= FRCT_FC;
+ *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF);
}
- snd_cr->act = now.tv_sec;
+ if (!rtx) {
+ snd_cr->lwe++;
+ } else {
+ if (!frcti->probe) {
+ frcti->rttseq = snd_cr->seqno;
+ frcti->t_probe = now;
+ frcti->probe = true;
+#ifdef PROC_FLOW_STATS
+ frcti->n_prb++;
+#endif
+ }
+ if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) {
+ pci->flags |= FRCT_ACK;
+ pci->ackno = hton32(rcv_cr->lwe);
+ rcv_cr->seqno = rcv_cr->lwe;
+ }
+ }
- snd_cr->drf = false;
- snd_cr->conf = false;
+ snd_cr->seqno++;
+ snd_cr->act = now;
pthread_rwlock_unlock(&frcti->lock);
+ if (rtx)
+ timerwheel_rxm(frcti, seqno, sdb);
+
return 0;
}
-/* Returns 0 when idx contains an SDU for the application. */
-static int __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+static void rtt_estimator(struct frcti * frcti,
+ time_t mrtt)
+{
+ time_t srtt = frcti->srtt;
+ time_t rttvar = frcti->mdev;
+
+ if (srtt == 0) { /* first measurement */
+ srtt = mrtt;
+ rttvar = mrtt >> 1;
+ } else {
+ time_t delta = mrtt - srtt;
+ srtt += (delta >> 3);
+ delta = (ABS(delta) - rttvar) >> 2;
+#ifdef FRCT_LINUX_RTT_ESTIMATOR
+ if (delta < 0)
+ delta >>= 3;
+#endif
+ rttvar += delta;
+ }
+#ifdef PROC_FLOW_STATS
+ frcti->n_rtt++;
+#endif
+ frcti->srtt = MAX(1000L, srtt);
+ frcti->mdev = MAX(100L, rttvar);
+ frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL));
+}
+
+/* Always queues the next application packet on the RQ. */
+static void __frcti_rcv(struct frcti * frcti,
+ struct shm_du_buff * sdb)
{
ssize_t idx;
+ size_t pos;
struct frct_pci * pci;
struct timespec now;
struct frct_cr * rcv_cr;
+ struct frct_cr * snd_cr;
uint32_t seqno;
+ uint32_t ackno;
+ uint32_t rwe;
+ int fd = -1;
assert(frcti);
rcv_cr = &frcti->rcv_cr;
+ snd_cr = &frcti->snd_cr;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ idx = shm_du_buff_get_idx(sdb);
+ seqno = ntoh32(pci->seqno);
+ pos = seqno & (RQ_SIZE - 1);
pthread_rwlock_wrlock(&frcti->lock);
- idx = shm_du_buff_get_idx(sdb);
+ if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
+ if (pci->flags & FRCT_DRF) { /* New run. */
+ rcv_cr->lwe = seqno;
+ rcv_cr->rwe = seqno + RQ_SIZE;
+ rcv_cr->seqno = seqno;
+ } else if (pci->flags & FRCT_DATA) {
+ goto drop_packet;
+ }
+ }
+
+ rcv_cr->act = now;
+
+ /* For now, just send an immediate window update. */
+ if (pci->flags & FRCT_RDVS) {
+ fd = frcti->fd;
+ rwe = rcv_cr->rwe;
+ pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_FC, 0, rwe);
- /* PDU may be corrupted. */
- if (pci->flags & FRCT_CRC) {
- uint8_t * tail = shm_du_buff_tail_release(sdb, FRCT_CRCLEN);
- if (frct_chk_crc((uint8_t *) pci, tail))
- goto fail_clean;
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return;
+ }
+
+ if (pci->flags & FRCT_ACK) {
+ ackno = ntoh32(pci->ackno);
+ if (after(ackno, frcti->snd_cr.lwe))
+ frcti->snd_cr.lwe = ackno;
+
+ if (frcti->probe && after(ackno, frcti->rttseq)) {
+#ifdef PROC_FLOW_STATS
+ if (!(pci->flags & FRCT_DATA))
+ frcti->n_dak++;
+#endif
+ rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
+ frcti->probe = false;
+ }
}
- /* Check if receiver inactivity is true. */
- if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact)
- rcv_cr->drf = true;
+ if (pci->flags & FRCT_FC) {
+ uint32_t rwe;
- /* When there is receiver inactivity and no DRF, drop the PDU. */
- if (rcv_cr->drf && !(pci->flags & FRCT_DRF))
- goto fail_clean;
+ rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
+ rwe |= snd_cr->rwe & 0xFF000000;
- seqno = ntoh32(pci->seqno);
+ /* Rollover for 24 bit */
+ if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF)
+ rwe += 0x01000000;
- /* Queue the PDU if needed. */
- if (rcv_cr->cflags & FRCTFORDERING) {
- if (seqno != frcti->rcv_cr.lwe) {
- /* NOTE: queued PDUs head/tail without PCI. */
- if (rq_push(frcti->rq, seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- goto fail;
- } else {
- ++rcv_cr->lwe;
+ snd_cr->rwe = rwe;
+
+ pthread_mutex_lock(&frcti->mtx);
+ if (!frcti->open) {
+ frcti->open = true;
+ pthread_cond_broadcast(&frcti->cond);
}
+ pthread_mutex_unlock(&frcti->mtx);
}
- /* If the DRF is set, reset the state of the connection. */
- if (pci->flags & FRCT_DRF) {
- rcv_cr->lwe = seqno;
- if (pci->flags & FRCT_CFG)
- rcv_cr->cflags = pci->cflags;
+ if (!(pci->flags & FRCT_DATA))
+ goto drop_packet;
+
+ if (before(seqno, rcv_cr->lwe)) {
+ rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
+ goto drop_packet;
}
- if (rcv_cr->drf)
- rcv_cr->drf = false;
+ if (rcv_cr->cflags & FRCTFRTX) {
- rcv_cr->act = now.tv_sec;
+ if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */
+#ifdef PROC_FLOW_STATS
+ frcti->n_out++;
+#endif
+ goto drop_packet;
+ }
- if (!(pci->flags & FRCT_DATA))
- shm_rdrbuff_remove(ai.rdrb, idx);
+ if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_rqo++;
+#endif
+ goto drop_packet; /* Out of rq. */
+ }
+ if (frcti->rq[pos] != -1) {
+#ifdef PROC_FLOW_STATS
+ frcti->n_dup++;
+#endif
+ goto drop_packet; /* Duplicate in rq. */
+ }
+ fd = frcti->fd;
+ } else {
+ rcv_cr->lwe = seqno;
+ }
+
+ frcti->rq[pos] = idx;
pthread_rwlock_unlock(&frcti->lock);
- return 0;
+ if (fd != -1)
+ timerwheel_delayed_ack(fd, frcti);
- fail_clean:
- if (!(pci->flags & FRCT_DATA))
- shm_rdrbuff_remove(ai.rdrb, idx);
- fail:
+ return;
+
+ drop_packet:
pthread_rwlock_unlock(&frcti->lock);
- return -EAGAIN;
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ send_frct_pkt(frcti);
+ return;
}