summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-06-06 09:48:56 +0200
committerSander Vrijders <sander@ouroboros.rocks>2020-06-06 18:29:14 +0200
commit5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (patch)
tree0f99ee5bc51c8599d4771b2a48701dd2f26d0ac5
parent7a6bc98a1ea07991d8ff00a9b77be196bd9cef45 (diff)
downloadouroboros-5f468ee5e02a0d63ed8ad7420ee1beda87e524d6.tar.gz
ouroboros-5f468ee5e02a0d63ed8ad7420ee1beda87e524d6.zip
lib: Allow pure acknowledgment packets in FRCT
This adds the logic to send a pure acknowledgment packet without any data to send. This needed the event filter for the fqueue, as these non-data packets should not trigger application PKT events. The default timeout is now 10ms, until we have FRCP tuning as part of fccntl. Karn's algorithm seems to be very unstable with low (sub-ms) RTT estimates. Doubling RTO (every RTO) seems still too slow to prevent rtx storms when the measured rtt suddenly spikes several orders of magnitude. Just assuming the ACK'd packet is the last one transmitted seems to be a lot more stable. It can lead to temporary underestimation, but this is not a throughput-killer in FRCP. Changes most time units to nanoseconds for faster computation. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--doc/man/fqueue.33
-rw-r--r--src/lib/dev.c71
-rw-r--r--src/lib/frct.c326
-rw-r--r--src/lib/rxmwheel.c50
4 files changed, 298 insertions, 152 deletions
diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3
index cbc8ee4e..85e12518 100644
--- a/doc/man/fqueue.3
+++ b/doc/man/fqueue.3
@@ -68,8 +68,7 @@ On success, \fBfqueue_create\fR() returns a pointer to an
\fBfqueue_destroy\fR() has no return value.
-On success, \fBfevent\fR() returns the number of events that occured
-in \fIset\fR.
+On success, \fBfevent\fR() returns 1.
On success, \fBfqueue_next\fR() returns the next file descriptor for
which an event occurred.
diff --git a/src/lib/dev.c b/src/lib/dev.c
index efd08146..df616ead 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1017,18 +1017,22 @@ ssize_t flow_write(int fd,
memcpy(ptr, buf, count);
+ pthread_rwlock_wrlock(&ai.lock);
+
if (frcti_snd(flow->frcti, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- pthread_rwlock_wrlock(&ai.lock);
- if (flow->qs.cypher_s > 0)
+ if (flow->qs.cypher_s > 0) {
if (crypt_encrypt(flow, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
+ }
+
pthread_rwlock_unlock(&ai.lock);
if (flow->qs.ber == 0 && add_crc(sdb) != 0) {
@@ -1097,12 +1101,12 @@ ssize_t flow_read(int fd,
abstime = &abs;
}
- pthread_rwlock_unlock(&ai.lock);
-
idx = flow->part_idx;
+
if (idx < 0) {
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+
idx = noblock ? shm_rbuff_read(rb) :
shm_rbuff_read_b(rb, abstime);
if (idx < 0)
@@ -1110,20 +1114,28 @@ ssize_t flow_read(int fd,
sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
+ pthread_rwlock_rdlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
continue;
}
+ pthread_rwlock_rdlock(&ai.lock);
+
if (flow->qs.cypher_s > 0
&& crypt_decrypt(flow, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- idx = frcti_rcv(flow->frcti, sdb);
+ frcti_rcv(flow->frcti, sdb);
}
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
n = shm_rdrbuff_read(&packet, ai.rdrb, idx);
assert(n >= 0);
@@ -1144,7 +1156,9 @@ ssize_t flow_read(int fd,
memcpy(buf, packet, count);
sdb = shm_rdrbuff_get(ai.rdrb, idx);
shm_du_buff_head_release(sdb, n);
+ pthread_rwlock_wrlock(&ai.lock);
flow->part_idx = idx;
+ pthread_rwlock_unlock(&ai.lock);
return count;
} else {
shm_rdrbuff_remove(ai.rdrb, idx);
@@ -1295,6 +1309,11 @@ int fqueue_next(struct fqueue * fq)
pthread_rwlock_rdlock(&ai.lock);
+ if (fq->next != 0 && frcti_filter(fq) == 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -EPERM;
+ }
+
fd = ai.ports[fq->fqueue[fq->next]].fd;
fq->next += 2;
@@ -1319,7 +1338,7 @@ ssize_t fevent(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeo)
{
- ssize_t ret;
+ ssize_t ret = 0;
struct timespec abstime;
struct timespec * t = NULL;
@@ -1335,18 +1354,22 @@ ssize_t fevent(struct flow_set * set,
t = &abstime;
}
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
+ while (ret == 0) {
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
+ if (ret == -ETIMEDOUT) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
- fq->fqsize = ret << 1;
- fq->next = 0;
+ fq->fqsize = ret << 1;
+ fq->next = 0;
+
+ ret = frcti_filter(fq);
+ }
assert(ret);
- return ret;
+ return 1;
}
/* ipcp-dev functions. */
@@ -1509,7 +1532,7 @@ int ipcp_flow_read(int fd,
{
struct flow * flow;
struct shm_rbuff * rb;
- ssize_t idx;
+ ssize_t idx = -1;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
@@ -1522,20 +1545,26 @@ int ipcp_flow_read(int fd,
rb = flow->rx_rb;
- pthread_rwlock_unlock(&ai.lock);
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
idx = shm_rbuff_read(rb);
if (idx < 0)
return idx;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(*sdb) != 0)
continue;
- idx = frcti_rcv(flow->frcti, *sdb);
+ frcti_rcv(flow->frcti, *sdb);
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2322a039..2bd126f4 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,23 +21,25 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL (60 * MILLION) /* us */
-#define DELT_A (1 * MILLION) /* us */
-#define DELT_R (20 * MILLION) /* us */
+#define DELT_MPL (60 * BILLION) /* ns */
+#define DELT_A (1 * BILLION) /* ns */
+#define DELT_R (20 * BILLION) /* ns */
-#define RQ_SIZE 1024
+#define DELT_ACK (10 * MILLION) /* ns */
+
+#define RQ_SIZE 256
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
- uint32_t lwe;
- uint32_t rwe;
+ uint32_t lwe; /* Left window edge */
+ uint32_t rwe; /* Right window edge */
- uint8_t cflags;
- uint32_t seqno;
+ uint8_t cflags;
+ uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */
- time_t act; /* s */
- time_t inact; /* s */
+ struct timespec act; /* Last seen activity */
+ time_t inact; /* Inactivity (s) */
};
struct frcti {
@@ -47,12 +49,12 @@ struct frcti {
time_t a;
time_t r;
- time_t srtt_us; /* smoothed rtt */
- time_t mdev_us; /* mdev */
- time_t rto; /* retransmission timeout */
+ time_t srtt; /* Smoothed rtt */
+ time_t mdev; /* Deviation */
+ time_t rto; /* Retransmission timeout */
uint32_t rttseq;
- struct timespec t_probe; /* probe time */
- bool probe; /* probe active */
+ struct timespec t_probe; /* Probe time */
+ bool probe; /* Probe active */
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
@@ -74,7 +76,9 @@ enum frct_flags {
};
struct frct_pci {
- uint16_t flags;
+ uint8_t flags;
+
+ uint8_t pad;
uint16_t window;
@@ -87,9 +91,11 @@ struct frct_pci {
static struct frcti * frcti_create(int fd)
{
struct frcti * frcti;
- time_t delta_t;
ssize_t idx;
struct timespec now;
+ time_t mpl;
+ time_t a;
+ time_t r;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -103,25 +109,21 @@ static struct frcti * frcti_create(int fd)
for (idx = 0; idx < RQ_SIZE; ++idx)
frcti->rq[idx] = -1;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->mpl = DELT_MPL;
- frcti->a = DELT_A;
- frcti->r = DELT_R;
+ frcti->mpl = mpl = DELT_MPL;
+ frcti->a = a = DELT_A;
+ frcti->r = r = DELT_R;
frcti->fd = fd;
- delta_t = frcti->mpl + frcti->a + frcti->r;
- frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */
- frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->rttseq = 0;
+ frcti->probe = false;
- frcti->rttseq = 0;
- frcti->probe = false;
-
- frcti->srtt_us = 0; /* updated on first ACK */
- frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */
- frcti->rto = 20000; /* initial rxm will be after 20 ms */
- frcti->rw = NULL;
+ frcti->srtt = 0; /* Updated on first ACK */
+ frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
+ frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
+ frcti->rw = NULL;
if (ai.flows[fd].qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX;
@@ -131,9 +133,11 @@ static struct frcti * frcti_create(int fd)
goto fail_rw;
}
- frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */
- frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
+ frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1);
return frcti;
@@ -175,14 +179,18 @@ static uint16_t frcti_getconf(struct frcti * frcti)
return ret;
}
-#define frcti_queued_pdu(frcti) \
- (frcti == NULL ? -1 : __frcti_queued_pdu(frcti))
+#define frcti_queued_pdu(frcti) \
+ (frcti == NULL ? idx : __frcti_queued_pdu(frcti))
-#define frcti_snd(frcti, sdb) \
+#define frcti_snd(frcti, sdb) \
(frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? idx : __frcti_rcv(frcti, sdb))
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+
+#define frcti_tick(frcti) \
+ (frcti == NULL ? 0 : __frcti_tick(frcti))
+
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -207,15 +215,22 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
return idx;
}
-static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
+static ssize_t __frcti_pdu_ready(struct frcti * frcti)
{
- struct frct_pci * pci;
+ ssize_t idx;
+ size_t pos;
- pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
- if (pci != NULL)
- memset(pci, 0, sizeof(*pci));
+ assert(frcti);
- return pci;
+ /* See if we already have the next PDU. */
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+ idx = frcti->rq[pos];
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return idx;
}
static bool before(uint32_t seq1,
@@ -230,6 +245,68 @@ static bool after(uint32_t seq1,
return (int32_t)(seq2 - seq1) < 0;
}
+static void frct_send_ack(struct frcti * frcti)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ struct flow * f;
+
+ assert(frcti);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ ackno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+
+ if (diff > frcti->a)
+ return;
+
+ if (diff < DELT_ACK)
+ return;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_ACK;
+ pci->ackno = hton32(ackno);
+
+ f = &ai.flows[frcti->fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ pthread_rwlock_rdlock(&ai.lock);
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
static int __frcti_snd(struct frcti * frcti,
struct shm_du_buff * sdb)
{
@@ -247,11 +324,13 @@ static int __frcti_snd(struct frcti * frcti,
if (frcti->rw != NULL)
rxmwheel_move(frcti->rw);
- pci = frcti_alloc_head(sdb);
+ pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
return -1;
- clock_gettime(CLOCK_REALTIME, &now);
+ memset(pci, 0, sizeof(*pci));
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&frcti->lock);
@@ -262,7 +341,7 @@ static int __frcti_snd(struct frcti * frcti,
pci->flags |= FRCT_DRF;
/* Choose a new sequence number if sender inactivity expired. */
- if (now.tv_sec - snd_cr->act > snd_cr->inact) {
+ if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) {
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
@@ -281,14 +360,15 @@ static int __frcti_snd(struct frcti * frcti,
frcti->probe = true;
}
- if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
+ rcv_cr->seqno = rcv_cr->lwe;
}
}
snd_cr->seqno++;
- snd_cr->act = now.tv_sec;
+ snd_cr->act = now;
pthread_rwlock_unlock(&frcti->lock);
@@ -299,104 +379,158 @@ static int __frcti_snd(struct frcti * frcti,
}
static void rtt_estimator(struct frcti * frcti,
- time_t mrtt_us)
+ time_t mrtt)
{
- time_t srtt = frcti->srtt_us;
- time_t rttvar = frcti->mdev_us;
+ time_t srtt = frcti->srtt;
+ time_t rttvar = frcti->mdev;
if (srtt == 0) { /* first measurement */
- srtt = mrtt_us;
- rttvar = mrtt_us >> 1;
+ srtt = mrtt;
+ rttvar = mrtt >> 1;
} else {
- time_t delta = mrtt_us - srtt;
+ time_t delta = mrtt - srtt;
srtt += (delta >> 3);
rttvar -= rttvar >> 2;
rttvar += ABS(delta) >> 2;
}
- frcti->srtt_us = MAX(1U, srtt);
- frcti->mdev_us = MAX(1U, rttvar);
- frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2));
+ frcti->srtt = MAX(1000U, srtt);
+ frcti->mdev = MAX(100U, rttvar);
+ frcti->rto = MAX(RTO_MIN * 1000,
+ frcti->srtt + (frcti->mdev << 1));
+}
+
+static void __frcti_tick(struct frcti * frcti)
+{
+ if (frcti->rw != NULL) {
+ rxmwheel_move(frcti->rw);
+ frct_send_ack(frcti);
+ }
}
-/* Always queues the packet on the RQ for the application. */
-static ssize_t __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+/* Always queues the next application packet on the RQ. */
+static void __frcti_rcv(struct frcti * frcti,
+ struct shm_du_buff * sdb)
{
ssize_t idx;
+ size_t pos;
struct frct_pci * pci;
struct timespec now;
- struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
+ uint32_t ackno;
assert(frcti);
rcv_cr = &frcti->rcv_cr;
- snd_cr = &frcti->snd_cr;
- clock_gettime(CLOCK_REALTIME, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
idx = shm_du_buff_get_idx(sdb);
seqno = ntoh32(pci->seqno);
+ pos = seqno & (RQ_SIZE - 1);
pthread_rwlock_wrlock(&frcti->lock);
- if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
if (pci->flags & FRCT_DRF) /* New run. */
rcv_cr->lwe = seqno;
else
goto drop_packet;
}
- if (before(seqno, rcv_cr->lwe))
- goto drop_packet;
+ if (pci->flags & FRCT_ACK) {
+ ackno = ntoh32(pci->ackno);
+ if (after(ackno, frcti->snd_cr.lwe))
+ frcti->snd_cr.lwe = ackno;
- if (rcv_cr->cflags & FRCTFRTX) {
- if (pci->flags & FRCT_ACK) {
- uint32_t ackno = ntoh32(pci->ackno);
- /* Check for duplicate (old) acks. */
- if (after(ackno, snd_cr->lwe))
- snd_cr->lwe = ackno;
-
- if (frcti->probe && after(ackno, frcti->rttseq)) {
- rtt_estimator(frcti, ts_diff_us(&frcti->t_probe,
- &now));
- frcti->probe = false;
- }
+ if (frcti->probe && after(ackno, frcti->rttseq)) {
+ rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
+ frcti->probe = false;
}
+ }
- if (seqno == rcv_cr->lwe) {
- ++frcti->rcv_cr.lwe;
- } else {
- size_t pos = seqno & (RQ_SIZE - 1);
- if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
- goto drop_packet; /* Out of rq. */
+ if (!(pci->flags & FRCT_DATA))
+ goto drop_packet;
- if (frcti->rq[pos] != -1)
- goto drop_packet; /* Duplicate in rq */
+ if (before(seqno, rcv_cr->lwe))
+ goto drop_packet;
- frcti->rq[pos] = idx;
- idx = -EAGAIN;
- }
+ if (rcv_cr->cflags & FRCTFRTX) {
+ if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
+ goto drop_packet; /* Out of rq. */
+
+ if (frcti->rq[pos] != -1)
+ goto drop_packet; /* Duplicate in rq. */
} else {
- rcv_cr->lwe = seqno + 1;
+ rcv_cr->lwe = seqno;
}
- rcv_cr->act = now.tv_sec;
+ frcti->rq[pos] = idx;
- pthread_rwlock_unlock(&frcti->lock);
+ rcv_cr->act = now;
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ pthread_rwlock_unlock(&frcti->lock);
- return idx;
+ return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
+ return;
+}
+
+/* Filter fqueue events for non-data packets */
+int frcti_filter(struct fqueue * fq)
+{
+ struct shm_du_buff * sdb;
+ int fd;
+ ssize_t idx;
+ struct frcti * frcti;
+ struct shm_rbuff * rb;
+
+ while(fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next + 1] != FLOW_PKT)
+ return 1;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ fd = ai.ports[fq->fqueue[fq->next]].fd;
+ rb = ai.flows[fd].rx_rb;
+ frcti = ai.flows[fd].frcti;
+
+ if (frcti == NULL) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ idx = shm_rbuff_read(rb);
+ if (idx < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 0;
+ }
+
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ __frcti_rcv(frcti, sdb);
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ fq->next += 2;
+ }
+
+ return fq->next < fq->fqsize;
}
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 9602c5f9..0572c7b7 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -22,15 +22,15 @@
#include <ouroboros/list.h>
-#define RXMQ_S 16 /* defines #slots */
-#define RXMQ_M 24 /* defines max delay (us) */
-#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */
+#define RXMQ_S 14 /* defines #slots */
+#define RXMQ_M 34 /* defines max delay (ns) */
+#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */
#define RXMQ_SLOTS (1 << RXMQ_S)
#define RXMQ_MAX (1 << RXMQ_M) /* us */
-/* Small inacurracy to avoid slow division by MILLION. */
-#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
-#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
+/* Overflow limits range to about 6 hours. */
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
struct rxm {
struct list_head next;
@@ -95,22 +95,6 @@ static struct rxmwheel * rxmwheel_create(void)
return rw;
}
-static void check_probe(struct frcti * frcti,
- uint32_t seqno)
-{
- /* Disable rtt probe on retransmitted packet! */
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
- /* Backoff to avoid never updating rtt */
- frcti->srtt_us += frcti->mdev_us;
- frcti->probe = false;
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-}
-
static void rxmwheel_move(struct rxmwheel * rw)
{
struct timespec now;
@@ -159,11 +143,15 @@ static void rxmwheel_move(struct rxmwheel * rw)
shm_du_buff_ack(r->sdb);
- pthread_rwlock_rdlock(&r->frcti->lock);
+ pthread_rwlock_wrlock(&r->frcti->lock);
snd_lwe = snd_cr->lwe;
rcv_lwe = rcv_cr->lwe;
rto = r->frcti->rto;
+ /* Assume last RTX is the one that's ACK'd. */
+ if (r->frcti->probe
+ && (r->frcti->rttseq + 1) == r->seqno)
+ r->frcti->t_probe = now;
pthread_rwlock_unlock(&r->frcti->lock);
@@ -175,13 +163,11 @@ static void rxmwheel_move(struct rxmwheel * rw)
}
/* Check for r-timer expiry. */
- if (ts_to_us(now) - r->t0 > r->frcti->r) {
+ if (ts_to_ns(now) - r->t0 > r->frcti->r) {
ipcp_sdb_release(r->sdb);
free(r);
- shm_rbuff_set_acl(ai.flows[fd].rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(ai.flows[fd].tx_rb,
- ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
continue;
}
@@ -201,9 +187,7 @@ static void rxmwheel_move(struct rxmwheel * rw)
ipcp_sdb_release(r->sdb);
- check_probe(r->frcti, r->seqno);
-
- ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);
+ ((struct frct_pci *) head)->ackno = hton32(rcv_lwe);
/* Retransmit the copy. */
if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
@@ -224,7 +208,7 @@ static void rxmwheel_move(struct rxmwheel * rw)
r->sdb = sdb;
/* Schedule at least in the next time slot */
- rslot = (slot + MAX(rto >> RXMQ_R, 1))
+ rslot = (slot + MAX((rto >> RXMQ_R), 1))
& (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw->wheel[rslot]);
@@ -251,7 +235,7 @@ static int rxmwheel_add(struct rxmwheel * rw,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- r->t0 = ts_to_us(now);
+ r->t0 = ts_to_ns(now);
r->mul = 0;
r->seqno = seqno;
r->sdb = sdb;