diff options
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r-- | src/lib/rxmwheel.c | 36 |
1 files changed, 21 insertions, 15 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index ce7ef8e4..3f01a0d3 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -22,11 +22,11 @@ #include <ouroboros/list.h> -#define RXMQ_S 16 /* defines #slots */ -#define RXMQ_M 24 /* defines max delay */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ +#define RXMQ_S 16 /* defines #slots */ +#define RXMQ_M 24 /* defines max delay (us) */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (us) */ #define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX (1 << RXMQ_M) /* ms */ +#define RXMQ_MAX (1 << RXMQ_M) /* us */ /* Small inacurracy to avoid slow division by MILLION. */ #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) @@ -119,12 +119,11 @@ static void check_probe(struct frcti * frcti, if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ - frcti->srtt_us <<= 1; + frcti->srtt_us += frcti->mdev_us; frcti->probe = false; } } -#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1)) /* Return fd on r-timer expiry. */ static int rxmwheel_move(void) { @@ -136,12 +135,17 @@ static int rxmwheel_move(void) clock_gettime(PTHREAD_COND_CLOCK, &now); + pthread_mutex_lock(&rw.lock); + slot = ts_to_slot(now); - pthread_mutex_lock(&rw.lock); + i = rw.prv; - for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { + if (slot < i) + slot += RXMQ_SLOTS; + + while (i++ < slot) { + list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; @@ -158,7 +162,7 @@ static int rxmwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) <= 0) { + if ((int) (r->seqno - snd_cr->lwe) < 0) { shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); @@ -180,7 +184,7 @@ static int rxmwheel_move(void) /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send? */ + /* FIXME: reschedule send instead of failing? */ int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); shm_du_buff_ack(r->sdb); @@ -206,10 +210,12 @@ static int rxmwheel_move(void) /* Retransmit the copy. */ if (shm_rbuff_write(f->tx_rb, idx)) { + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); ipcp_sdb_release(sdb); free(r); /* FIXME: reschedule send? */ - continue; + return fd; } shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); @@ -221,14 +227,14 @@ static int rxmwheel_move(void) r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; - newtime = ts_to_us(now) + rto(f->frcti); + newtime = ts_to_us(now) + f->frcti->rto; rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); } } - rw.prv = slot; + rw.prv = slot & (RXMQ_SLOTS - 1); pthread_mutex_unlock(&rw.lock); @@ -259,7 +265,7 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; - slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1); + slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); |