diff options
-rw-r--r-- | doc/man/fqueue.3 | 3 | ||||
-rw-r--r-- | src/lib/dev.c | 71 | ||||
-rw-r--r-- | src/lib/frct.c | 326 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 50 |
4 files changed, 298 insertions, 152 deletions
diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3 index cbc8ee4e..85e12518 100644 --- a/doc/man/fqueue.3 +++ b/doc/man/fqueue.3 @@ -68,8 +68,7 @@ On success, \fBfqueue_create\fR() returns a pointer to an \fBfqueue_destroy\fR() has no return value. -On success, \fBfevent\fR() returns the number of events that occured -in \fIset\fR. +On success, \fBfevent\fR() returns 1. On success, \fBfqueue_next\fR() returns the next file descriptor for which an event occurred. diff --git a/src/lib/dev.c b/src/lib/dev.c index efd08146..df616ead 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1017,18 +1017,22 @@ ssize_t flow_write(int fd, memcpy(ptr, buf, count); + pthread_rwlock_wrlock(&ai.lock); + if (frcti_snd(flow->frcti, sdb) < 0) { + pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } - pthread_rwlock_wrlock(&ai.lock); - if (flow->qs.cypher_s > 0) + if (flow->qs.cypher_s > 0) { if (crypt_encrypt(flow, sdb) < 0) { pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } + } + pthread_rwlock_unlock(&ai.lock); if (flow->qs.ber == 0 && add_crc(sdb) != 0) { @@ -1097,12 +1101,12 @@ ssize_t flow_read(int fd, abstime = &abs; } - pthread_rwlock_unlock(&ai.lock); - idx = flow->part_idx; + if (idx < 0) { - idx = frcti_queued_pdu(flow->frcti); - while (idx < 0) { + while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { + pthread_rwlock_unlock(&ai.lock); + idx = noblock ? shm_rbuff_read(rb) : shm_rbuff_read_b(rb, abstime); if (idx < 0) @@ -1110,20 +1114,28 @@ ssize_t flow_read(int fd, sdb = shm_rdrbuff_get(ai.rdrb, idx); if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { + pthread_rwlock_rdlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); continue; } + pthread_rwlock_rdlock(&ai.lock); + if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0) { + pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } - idx = frcti_rcv(flow->frcti, sdb); + frcti_rcv(flow->frcti, sdb); } } + frcti_tick(flow->frcti); + + pthread_rwlock_unlock(&ai.lock); + n = shm_rdrbuff_read(&packet, ai.rdrb, idx); assert(n >= 0); @@ -1144,7 +1156,9 @@ ssize_t flow_read(int fd, memcpy(buf, packet, count); sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); + pthread_rwlock_wrlock(&ai.lock); flow->part_idx = idx; + pthread_rwlock_unlock(&ai.lock); return count; } else { shm_rdrbuff_remove(ai.rdrb, idx); @@ -1295,6 +1309,11 @@ int fqueue_next(struct fqueue * fq) pthread_rwlock_rdlock(&ai.lock); + if (fq->next != 0 && frcti_filter(fq) == 0) { + pthread_rwlock_unlock(&ai.lock); + return -EPERM; + } + fd = ai.ports[fq->fqueue[fq->next]].fd; fq->next += 2; @@ -1319,7 +1338,7 @@ ssize_t fevent(struct flow_set * set, struct fqueue * fq, const struct timespec * timeo) { - ssize_t ret; + ssize_t ret = 0; struct timespec abstime; struct timespec * t = NULL; @@ -1335,18 +1354,22 @@ ssize_t fevent(struct flow_set * set, t = &abstime; } - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; - } + while (ret == 0) { + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); + if (ret == -ETIMEDOUT) { + fq->fqsize = 0; + return -ETIMEDOUT; + } - fq->fqsize = ret << 1; - fq->next = 0; + fq->fqsize = ret << 1; + fq->next = 0; + + ret = frcti_filter(fq); + } assert(ret); - return ret; + return 1; } /* ipcp-dev functions. */ @@ -1509,7 +1532,7 @@ int ipcp_flow_read(int fd, { struct flow * flow; struct shm_rbuff * rb; - ssize_t idx; + ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(sdb); @@ -1522,20 +1545,26 @@ int ipcp_flow_read(int fd, rb = flow->rx_rb; - pthread_rwlock_unlock(&ai.lock); + while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { + pthread_rwlock_unlock(&ai.lock); - idx = frcti_queued_pdu(flow->frcti); - while (idx < 0) { idx = shm_rbuff_read(rb); if (idx < 0) return idx; + + pthread_rwlock_rdlock(&ai.lock); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); if (flow->qs.ber == 0 && chk_crc(*sdb) != 0) continue; - idx = frcti_rcv(flow->frcti, *sdb); + frcti_rcv(flow->frcti, *sdb); } + frcti_tick(flow->frcti); + + pthread_rwlock_unlock(&ai.lock); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); return 0; diff --git a/src/lib/frct.c b/src/lib/frct.c index 2322a039..2bd126f4 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,23 +21,25 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL (60 * MILLION) /* us */ -#define DELT_A (1 * MILLION) /* us */ -#define DELT_R (20 * MILLION) /* us */ +#define DELT_MPL (60 * BILLION) /* ns */ +#define DELT_A (1 * BILLION) /* ns */ +#define DELT_R (20 * BILLION) /* ns */ -#define RQ_SIZE 1024 +#define DELT_ACK (10 * MILLION) /* ns */ + +#define RQ_SIZE 256 #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { - uint32_t lwe; - uint32_t rwe; + uint32_t lwe; /* Left window edge */ + uint32_t rwe; /* Right window edge */ - uint8_t cflags; - uint32_t seqno; + uint8_t cflags; + uint32_t seqno; /* SEQ to send, or last SEQ Ack'd */ - time_t act; /* s */ - time_t inact; /* s */ + struct timespec act; /* Last seen activity */ + time_t inact; /* Inactivity (s) */ }; struct frcti { @@ -47,12 +49,12 @@ struct frcti { time_t a; time_t r; - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - time_t rto; /* retransmission timeout */ + time_t srtt; /* Smoothed rtt */ + time_t mdev; /* Deviation */ + time_t rto; /* Retransmission timeout */ uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct timespec t_probe; /* Probe time */ + bool probe; /* Probe active */ struct frct_cr snd_cr; struct frct_cr rcv_cr; @@ -74,7 +76,9 @@ enum frct_flags { }; struct frct_pci { - uint16_t flags; + uint8_t flags; + + uint8_t pad; uint16_t window; @@ -87,9 +91,11 @@ struct frct_pci { static struct frcti * frcti_create(int fd) { struct frcti * frcti; - time_t delta_t; ssize_t idx; struct timespec now; + time_t mpl; + time_t a; + time_t r; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -103,25 +109,21 @@ static struct frcti * frcti_create(int fd) for (idx = 0; idx < RQ_SIZE; ++idx) frcti->rq[idx] = -1; - clock_gettime(CLOCK_REALTIME_COARSE, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); - frcti->mpl = DELT_MPL; - frcti->a = DELT_A; - frcti->r = DELT_R; + frcti->mpl = mpl = DELT_MPL; + frcti->a = a = DELT_A; + frcti->r = r = DELT_R; frcti->fd = fd; - delta_t = frcti->mpl + frcti->a + frcti->r; - frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->rttseq = 0; + frcti->probe = false; - frcti->rttseq = 0; - frcti->probe = false; - - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ - frcti->rto = 20000; /* initial rxm will be after 20 ms */ - frcti->rw = NULL; + frcti->srtt = 0; /* Updated on first ACK */ + frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */ + frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */ + frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; @@ -131,9 +133,11 @@ static struct frcti * frcti_create(int fd) goto fail_rw; } - frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ + frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->rcv_cr.inact = (2 * mpl + a + r) / BILLION + 1; /* s */ + frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1); return frcti; @@ -175,14 +179,18 @@ static uint16_t frcti_getconf(struct frcti * frcti) return ret; } -#define frcti_queued_pdu(frcti) \ - (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) -#define frcti_snd(frcti, sdb) \ +#define frcti_snd(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) #define frcti_rcv(frcti, sdb) \ - (frcti == NULL ? idx : __frcti_rcv(frcti, sdb)) + (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + +#define frcti_tick(frcti) \ + (frcti == NULL ? 0 : __frcti_tick(frcti)) + static ssize_t __frcti_queued_pdu(struct frcti * frcti) { @@ -207,15 +215,22 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti) return idx; } -static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb) +static ssize_t __frcti_pdu_ready(struct frcti * frcti) { - struct frct_pci * pci; + ssize_t idx; + size_t pos; - pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); - if (pci != NULL) - memset(pci, 0, sizeof(*pci)); + assert(frcti); - return pci; + /* See if we already have the next PDU. */ + pthread_rwlock_rdlock(&frcti->lock); + + pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1); + idx = frcti->rq[pos]; + + pthread_rwlock_unlock(&frcti->lock); + + return idx; } static bool before(uint32_t seq1, @@ -230,6 +245,68 @@ static bool after(uint32_t seq1, return (int32_t)(seq2 - seq1) < 0; } +static void frct_send_ack(struct frcti * frcti) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct timespec now; + time_t diff; + uint32_t ackno; + struct flow * f; + + assert(frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + ackno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + + if (diff > frcti->a) + return; + + if (diff < DELT_ACK) + return; + + /* Raw calls needed to bypass frcti. */ + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + pci->flags = FRCT_ACK; + pci->ackno = hton32(ackno); + + f = &ai.flows[frcti->fd]; + + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + pthread_rwlock_rdlock(&ai.lock); + ipcp_sdb_release(sdb); + return; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + + pthread_rwlock_wrlock(&frcti->lock); + + if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); +} + static int __frcti_snd(struct frcti * frcti, struct shm_du_buff * sdb) { @@ -247,11 +324,13 @@ static int __frcti_snd(struct frcti * frcti, if (frcti->rw != NULL) rxmwheel_move(frcti->rw); - pci = frcti_alloc_head(sdb); + pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) return -1; - clock_gettime(CLOCK_REALTIME, &now); + memset(pci, 0, sizeof(*pci)); + + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&frcti->lock); @@ -262,7 +341,7 @@ static int __frcti_snd(struct frcti * frcti, pci->flags |= FRCT_DRF; /* Choose a new sequence number if sender inactivity expired. */ - if (now.tv_sec - snd_cr->act > snd_cr->inact) { + if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) { /* There are no unacknowledged packets. */ assert(snd_cr->seqno == snd_cr->lwe); random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno)); @@ -281,14 +360,15 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { + if (now.tv_sec - rcv_cr->act.tv_sec <= rcv_cr->inact) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); + rcv_cr->seqno = rcv_cr->lwe; } } snd_cr->seqno++; - snd_cr->act = now.tv_sec; + snd_cr->act = now; pthread_rwlock_unlock(&frcti->lock); @@ -299,104 +379,158 @@ static int __frcti_snd(struct frcti * frcti, } static void rtt_estimator(struct frcti * frcti, - time_t mrtt_us) + time_t mrtt) { - time_t srtt = frcti->srtt_us; - time_t rttvar = frcti->mdev_us; + time_t srtt = frcti->srtt; + time_t rttvar = frcti->mdev; if (srtt == 0) { /* first measurement */ - srtt = mrtt_us; - rttvar = mrtt_us >> 1; + srtt = mrtt; + rttvar = mrtt >> 1; } else { - time_t delta = mrtt_us - srtt; + time_t delta = mrtt - srtt; srtt += (delta >> 3); rttvar -= rttvar >> 2; rttvar += ABS(delta) >> 2; } - frcti->srtt_us = MAX(1U, srtt); - frcti->mdev_us = MAX(1U, rttvar); - frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2)); + frcti->srtt = MAX(1000U, srtt); + frcti->mdev = MAX(100U, rttvar); + frcti->rto = MAX(RTO_MIN * 1000, + frcti->srtt + (frcti->mdev << 1)); +} + +static void __frcti_tick(struct frcti * frcti) +{ + if (frcti->rw != NULL) { + rxmwheel_move(frcti->rw); + frct_send_ack(frcti); + } } -/* Always queues the packet on the RQ for the application. */ -static ssize_t __frcti_rcv(struct frcti * frcti, - struct shm_du_buff * sdb) +/* Always queues the next application packet on the RQ. */ +static void __frcti_rcv(struct frcti * frcti, + struct shm_du_buff * sdb) { ssize_t idx; + size_t pos; struct frct_pci * pci; struct timespec now; - struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; + uint32_t ackno; assert(frcti); rcv_cr = &frcti->rcv_cr; - snd_cr = &frcti->snd_cr; - clock_gettime(CLOCK_REALTIME, &now); + clock_gettime(PTHREAD_COND_CLOCK, &now); pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN); idx = shm_du_buff_get_idx(sdb); seqno = ntoh32(pci->seqno); + pos = seqno & (RQ_SIZE - 1); pthread_rwlock_wrlock(&frcti->lock); - if (now.tv_sec - rcv_cr->act > rcv_cr->inact) { + if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) { if (pci->flags & FRCT_DRF) /* New run. */ rcv_cr->lwe = seqno; else goto drop_packet; } - if (before(seqno, rcv_cr->lwe)) - goto drop_packet; + if (pci->flags & FRCT_ACK) { + ackno = ntoh32(pci->ackno); + if (after(ackno, frcti->snd_cr.lwe)) + frcti->snd_cr.lwe = ackno; - if (rcv_cr->cflags & FRCTFRTX) { - if (pci->flags & FRCT_ACK) { - uint32_t ackno = ntoh32(pci->ackno); - /* Check for duplicate (old) acks. */ - if (after(ackno, snd_cr->lwe)) - snd_cr->lwe = ackno; - - if (frcti->probe && after(ackno, frcti->rttseq)) { - rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, - &now)); - frcti->probe = false; - } + if (frcti->probe && after(ackno, frcti->rttseq)) { + rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now)); + frcti->probe = false; } + } - if (seqno == rcv_cr->lwe) { - ++frcti->rcv_cr.lwe; - } else { - size_t pos = seqno & (RQ_SIZE - 1); - if ((seqno - rcv_cr->lwe) >= RQ_SIZE) - goto drop_packet; /* Out of rq. */ + if (!(pci->flags & FRCT_DATA)) + goto drop_packet; - if (frcti->rq[pos] != -1) - goto drop_packet; /* Duplicate in rq */ + if (before(seqno, rcv_cr->lwe)) + goto drop_packet; - frcti->rq[pos] = idx; - idx = -EAGAIN; - } + if (rcv_cr->cflags & FRCTFRTX) { + if ((seqno - rcv_cr->lwe) >= RQ_SIZE) + goto drop_packet; /* Out of rq. */ + + if (frcti->rq[pos] != -1) + goto drop_packet; /* Duplicate in rq. */ } else { - rcv_cr->lwe = seqno + 1; + rcv_cr->lwe = seqno; } - rcv_cr->act = now.tv_sec; + frcti->rq[pos] = idx; - pthread_rwlock_unlock(&frcti->lock); + rcv_cr->act = now; - if (frcti->rw != NULL) - rxmwheel_move(frcti->rw); + pthread_rwlock_unlock(&frcti->lock); - return idx; + return; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; + return; +} + +/* Filter fqueue events for non-data packets */ +int frcti_filter(struct fqueue * fq) +{ + struct shm_du_buff * sdb; + int fd; + ssize_t idx; + struct frcti * frcti; + struct shm_rbuff * rb; + + while(fq->next < fq->fqsize) { + if (fq->fqueue[fq->next + 1] != FLOW_PKT) + return 1; + + pthread_rwlock_rdlock(&ai.lock); + + fd = ai.ports[fq->fqueue[fq->next]].fd; + rb = ai.flows[fd].rx_rb; + frcti = ai.flows[fd].frcti; + + if (frcti == NULL) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + if (__frcti_pdu_ready(frcti) >= 0) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + idx = shm_rbuff_read(rb); + if (idx < 0) { + pthread_rwlock_unlock(&ai.lock); + return 0; + } + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + __frcti_rcv(frcti, sdb); + + if (__frcti_pdu_ready(frcti) >= 0) { + pthread_rwlock_unlock(&ai.lock); + return 1; + } + + pthread_rwlock_unlock(&ai.lock); + + fq->next += 2; + } + + return fq->next < fq->fqsize; } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 9602c5f9..0572c7b7 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,15 +22,15 @@ #include <ouroboros/list.h> -#define RXMQ_S 16 /* defines #slots */ -#define RXMQ_M 24 /* defines max delay (us) */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */ +#define RXMQ_S 14 /* defines #slots */ +#define RXMQ_M 34 /* defines max delay (ns) */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */ #define RXMQ_SLOTS (1 << RXMQ_S) #define RXMQ_MAX (1 << RXMQ_M) /* us */ -/* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) -#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) +/* Overflow limits range to about 6 hours. */ +#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) +#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) struct rxm { struct list_head next; @@ -95,22 +95,6 @@ static struct rxmwheel * rxmwheel_create(void) return rw; } -static void check_probe(struct frcti * frcti, - uint32_t seqno) -{ - /* Disable rtt probe on retransmitted packet! */ - - pthread_rwlock_wrlock(&frcti->lock); - - if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { - /* Backoff to avoid never updating rtt */ - frcti->srtt_us += frcti->mdev_us; - frcti->probe = false; - } - - pthread_rwlock_unlock(&frcti->lock); -} - static void rxmwheel_move(struct rxmwheel * rw) { struct timespec now; @@ -159,11 +143,15 @@ static void rxmwheel_move(struct rxmwheel * rw) shm_du_buff_ack(r->sdb); - pthread_rwlock_rdlock(&r->frcti->lock); + pthread_rwlock_wrlock(&r->frcti->lock); snd_lwe = snd_cr->lwe; rcv_lwe = rcv_cr->lwe; rto = r->frcti->rto; + /* Assume last RTX is the one that's ACK'd. */ + if (r->frcti->probe + && (r->frcti->rttseq + 1) == r->seqno) + r->frcti->t_probe = now; pthread_rwlock_unlock(&r->frcti->lock); @@ -175,13 +163,11 @@ static void rxmwheel_move(struct rxmwheel * rw) } /* Check for r-timer expiry. */ - if (ts_to_us(now) - r->t0 > r->frcti->r) { + if (ts_to_ns(now) - r->t0 > r->frcti->r) { ipcp_sdb_release(r->sdb); free(r); - shm_rbuff_set_acl(ai.flows[fd].rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(ai.flows[fd].tx_rb, - ACL_FLOWDOWN); + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); continue; } @@ -201,9 +187,7 @@ static void rxmwheel_move(struct rxmwheel * rw) ipcp_sdb_release(r->sdb); - check_probe(r->frcti, r->seqno); - - ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe); + ((struct frct_pci *) head)->ackno = hton32(rcv_lwe); /* Retransmit the copy. */ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { @@ -224,7 +208,7 @@ static void rxmwheel_move(struct rxmwheel * rw) r->sdb = sdb; /* Schedule at least in the next time slot */ - rslot = (slot + MAX(rto >> RXMQ_R, 1)) + rslot = (slot + MAX((rto >> RXMQ_R), 1)) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw->wheel[rslot]); @@ -251,7 +235,7 @@ static int rxmwheel_add(struct rxmwheel * rw, clock_gettime(PTHREAD_COND_CLOCK, &now); - r->t0 = ts_to_us(now); + r->t0 = ts_to_ns(now); r->mul = 0; r->seqno = seqno; r->sdb = sdb; |