summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lib/timerwheel.c132
1 files changed, 60 insertions, 72 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 0a6e48e1..661cc456 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -30,14 +30,11 @@
struct rxm {
struct list_head next;
uint32_t seqno;
-#ifdef RXM_BUFFER_ON_HEAP
- uint8_t * pkt;
- size_t pkt_len;
-#else
+#ifndef RXM_BUFFER_ON_HEAP
struct shm_du_buff * sdb;
- uint8_t * head;
- uint8_t * tail;
#endif
+ struct frct_pci * pkt;
+ size_t len;
time_t t0; /* Time when original was sent (us). */
size_t mul; /* RTO multiplier. */
struct frcti * frcti;
@@ -62,8 +59,8 @@ struct {
struct list_head acks[ACKQ_SLOTS];
bool map[ACKQ_SLOTS][PROG_MAX_FLOWS];
- size_t prv_rxm; /* Last processed rxm slot at lvl 0. */
- size_t prv_ack; /* Last processed ack slot. */
+ size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
+ size_t prv_ack; /* Last processed ack slot. */
pthread_mutex_t lock;
bool in_use;
@@ -119,8 +116,10 @@ static int timerwheel_init(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
- rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1);
for (i = 0; i < RXMQ_LVLS; ++i) {
+ rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1);
+ rw.prv_rxm[i] >>= (RXMQ_BUMP * i);
+ rw.prv_rxm[i] &= (RXMQ_SLOTS - 1);
for (j = 0; j < RXMQ_SLOTS; ++j)
list_head_init(&rw.rxms[i][j]);
}
@@ -151,30 +150,30 @@ static void timerwheel_move(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
- rxm_slot = ts_to_ns(now) >> RXMQ_RES;
- j = rw.prv_rxm;
- rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1);
+ rxm_slot = ts_to_rxm_slot(now);
for (i = 0; i < RXMQ_LVLS; ++i) {
size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
+ j = rw.prv_rxm[i];
if (j_max_slot < j)
j_max_slot += RXMQ_SLOTS;
-
while (j++ < j_max_slot) {
list_for_each_safe(p, h,
&rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
+ size_t slot;
size_t rslot;
ssize_t idx;
struct shm_du_buff * sdb;
- uint8_t * head;
+ struct frct_pci * pci;
struct flow * f;
uint32_t snd_lwe;
uint32_t rcv_lwe;
time_t rto;
- size_t new_i;
+ size_t lvl = 0;
+ time_t act;
r = list_entry(p, struct rxm, next);
@@ -195,6 +194,7 @@ static void timerwheel_move(void)
snd_lwe = snd_cr->lwe;
rcv_lwe = rcv_cr->lwe;
rto = r->frcti->rto;
+ act = ts_to_ns(r->frcti->rcv_cr.act);
pthread_rwlock_unlock(&r->frcti->lock);
@@ -209,75 +209,64 @@ static void timerwheel_move(void)
pthread_rwlock_wrlock(&r->frcti->lock);
if (r->frcti->probe
- && (r->frcti->rttseq == r->seqno)) {
+ && (r->frcti->rttseq == r->seqno))
r->frcti->probe = false;
- r->frcti->rto += (rto >> 3);
- }
r->frcti->n_rtx++;
pthread_rwlock_unlock(&r->frcti->lock);
+
+ if (ts_to_ns(now) - act > (rto << 2))
+ rto <<= r->mul++;
+ else
+ r->mul = 0;
+
+ /* Schedule at least in the next time slot. */
+ slot = ts_to_ns(now) >> RXMQ_RES;
+ rslot = rto >> RXMQ_RES;
+
+ while (rslot >= RXMQ_SLOTS) {
+ ++lvl;
+ rslot >>= RXMQ_BUMP;
+ slot >>= RXMQ_BUMP;
+ }
+
+ if (lvl >= RXMQ_LVLS) /* Can't reschedule */
+ goto flow_down;
+
+ rslot = (rslot + slot) & (RXMQ_SLOTS - 1);
+
#ifdef RXM_BLOCKING
- #ifdef RXM_BUFFER_ON_HEAP
- if (ipcp_sdb_reserve(&sdb, r->pkt_len) < 0)
- #else
- if (ipcp_sdb_reserve(&sdb,
- r->tail - r->head) < 0)
- #endif
+ if (ipcp_sdb_reserve(&sdb, r->len) < 0)
#else
- #ifdef RXM_BUFFER_ON_HEAP
- if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL,
- &sdb) < 0)
- #else
- if (shm_rdrbuff_alloc(ai.rdrb,
- r->tail - r->head, NULL,
+ if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL,
&sdb) < 0)
- #endif
#endif
- goto reschedule; /* rbuff full */
+ goto reschedule; /* rdrbuff full */
- idx = shm_du_buff_get_idx(sdb);
-
- head = shm_du_buff_head(sdb);
-#ifdef RXM_BUFFER_ON_HEAP
- memcpy(head, r->pkt, r->pkt_len);
-#else
- memcpy(head, r->head, r->tail - r->head);
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memcpy(pci, r->pkt, r->len);
+#ifndef RXM_BUFFER_ON_HEAP
ipcp_sdb_release(r->sdb);
- r->sdb = sdb;
- r->head = head;
- r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+ r->pkt = pci;
shm_du_buff_wait_ack(sdb);
#endif
+ idx = shm_du_buff_get_idx(sdb);
+
/* Retransmit the copy. */
- ((struct frct_pci *) head)->ackno =
- hton32(rcv_lwe);
+ pci->ackno = hton32(rcv_lwe);
#ifdef RXM_BLOCKING
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0)
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0)
#else
- if (shm_rbuff_write(f->tx_rb, idx) == 0)
+ if (shm_rbuff_write(f->tx_rb, idx) < 0)
#endif
- shm_flow_set_notify(f->set, f->flow_id,
- FLOW_PKT);
+ goto flow_down;
+ shm_flow_set_notify(f->set, f->flow_id,
+ FLOW_PKT);
reschedule:
- rslot = (rto << r->mul++) >> (RXMQ_RES * i);
-
- new_i = i;
- while (rslot >= RXMQ_SLOTS) {
- ++ new_i;
- rslot >>= RXMQ_BUMP;
- }
-
- if (new_i >= RXMQ_LVLS) /* Can't reschedule */
- continue;
-
- /* Schedule at least in the next time slot. */
- rslot = ((rxm_slot >> (RXMQ_BUMP * (new_i - 1))) + MAX(rslot, 1))
- & (RXMQ_SLOTS - 1);
-
- list_add_tail(&r->next, &rw.rxms[new_i][rslot]);
-
+ list_add(&r->next, &rw.rxms[lvl][rslot]);
continue;
flow_down:
@@ -292,9 +281,9 @@ static void timerwheel_move(void)
free(r);
}
}
+ rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1);
/* Move up a level in the wheel. */
rxm_slot >>= RXMQ_BUMP;
- j >>= RXMQ_BUMP;
}
ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
@@ -350,18 +339,17 @@ static int timerwheel_rxm(struct frcti * frcti,
r->mul = 0;
r->seqno = seqno;
r->frcti = frcti;
+ r->len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
#ifdef RXM_BUFFER_ON_HEAP
- r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
- r->pkt = malloc(r->pkt_len);
+ r->pkt = malloc(r->len);
if (r->pkt == NULL) {
free(r);
return -ENOMEM;
}
- memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len);
+ memcpy(r->pkt, shm_du_buff_head(sdb), r->len);
#else
- r->sdb = sdb;
- r->head = shm_du_buff_head(sdb);
- r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+ r->pkt = (struct frct_pci *) shm_du_buff_head(sdb);
#endif
pthread_rwlock_rdlock(&r->frcti->lock);