diff options
-rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/lib/config.h.in | 2 | ||||
-rw-r--r-- | src/lib/frct.c | 12 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 35 |
4 files changed, 28 insertions, 23 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index c5be9946..4aad3a11 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -186,6 +186,8 @@ set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL "Enable shared memory lockless rbuff support") set(QOS_DISABLE_CRC TRUE CACHE BOOL "Ignores ber setting on all QoS cubes") +set(FRCT_RTO_MIN 250 CACHE STRING + "Minimum Retransmission Timeout (RTO) for FRCT (us)") set(SOURCE_FILES_DEV # Add source files here diff --git a/src/lib/config.h.in b/src/lib/config.h.in index b6e49100..7cac76a6 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -66,3 +66,5 @@ #define DU_BUFF_HEADSPACE @DU_BUFF_HEADSPACE@ #define DU_BUFF_TAILSPACE @DU_BUFF_TAILSPACE@ + +#define RTO_MIN @FRCT_RTO_MIN@ diff --git a/src/lib/frct.c b/src/lib/frct.c index bc07be5a..0e9d64c7 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -25,7 +25,7 @@ #define DELT_A 3000 /* ms */ #define DELT_R 20000 /* ms */ -#define RQ_SIZE 64 +#define RQ_SIZE 1024 #define TW_ELEMENTS 6000 #define TW_RESOLUTION 1 /* ms */ @@ -119,9 +119,9 @@ static struct frcti * frcti_create(int fd) frcti->rttseq = 0; frcti->probe = false; - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 100000; /* initial rxm will be after 200 ms */ - frcti->rto = 200000; /* initial rxm will be after 200 ms */ + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ + frcti->rto = 20000; /* initial rxm will be after 20 ms */ if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; @@ -304,7 +304,7 @@ static void rtt_estimator(struct frcti * frcti, frcti->srtt_us = MAX(1U, srtt); frcti->mdev_us = MAX(1U, rttvar); - frcti->rto = srtt + (rttvar >> 2); + frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2)); } /* Returns 0 when idx contains a packet for the application. */ @@ -391,6 +391,6 @@ static int __frcti_rcv(struct frcti * frcti, drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - rxmwheel_move(); + return -EAGAIN; } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 3f01a0d3..28cd78de 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -61,6 +61,8 @@ static void rxmwheel_fini(void) list_for_each_safe(p, h, &rw.wheel[i]) { struct rxm * rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); + shm_du_buff_ack(rxm->sdb); + ipcp_sdb_release(rxm->sdb); free(rxm); } } @@ -133,10 +135,10 @@ static int rxmwheel_move(void) size_t slot; size_t i; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_mutex_lock(&rw.lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + slot = ts_to_slot(now); i = rw.prv; @@ -150,7 +152,6 @@ static int rxmwheel_move(void) struct frct_cr * snd_cr; struct frct_cr * rcv_cr; size_t rslot; - time_t newtime; ssize_t idx; struct shm_du_buff * sdb; uint8_t * head; @@ -161,9 +162,11 @@ static int rxmwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; + + shm_du_buff_ack(r->sdb); + /* Has been ack'd, remove. */ if ((int) (r->seqno - snd_cr->lwe) < 0) { - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); continue; @@ -176,7 +179,6 @@ static int rxmwheel_move(void) if (ts_to_ms(now) - r->t0 > r->frcti->r) { int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); return fd; @@ -187,7 +189,6 @@ static int rxmwheel_move(void) /* FIXME: reschedule send instead of failing? */ int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); return fd; @@ -199,7 +200,6 @@ static int rxmwheel_move(void) memcpy(head, r->head, r->tail - r->head); /* Release the old copy. */ - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); /* Update ackno and make sure DRF is not set. */ @@ -208,7 +208,7 @@ static int rxmwheel_move(void) f = &ai.flows[r->frcti->fd]; - /* Retransmit the copy. */ + /* Retransmit the copy. FIXME: cancel flow */ if (shm_rbuff_write(f->tx_rb, idx)) { int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); @@ -218,17 +218,18 @@ static int rxmwheel_move(void) return fd; } - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - /* Reschedule. */ shm_du_buff_wait_ack(sdb); + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + r->head = head; r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; - newtime = ts_to_us(now) + f->frcti->rto; - rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); + /* Schedule at least in the next time slot */ + rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) + & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); } @@ -253,10 +254,10 @@ static int rxmwheel_add(struct frcti * frcti, if (r == NULL) return -ENOMEM; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_mutex_lock(&rw.lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + r->t0 = ts_to_us(now); r->mul = 0; r->seqno = seqno; @@ -265,13 +266,13 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; - slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); + slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); - pthread_mutex_unlock(&rw.lock); - shm_du_buff_wait_ack(sdb); + pthread_mutex_unlock(&rw.lock); + return 0; } |