diff options
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r-- | src/lib/frct.c | 326 |
1 files changed, 230 insertions, 96 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index 2322a039..2bd126f4 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,23 +21,25 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL (60 * MILLION) /* us */ -#define DELT_A (1 * MILLION) /* us */ -#define DELT_R (20 * MILLION) /* us */ +#define DELT_MPL (60 * BILLION) /* ns */ +#define DELT_A (1 * BILLION) /* ns */ +#define DELT_R (20 * BILLION) /* ns */ -#define RQ_SIZE 1024 +#define DELT_ACK (10 * MILLION) /* ns */ + +#define RQ_SIZE 256 #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { - uint32_t lwe; - uint32_t rwe; + uint32_t lwe; /* Left window edge */ + uint32_t rwe; /* Right window edge */ - uint8_t cflags; - uint32_t seqno; + uint8_t cflags; + uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ - time_t act; /* s */ - time_t inact; /* s */ + struct timespec act; /* Last seen activity */ + time_t inact; /* Inactivity (s) */ }; struct frcti { @@ -47,12 +49,12 @@ struct frcti { time_t a; time_t r; - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - time_t rto; /* retransmission timeout */ + time_t srtt; /* Smoothed rtt */ + time_t mdev; /* Deviation */ + time_t rto; /* Retransmission timeout */ uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct timespec t_probe; /* Probe time */ + bool probe; /* Probe active */ struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -74,7 +76,9 @@ enum frct_flags { }; struct frct_pci { - uint16_t flags; + uint8_t flags; + + uint8_t pad; uint16_t window; @@ -87,9 +91,11 @@ struct frct_pci { static struct frcti * frcti_create(int fd) { struct frcti * frcti; - time_t delta_t; ssize_t idx; struct timespec now; + time_t mpl; + time_t a; + time_t r; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -103,25 +109,21 @@ static struct frcti * frcti_create(int fd) for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; - clock_gettime(CLOCK_REALTIME_COARSE, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->mpl = DELT_MPL; - frcti->a = DELT_A; - frcti->r = DELT_R; + frcti->mpl = mpl = DELT_MPL; + frcti->a = a = DELT_A; + frcti->r = r = DELT_R; frcti->fd = fd; - delta_t = frcti->mpl + frcti->a + frcti->r; - frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->rttseq = 0; + frcti->probe = false; - frcti->rttseq = 0; - frcti->probe = false; - - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ - frcti->rto = 20000; /* initial rxm will be after 20 ms */ - frcti->rw = NULL; + frcti->srtt = 0; /* Updated on first ACK */ + frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */ + frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */ + frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; @@ -131,9 +133,11 @@ static struct frcti * frcti_create(int fd) goto fail_rw; } - frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ + frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */ + frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; @@ -175,14 +179,18 @@ static uint16_t frcti_getconf(struct frcti * frcti) return ret; } -#define frcti_queued_pdu(frcti) \ - (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) -#define frcti_snd(frcti, sdb) \ +#define frcti_snd(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) #define frcti_rcv(frcti, sdb) \ - (frcti == NULL ? idx : __frcti_rcv(frcti, sdb)) + (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + +#define frcti_tick(frcti) \ + (frcti == NULL ? 0 : __frcti_tick(frcti)) + static ssize_t __frcti_queued_pdu(struct frcti * frcti) { @@ -207,15 +215,22 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) return idx; } -static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) +static ssize_t __frcti_pdu_ready(struct frcti * frcti) { - struct frct_pci * pci; + ssize_t idx; + size_t pos; - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); - if (pci != NULL) - memset(pci, 0, sizeof(*pci)); + assert(frcti); - return pci; + /* See if we already have the next PDU. */ + pthread_rwlock_rdlock(&frcti->lock); + + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + idx = frcti->rq[pos]; + + pthread_rwlock_unlock(&frcti->lock); + + return idx; } static bool before(uint32_t seq1, @@ -230,6 +245,68 @@ static bool after(uint32_t seq1, return (int32_t)(seq2 - seq1) < 0; } +static void frct_send_ack(struct frcti * frcti) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct timespec now; + time_t diff; + uint32_t ackno; + struct flow * f; + + assert(frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + ackno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + + if (diff > frcti->a) + return; + + if (diff < DELT_ACK) + return; + + /* Raw calls needed to bypass frcti. */ + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + pci->flags = FRCT_ACK; + pci->ackno = hton32(ackno); + + f = &ai.flows[frcti->fd]; + + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + pthread_rwlock_rdlock(&ai.lock); + ipcp_sdb_release(sdb); + return; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + + pthread_rwlock_wrlock(&frcti->lock); + + if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); +} + static int __frcti_snd(struct frcti * frcti, struct shm_du_buff * sdb) { @@ -247,11 +324,13 @@ static int __frcti_snd(struct frcti * frcti, if (frcti->rw != NULL) rxmwheel_move(frcti->rw); - pci = frcti_alloc_head(sdb); + pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) return -1; - clock_gettime(CLOCK_REALTIME, &now); + memset(pci, 0, sizeof(*pci)); + + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&frcti->lock); @@ -262,7 +341,7 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_DRF; /* Choose a new sequence number if sender inactivity expired. */ - if (now.tv_sec - snd_cr->act > snd_cr->inact) { + if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); @@ -281,14 +360,15 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { + if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); + rcv_cr->seqno = rcv_cr->lwe; } } snd_cr->seqno++; - snd_cr->act = now.tv_sec; + snd_cr->act = now; pthread_rwlock_unlock(&frcti->lock); @@ -299,104 +379,158 @@ static int __frcti_snd(struct frcti * frcti, } static void rtt_estimator(struct frcti * frcti, - time_t mrtt_us) + time_t mrtt) { - time_t srtt = frcti->srtt_us; - time_t rttvar = frcti->mdev_us; + time_t srtt = frcti->srtt; + time_t rttvar = frcti->mdev; if (srtt == 0) { /* first measurement */ - srtt = mrtt_us; - rttvar = mrtt_us >> 1; + srtt = mrtt; + rttvar = mrtt >> 1; } else { - time_t delta = mrtt_us - srtt; + time_t delta = mrtt - srtt; srtt += (delta >> 3); rttvar -= rttvar >> 2; rttvar += ABS(delta) >> 2; } - frcti->srtt_us = MAX(1U, srtt); - frcti->mdev_us = MAX(1U, rttvar); - frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2)); + frcti->srtt = MAX(1000U, srtt); + frcti->mdev = MAX(100U, rttvar); + frcti->rto = MAX(RTO_MIN * 1000, + frcti->srtt + (frcti->mdev << 1)); +} + +static void __frcti_tick(struct frcti * frcti) +{ + if (frcti->rw != NULL) { + rxmwheel_move(frcti->rw); + frct_send_ack(frcti); + } } -/* Always queues the packet on the RQ for the application. */ -static ssize_t __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) +/* Always queues the next application packet on the RQ. */ +static void __frcti_rcv(struct frcti * frcti, + struct shm_du_buff * sdb) { ssize_t idx; + size_t pos; struct frct_pci * pci; struct timespec now; - struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; + uint32_t ackno; assert(frcti); rcv_cr = &frcti->rcv_cr; - snd_cr = &frcti->snd_cr; - clock_gettime(CLOCK_REALTIME, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); idx = shm_du_buff_get_idx(sdb); seqno = ntoh32(pci->seqno); + pos = seqno & (RQ_SIZE - 1); pthread_rwlock_wrlock(&frcti->lock); - if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { + if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { if (pci->flags & FRCT_DRF) /* New run. */ rcv_cr->lwe = seqno; else goto drop_packet; } - if (before(seqno, rcv_cr->lwe)) - goto drop_packet; + if (pci->flags & FRCT_ACK) { + ackno = ntoh32(pci->ackno); + if (after(ackno, frcti->snd_cr.lwe)) + frcti->snd_cr.lwe = ackno; - if (rcv_cr->cflags & FRCTFRTX) { - if (pci->flags & FRCT_ACK) { - uint32_t ackno = ntoh32(pci->ackno); - /* Check for duplicate (old) acks. */ - if (after(ackno, snd_cr->lwe)) - snd_cr->lwe = ackno; - - if (frcti->probe && after(ackno, frcti->rttseq)) { - rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, - &now)); - frcti->probe = false; - } + if (frcti->probe && after(ackno, frcti->rttseq)) { + rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now)); + frcti->probe = false; } + } - if (seqno == rcv_cr->lwe) { - ++frcti->rcv_cr.lwe; - } else { - size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) >= RQ_SIZE) - goto drop_packet; /* Out of rq. */ + if (!(pci->flags & FRCT_DATA)) + goto drop_packet; - if (frcti->rq[pos] != -1) - goto drop_packet; /* Duplicate in rq */ + if (before(seqno, rcv_cr->lwe)) + goto drop_packet; - frcti->rq[pos] = idx; - idx = -EAGAIN; - } + if (rcv_cr->cflags & FRCTFRTX) { + if ((seqno - rcv_cr->lwe) >= RQ_SIZE) + goto drop_packet; /* Out of rq. */ + + if (frcti->rq[pos] != -1) + goto drop_packet; /* Duplicate in rq. */ } else { - rcv_cr->lwe = seqno + 1; + rcv_cr->lwe = seqno; } - rcv_cr->act = now.tv_sec; + frcti->rq[pos] = idx; - pthread_rwlock_unlock(&frcti->lock); + rcv_cr->act = now; - if (frcti->rw != NULL) - rxmwheel_move(frcti->rw); + pthread_rwlock_unlock(&frcti->lock); - return idx; + return; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; + return; +} + +/* Filter fqueue events for non-data packets */ +int frcti_filter(struct fqueue * fq) +{ + struct shm_du_buff * sdb; + int fd; + ssize_t idx; + struct frcti * frcti; + struct shm_rbuff * rb; + + while(fq->next < fq->fqsize) { + if (fq->fqueue[fq->next + 1] != FLOW_PKT) + return 1; + + pthread_rwlock_rdlock(&ai.lock); + + fd = ai.ports[fq->fqueue[fq->next]].fd; + rb = ai.flows[fd].rx_rb; + frcti = ai.flows[fd].frcti; + + if (frcti == NULL) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + if (__frcti_pdu_ready(frcti) >= 0) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + idx = shm_rbuff_read(rb); + if (idx < 0) { + pthread_rwlock_unlock(&ai.lock); + return 0; + } + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + __frcti_rcv(frcti, sdb); + + if (__frcti_pdu_ready(frcti) >= 0) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + pthread_rwlock_unlock(&ai.lock); + + fq->next += 2; + } + + return fq->next < fq->fqsize; } |