summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/fqueue.33
-rw-r--r--src/lib/dev.c71
-rw-r--r--src/lib/frct.c326
-rw-r--r--src/lib/rxmwheel.c50
4 files changed, 298 insertions, 152 deletions
diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3
index cbc8ee4e..85e12518 100644
--- a/doc/man/fqueue.3
+++ b/doc/man/fqueue.3
@@ -68,8 +68,7 @@ On success, \fBfqueue_create\fR() returns a pointer to an
\fBfqueue_destroy\fR() has no return value.
-On success, \fBfevent\fR() returns the number of events that occured
-in \fIset\fR.
+On success, \fBfevent\fR() returns 1.
On success, \fBfqueue_next\fR() returns the next file descriptor for
which an event occurred.
diff --git a/src/lib/dev.c b/src/lib/dev.c
index efd08146..df616ead 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1017,18 +1017,22 @@ ssize_t flow_write(int fd,
memcpy(ptr, buf, count);
+ pthread_rwlock_wrlock(&ai.lock);
+
if (frcti_snd(flow->frcti, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- pthread_rwlock_wrlock(&ai.lock);
- if (flow->qs.cypher_s > 0)
+ if (flow->qs.cypher_s > 0) {
if (crypt_encrypt(flow, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
+ }
+
pthread_rwlock_unlock(&ai.lock);
if (flow->qs.ber == 0 && add_crc(sdb) != 0) {
@@ -1097,12 +1101,12 @@ ssize_t flow_read(int fd,
abstime = &abs;
}
- pthread_rwlock_unlock(&ai.lock);
-
idx = flow->part_idx;
+
if (idx < 0) {
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+
idx = noblock ? shm_rbuff_read(rb) :
shm_rbuff_read_b(rb, abstime);
if (idx < 0)
@@ -1110,20 +1114,28 @@ ssize_t flow_read(int fd,
sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
+ pthread_rwlock_rdlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
continue;
}
+ pthread_rwlock_rdlock(&ai.lock);
+
if (flow->qs.cypher_s > 0
&& crypt_decrypt(flow, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- idx = frcti_rcv(flow->frcti, sdb);
+ frcti_rcv(flow->frcti, sdb);
}
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
n = shm_rdrbuff_read(&packet, ai.rdrb, idx);
assert(n >= 0);
@@ -1144,7 +1156,9 @@ ssize_t flow_read(int fd,
memcpy(buf, packet, count);
sdb = shm_rdrbuff_get(ai.rdrb, idx);
shm_du_buff_head_release(sdb, n);
+ pthread_rwlock_wrlock(&ai.lock);
flow->part_idx = idx;
+ pthread_rwlock_unlock(&ai.lock);
return count;
} else {
shm_rdrbuff_remove(ai.rdrb, idx);
@@ -1295,6 +1309,11 @@ int fqueue_next(struct fqueue * fq)
pthread_rwlock_rdlock(&ai.lock);
+ if (fq->next != 0 && frcti_filter(fq) == 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -EPERM;
+ }
+
fd = ai.ports[fq->fqueue[fq->next]].fd;
fq->next += 2;
@@ -1319,7 +1338,7 @@ ssize_t fevent(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeo)
{
- ssize_t ret;
+ ssize_t ret = 0;
struct timespec abstime;
struct timespec * t = NULL;
@@ -1335,18 +1354,22 @@ ssize_t fevent(struct flow_set * set,
t = &abstime;
}
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
+ while (ret == 0) {
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
+ if (ret == -ETIMEDOUT) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
- fq->fqsize = ret << 1;
- fq->next = 0;
+ fq->fqsize = ret << 1;
+ fq->next = 0;
+
+ ret = frcti_filter(fq);
+ }
assert(ret);
- return ret;
+ return 1;
}
/* ipcp-dev functions. */
@@ -1509,7 +1532,7 @@ int ipcp_flow_read(int fd,
{
struct flow * flow;
struct shm_rbuff * rb;
- ssize_t idx;
+ ssize_t idx = -1;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
@@ -1522,20 +1545,26 @@ int ipcp_flow_read(int fd,
rb = flow->rx_rb;
- pthread_rwlock_unlock(&ai.lock);
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
idx = shm_rbuff_read(rb);
if (idx < 0)
return idx;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(*sdb) != 0)
continue;
- idx = frcti_rcv(flow->frcti, *sdb);
+ frcti_rcv(flow->frcti, *sdb);
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2322a039..2bd126f4 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,23 +21,25 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL (60 * MILLION) /* us */
-#define DELT_A (1 * MILLION) /* us */
-#define DELT_R (20 * MILLION) /* us */
+#define DELT_MPL (60 * BILLION) /* ns */
+#define DELT_A (1 * BILLION) /* ns */
+#define DELT_R (20 * BILLION) /* ns */
-#define RQ_SIZE 1024
+#define DELT_ACK (10 * MILLION) /* ns */
+
+#define RQ_SIZE 256
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
- uint32_t lwe;
- uint32_t rwe;
+ uint32_t lwe; /* Left window edge */
+ uint32_t rwe; /* Right window edge */
- uint8_t cflags;
- uint32_t seqno;
+ uint8_t cflags;
+ uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */
- time_t act; /* s */
- time_t inact; /* s */
+ struct timespec act; /* Last seen activity */
+ time_t inact; /* Inactivity (s) */
};
struct frcti {
@@ -47,12 +49,12 @@ struct frcti {
time_t a;
time_t r;
- time_t srtt_us; /* smoothed rtt */
- time_t mdev_us; /* mdev */
- time_t rto; /* retransmission timeout */
+ time_t srtt; /* Smoothed rtt */
+ time_t mdev; /* Deviation */
+ time_t rto; /* Retransmission timeout */
uint32_t rttseq;
- struct timespec t_probe; /* probe time */
- bool probe; /* probe active */
+ struct timespec t_probe; /* Probe time */
+ bool probe; /* Probe active */
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
@@ -74,7 +76,9 @@ enum frct_flags {
};
struct frct_pci {
- uint16_t flags;
+ uint8_t flags;
+
+ uint8_t pad;
uint16_t window;
@@ -87,9 +91,11 @@ struct frct_pci {
static struct frcti * frcti_create(int fd)
{
struct frcti * frcti;
- time_t delta_t;
ssize_t idx;
struct timespec now;
+ time_t mpl;
+ time_t a;
+ time_t r;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -103,25 +109,21 @@ static struct frcti * frcti_create(int fd)
for (idx = 0; idx < RQ_SIZE; ++idx)
frcti->rq[idx] = -1;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->mpl = DELT_MPL;
- frcti->a = DELT_A;
- frcti->r = DELT_R;
+ frcti->mpl = mpl = DELT_MPL;
+ frcti->a = a = DELT_A;
+ frcti->r = r = DELT_R;
frcti->fd = fd;
- delta_t = frcti->mpl + frcti->a + frcti->r;
- frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */
- frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->rttseq = 0;
+ frcti->probe = false;
- frcti->rttseq = 0;
- frcti->probe = false;
-
- frcti->srtt_us = 0; /* updated on first ACK */
- frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */
- frcti->rto = 20000; /* initial rxm will be after 20 ms */
- frcti->rw = NULL;
+ frcti->srtt = 0; /* Updated on first ACK */
+ frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
+ frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
+ frcti->rw = NULL;
if (ai.flows[fd].qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX;
@@ -131,9 +133,11 @@ static struct frcti * frcti_create(int fd)
goto fail_rw;
}
- frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */
- frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
+ frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1);
return frcti;
@@ -175,14 +179,18 @@ static uint16_t frcti_getconf(struct frcti * frcti)
return ret;
}
-#define frcti_queued_pdu(frcti) \
- (frcti == NULL ? -1 : __frcti_queued_pdu(frcti))
+#define frcti_queued_pdu(frcti) \
+ (frcti == NULL ? idx : __frcti_queued_pdu(frcti))
-#define frcti_snd(frcti, sdb) \
+#define frcti_snd(frcti, sdb) \
(frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? idx : __frcti_rcv(frcti, sdb))
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+
+#define frcti_tick(frcti) \
+ (frcti == NULL ? 0 : __frcti_tick(frcti))
+
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -207,15 +215,22 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
return idx;
}
-static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
+static ssize_t __frcti_pdu_ready(struct frcti * frcti)
{
- struct frct_pci * pci;
+ ssize_t idx;
+ size_t pos;
- pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
- if (pci != NULL)
- memset(pci, 0, sizeof(*pci));
+ assert(frcti);
- return pci;
+ /* See if we already have the next PDU. */
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+ idx = frcti->rq[pos];
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return idx;
}
static bool before(uint32_t seq1,
@@ -230,6 +245,68 @@ static bool after(uint32_t seq1,
return (int32_t)(seq2 - seq1) < 0;
}
+static void frct_send_ack(struct frcti * frcti)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ struct flow * f;
+
+ assert(frcti);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ ackno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+
+ if (diff > frcti->a)
+ return;
+
+ if (diff < DELT_ACK)
+ return;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_ACK;
+ pci->ackno = hton32(ackno);
+
+ f = &ai.flows[frcti->fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ pthread_rwlock_rdlock(&ai.lock);
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
static int __frcti_snd(struct frcti * frcti,
struct shm_du_buff * sdb)
{
@@ -247,11 +324,13 @@ static int __frcti_snd(struct frcti * frcti,
if (frcti->rw != NULL)
rxmwheel_move(frcti->rw);
- pci = frcti_alloc_head(sdb);
+ pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
return -1;
- clock_gettime(CLOCK_REALTIME, &now);
+ memset(pci, 0, sizeof(*pci));
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&frcti->lock);
@@ -262,7 +341,7 @@ static int __frcti_snd(struct frcti * frcti,
pci->flags |= FRCT_DRF;
/* Choose a new sequence number if sender inactivity expired. */
- if (now.tv_sec - snd_cr->act > snd_cr->inact) {
+ if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) {
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
@@ -281,14 +360,15 @@ static int __frcti_snd(struct frcti * frcti,
frcti->probe = true;
}
- if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
+ rcv_cr->seqno = rcv_cr->lwe;
}
}
snd_cr->seqno++;
- snd_cr->act = now.tv_sec;
+ snd_cr->act = now;
pthread_rwlock_unlock(&frcti->lock);
@@ -299,104 +379,158 @@ static int __frcti_snd(struct frcti * frcti,
}
static void rtt_estimator(struct frcti * frcti,
- time_t mrtt_us)
+ time_t mrtt)
{
- time_t srtt = frcti->srtt_us;
- time_t rttvar = frcti->mdev_us;
+ time_t srtt = frcti->srtt;
+ time_t rttvar = frcti->mdev;
if (srtt == 0) { /* first measurement */
- srtt = mrtt_us;
- rttvar = mrtt_us >> 1;
+ srtt = mrtt;
+ rttvar = mrtt >> 1;
} else {
- time_t delta = mrtt_us - srtt;
+ time_t delta = mrtt - srtt;
srtt += (delta >> 3);
rttvar -= rttvar >> 2;
rttvar += ABS(delta) >> 2;
}
- frcti->srtt_us = MAX(1U, srtt);
- frcti->mdev_us = MAX(1U, rttvar);
- frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2));
+ frcti->srtt = MAX(1000U, srtt);
+ frcti->mdev = MAX(100U, rttvar);
+ frcti->rto = MAX(RTO_MIN * 1000,
+ frcti->srtt + (frcti->mdev << 1));
+}
+
+static void __frcti_tick(struct frcti * frcti)
+{
+ if (frcti->rw != NULL) {
+ rxmwheel_move(frcti->rw);
+ frct_send_ack(frcti);
+ }
}
-/* Always queues the packet on the RQ for the application. */
-static ssize_t __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+/* Always queues the next application packet on the RQ. */
+static void __frcti_rcv(struct frcti * frcti,
+ struct shm_du_buff * sdb)
{
ssize_t idx;
+ size_t pos;
struct frct_pci * pci;
struct timespec now;
- struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
+ uint32_t ackno;
assert(frcti);
rcv_cr = &frcti->rcv_cr;
- snd_cr = &frcti->snd_cr;
- clock_gettime(CLOCK_REALTIME, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
idx = shm_du_buff_get_idx(sdb);
seqno = ntoh32(pci->seqno);
+ pos = seqno & (RQ_SIZE - 1);
pthread_rwlock_wrlock(&frcti->lock);
- if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
if (pci->flags & FRCT_DRF) /* New run. */
rcv_cr->lwe = seqno;
else
goto drop_packet;
}
- if (before(seqno, rcv_cr->lwe))
- goto drop_packet;
+ if (pci->flags & FRCT_ACK) {
+ ackno = ntoh32(pci->ackno);
+ if (after(ackno, frcti->snd_cr.lwe))
+ frcti->snd_cr.lwe = ackno;
- if (rcv_cr->cflags & FRCTFRTX) {
- if (pci->flags & FRCT_ACK) {
- uint32_t ackno = ntoh32(pci->ackno);
- /* Check for duplicate (old) acks. */
- if (after(ackno, snd_cr->lwe))
- snd_cr->lwe = ackno;
-
- if (frcti->probe && after(ackno, frcti->rttseq)) {
- rtt_estimator(frcti, ts_diff_us(&frcti->t_probe,
- &now));
- frcti->probe = false;
- }
+ if (frcti->probe && after(ackno, frcti->rttseq)) {
+ rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
+ frcti->probe = false;
}
+ }
- if (seqno == rcv_cr->lwe) {
- ++frcti->rcv_cr.lwe;
- } else {
- size_t pos = seqno & (RQ_SIZE - 1);
- if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
- goto drop_packet; /* Out of rq. */
+ if (!(pci->flags & FRCT_DATA))
+ goto drop_packet;
- if (frcti->rq[pos] != -1)
- goto drop_packet; /* Duplicate in rq */
+ if (before(seqno, rcv_cr->lwe))
+ goto drop_packet;
- frcti->rq[pos] = idx;
- idx = -EAGAIN;
- }
+ if (rcv_cr->cflags & FRCTFRTX) {
+ if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
+ goto drop_packet; /* Out of rq. */
+
+ if (frcti->rq[pos] != -1)
+ goto drop_packet; /* Duplicate in rq. */
} else {
- rcv_cr->lwe = seqno + 1;
+ rcv_cr->lwe = seqno;
}
- rcv_cr->act = now.tv_sec;
+ frcti->rq[pos] = idx;
- pthread_rwlock_unlock(&frcti->lock);
+ rcv_cr->act = now;
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ pthread_rwlock_unlock(&frcti->lock);
- return idx;
+ return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
+ return;
+}
+
+/* Filter fqueue events for non-data packets */
+int frcti_filter(struct fqueue * fq)
+{
+ struct shm_du_buff * sdb;
+ int fd;
+ ssize_t idx;
+ struct frcti * frcti;
+ struct shm_rbuff * rb;
+
+ while(fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next + 1] != FLOW_PKT)
+ return 1;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ fd = ai.ports[fq->fqueue[fq->next]].fd;
+ rb = ai.flows[fd].rx_rb;
+ frcti = ai.flows[fd].frcti;
+
+ if (frcti == NULL) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ idx = shm_rbuff_read(rb);
+ if (idx < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 0;
+ }
+
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ __frcti_rcv(frcti, sdb);
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ fq->next += 2;
+ }
+
+ return fq->next < fq->fqsize;
}
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 9602c5f9..0572c7b7 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -22,15 +22,15 @@
#include <ouroboros/list.h>
-#define RXMQ_S 16 /* defines #slots */
-#define RXMQ_M 24 /* defines max delay (us) */
-#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */
+#define RXMQ_S 14 /* defines #slots */
+#define RXMQ_M 34 /* defines max delay (ns) */
+#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */
#define RXMQ_SLOTS (1 << RXMQ_S)
#define RXMQ_MAX (1 << RXMQ_M) /* us */
-/* Small inacurracy to avoid slow division by MILLION. */
-#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
-#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
+/* Overflow limits range to about 6 hours. */
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
struct rxm {
struct list_head next;
@@ -95,22 +95,6 @@ static struct rxmwheel * rxmwheel_create(void)
return rw;
}
-static void check_probe(struct frcti * frcti,
- uint32_t seqno)
-{
- /* Disable rtt probe on retransmitted packet! */
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
- /* Backoff to avoid never updating rtt */
- frcti->srtt_us += frcti->mdev_us;
- frcti->probe = false;
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-}
-
static void rxmwheel_move(struct rxmwheel * rw)
{
struct timespec now;
@@ -159,11 +143,15 @@ static void rxmwheel_move(struct rxmwheel * rw)
shm_du_buff_ack(r->sdb);
- pthread_rwlock_rdlock(&r->frcti->lock);
+ pthread_rwlock_wrlock(&r->frcti->lock);
snd_lwe = snd_cr->lwe;
rcv_lwe = rcv_cr->lwe;
rto = r->frcti->rto;
+ /* Assume last RTX is the one that's ACK'd. */
+ if (r->frcti->probe
+ && (r->frcti->rttseq + 1) == r->seqno)
+ r->frcti->t_probe = now;
pthread_rwlock_unlock(&r->frcti->lock);
@@ -175,13 +163,11 @@ static void rxmwheel_move(struct rxmwheel * rw)
}
/* Check for r-timer expiry. */
- if (ts_to_us(now) - r->t0 > r->frcti->r) {
+ if (ts_to_ns(now) - r->t0 > r->frcti->r) {
ipcp_sdb_release(r->sdb);
free(r);
- shm_rbuff_set_acl(ai.flows[fd].rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(ai.flows[fd].tx_rb,
- ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
continue;
}
@@ -201,9 +187,7 @@ static void rxmwheel_move(struct rxmwheel * rw)
ipcp_sdb_release(r->sdb);
- check_probe(r->frcti, r->seqno);
-
- ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);
+ ((struct frct_pci *) head)->ackno = hton32(rcv_lwe);
/* Retransmit the copy. */
if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
@@ -224,7 +208,7 @@ static void rxmwheel_move(struct rxmwheel * rw)
r->sdb = sdb;
/* Schedule at least in the next time slot */
- rslot = (slot + MAX(rto >> RXMQ_R, 1))
+ rslot = (slot + MAX((rto >> RXMQ_R), 1))
& (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw->wheel[rslot]);
@@ -251,7 +235,7 @@ static int rxmwheel_add(struct rxmwheel * rw,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- r->t0 = ts_to_us(now);
+ r->t0 = ts_to_ns(now);
r->mul = 0;
r->seqno = seqno;
r->sdb = sdb;