summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/frct.c91
-rw-r--r--src/lib/rxmwheel.c36
2 files changed, 68 insertions, 59 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index e4b858d0..bc07be5a 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,9 +21,9 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL 60000 /* ms */
-#define DELT_A 0 /* ms */
-#define DELT_R 2000 /* ms */
+#define DELT_MPL 60000 /* ms */
+#define DELT_A 3000 /* ms */
+#define DELT_R 20000 /* ms */
#define RQ_SIZE 64
@@ -50,11 +50,12 @@ struct frcti {
time_t a;
time_t r;
- time_t srtt_us; /* smoothed rtt */
- time_t mdev_us; /* mdev */
+ time_t srtt_us; /* smoothed rtt */
+ time_t mdev_us; /* mdev */
+ time_t rto; /* retransmission timeout */
uint32_t rttseq;
- struct timespec t_probe; /* probe time */
- bool probe; /* probe active */
+ struct timespec t_probe; /* probe time */
+ bool probe; /* probe active */
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
@@ -114,12 +115,14 @@ static struct frcti * frcti_create(int fd)
frcti->snd_cr.inact = 3 * delta_t;
frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
- /* rtt estimator. rto is currently srtt + 2 * mdev */
- frcti->srtt_us = 0; /* updated on first ACK */
- frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */
+
frcti->rttseq = 0;
frcti->probe = false;
+ frcti->srtt_us = 0; /* updated on first ACK */
+ frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */
+ frcti->rto = 200000; /* initial rxm will be after 200 ms */
+
if (ai.flows[fd].qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX;
frcti->rcv_cr.cflags |= FRCTFRTX;
@@ -184,10 +187,10 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
/* See if we already have the next PDU. */
pthread_rwlock_wrlock(&frcti->lock);
- pos = frcti->rcv_cr.seqno & (RQ_SIZE - 1);
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
idx = frcti->rq[pos];
if (idx != -1) {
- ++frcti->rcv_cr.seqno;
+ ++frcti->rcv_cr.lwe;
frcti->rq[pos] = -1;
}
@@ -252,11 +255,7 @@ static int __frcti_snd(struct frcti * frcti,
if (now.tv_sec - snd_cr->act > snd_cr->inact) {
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
-#ifdef CONFIG_OUROBOROS_DEBUG
- snd_cr->seqno = 0;
-#else
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
-#endif
frcti->snd_cr.lwe = snd_cr->seqno - 1;
}
@@ -270,13 +269,11 @@ static int __frcti_snd(struct frcti * frcti,
frcti->probe = true;
}
+ rxmwheel_add(frcti, snd_cr->seqno, sdb);
+
if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
- rxmwheel_add(frcti, snd_cr->seqno, sdb);
- if (rcv_cr->lwe <= rcv_cr->seqno) {
- pci->flags |= FRCT_ACK;
- pci->ackno = hton32(rcv_cr->seqno);
- rcv_cr->lwe = rcv_cr->seqno;
- }
+ pci->flags |= FRCT_ACK;
+ pci->ackno = hton32(rcv_cr->lwe);
}
}
@@ -291,21 +288,23 @@ static int __frcti_snd(struct frcti * frcti,
static void rtt_estimator(struct frcti * frcti,
time_t mrtt_us)
{
- time_t srtt = frcti->srtt_us;
- time_t mdev = frcti->mdev_us;
-
- if (srtt != 0) {
- srtt -= (srtt >> 3);
- srtt += mrtt_us >> 3; /* rtt = 7/8 rtt + 1/8 new */
- mdev -= (mdev >> 2);
- mdev += ABS(srtt - mrtt_us) >> 2;
+ time_t srtt = frcti->srtt_us;
+ time_t rttvar = frcti->mdev_us;
+
+ if (srtt == 0) { /* first measurement */
+ srtt = mrtt_us;
+ rttvar = mrtt_us >> 1;
+
} else {
- srtt = mrtt_us << 3; /* take the measured time to be rtt */
- mdev = mrtt_us >> 1; /* take half mrtt_us as deviation */
+ time_t delta = mrtt_us - srtt;
+ srtt += (delta >> 3);
+ rttvar -= rttvar >> 2;
+ rttvar += ABS(delta) >> 2;
}
- frcti->srtt_us = MAX(1U, srtt);
- frcti->mdev_us = MAX(1U, mdev);
+ frcti->srtt_us = MAX(1U, srtt);
+ frcti->mdev_us = MAX(1U, rttvar);
+ frcti->rto = srtt + (rttvar >> 2);
}
/* Returns 0 when idx contains a packet for the application. */
@@ -339,35 +338,39 @@ static int __frcti_rcv(struct frcti * frcti,
if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
/* Inactive receiver, check for DRF. */
if (pci->flags & FRCT_DRF) /* New run. */
- rcv_cr->seqno = seqno;
+ rcv_cr->lwe = seqno;
else
goto drop_packet;
}
- if (seqno == rcv_cr->seqno) {
- ++rcv_cr->seqno;
+ if (seqno == rcv_cr->lwe) {
+ ++rcv_cr->lwe;
} else { /* Out of order. */
- if (before(seqno, rcv_cr->seqno))
+ if (before(seqno, rcv_cr->lwe) )
goto drop_packet;
if (rcv_cr->cflags & FRCTFRTX) {
size_t pos = seqno & (RQ_SIZE - 1);
- if ((seqno - rcv_cr->lwe) > RQ_SIZE /* Out of rq. */
- || frcti->rq[pos] != -1) /* Duplicate in rq. */
- goto drop_packet;
+ if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
+ goto drop_packet; /* Out of rq. */
+
+ if (frcti->rq[pos] != -1)
+ goto drop_packet; /* Duplicate in rq */
+
/* Queue. */
frcti->rq[pos] = idx;
ret = -EAGAIN;
} else {
- rcv_cr->seqno = seqno + 1;
+ rcv_cr->lwe = seqno + 1;
}
}
if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) {
uint32_t ackno = ntoh32(pci->ackno);
/* Check for duplicate (old) acks. */
- if ((int32_t)(ackno - snd_cr->lwe) >= 0)
+ if ((int32_t)(ackno - snd_cr->lwe) > 0)
snd_cr->lwe = ackno;
+
if (frcti->probe && after(ackno, frcti->rttseq)) {
rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now));
frcti->probe = false;
@@ -378,7 +381,7 @@ static int __frcti_rcv(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
- if (!(pci->flags & FRCT_DATA))
+ if (ret == 0 && !(pci->flags & FRCT_DATA))
shm_rdrbuff_remove(ai.rdrb, idx);
rxmwheel_move();
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index ce7ef8e4..3f01a0d3 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -22,11 +22,11 @@
#include <ouroboros/list.h>
-#define RXMQ_S 16 /* defines #slots */
-#define RXMQ_M 24 /* defines max delay */
-#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */
+#define RXMQ_S 16 /* defines #slots */
+#define RXMQ_M 24 /* defines max delay (us) */
+#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */
#define RXMQ_SLOTS (1 << RXMQ_S)
-#define RXMQ_MAX (1 << RXMQ_M) /* ms */
+#define RXMQ_MAX (1 << RXMQ_M) /* us */
/* Small inacurracy to avoid slow division by MILLION. */
#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
@@ -119,12 +119,11 @@ static void check_probe(struct frcti * frcti,
if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
/* Backoff to avoid never updating rtt */
- frcti->srtt_us <<= 1;
+ frcti->srtt_us += frcti->mdev_us;
frcti->probe = false;
}
}
-#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1))
/* Return fd on r-timer expiry. */
static int rxmwheel_move(void)
{
@@ -136,12 +135,17 @@ static int rxmwheel_move(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
+ pthread_mutex_lock(&rw.lock);
+
slot = ts_to_slot(now);
- pthread_mutex_lock(&rw.lock);
+ i = rw.prv;
- for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) {
- list_for_each_safe(p, h, &rw.wheel[i]) {
+ if (slot < i)
+ slot += RXMQ_SLOTS;
+
+ while (i++ < slot) {
+ list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
@@ -158,7 +162,7 @@ static int rxmwheel_move(void)
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_cr->lwe) <= 0) {
+ if ((int) (r->seqno - snd_cr->lwe) < 0) {
shm_du_buff_ack(r->sdb);
ipcp_sdb_release(r->sdb);
free(r);
@@ -180,7 +184,7 @@ static int rxmwheel_move(void)
/* Copy the payload, safe rtx in other layers. */
if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- /* FIXME: reschedule send? */
+ /* FIXME: reschedule send instead of failing? */
int fd = r->frcti->fd;
pthread_mutex_unlock(&rw.lock);
shm_du_buff_ack(r->sdb);
@@ -206,10 +210,12 @@ static int rxmwheel_move(void)
/* Retransmit the copy. */
if (shm_rbuff_write(f->tx_rb, idx)) {
+ int fd = r->frcti->fd;
+ pthread_mutex_unlock(&rw.lock);
ipcp_sdb_release(sdb);
free(r);
/* FIXME: reschedule send? */
- continue;
+ return fd;
}
shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
@@ -221,14 +227,14 @@ static int rxmwheel_move(void)
r->tail = shm_du_buff_tail(sdb);
r->sdb = sdb;
- newtime = ts_to_us(now) + rto(f->frcti);
+ newtime = ts_to_us(now) + f->frcti->rto;
rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw.wheel[rslot]);
}
}
- rw.prv = slot;
+ rw.prv = slot & (RXMQ_SLOTS - 1);
pthread_mutex_unlock(&rw.lock);
@@ -259,7 +265,7 @@ static int rxmwheel_add(struct frcti * frcti,
r->tail = shm_du_buff_tail(sdb);
r->frcti = frcti;
- slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1);
+ slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw.wheel[slot]);