diff options
Diffstat (limited to 'src/lib/frct.c')
| -rw-r--r-- | src/lib/frct.c | 918 |
1 files changed, 728 insertions, 190 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index 54f822f4..c6fef35c 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * * Flow and Retransmission Control * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -20,135 +20,410 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -/* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 0 /* ms */ -#define DELT_R 2000 /* ms */ +#include <ouroboros/endian.h> -#define RQ_SIZE 20 +#define DELT_RDV (100 * MILLION) /* ns */ +#define MAX_RDV (1 * BILLION) /* ns */ -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - -#define FRCT_PCILEN (sizeof(struct frct_pci)) -#define FRCT_CRCLEN (sizeof(uint32_t)) +#define FRCT "frct" +#define FRCT_PCILEN (sizeof(struct frct_pci)) +#define FRCT_NAME_STRLEN 32 struct frct_cr { - bool drf; - uint32_t lwe; - uint32_t rwe; + uint32_t lwe; /* Left window edge */ + uint32_t rwe; /* Right window edge */ - uint32_t seqno; - bool conf; - uint8_t cflags; + uint8_t cflags; + uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ - time_t act; - time_t inact; + struct timespec act; /* Last seen activity */ + time_t inact; /* Inactivity (s) */ }; struct frcti { - int fd; - - time_t mpl; - time_t a; - time_t r; - - struct frct_cr snd_cr; - struct frct_cr rcv_cr; + int fd; + + time_t mpl; + time_t a; + time_t r; + time_t rdv; + + time_t srtt; /* Smoothed rtt */ + time_t mdev; /* Deviation */ + time_t rto; /* Retransmission timeout */ + uint32_t rttseq; + struct timespec t_probe; /* Probe time */ + bool probe; /* Probe active */ +#ifdef PROC_FLOW_STATS + size_t n_rtx; /* Number of rxm packets */ + size_t n_prb; /* Number of rtt probes */ + size_t n_rtt; /* Number of estimates */ + size_t n_dup; /* Duplicates received */ + size_t n_dak; /* Delayed ACKs received */ + size_t n_rdv; /* Number of rdv packets */ + size_t n_out; /* Packets out of window */ + size_t n_rqo; /* Packets out of rqueue */ +#endif + struct frct_cr snd_cr; + struct frct_cr rcv_cr; - struct rq * rq; - struct timespec rtt; + ssize_t rq[RQ_SIZE]; + pthread_rwlock_t lock; - pthread_rwlock_t lock; + bool open; /* Window open/closed */ + struct timespec t_wnd; /* Window closed time */ + struct timespec t_rdvs; /* Last rendez-vous sent */ + pthread_cond_t cond; + pthread_mutex_t mtx; }; -struct { - struct timerwheel * tw; -} frct; - enum frct_flags { FRCT_DATA = 0x01, /* PDU carries data */ FRCT_DRF = 0x02, /* Data run flag */ - FRCT_ACK = 0x03, /* ACK field valid */ + FRCT_ACK = 0x04, /* ACK field valid */ FRCT_FC = 0x08, /* FC window valid */ - FRCT_RDVZ = 0x10, /* Rendez-vous */ - FRCT_CFG = 0x20, /* Configuration */ + FRCT_RDVS = 0x10, /* Rendez-vous */ + FRCT_FFGM = 0x20, /* First Fragment */ FRCT_MFGM = 0x40, /* More fragments */ - FRCT_CRC = 0x80, /* CRC present */ }; struct frct_pci { uint8_t flags; - uint8_t cflags; - + uint8_t pad; /* 24 bit window! */ uint16_t window; uint32_t seqno; uint32_t ackno; } __attribute__((packed)); -static int frct_init(void) +#ifdef PROC_FLOW_STATS + +static int frct_rib_read(const char * path, + char * buf, + size_t len) +{ + struct timespec now; + char * entry; + struct flow * flow; + struct frcti * frcti; + int fd; + + (void) len; + + entry = strstr(path, RIB_SEPARATOR); + assert(entry); + *entry = '\0'; + + fd = atoi(path); + + flow = &ai.flows[fd]; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&ai.lock); + + frcti = flow->frcti; + + pthread_rwlock_rdlock(&frcti->lock); + + sprintf(buf, + "Maximum packet lifetime (ns): %20ld\n" + "Max time to Ack (ns): %20ld\n" + "Max time to Retransmit (ns): %20ld\n" + "Smoothed rtt (ns): %20ld\n" + "RTT standard deviation (ns): %20ld\n" + "Retransmit timeout RTO (ns): %20ld\n" + "Sender left window edge: %20u\n" + "Sender right window edge: %20u\n" + "Sender inactive (ns): %20ld\n" + "Sender current sequence number: %20u\n" + "Receiver left window edge: %20u\n" + "Receiver right window edge: %20u\n" + "Receiver inactive (ns): %20ld\n" + "Receiver last ack: %20u\n" + "Number of pkt retransmissions: %20zu\n" + "Number of rtt probes: %20zu\n" + "Number of rtt estimates: %20zu\n" + "Number of duplicates received: %20zu\n" + "Number of delayed acks received: %20zu\n" + "Number of rendez-vous sent: %20zu\n" + "Number of packets out of window: %20zu\n" + "Number of packets out of rqueue: %20zu\n", + frcti->mpl, + frcti->a, + frcti->r, + frcti->srtt, + frcti->mdev, + frcti->rto, + frcti->snd_cr.lwe, + frcti->snd_cr.rwe, + ts_diff_ns(&frcti->snd_cr.act, &now), + frcti->snd_cr.seqno, + frcti->rcv_cr.lwe, + frcti->rcv_cr.rwe, + ts_diff_ns(&frcti->rcv_cr.act, &now), + frcti->rcv_cr.seqno, + frcti->n_rtx, + frcti->n_prb, + frcti->n_rtt, + frcti->n_dup, + frcti->n_dak, + frcti->n_rdv, + frcti->n_out, + frcti->n_rqo); + + pthread_rwlock_unlock(&flow->frcti->lock); + + pthread_rwlock_unlock(&ai.lock); + + return strlen(buf); +} + +static int frct_rib_readdir(char *** buf) { - frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); - if (frct.tw == NULL) - return -1; + *buf = malloc(sizeof(**buf)); + if (*buf == NULL) + goto fail_malloc; + + (*buf)[0] = strdup("frct"); + if ((*buf)[0] == NULL) + goto fail_strdup; + + return 1; + + fail_strdup: + free(*buf); + fail_malloc: + return -ENOMEM; +} + +static int frct_rib_getattr(const char * path, + struct rib_attr * attr) +{ + (void) path; + (void) attr; + + attr->size = 1189; + attr->mtime = 0; return 0; } -static void frct_fini(void) + +static struct rib_ops r_ops = { + .read = frct_rib_read, + .readdir = frct_rib_readdir, + .getattr = frct_rib_getattr +}; + +#endif /* PROC_FLOW_STATS */ + +static bool before(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq2 - seq1) < 0; +} + +static void __send_frct_pkt(int fd, + uint8_t flags, + uint32_t ackno, + uint32_t rwe) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct flow * f; + + /* Raw calls needed to bypass frcti. */ +#ifdef RXM_BLOCKING + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); +#else + idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); +#endif + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + *((uint32_t *) pci) = hton32(rwe); + + pci->flags = flags; + pci->ackno = hton32(ackno); + + f = &ai.flows[fd]; + + if (crypt_encrypt(&f->crypt, sdb) < 0) + goto fail; + +#ifdef RXM_BLOCKING + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) +#else + if (shm_rbuff_write(f->tx_rb, idx)) +#endif + goto fail; + + shm_flow_set_notify(f->set, f->info.id, FLOW_PKT); + + return; + + fail: + ipcp_sdb_release(sdb); + return; +} + +static void send_frct_pkt(struct frcti * frcti) { - assert(frct.tw); + struct timespec now; + time_t diff; + uint32_t ackno; + uint32_t rwe; + int fd; + + assert(frcti); + + clock_gettime(PTHREAD_COND_CLOCK, &now); - timerwheel_destroy(frct.tw); + pthread_rwlock_wrlock(&frcti->lock); + + if (!after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + fd = frcti->fd; + ackno = frcti->rcv_cr.lwe; + rwe = frcti->rcv_cr.rwe; + + diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + if (diff > frcti->a) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + diff = ts_diff_ns(&frcti->snd_cr.act, &now); + if (diff < TICTIME) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); +} + +static void __send_rdv(int fd) +{ + __send_frct_pkt(fd, FRCT_RDVS, 0, 0); } -static struct frcti * frcti_create(int fd) +static struct frcti * frcti_create(int fd, + time_t a, + time_t r, + time_t mpl) { - struct frcti * frcti; - time_t delta_t; + struct frcti * frcti; + ssize_t idx; + struct timespec now; + pthread_condattr_t cattr; +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; +#endif + mpl *= BILLION; + a *= BILLION; + r *= BILLION; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) goto fail_malloc; + memset(frcti, 0, sizeof(*frcti)); + if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; - frcti->rq = rq_create(RQ_SIZE); - if (frcti->rq == NULL) - goto fail_rq; + if (pthread_mutex_init(&frcti->mtx, NULL)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&frcti->cond, &cattr)) + goto fail_cond; + +#ifdef PROC_FLOW_STATS + sprintf(frctstr, "%d", fd); + if (rib_reg(frctstr, &r_ops)) + goto fail_rib_reg; +#endif + pthread_condattr_destroy(&cattr); + + for (idx = 0; idx < RQ_SIZE; ++idx) + frcti->rq[idx] = -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->mpl = DELT_MPL; - frcti->a = DELT_A; - frcti->r = DELT_R; + frcti->mpl = mpl; + frcti->a = a; + frcti->r = r; + frcti->rdv = DELT_RDV; frcti->fd = fd; - delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; - frcti->snd_cr.drf = true; - frcti->snd_cr.conf = true; -#ifdef OUROBOROS_CONFIG_DEBUG - frcti->snd_cr.seqno = 0; -#else - random_buffer(&frcti->snd_cr.seqno, sizeof(frcti->snd_cr.seqno)); + frcti->rttseq = 0; + frcti->probe = false; + + frcti->srtt = 0; /* Updated on first ACK */ + frcti->mdev = 10 * MILLION; /* Updated on first ACK */ + frcti->rto = BILLION; /* Initial rxm will be after 1 s */ +#ifdef PROC_FLOW_STATS + frcti->n_rtx = 0; + frcti->n_prb = 0; + frcti->n_rtt = 0; + frcti->n_dup = 0; + frcti->n_dak = 0; + frcti->n_rdv = 0; + frcti->n_out = 0; + frcti->n_rqo = 0; #endif - frcti->snd_cr.lwe = 0; - frcti->snd_cr.rwe = 0; - frcti->snd_cr.cflags = 0; - frcti->snd_cr.inact = 2 * delta_t + 1; + if (ai.flows[fd].info.qs.loss == 0) { + frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; + frcti->rcv_cr.cflags |= FRCTFRTX; + } + + frcti->snd_cr.cflags |= FRCTFRESCNTL; + + frcti->snd_cr.rwe = START_WINDOW; - frcti->rcv_cr.drf = true; - frcti->rcv_cr.lwe = 0; - frcti->rcv_cr.rwe = 0; - frcti->rcv_cr.cflags = 0; - frcti->rcv_cr.inact = 3 * delta_t + 1; + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ + frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); + + frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */ + frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; - fail_rq: +#ifdef PROC_FLOW_STATS + fail_rib_reg: + pthread_cond_destroy(&frcti->cond); +#endif + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&frcti->mtx); + fail_mutex: pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); @@ -158,73 +433,192 @@ static struct frcti * frcti_create(int fd) static void frcti_destroy(struct frcti * frcti) { - /* - * FIXME: In case of reliable transmission we should - * make sure everything is acked. - */ - +#ifdef PROC_FLOW_STATS + char frctstr[FRCT_NAME_STRLEN + 1]; + sprintf(frctstr, "%d", frcti->fd); + rib_unreg(frctstr); +#endif + pthread_cond_destroy(&frcti->cond); + pthread_mutex_destroy(&frcti->mtx); pthread_rwlock_destroy(&frcti->lock); - rq_destroy(frcti->rq); free(frcti); } -static int frcti_setconf(struct frcti * frcti, - uint16_t flags) +static uint16_t frcti_getflags(struct frcti * frcti) { + uint16_t ret; + assert(frcti); - pthread_rwlock_wrlock(&frcti->lock); + pthread_rwlock_rdlock(&frcti->lock); - if (frcti->snd_cr.cflags != flags) { - frcti->snd_cr.cflags = flags; - frcti->snd_cr.conf = true; - frcti->snd_cr.drf = true; - } + ret = frcti->snd_cr.cflags; pthread_rwlock_unlock(&frcti->lock); - return 0; + return ret; } -static uint16_t frcti_getconf(struct frcti * frcti) +static void frcti_setflags(struct frcti * frcti, + uint16_t flags) { - uint16_t ret; + flags |= FRCTFRTX; /* Should not be set by command */ + + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */ - assert (frcti); + frcti->snd_cr.cflags &= flags; + + pthread_rwlock_unlock(&frcti->lock); +} + +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) + +#define frcti_snd(frcti, sdb) \ + (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) + +#define frcti_rcv(frcti, sdb) \ + (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + +#define frcti_dealloc(frcti) \ + (frcti == NULL ? 0 : __frcti_dealloc(frcti)) + +#define frcti_is_window_open(frcti) \ + (frcti == NULL ? true : __frcti_is_window_open(frcti)) + +#define frcti_window_wait(frcti, abstime) \ + (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime)) + + +static bool __frcti_is_window_open(struct frcti * frcti) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + bool ret = true; pthread_rwlock_rdlock(&frcti->lock); - ret = frcti->snd_cr.cflags; + if (snd_cr->cflags & FRCTFRESCNTL) + ret = before(snd_cr->seqno, snd_cr->rwe); + + if (!ret) { + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&frcti->mtx); + if (frcti->open) { + frcti->open = false; + frcti->t_wnd = now; + frcti->t_rdvs = now; + } else { + time_t diff; + diff = ts_diff_ns(&frcti->t_wnd, &now); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_unlock(&frcti->lock); + return false; + } + + diff = ts_diff_ns(&frcti->t_rdvs, &now); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); +#ifdef PROC_FLOW_STATS + frcti->n_rdv++; +#endif + + } + } + + pthread_mutex_unlock(&frcti->mtx); + } pthread_rwlock_unlock(&frcti->lock); return ret; } -#define frcti_queued_pdu(frcti) \ - (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) +static int __frcti_window_wait(struct frcti * frcti, + struct timespec * abstime) +{ + struct frct_cr * snd_cr = &frcti->snd_cr; + int ret = 0; -#define frcti_snd(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) + pthread_rwlock_rdlock(&frcti->lock); -#define frcti_rcv(frcti, sdb) \ - (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + if (!(snd_cr->cflags & FRCTFRESCNTL)) { + pthread_rwlock_unlock(&frcti->lock); + return 0; + } + + while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { + struct timespec now; + pthread_rwlock_unlock(&frcti->lock); + pthread_mutex_lock(&frcti->mtx); + + if (frcti->open) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + + frcti->t_wnd = now; + frcti->t_rdvs = now; + frcti->open = false; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, &frcti->mtx); + + ret = -__timedwait(&frcti->cond, &frcti->mtx, abstime); + + pthread_cleanup_pop(false); + + if (ret == -ETIMEDOUT) { + time_t diff; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->t_wnd, &now); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + return -ECONNRESET; /* write fails! */ + } + + diff = ts_diff_ns(&frcti->t_rdvs, &now); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); + } + } + + pthread_mutex_unlock(&frcti->mtx); + pthread_rwlock_rdlock(&frcti->lock); + } + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} static ssize_t __frcti_queued_pdu(struct frcti * frcti) { - ssize_t idx = -1; + ssize_t idx; + size_t pos; assert(frcti); /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); - if (!rq_is_empty(frcti->rq)) { - if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) { - ++frcti->rcv_cr.lwe; - idx = rq_pop(frcti->rq); - } + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + + idx = frcti->rq[pos]; + if (idx != -1) { + ++frcti->rcv_cr.lwe; + ++frcti->rcv_cr.rwe; + frcti->rq[pos] = -1; } pthread_rwlock_unlock(&frcti->lock); @@ -232,31 +626,60 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) return idx; } -static int frct_chk_crc(uint8_t * head, - uint8_t * tail) +static ssize_t __frcti_pdu_ready(struct frcti * frcti) { - uint32_t crc; + ssize_t idx; + size_t pos; - mem_hash(HASH_CRC32, &crc, head, tail - head); + assert(frcti); - return crc == *((uint32_t *) tail); -} + /* See if we already have the next PDU. */ + pthread_rwlock_rdlock(&frcti->lock); -static void frct_add_crc(uint8_t * head, - uint8_t * tail) -{ - mem_hash(HASH_CRC32, tail, head, tail - head); + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + idx = frcti->rq[pos]; + + pthread_rwlock_unlock(&frcti->lock); + + return idx; } -static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) +#include <timerwheel.c> + +/* + * Send a final ACK for everything that has not been ACK'd. + * If the flow should be kept active for retransmission, + * the returned time will be negative. + */ +static time_t __frcti_dealloc(struct frcti * frcti) { - struct frct_pci * pci = NULL; + struct timespec now; + time_t wait; + int ackno; + int fd = -1; - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); - if (pci != NULL) - memset(pci, 0, sizeof(*pci)); + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&frcti->lock); + + ackno = frcti->rcv_cr.lwe; + if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) + fd = frcti->fd; + + wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, + frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); + wait = MAX(wait, 0); + + if (frcti->snd_cr.cflags & FRCTFLINGER + && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + wait = -wait; - return pci; + pthread_rwlock_unlock(&frcti->lock); + + if (fd != -1) + __send_frct_pkt(fd, FRCT_ACK, ackno, 0); + + return wait; } static int __frcti_snd(struct frcti * frcti, @@ -265,132 +688,247 @@ static int __frcti_snd(struct frcti * frcti, struct frct_pci * pci; struct timespec now; struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + uint32_t seqno; + bool rtx; assert(frcti); + assert(shm_du_buff_len(sdb) != 0); snd_cr = &frcti->snd_cr; + rcv_cr = &frcti->rcv_cr; + + timerwheel_move(); - pci = frcti_alloc_head(sdb); + pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) - return -1; + return -ENOMEM; + + memset(pci, 0, sizeof(*pci)); - clock_gettime(CLOCK_REALTIME_COARSE, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&frcti->lock); - /* Check if sender is inactive. */ - if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact) - snd_cr->drf = true; + rtx = snd_cr->cflags & FRCTFRTX; pci->flags |= FRCT_DATA; - /* Set the DRF in the first packet of a new run of SDUs. */ - if (snd_cr->drf) { + /* Set DRF if there are no unacknowledged packets. */ + if (snd_cr->seqno == snd_cr->lwe) pci->flags |= FRCT_DRF; - if (snd_cr->conf) { - pci->flags |= FRCT_CFG; - pci->cflags = snd_cr->cflags; - } - } - pci->seqno = hton32(snd_cr->seqno++); - - if (snd_cr->cflags & FRCTFERRCHCK) { - uint8_t * tail = shm_du_buff_tail_alloc(sdb, FRCT_CRCLEN); - if (tail == NULL) { - pthread_rwlock_unlock(&frcti->lock); - return -1; - } + /* Choose a new sequence number if sender inactivity expired. */ + if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) { + /* There are no unacknowledged packets. */ + assert(snd_cr->seqno == snd_cr->lwe); + random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); + snd_cr->lwe = snd_cr->seqno; + snd_cr->rwe = snd_cr->lwe + START_WINDOW; + } - frct_add_crc((uint8_t *) pci, tail); + seqno = snd_cr->seqno; + pci->seqno = hton32(seqno); - pci->flags |= FRCT_CRC; + if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) { + pci->flags |= FRCT_FC; + *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF); } - snd_cr->act = now.tv_sec; + if (!rtx) { + snd_cr->lwe++; + } else { + if (!frcti->probe) { + frcti->rttseq = snd_cr->seqno; + frcti->t_probe = now; + frcti->probe = true; +#ifdef PROC_FLOW_STATS + frcti->n_prb++; +#endif + } + if ((now.tv_sec - rcv_cr->act.tv_sec) * BILLION <= frcti->a) { + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->lwe); + rcv_cr->seqno = rcv_cr->lwe; + } + } - snd_cr->drf = false; - snd_cr->conf = false; + snd_cr->seqno++; + snd_cr->act = now; pthread_rwlock_unlock(&frcti->lock); + if (rtx) + timerwheel_rxm(frcti, seqno, sdb); + return 0; } -/* Returns 0 when idx contains an SDU for the application. */ -static int __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) +static void rtt_estimator(struct frcti * frcti, + time_t mrtt) +{ + time_t srtt = frcti->srtt; + time_t rttvar = frcti->mdev; + + if (srtt == 0) { /* first measurement */ + srtt = mrtt; + rttvar = mrtt >> 1; + } else { + time_t delta = mrtt - srtt; + srtt += (delta >> 3); + delta = (ABS(delta) - rttvar) >> 2; +#ifdef FRCT_LINUX_RTT_ESTIMATOR + if (delta < 0) + delta >>= 3; +#endif + rttvar += delta; + } +#ifdef PROC_FLOW_STATS + frcti->n_rtt++; +#endif + frcti->srtt = MAX(1000L, srtt); + frcti->mdev = MAX(100L, rttvar); + frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << MDEV_MUL)); +} + +/* Always queues the next application packet on the RQ. */ +static void __frcti_rcv(struct frcti * frcti, + struct shm_du_buff * sdb) { ssize_t idx; + size_t pos; struct frct_pci * pci; struct timespec now; struct frct_cr * rcv_cr; + struct frct_cr * snd_cr; uint32_t seqno; + uint32_t ackno; + uint32_t rwe; + int fd = -1; assert(frcti); rcv_cr = &frcti->rcv_cr; + snd_cr = &frcti->snd_cr; + + clock_gettime(PTHREAD_COND_CLOCK, &now); pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); - clock_gettime(CLOCK_REALTIME_COARSE, &now); + idx = shm_du_buff_get_idx(sdb); + seqno = ntoh32(pci->seqno); + pos = seqno & (RQ_SIZE - 1); pthread_rwlock_wrlock(&frcti->lock); - idx = shm_du_buff_get_idx(sdb); + if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { + if (pci->flags & FRCT_DRF) { /* New run. */ + rcv_cr->lwe = seqno; + rcv_cr->rwe = seqno + RQ_SIZE; + rcv_cr->seqno = seqno; + } else if (pci->flags & FRCT_DATA) { + goto drop_packet; + } + } + + rcv_cr->act = now; + + /* For now, just send an immediate window update. */ + if (pci->flags & FRCT_RDVS) { + fd = frcti->fd; + rwe = rcv_cr->rwe; + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_FC, 0, rwe); - /* PDU may be corrupted. */ - if (pci->flags & FRCT_CRC) { - uint8_t * tail = shm_du_buff_tail_release(sdb, FRCT_CRCLEN); - if (frct_chk_crc((uint8_t *) pci, tail)) - goto fail_clean; + shm_rdrbuff_remove(ai.rdrb, idx); + return; + } + + if (pci->flags & FRCT_ACK) { + ackno = ntoh32(pci->ackno); + if (after(ackno, frcti->snd_cr.lwe)) + frcti->snd_cr.lwe = ackno; + + if (frcti->probe && after(ackno, frcti->rttseq)) { +#ifdef PROC_FLOW_STATS + if (!(pci->flags & FRCT_DATA)) + frcti->n_dak++; +#endif + rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now)); + frcti->probe = false; + } } - /* Check if receiver inactivity is true. */ - if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact) - rcv_cr->drf = true; + if (pci->flags & FRCT_FC) { + uint32_t rwe; - /* When there is receiver inactivity and no DRF, drop the PDU. */ - if (rcv_cr->drf && !(pci->flags & FRCT_DRF)) - goto fail_clean; + rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF)); + rwe |= snd_cr->rwe & 0xFF000000; - seqno = ntoh32(pci->seqno); + /* Rollover for 24 bit */ + if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF) + rwe += 0x01000000; - /* Queue the PDU if needed. */ - if (rcv_cr->cflags & FRCTFORDERING) { - if (seqno != frcti->rcv_cr.lwe) { - /* NOTE: queued PDUs head/tail without PCI. */ - if (rq_push(frcti->rq, seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - goto fail; - } else { - ++rcv_cr->lwe; + snd_cr->rwe = rwe; + + pthread_mutex_lock(&frcti->mtx); + if (!frcti->open) { + frcti->open = true; + pthread_cond_broadcast(&frcti->cond); } + pthread_mutex_unlock(&frcti->mtx); } - /* If the DRF is set, reset the state of the connection. */ - if (pci->flags & FRCT_DRF) { - rcv_cr->lwe = seqno; - if (pci->flags & FRCT_CFG) - rcv_cr->cflags = pci->cflags; + if (!(pci->flags & FRCT_DATA)) + goto drop_packet; + + if (before(seqno, rcv_cr->lwe)) { + rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif + goto drop_packet; } - if (rcv_cr->drf) - rcv_cr->drf = false; + if (rcv_cr->cflags & FRCTFRTX) { - rcv_cr->act = now.tv_sec; + if (!before(seqno, rcv_cr->rwe)) { /* Out of window. */ +#ifdef PROC_FLOW_STATS + frcti->n_out++; +#endif + goto drop_packet; + } - if (!(pci->flags & FRCT_DATA)) - shm_rdrbuff_remove(ai.rdrb, idx); + if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) { +#ifdef PROC_FLOW_STATS + frcti->n_rqo++; +#endif + goto drop_packet; /* Out of rq. */ + } + if (frcti->rq[pos] != -1) { +#ifdef PROC_FLOW_STATS + frcti->n_dup++; +#endif + goto drop_packet; /* Duplicate in rq. */ + } + fd = frcti->fd; + } else { + rcv_cr->lwe = seqno; + } + + frcti->rq[pos] = idx; pthread_rwlock_unlock(&frcti->lock); - return 0; + if (fd != -1) + timerwheel_delayed_ack(fd, frcti); - fail_clean: - if (!(pci->flags & FRCT_DATA)) - shm_rdrbuff_remove(ai.rdrb, idx); - fail: + return; + + drop_packet: pthread_rwlock_unlock(&frcti->lock); - return -EAGAIN; + shm_rdrbuff_remove(ai.rdrb, idx); + send_frct_pkt(frcti); + return; } |
