/* * Ouroboros - Copyright (C) 2016 - 2020 * * Flow and Retransmission Control * * Dimitri Staessens * Sander Vrijders * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * version 2.1 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., http://www.fsf.org/about/contact/. */ /* Default Delta-t parameters */ #define DELT_MPL (60 * MILLION) /* us */ #define DELT_A (1 * MILLION) /* us */ #define DELT_R (20 * MILLION) /* us */ #define RQ_SIZE 1024 #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { uint32_t lwe; uint32_t rwe; uint8_t cflags; uint32_t seqno; time_t act; /* s */ time_t inact; /* s */ }; struct frcti { int fd; time_t mpl; time_t a; time_t r; time_t srtt_us; /* smoothed rtt */ time_t mdev_us; /* mdev */ time_t rto; /* retransmission timeout */ uint32_t rttseq; struct timespec t_probe; /* probe time */ bool probe; /* probe active */ struct frct_cr snd_cr; struct frct_cr rcv_cr; struct rxmwheel * rw; ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; }; enum frct_flags { FRCT_DATA = 0x01, /* PDU carries data */ FRCT_DRF = 0x02, /* Data run flag */ FRCT_ACK = 0x04, /* ACK field valid */ FRCT_FC = 0x08, /* FC window valid */ FRCT_RDVS = 0x10, /* Rendez-vous */ FRCT_FFGM = 0x20, /* First Fragment */ FRCT_MFGM = 0x40, /* More fragments */ }; struct frct_pci { uint16_t flags; uint16_t window; uint32_t seqno; uint32_t ackno; } __attribute__((packed)); #include static struct frcti * frcti_create(int fd) { struct frcti * frcti; time_t delta_t; ssize_t idx; struct timespec now; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) goto fail_malloc; memset(frcti, 0, sizeof(*frcti)); if (pthread_rwlock_init(&frcti->lock, NULL)) goto fail_lock; for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; clock_gettime(CLOCK_REALTIME_COARSE, &now); frcti->mpl = DELT_MPL; frcti->a = DELT_A; frcti->r = DELT_R; frcti->fd = fd; delta_t = frcti->mpl + frcti->a + frcti->r; frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); frcti->rttseq = 0; frcti->probe = false; frcti->srtt_us = 0; /* updated on first ACK */ frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ frcti->rto = 20000; /* initial rxm will be after 20 ms */ frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.cflags |= FRCTFRTX; frcti->rw = rxmwheel_create(); if (frcti->rw == NULL) goto fail_rw; } frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; fail_rw: pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: return NULL; } static void frcti_destroy(struct frcti * frcti) { /* * FIXME: In case of reliable transmission we should * make sure everything we sent is acked. */ if (frcti->rw != NULL) rxmwheel_destroy(frcti->rw); pthread_rwlock_destroy(&frcti->lock); free(frcti); } static uint16_t frcti_getconf(struct frcti * frcti) { uint16_t ret; assert (frcti); pthread_rwlock_rdlock(&frcti->lock); ret = frcti->snd_cr.cflags; pthread_rwlock_unlock(&frcti->lock); return ret; } #define frcti_queued_pdu(frcti) \ (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) #define frcti_snd(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) #define frcti_rcv(frcti, sdb) \ (frcti == NULL ? idx : __frcti_rcv(frcti, sdb)) static ssize_t __frcti_queued_pdu(struct frcti * frcti) { ssize_t idx; size_t pos; assert(frcti); /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); idx = frcti->rq[pos]; if (idx != -1) { ++frcti->rcv_cr.lwe; frcti->rq[pos] = -1; } pthread_rwlock_unlock(&frcti->lock); return idx; } static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) { struct frct_pci * pci; pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci != NULL) memset(pci, 0, sizeof(*pci)); return pci; } static bool before(uint32_t seq1, uint32_t seq2) { return (int32_t)(seq1 - seq2) < 0; } static bool after(uint32_t seq1, uint32_t seq2) { return (int32_t)(seq2 - seq1) < 0; } static int __frcti_snd(struct frcti * frcti, struct shm_du_buff * sdb) { struct frct_pci * pci; struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; assert(frcti); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; if (frcti->rw != NULL) rxmwheel_move(frcti->rw); pci = frcti_alloc_head(sdb); if (pci == NULL) return -1; clock_gettime(CLOCK_REALTIME, &now); pthread_rwlock_wrlock(&frcti->lock); pci->flags |= FRCT_DATA; /* Set DRF if there are no unacknowledged packets. */ if (snd_cr->seqno == snd_cr->lwe) pci->flags |= FRCT_DRF; /* Choose a new sequence number if sender inactivity expired. */ if (now.tv_sec - snd_cr->act > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); frcti->snd_cr.lwe = snd_cr->seqno - 1; } seqno = snd_cr->seqno; pci->seqno = hton32(seqno); if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; } else { if (!frcti->probe) { frcti->rttseq = snd_cr->seqno; frcti->t_probe = now; frcti->probe = true; } if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); } } snd_cr->seqno++; snd_cr->act = now.tv_sec; pthread_rwlock_unlock(&frcti->lock); if (frcti->rw != NULL) rxmwheel_add(frcti->rw, frcti, seqno, sdb); return 0; } static void rtt_estimator(struct frcti * frcti, time_t mrtt_us) { time_t srtt = frcti->srtt_us; time_t rttvar = frcti->mdev_us; if (srtt == 0) { /* first measurement */ srtt = mrtt_us; rttvar = mrtt_us >> 1; } else { time_t delta = mrtt_us - srtt; srtt += (delta >> 3); rttvar -= rttvar >> 2; rttvar += ABS(delta) >> 2; } frcti->srtt_us = MAX(1U, srtt); frcti->mdev_us = MAX(1U, rttvar); frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2)); } /* Always queues the packet on the RQ for the application. */ static ssize_t __frcti_rcv(struct frcti * frcti, struct shm_du_buff * sdb) { ssize_t idx; struct frct_pci * pci; struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; assert(frcti); rcv_cr = &frcti->rcv_cr; snd_cr = &frcti->snd_cr; clock_gettime(CLOCK_REALTIME, &now); pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); idx = shm_du_buff_get_idx(sdb); seqno = ntoh32(pci->seqno); pthread_rwlock_wrlock(&frcti->lock); if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { if (pci->flags & FRCT_DRF) /* New run. */ rcv_cr->lwe = seqno; else goto drop_packet; } if (before(seqno, rcv_cr->lwe)) goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { if (pci->flags & FRCT_ACK) { uint32_t ackno = ntoh32(pci->ackno); /* Check for duplicate (old) acks. */ if (after(ackno, snd_cr->lwe)) snd_cr->lwe = ackno; if (frcti->probe && after(ackno, frcti->rttseq)) { rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now)); frcti->probe = false; } } if (seqno == rcv_cr->lwe) { ++frcti->rcv_cr.lwe; } else { size_t pos = seqno & (RQ_SIZE - 1); if ((seqno - rcv_cr->lwe) >= RQ_SIZE) goto drop_packet; /* Out of rq. */ if (frcti->rq[pos] != -1) goto drop_packet; /* Duplicate in rq */ frcti->rq[pos] = idx; idx = -EAGAIN; } } else { rcv_cr->lwe = seqno + 1; } rcv_cr->act = now.tv_sec; pthread_rwlock_unlock(&frcti->lock); if (frcti->rw != NULL) rxmwheel_move(frcti->rw); return idx; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); return -EAGAIN; }