summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c326
1 files changed, 230 insertions, 96 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2322a039..2bd126f4 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,23 +21,25 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL (60 * MILLION) /* us */
-#define DELT_A (1 * MILLION) /* us */
-#define DELT_R (20 * MILLION) /* us */
+#define DELT_MPL (60 * BILLION) /* ns */
+#define DELT_A (1 * BILLION) /* ns */
+#define DELT_R (20 * BILLION) /* ns */
-#define RQ_SIZE 1024
+#define DELT_ACK (10 * MILLION) /* ns */
+
+#define RQ_SIZE 256
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
- uint32_t lwe;
- uint32_t rwe;
+ uint32_t lwe; /* Left window edge */
+ uint32_t rwe; /* Right window edge */
- uint8_t cflags;
- uint32_t seqno;
+ uint8_t cflags;
+ uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */
- time_t act; /* s */
- time_t inact; /* s */
+ struct timespec act; /* Last seen activity */
+ time_t inact; /* Inactivity (s) */
};
struct frcti {
@@ -47,12 +49,12 @@ struct frcti {
time_t a;
time_t r;
- time_t srtt_us; /* smoothed rtt */
- time_t mdev_us; /* mdev */
- time_t rto; /* retransmission timeout */
+ time_t srtt; /* Smoothed rtt */
+ time_t mdev; /* Deviation */
+ time_t rto; /* Retransmission timeout */
uint32_t rttseq;
- struct timespec t_probe; /* probe time */
- bool probe; /* probe active */
+ struct timespec t_probe; /* Probe time */
+ bool probe; /* Probe active */
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
@@ -74,7 +76,9 @@ enum frct_flags {
};
struct frct_pci {
- uint16_t flags;
+ uint8_t flags;
+
+ uint8_t pad;
uint16_t window;
@@ -87,9 +91,11 @@ struct frct_pci {
static struct frcti * frcti_create(int fd)
{
struct frcti * frcti;
- time_t delta_t;
ssize_t idx;
struct timespec now;
+ time_t mpl;
+ time_t a;
+ time_t r;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -103,25 +109,21 @@ static struct frcti * frcti_create(int fd)
for (idx = 0; idx < RQ_SIZE; ++idx)
frcti->rq[idx] = -1;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
- frcti->mpl = DELT_MPL;
- frcti->a = DELT_A;
- frcti->r = DELT_R;
+ frcti->mpl = mpl = DELT_MPL;
+ frcti->a = a = DELT_A;
+ frcti->r = r = DELT_R;
frcti->fd = fd;
- delta_t = frcti->mpl + frcti->a + frcti->r;
- frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */
- frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->rttseq = 0;
+ frcti->probe = false;
- frcti->rttseq = 0;
- frcti->probe = false;
-
- frcti->srtt_us = 0; /* updated on first ACK */
- frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */
- frcti->rto = 20000; /* initial rxm will be after 20 ms */
- frcti->rw = NULL;
+ frcti->srtt = 0; /* Updated on first ACK */
+ frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
+ frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
+ frcti->rw = NULL;
if (ai.flows[fd].qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX;
@@ -131,9 +133,11 @@ static struct frcti * frcti_create(int fd)
goto fail_rw;
}
- frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */
- frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
+ frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */
+ frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1);
return frcti;
@@ -175,14 +179,18 @@ static uint16_t frcti_getconf(struct frcti * frcti)
return ret;
}
-#define frcti_queued_pdu(frcti) \
- (frcti == NULL ? -1 : __frcti_queued_pdu(frcti))
+#define frcti_queued_pdu(frcti) \
+ (frcti == NULL ? idx : __frcti_queued_pdu(frcti))
-#define frcti_snd(frcti, sdb) \
+#define frcti_snd(frcti, sdb) \
(frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? idx : __frcti_rcv(frcti, sdb))
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+
+#define frcti_tick(frcti) \
+ (frcti == NULL ? 0 : __frcti_tick(frcti))
+
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -207,15 +215,22 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
return idx;
}
-static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
+static ssize_t __frcti_pdu_ready(struct frcti * frcti)
{
- struct frct_pci * pci;
+ ssize_t idx;
+ size_t pos;
- pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
- if (pci != NULL)
- memset(pci, 0, sizeof(*pci));
+ assert(frcti);
- return pci;
+ /* See if we already have the next PDU. */
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+ idx = frcti->rq[pos];
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return idx;
}
static bool before(uint32_t seq1,
@@ -230,6 +245,68 @@ static bool after(uint32_t seq1,
return (int32_t)(seq2 - seq1) < 0;
}
+static void frct_send_ack(struct frcti * frcti)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ struct flow * f;
+
+ assert(frcti);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ ackno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+
+ if (diff > frcti->a)
+ return;
+
+ if (diff < DELT_ACK)
+ return;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_ACK;
+ pci->ackno = hton32(ackno);
+
+ f = &ai.flows[frcti->fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ pthread_rwlock_rdlock(&ai.lock);
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
static int __frcti_snd(struct frcti * frcti,
struct shm_du_buff * sdb)
{
@@ -247,11 +324,13 @@ static int __frcti_snd(struct frcti * frcti,
if (frcti->rw != NULL)
rxmwheel_move(frcti->rw);
- pci = frcti_alloc_head(sdb);
+ pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
return -1;
- clock_gettime(CLOCK_REALTIME, &now);
+ memset(pci, 0, sizeof(*pci));
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&frcti->lock);
@@ -262,7 +341,7 @@ static int __frcti_snd(struct frcti * frcti,
pci->flags |= FRCT_DRF;
/* Choose a new sequence number if sender inactivity expired. */
- if (now.tv_sec - snd_cr->act > snd_cr->inact) {
+ if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) {
/* There are no unacknowledged packets. */
assert(snd_cr->seqno == snd_cr->lwe);
random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
@@ -281,14 +360,15 @@ static int __frcti_snd(struct frcti * frcti,
frcti->probe = true;
}
- if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
+ rcv_cr->seqno = rcv_cr->lwe;
}
}
snd_cr->seqno++;
- snd_cr->act = now.tv_sec;
+ snd_cr->act = now;
pthread_rwlock_unlock(&frcti->lock);
@@ -299,104 +379,158 @@ static int __frcti_snd(struct frcti * frcti,
}
static void rtt_estimator(struct frcti * frcti,
- time_t mrtt_us)
+ time_t mrtt)
{
- time_t srtt = frcti->srtt_us;
- time_t rttvar = frcti->mdev_us;
+ time_t srtt = frcti->srtt;
+ time_t rttvar = frcti->mdev;
if (srtt == 0) { /* first measurement */
- srtt = mrtt_us;
- rttvar = mrtt_us >> 1;
+ srtt = mrtt;
+ rttvar = mrtt >> 1;
} else {
- time_t delta = mrtt_us - srtt;
+ time_t delta = mrtt - srtt;
srtt += (delta >> 3);
rttvar -= rttvar >> 2;
rttvar += ABS(delta) >> 2;
}
- frcti->srtt_us = MAX(1U, srtt);
- frcti->mdev_us = MAX(1U, rttvar);
- frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2));
+ frcti->srtt = MAX(1000U, srtt);
+ frcti->mdev = MAX(100U, rttvar);
+ frcti->rto = MAX(RTO_MIN * 1000,
+ frcti->srtt + (frcti->mdev << 1));
+}
+
+static void __frcti_tick(struct frcti * frcti)
+{
+ if (frcti->rw != NULL) {
+ rxmwheel_move(frcti->rw);
+ frct_send_ack(frcti);
+ }
}
-/* Always queues the packet on the RQ for the application. */
-static ssize_t __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+/* Always queues the next application packet on the RQ. */
+static void __frcti_rcv(struct frcti * frcti,
+ struct shm_du_buff * sdb)
{
ssize_t idx;
+ size_t pos;
struct frct_pci * pci;
struct timespec now;
- struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
+ uint32_t ackno;
assert(frcti);
rcv_cr = &frcti->rcv_cr;
- snd_cr = &frcti->snd_cr;
- clock_gettime(CLOCK_REALTIME, &now);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
idx = shm_du_buff_get_idx(sdb);
seqno = ntoh32(pci->seqno);
+ pos = seqno & (RQ_SIZE - 1);
pthread_rwlock_wrlock(&frcti->lock);
- if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
+ if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
if (pci->flags & FRCT_DRF) /* New run. */
rcv_cr->lwe = seqno;
else
goto drop_packet;
}
- if (before(seqno, rcv_cr->lwe))
- goto drop_packet;
+ if (pci->flags & FRCT_ACK) {
+ ackno = ntoh32(pci->ackno);
+ if (after(ackno, frcti->snd_cr.lwe))
+ frcti->snd_cr.lwe = ackno;
- if (rcv_cr->cflags & FRCTFRTX) {
- if (pci->flags & FRCT_ACK) {
- uint32_t ackno = ntoh32(pci->ackno);
- /* Check for duplicate (old) acks. */
- if (after(ackno, snd_cr->lwe))
- snd_cr->lwe = ackno;
-
- if (frcti->probe && after(ackno, frcti->rttseq)) {
- rtt_estimator(frcti, ts_diff_us(&frcti->t_probe,
- &now));
- frcti->probe = false;
- }
+ if (frcti->probe && after(ackno, frcti->rttseq)) {
+ rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
+ frcti->probe = false;
}
+ }
- if (seqno == rcv_cr->lwe) {
- ++frcti->rcv_cr.lwe;
- } else {
- size_t pos = seqno & (RQ_SIZE - 1);
- if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
- goto drop_packet; /* Out of rq. */
+ if (!(pci->flags & FRCT_DATA))
+ goto drop_packet;
- if (frcti->rq[pos] != -1)
- goto drop_packet; /* Duplicate in rq */
+ if (before(seqno, rcv_cr->lwe))
+ goto drop_packet;
- frcti->rq[pos] = idx;
- idx = -EAGAIN;
- }
+ if (rcv_cr->cflags & FRCTFRTX) {
+ if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
+ goto drop_packet; /* Out of rq. */
+
+ if (frcti->rq[pos] != -1)
+ goto drop_packet; /* Duplicate in rq. */
} else {
- rcv_cr->lwe = seqno + 1;
+ rcv_cr->lwe = seqno;
}
- rcv_cr->act = now.tv_sec;
+ frcti->rq[pos] = idx;
- pthread_rwlock_unlock(&frcti->lock);
+ rcv_cr->act = now;
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ pthread_rwlock_unlock(&frcti->lock);
- return idx;
+ return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
+ return;
+}
+
+/* Filter fqueue events for non-data packets */
+int frcti_filter(struct fqueue * fq)
+{
+ struct shm_du_buff * sdb;
+ int fd;
+ ssize_t idx;
+ struct frcti * frcti;
+ struct shm_rbuff * rb;
+
+ while(fq->next < fq->fqsize) {
+ if (fq->fqueue[fq->next + 1] != FLOW_PKT)
+ return 1;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ fd = ai.ports[fq->fqueue[fq->next]].fd;
+ rb = ai.flows[fd].rx_rb;
+ frcti = ai.flows[fd].frcti;
+
+ if (frcti == NULL) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ idx = shm_rbuff_read(rb);
+ if (idx < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 0;
+ }
+
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+ __frcti_rcv(frcti, sdb);
+
+ if (__frcti_pdu_ready(frcti) >= 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return 1;
+ }
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ fq->next += 2;
+ }
+
+ return fq->next < fq->fqsize;
}