diff options
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r-- | src/lib/frct.c | 869 |
1 files changed, 705 insertions, 164 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index e4b858d0..c6fef35c 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2020 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow and Retransmission Control * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,47 +20,62 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -/* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 0 /* ms */ -#define DELT_R 2000 /* ms */ +#include <ouroboros/endian.h> -#define RQ_SIZE 64 +#define DELT_RDV (100 * MILLION) /* ns */ +#define MAX_RDV (1 * BILLION) /* ns */ -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - -#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define FRCT "frct" +#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define FRCT_NAME_STRLEN 32 struct frct_cr { - uint32_t lwe; - uint32_t rwe; + uint32_t lwe; /* Left window edge */ + uint32_t rwe; /* Right window edge */ - uint8_t cflags; - uint32_t seqno; + uint8_t cflags; + uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ - time_t act; /* s */ - time_t inact; /* s */ + struct timespec act; /* Last seen activity */ + time_t inact; /* Inactivity (s) */ }; struct frcti { - int fd; - - time_t mpl; - time_t a; - time_t r; + int fd; + + time_t mpl; + time_t a; + time_t r; + time_t rdv; + + time_t srtt; /* Smoothed rtt */ + time_t mdev; /* Deviation */ + time_t rto; /* Retransmission timeout */ + uint32_t rttseq; + struct timespec t_probe; /* Probe time */ + bool probe; /* Probe active */ +#ifdef PROC_FLOW_STATS + size_t n_rtx; /* Number of rxm packets */ + size_t n_prb; /* Number of rtt probes */ + size_t n_rtt; /* Number of estimates */ + size_t n_dup; /* Duplicates received */ + size_t n_dak; /* Delayed ACKs received */ + size_t n_rdv; /* Number of rdv packets */ + size_t n_out; /* Packets out of window */ + size_t n_rqo; /* Packets out of rqueue */ +#endif + struct frct_cr snd_cr; + struct frct_cr rcv_cr; - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ - struct frct_cr snd_cr; - struct frct_cr rcv_cr; + ssize_t rq[RQ_SIZE]; + pthread_rwlock_t lock; - ssize_t rq[RQ_SIZE]; - pthread_rwlock_t lock; + bool open; /* Window open/closed */ + struct timespec t_wnd; /* Window closed time */ + struct timespec t_rdvs; /* Last rendez-vous sent */ + pthread_cond_t cond; + pthread_mutex_t mtx; }; enum frct_flags { @@ -68,28 +83,265 @@ enum frct_flags { FRCT_DRF = 0x02, /* Data run flag */ FRCT_ACK = 0x04, /* ACK field valid */ FRCT_FC = 0x08, /* FC window valid */ - FRCT_RDVZ = 0x10, /* Rendez-vous */ + FRCT_RDVS = 0x10, /* Rendez-vous */ FRCT_FFGM = 0x20, /* First Fragment */ FRCT_MFGM = 0x40, /* More fragments */ }; struct frct_pci { - uint16_t flags; + uint8_t flags; + uint8_t pad; /* 24 bit window! */ uint16_t window; uint32_t seqno; uint32_t ackno; } __attribute__((packed)); -#include <rxmwheel.c> +#ifdef PROC_FLOW_STATS -static struct frcti * frcti_create(int fd) +static int frct_rib_read(const char * path, + char * buf, + size_t len) { - struct frcti * frcti; - time_t delta_t; - ssize_t idx; struct timespec now; + char * entry; + struct flow * flow; + struct frcti * frcti; + int fd; + + (void) len; + + entry = strstr(path, RIB_SEPARATOR); + assert(entry); + *entry = '\0'; + + fd = atoi(path); + + flow = &ai.flows[fd]; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&ai.lock); + + frcti = flow->frcti; + + pthread_rwlock_rdlock(&frcti->lock); + + sprintf(buf, + "Maximum packet lifetime (ns): %20ld\n" + "Max time to Ack (ns): %20ld\n" + "Max time to Retransmit (ns): %20ld\n" + "Smoothed rtt (ns): %20ld\n" + "RTT standard deviation (ns): %20ld\n" + "Retransmit timeout RTO (ns): %20ld\n" + "Sender left window edge: %20u\n" + "Sender right window edge: %20u\n" + "Sender inactive (ns): %20ld\n" + "Sender current sequence number: %20u\n" + "Receiver left window edge: %20u\n" + "Receiver right window edge: %20u\n" + "Receiver inactive (ns): %20ld\n" + "Receiver last ack: %20u\n" + "Number of pkt retransmissions: %20zu\n" + "Number of rtt probes: %20zu\n" + "Number of rtt estimates: %20zu\n" + "Number of duplicates received: %20zu\n" + "Number of delayed acks received: %20zu\n" + "Number of rendez-vous sent: %20zu\n" + "Number of packets out of window: %20zu\n" + "Number of packets out of rqueue: %20zu\n", + frcti->mpl, + frcti->a, + frcti->r, + frcti->srtt, + frcti->mdev, + frcti->rto, + frcti->snd_cr.lwe, + frcti->snd_cr.rwe, + ts_diff_ns(&frcti->snd_cr.act, &now), + frcti->snd_cr.seqno, + frcti->rcv_cr.lwe, + frcti->rcv_cr.rwe, + ts_diff_ns(&frcti->rcv_cr.act, &now), + frcti->rcv_cr.seqno, + frcti->n_rtx, + frcti->n_prb, + frcti->n_rtt, + frcti->n_dup, + frcti->n_dak, + frcti->n_rdv, + frcti->n_out, + frcti->n_rqo); + + pthread_rwlock_unlock(&flow->frcti->lock); + + pthread_rwlock_unlock(&ai.lock); + + return strlen(buf); +} + +static int frct_rib_readdir(char *** buf) +{ + *buf = malloc(sizeof(**buf)); + if (*buf == NULL) + goto fail_malloc; + + (*buf)[0] = strdup("frct"); + if ((*buf)[0] == NULL) + goto fail_strdup; + + return 1; + + fail_strdup: + free(*buf); + fail_malloc: + return -ENOMEM; +} + +static int frct_rib_getattr(const char * path, + struct rib_attr * attr) +{ + (void) path; + (void) attr; + + attr->size = 1189; + attr->mtime = 0; + + return 0; +} + + +static struct rib_ops r_ops = { + .read = frct_rib_read, + .readdir = frct_rib_readdir, + .getattr = frct_rib_getattr +}; + +#endif /* PROC_FLOW_STATS */ + +static bool before(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq2 - seq1) < 0; +} + +static void __send_frct_pkt(int fd, + uint8_t flags, + uint32_t ackno, + uint32_t rwe) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct flow * f; + + /* Raw calls needed to bypass frcti. */ +#ifdef RXM_BLOCKING + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); +#else + idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); +#endif + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + *((uint32_t *) pci) = hton32(rwe); + + pci->flags = flags; + pci->ackno = hton32(ackno); + + f = &ai.flows[fd]; + + if (crypt_encrypt(&f->crypt, sdb) < 0) + goto fail; + +#ifdef RXM_BLOCKING + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) +#else + if (shm_rbuff_write(f->tx_rb, idx)) +#endif + goto fail; + + shm_flow_set_notify(f->set, f->info.id, FLOW_PKT); + + return; + + fail: + ipcp_sdb_release(sdb); + return; +} + +static void send_frct_pkt(struct frcti * frcti) +{ + struct timespec now; + time_t diff; + uint32_t ackno; + uint32_t rwe; + int fd; + + assert(frcti); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&frcti->lock); + + if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + fd = frcti->fd; + ackno = frcti->rcv_cr.lwe; + rwe = frcti->rcv_cr.rwe; + + diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + if (diff > frcti->a) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + diff = ts_diff_ns(&frcti->snd_cr.act, &now); + if (diff < TICTIME) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); +} + +static void __send_rdv(int fd) +{ + __send_frct_pkt(fd, FRCT_RDVS, 0, 0); +} + +static struct frcti * frcti_create(int fd, + time_t a, + time_t r, + time_t mpl) +{ + struct frcti * frcti; + ssize_t idx; + struct timespec now; + pthread_condattr_t cattr; +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; +#endif + mpl *= BILLION; + a *= BILLION; + r *= BILLION; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -100,36 +352,79 @@ static struct frcti * frcti_create(int fd) if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; + if (pthread_mutex_init(&frcti->mtx, NULL)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&frcti->cond, &cattr)) + goto fail_cond; + +#ifdef PROC_FLOW_STATS + sprintf(frctstr, "%d", fd); + if (rib_reg(frctstr, &r_ops)) + goto fail_rib_reg; +#endif + pthread_condattr_destroy(&cattr); + for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; - clock_gettime(CLOCK_REALTIME_COARSE, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->mpl = DELT_MPL; - frcti->a = DELT_A; - frcti->r = DELT_R; + frcti->mpl = mpl; + frcti->a = a; + frcti->r = r; + frcti->rdv = DELT_RDV; frcti->fd = fd; - delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; - - frcti->snd_cr.inact = 3 * delta_t; - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - /* rtt estimator. rto is currently srtt + 2 * mdev */ - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ - frcti->rttseq = 0; - frcti->probe = false; - if (ai.flows[fd].qs.loss == 0) { - frcti->snd_cr.cflags |= FRCTFRTX; + frcti->rttseq = 0; + frcti->probe = false; + + frcti->srtt = 0; /* Updated on first ACK */ + frcti->mdev = 10 * MILLION; /* Updated on first ACK */ + frcti->rto = BILLION; /* Initial rxm will be after 1 s */ +#ifdef PROC_FLOW_STATS + frcti->n_rtx = 0; + frcti->n_prb = 0; + frcti->n_rtt = 0; + frcti->n_dup = 0; + frcti->n_dak = 0; + frcti->n_rdv = 0; + frcti->n_out = 0; + frcti->n_rqo = 0; +#endif + if (ai.flows[fd].info.qs.loss == 0) { + frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; } - frcti->rcv_cr.inact = 2 * delta_t; - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->snd_cr.cflags |= FRCTFRESCNTL; + + frcti->snd_cr.rwe = START_WINDOW; + + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ + frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); + + frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */ + frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; +#ifdef PROC_FLOW_STATS + fail_rib_reg: + pthread_cond_destroy(&frcti->cond); +#endif + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&frcti->mtx); + fail_mutex: + pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -138,23 +433,23 @@ static struct frcti * frcti_create(int fd) static void frcti_destroy(struct frcti * frcti) { - /* - * FIXME: In case of reliable transmission we should - * make sure everything we sent is acked. - */ - - rxmwheel_clear(frcti->fd); - +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; + sprintf(frctstr, "%d", frcti->fd); + rib_unreg(frctstr); +#endif + pthread_cond_destroy(&frcti->cond); + pthread_mutex_destroy(&frcti->mtx); pthread_rwlock_destroy(&frcti->lock); free(frcti); } -static uint16_t frcti_getconf(struct frcti * frcti) +static uint16_t frcti_getflags(struct frcti * frcti) { uint16_t ret; - assert (frcti); + assert(frcti); pthread_rwlock_rdlock(&frcti->lock); @@ -165,15 +460,148 @@ static uint16_t frcti_getconf(struct frcti * frcti) return ret; } -#define frcti_queued_pdu(frcti) \ - (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) +static void frcti_setflags(struct frcti * frcti, + uint16_t flags) +{ + flags |= FRCTFRTX; /* Should not be set by command */ + + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); -#define frcti_snd(frcti, sdb) \ + frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */ + + frcti->snd_cr.cflags &= flags; + + pthread_rwlock_unlock(&frcti->lock); +} + +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) + +#define frcti_snd(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) -#define frcti_rcv(frcti, sdb) \ +#define frcti_rcv(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) +#define frcti_dealloc(frcti) \ + (frcti == NULL ? 0 : __frcti_dealloc(frcti)) + +#define frcti_is_window_open(frcti) \ + (frcti == NULL ? true : __frcti_is_window_open(frcti)) + +#define frcti_window_wait(frcti, abstime) \ + (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime)) + + +static bool __frcti_is_window_open(struct frcti * frcti) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + bool ret = true; + + pthread_rwlock_rdlock(&frcti->lock); + + if (snd_cr->cflags & FRCTFRESCNTL) + ret = before(snd_cr->seqno, snd_cr->rwe); + + if (!ret) { + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&frcti->mtx); + if (frcti->open) { + frcti->open = false; + frcti->t_wnd = now; + frcti->t_rdvs = now; + } else { + time_t diff; + diff = ts_diff_ns(&frcti->t_wnd, &now); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_unlock(&frcti->lock); + return false; + } + + diff = ts_diff_ns(&frcti->t_rdvs, &now); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); +#ifdef PROC_FLOW_STATS + frcti->n_rdv++; +#endif + + } + } + + pthread_mutex_unlock(&frcti->mtx); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + +static int __frcti_window_wait(struct frcti * frcti, + struct timespec * abstime) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + int ret = 0; + + pthread_rwlock_rdlock(&frcti->lock); + + if (!(snd_cr->cflags & FRCTFRESCNTL)) { + pthread_rwlock_unlock(&frcti->lock); + return 0; + } + + while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { + struct timespec now; + pthread_rwlock_unlock(&frcti->lock); + pthread_mutex_lock(&frcti->mtx); + + if (frcti->open) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + + frcti->t_wnd = now; + frcti->t_rdvs = now; + frcti->open = false; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx); + + ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime); + + pthread_cleanup_pop(false); + + if (ret == -ETIMEDOUT) { + time_t diff; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->t_wnd, &now); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + return -ECONNRESET; /* write fails! */ + } + + diff = ts_diff_ns(&frcti->t_rdvs, &now); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); + } + } + + pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_rdlock(&frcti->lock); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + static ssize_t __frcti_queued_pdu(struct frcti * frcti) { ssize_t idx; @@ -184,10 +612,12 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); - pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1); + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + idx = frcti->rq[pos]; if (idx != -1) { - ++frcti->rcv_cr.seqno; + ++frcti->rcv_cr.lwe; + ++frcti->rcv_cr.rwe; frcti->rq[pos] = -1; } @@ -196,27 +626,60 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) return idx; } -static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) +static ssize_t __frcti_pdu_ready(struct frcti * frcti) { - struct frct_pci * pci; + ssize_t idx; + size_t pos; - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); - if (pci != NULL) - memset(pci, 0, sizeof(*pci)); + assert(frcti); - return pci; -} + /* See if we already have the next PDU. */ + pthread_rwlock_rdlock(&frcti->lock); -static bool before(uint32_t seq1, - uint32_t seq2) -{ - return (int32_t)(seq1 - seq2) < 0; + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + idx = frcti->rq[pos]; + + pthread_rwlock_unlock(&frcti->lock); + + return idx; } -static bool after(uint32_t seq1, - uint32_t seq2) +#include <timerwheel.c> + +/* + * Send a final ACK for everything that has not been ACK'd. + * If the flow should be kept active for retransmission, + * the returned time will be negative. + */ +static time_t __frcti_dealloc(struct frcti * frcti) { - return (int32_t)(seq2 - seq1) < 0; + struct timespec now; + time_t wait; + int ackno; + int fd = -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&frcti->lock); + + ackno = frcti->rcv_cr.lwe; + if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) + fd = frcti->fd; + + wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, + frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); + wait = MAX(wait, 0); + + if (frcti->snd_cr.cflags & FRCTFLINGER + && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + wait = -wait; + + pthread_rwlock_unlock(&frcti->lock); + + if (fd != -1) + __send_frct_pkt(fd, FRCT_ACK, ackno, 0); + + return wait; } static int __frcti_snd(struct frcti * frcti, @@ -226,22 +689,29 @@ static int __frcti_snd(struct frcti * frcti, struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + uint32_t seqno; + bool rtx; assert(frcti); + assert(shm_du_buff_len(sdb) != 0); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - rxmwheel_move(); + timerwheel_move(); - pci = frcti_alloc_head(sdb); + pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) - return -1; + return -ENOMEM; + + memset(pci, 0, sizeof(*pci)); - clock_gettime(CLOCK_REALTIME, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&frcti->lock); + rtx = snd_cr->cflags & FRCTFRTX; + pci->flags |= FRCT_DATA; /* Set DRF if there are no unacknowledged packets. */ @@ -249,145 +719,216 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_DRF; /* Choose a new sequence number if sender inactivity expired. */ - if (now.tv_sec - snd_cr->act > snd_cr->inact) { + if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); -#ifdef CONFIG_OUROBOROS_DEBUG - snd_cr->seqno = 0; -#else random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); -#endif - frcti->snd_cr.lwe = snd_cr->seqno - 1; + snd_cr->lwe = snd_cr->seqno; + snd_cr->rwe = snd_cr->lwe + START_WINDOW; + } + + seqno = snd_cr->seqno; + pci->seqno = hton32(seqno); + + if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) { + pci->flags |= FRCT_FC; + *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF); } - pci->seqno = hton32(snd_cr->seqno); - if (!(snd_cr->cflags & FRCTFRTX)) { + if (!rtx) { snd_cr->lwe++; } else { if (!frcti->probe) { frcti->rttseq = snd_cr->seqno; frcti->t_probe = now; frcti->probe = true; +#ifdef PROC_FLOW_STATS + frcti->n_prb++; +#endif } - - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (rcv_cr->lwe <= rcv_cr->seqno) { - pci->flags |= FRCT_ACK; - pci->ackno = hton32(rcv_cr->seqno); - rcv_cr->lwe = rcv_cr->seqno; - } + if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) { + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->lwe); + rcv_cr->seqno = rcv_cr->lwe; } } snd_cr->seqno++; - snd_cr->act = now.tv_sec; + snd_cr->act = now; pthread_rwlock_unlock(&frcti->lock); + if (rtx) + timerwheel_rxm(frcti, seqno, sdb); + return 0; } static void rtt_estimator(struct frcti * frcti, - time_t mrtt_us) + time_t mrtt) { - time_t srtt = frcti->srtt_us; - time_t mdev = frcti->mdev_us; - - if (srtt != 0) { - srtt -= (srtt >> 3); - srtt += mrtt_us >> 3; /* rtt = 7/8 rtt + 1/8 new */ - mdev -= (mdev >> 2); - mdev += ABS(srtt - mrtt_us) >> 2; + time_t srtt = frcti->srtt; + time_t rttvar = frcti->mdev; + + if (srtt == 0) { /* first measurement */ + srtt = mrtt; + rttvar = mrtt >> 1; } else { - srtt = mrtt_us << 3; /* take the measured time to be rtt */ - mdev = mrtt_us >> 1; /* take half mrtt_us as deviation */ + time_t delta = mrtt - srtt; + srtt += (delta >> 3); + delta = (ABS(delta) - rttvar) >> 2; +#ifdef FRCT_LINUX_RTT_ESTIMATOR + if (delta < 0) + delta >>= 3; +#endif + rttvar += delta; } - - frcti->srtt_us = MAX(1U, srtt); - frcti->mdev_us = MAX(1U, mdev); +#ifdef PROC_FLOW_STATS + frcti->n_rtt++; +#endif + frcti->srtt = MAX(1000L, srtt); + frcti->mdev = MAX(100L, rttvar); + frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL)); } -/* Returns 0 when idx contains a packet for the application. */ -static int __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) +/* Always queues the next application packet on the RQ. */ +static void __frcti_rcv(struct frcti * frcti, + struct shm_du_buff * sdb) { ssize_t idx; + size_t pos; struct frct_pci * pci; struct timespec now; - struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + struct frct_cr * snd_cr; uint32_t seqno; - int ret = 0; + uint32_t ackno; + uint32_t rwe; + int fd = -1; assert(frcti); rcv_cr = &frcti->rcv_cr; snd_cr = &frcti->snd_cr; - pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); - - clock_gettime(CLOCK_REALTIME, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_rwlock_wrlock(&frcti->lock); + pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); idx = shm_du_buff_get_idx(sdb); - seqno = ntoh32(pci->seqno); + pos = seqno & (RQ_SIZE - 1); - /* Check if receiver inactivity is true. */ - if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { - /* Inactive receiver, check for DRF. */ - if (pci->flags & FRCT_DRF) /* New run. */ + pthread_rwlock_wrlock(&frcti->lock); + + if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { + if (pci->flags & FRCT_DRF) { /* New run. */ + rcv_cr->lwe = seqno; + rcv_cr->rwe = seqno + RQ_SIZE; rcv_cr->seqno = seqno; - else + } else if (pci->flags & FRCT_DATA) { goto drop_packet; + } } - if (seqno == rcv_cr->seqno) { - ++rcv_cr->seqno; - } else { /* Out of order. */ - if (before(seqno, rcv_cr->seqno)) - goto drop_packet; + rcv_cr->act = now; - if (rcv_cr->cflags & FRCTFRTX) { - size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */ - || frcti->rq[pos] != -1) /* Duplicate in rq. */ - goto drop_packet; - /* Queue. */ - frcti->rq[pos] = idx; - ret = -EAGAIN; - } else { - rcv_cr->seqno = seqno + 1; - } + /* For now, just send an immediate window update. */ + if (pci->flags & FRCT_RDVS) { + fd = frcti->fd; + rwe = rcv_cr->rwe; + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_FC, 0, rwe); + + shm_rdrbuff_remove(ai.rdrb, idx); + return; } - if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) { - uint32_t ackno = ntoh32(pci->ackno); - /* Check for duplicate (old) acks. */ - if ((int32_t)(ackno - snd_cr->lwe) >= 0) - snd_cr->lwe = ackno; + if (pci->flags & FRCT_ACK) { + ackno = ntoh32(pci->ackno); + if (after(ackno, frcti->snd_cr.lwe)) + frcti->snd_cr.lwe = ackno; + if (frcti->probe && after(ackno, frcti->rttseq)) { - rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now)); +#ifdef PROC_FLOW_STATS + if (!(pci->flags & FRCT_DATA)) + frcti->n_dak++; +#endif + rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now)); frcti->probe = false; } } - rcv_cr->act = now.tv_sec; + if (pci->flags & FRCT_FC) { + uint32_t rwe; - pthread_rwlock_unlock(&frcti->lock); + rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); + rwe |= snd_cr->rwe & 0xFF000000; + + /* Rollover for 24 bit */ + if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF) + rwe += 0x01000000; + + snd_cr->rwe = rwe; + + pthread_mutex_lock(&frcti->mtx); + if (!frcti->open) { + frcti->open = true; + pthread_cond_broadcast(&frcti->cond); + } + pthread_mutex_unlock(&frcti->mtx); + } if (!(pci->flags & FRCT_DATA)) - shm_rdrbuff_remove(ai.rdrb, idx); + goto drop_packet; - rxmwheel_move(); + if (before(seqno, rcv_cr->lwe)) { + rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif + goto drop_packet; + } - return ret; + if (rcv_cr->cflags & FRCTFRTX) { + + if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */ +#ifdef PROC_FLOW_STATS + frcti->n_out++; +#endif + goto drop_packet; + } + + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { +#ifdef PROC_FLOW_STATS + frcti->n_rqo++; +#endif + goto drop_packet; /* Out of rq. */ + } + if (frcti->rq[pos] != -1) { +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif + goto drop_packet; /* Duplicate in rq. */ + } + fd = frcti->fd; + } else { + rcv_cr->lwe = seqno; + } + + frcti->rq[pos] = idx; + + pthread_rwlock_unlock(&frcti->lock); + + if (fd != -1) + timerwheel_delayed_ack(fd, frcti); + + return; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - rxmwheel_move(); - return -EAGAIN; + send_frct_pkt(frcti); + return; } |