diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/frct.c | 91 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 36 |
2 files changed, 68 insertions, 59 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c index e4b858d0..bc07be5a 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,9 +21,9 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 0 /* ms */ -#define DELT_R 2000 /* ms */ +#define DELT_MPL 60000 /* ms */ +#define DELT_A 3000 /* ms */ +#define DELT_R 20000 /* ms */ #define RQ_SIZE 64 @@ -50,11 +50,12 @@ struct frcti { time_t a; time_t r; - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ + time_t srtt_us; /* smoothed rtt */ + time_t mdev_us; /* mdev */ + time_t rto; /* retransmission timeout */ uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct timespec t_probe; /* probe time */ + bool probe; /* probe active */ struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -114,12 +115,14 @@ static struct frcti * frcti_create(int fd) frcti->snd_cr.inact = 3 * delta_t; frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - /* rtt estimator. rto is currently srtt + 2 * mdev */ - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ + frcti->rttseq = 0; frcti->probe = false; + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ + frcti->rto = 200000; /* initial rxm will be after 200 ms */ + if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.cflags |= FRCTFRTX; @@ -184,10 +187,10 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) /* See if we already have the next PDU. */ pthread_rwlock_wrlock(&frcti->lock); - pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1); + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); idx = frcti->rq[pos]; if (idx != -1) { - ++frcti->rcv_cr.seqno; + ++frcti->rcv_cr.lwe; frcti->rq[pos] = -1; } @@ -252,11 +255,7 @@ static int __frcti_snd(struct frcti * frcti, if (now.tv_sec - snd_cr->act > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); -#ifdef CONFIG_OUROBOROS_DEBUG - snd_cr->seqno = 0; -#else random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); -#endif frcti->snd_cr.lwe = snd_cr->seqno - 1; } @@ -270,13 +269,11 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } + rxmwheel_add(frcti, snd_cr->seqno, sdb); + if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (rcv_cr->lwe <= rcv_cr->seqno) { - pci->flags |= FRCT_ACK; - pci->ackno = hton32(rcv_cr->seqno); - rcv_cr->lwe = rcv_cr->seqno; - } + pci->flags |= FRCT_ACK; + pci->ackno = hton32(rcv_cr->lwe); } } @@ -291,21 +288,23 @@ static int __frcti_snd(struct frcti * frcti, static void rtt_estimator(struct frcti * frcti, time_t mrtt_us) { - time_t srtt = frcti->srtt_us; - time_t mdev = frcti->mdev_us; - - if (srtt != 0) { - srtt -= (srtt >> 3); - srtt += mrtt_us >> 3; /* rtt = 7/8 rtt + 1/8 new */ - mdev -= (mdev >> 2); - mdev += ABS(srtt - mrtt_us) >> 2; + time_t srtt = frcti->srtt_us; + time_t rttvar = frcti->mdev_us; + + if (srtt == 0) { /* first measurement */ + srtt = mrtt_us; + rttvar = mrtt_us >> 1; + } else { - srtt = mrtt_us << 3; /* take the measured time to be rtt */ - mdev = mrtt_us >> 1; /* take half mrtt_us as deviation */ + time_t delta = mrtt_us - srtt; + srtt += (delta >> 3); + rttvar -= rttvar >> 2; + rttvar += ABS(delta) >> 2; } - frcti->srtt_us = MAX(1U, srtt); - frcti->mdev_us = MAX(1U, mdev); + frcti->srtt_us = MAX(1U, srtt); + frcti->mdev_us = MAX(1U, rttvar); + frcti->rto = srtt + (rttvar >> 2); } /* Returns 0 when idx contains a packet for the application. */ @@ -339,35 +338,39 @@ static int __frcti_rcv(struct frcti * frcti, if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { /* Inactive receiver, check for DRF. */ if (pci->flags & FRCT_DRF) /* New run. */ - rcv_cr->seqno = seqno; + rcv_cr->lwe = seqno; else goto drop_packet; } - if (seqno == rcv_cr->seqno) { - ++rcv_cr->seqno; + if (seqno == rcv_cr->lwe) { + ++rcv_cr->lwe; } else { /* Out of order. */ - if (before(seqno, rcv_cr->seqno)) + if (before(seqno, rcv_cr->lwe) ) goto drop_packet; if (rcv_cr->cflags & FRCTFRTX) { size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */ - || frcti->rq[pos] != -1) /* Duplicate in rq. */ - goto drop_packet; + if ((seqno - rcv_cr->lwe) >= RQ_SIZE) + goto drop_packet; /* Out of rq. */ + + if (frcti->rq[pos] != -1) + goto drop_packet; /* Duplicate in rq */ + /* Queue. */ frcti->rq[pos] = idx; ret = -EAGAIN; } else { - rcv_cr->seqno = seqno + 1; + rcv_cr->lwe = seqno + 1; } } if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) { uint32_t ackno = ntoh32(pci->ackno); /* Check for duplicate (old) acks. */ - if ((int32_t)(ackno - snd_cr->lwe) >= 0) + if ((int32_t)(ackno - snd_cr->lwe) > 0) snd_cr->lwe = ackno; + if (frcti->probe && after(ackno, frcti->rttseq)) { rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now)); frcti->probe = false; @@ -378,7 +381,7 @@ static int __frcti_rcv(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); - if (!(pci->flags & FRCT_DATA)) + if (ret == 0 && !(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); rxmwheel_move(); diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index ce7ef8e4..3f01a0d3 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,11 +22,11 @@ #include <ouroboros/list.h> -#define RXMQ_S 16 /* defines #slots */ -#define RXMQ_M 24 /* defines max delay */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ +#define RXMQ_S 16 /* defines #slots */ +#define RXMQ_M 24 /* defines max delay (us) */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */ #define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX (1 << RXMQ_M) /* ms */ +#define RXMQ_MAX (1 << RXMQ_M) /* us */ /* Small inacurracy to avoid slow division by MILLION. */ #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) @@ -119,12 +119,11 @@ static void check_probe(struct frcti * frcti, if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ - frcti->srtt_us <<= 1; + frcti->srtt_us += frcti->mdev_us; frcti->probe = false; } } -#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1)) /* Return fd on r-timer expiry. */ static int rxmwheel_move(void) { @@ -136,12 +135,17 @@ static int rxmwheel_move(void) clock_gettime(PTHREAD_COND_CLOCK, &now); + pthread_mutex_lock(&rw.lock); + slot = ts_to_slot(now); - pthread_mutex_lock(&rw.lock); + i = rw.prv; - for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { + if (slot < i) + slot += RXMQ_SLOTS; + + while (i++ < slot) { + list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; @@ -158,7 +162,7 @@ static int rxmwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) <= 0) { + if ((int) (r->seqno - snd_cr->lwe) < 0) { shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); @@ -180,7 +184,7 @@ static int rxmwheel_move(void) /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send? */ + /* FIXME: reschedule send instead of failing? */ int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); shm_du_buff_ack(r->sdb); @@ -206,10 +210,12 @@ static int rxmwheel_move(void) /* Retransmit the copy. */ if (shm_rbuff_write(f->tx_rb, idx)) { + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); ipcp_sdb_release(sdb); free(r); /* FIXME: reschedule send? */ - continue; + return fd; } shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); @@ -221,14 +227,14 @@ static int rxmwheel_move(void) r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; - newtime = ts_to_us(now) + rto(f->frcti); + newtime = ts_to_us(now) + f->frcti->rto; rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); } } - rw.prv = slot; + rw.prv = slot & (RXMQ_SLOTS - 1); pthread_mutex_unlock(&rw.lock); @@ -259,7 +265,7 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; - slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); + slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); |