diff options
-rw-r--r-- | src/lib/timerwheel.c | 132 |
1 files changed, 60 insertions, 72 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 0a6e48e1..661cc456 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -30,14 +30,11 @@ struct rxm { struct list_head next; uint32_t seqno; -#ifdef RXM_BUFFER_ON_HEAP - uint8_t * pkt; - size_t pkt_len; -#else +#ifndef RXM_BUFFER_ON_HEAP struct shm_du_buff * sdb; - uint8_t * head; - uint8_t * tail; #endif + struct frct_pci * pkt; + size_t len; time_t t0; /* Time when original was sent (us). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; @@ -62,8 +59,8 @@ struct { struct list_head acks[ACKQ_SLOTS]; bool map[ACKQ_SLOTS][PROG_MAX_FLOWS]; - size_t prv_rxm; /* Last processed rxm slot at lvl 0. */ - size_t prv_ack; /* Last processed ack slot. */ + size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */ + size_t prv_ack; /* Last processed ack slot. */ pthread_mutex_t lock; bool in_use; @@ -119,8 +116,10 @@ static int timerwheel_init(void) clock_gettime(PTHREAD_COND_CLOCK, &now); - rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1); for (i = 0; i < RXMQ_LVLS; ++i) { + rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1); + rw.prv_rxm[i] >>= (RXMQ_BUMP * i); + rw.prv_rxm[i] &= (RXMQ_SLOTS - 1); for (j = 0; j < RXMQ_SLOTS; ++j) list_head_init(&rw.rxms[i][j]); } @@ -151,30 +150,30 @@ static void timerwheel_move(void) clock_gettime(PTHREAD_COND_CLOCK, &now); - rxm_slot = ts_to_ns(now) >> RXMQ_RES; - j = rw.prv_rxm; - rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1); + rxm_slot = ts_to_rxm_slot(now); for (i = 0; i < RXMQ_LVLS; ++i) { size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); + j = rw.prv_rxm[i]; if (j_max_slot < j) j_max_slot += RXMQ_SLOTS; - while (j++ < j_max_slot) { list_for_each_safe(p, h, &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + size_t slot; size_t rslot; ssize_t idx; struct shm_du_buff * sdb; - uint8_t * head; + struct frct_pci * pci; struct flow * f; uint32_t snd_lwe; uint32_t rcv_lwe; time_t rto; - size_t new_i; + size_t lvl = 0; + time_t act; r = list_entry(p, struct rxm, next); @@ -195,6 +194,7 @@ static void timerwheel_move(void) snd_lwe = snd_cr->lwe; rcv_lwe = rcv_cr->lwe; rto = r->frcti->rto; + act = ts_to_ns(r->frcti->rcv_cr.act); pthread_rwlock_unlock(&r->frcti->lock); @@ -209,75 +209,64 @@ static void timerwheel_move(void) pthread_rwlock_wrlock(&r->frcti->lock); if (r->frcti->probe - && (r->frcti->rttseq == r->seqno)) { + && (r->frcti->rttseq == r->seqno)) r->frcti->probe = false; - r->frcti->rto += (rto >> 3); - } r->frcti->n_rtx++; pthread_rwlock_unlock(&r->frcti->lock); + + if (ts_to_ns(now) - act > (rto << 2)) + rto <<= r->mul++; + else + r->mul = 0; + + /* Schedule at least in the next time slot. */ + slot = ts_to_ns(now) >> RXMQ_RES; + rslot = rto >> RXMQ_RES; + + while (rslot >= RXMQ_SLOTS) { + ++lvl; + rslot >>= RXMQ_BUMP; + slot >>= RXMQ_BUMP; + } + + if (lvl >= RXMQ_LVLS) /* Can't reschedule */ + goto flow_down; + + rslot = (rslot + slot) & (RXMQ_SLOTS - 1); + #ifdef RXM_BLOCKING - #ifdef RXM_BUFFER_ON_HEAP - if (ipcp_sdb_reserve(&sdb, r->pkt_len) < 0) - #else - if (ipcp_sdb_reserve(&sdb, - r->tail - r->head) < 0) - #endif + if (ipcp_sdb_reserve(&sdb, r->len) < 0) #else - #ifdef RXM_BUFFER_ON_HEAP - if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL, - &sdb) < 0) - #else - if (shm_rdrbuff_alloc(ai.rdrb, - r->tail - r->head, NULL, + if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL, &sdb) < 0) - #endif #endif - goto reschedule; /* rbuff full */ + goto reschedule; /* rdrbuff full */ - idx = shm_du_buff_get_idx(sdb); - - head = shm_du_buff_head(sdb); -#ifdef RXM_BUFFER_ON_HEAP - memcpy(head, r->pkt, r->pkt_len); -#else - memcpy(head, r->head, r->tail - r->head); + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memcpy(pci, r->pkt, r->len); +#ifndef RXM_BUFFER_ON_HEAP ipcp_sdb_release(r->sdb); - r->sdb = sdb; - r->head = head; - r->tail = shm_du_buff_tail(sdb); + r->sdb = sdb; + r->pkt = pci; shm_du_buff_wait_ack(sdb); #endif + idx = shm_du_buff_get_idx(sdb); + /* Retransmit the copy. */ - ((struct frct_pci *) head)->ackno = - hton32(rcv_lwe); + pci->ackno = hton32(rcv_lwe); #ifdef RXM_BLOCKING - if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0) + if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0) #else - if (shm_rbuff_write(f->tx_rb, idx) == 0) + if (shm_rbuff_write(f->tx_rb, idx) < 0) #endif - shm_flow_set_notify(f->set, f->flow_id, - FLOW_PKT); + goto flow_down; + shm_flow_set_notify(f->set, f->flow_id, + FLOW_PKT); reschedule: - rslot = (rto << r->mul++) >> (RXMQ_RES * i); - - new_i = i; - while (rslot >= RXMQ_SLOTS) { - ++ new_i; - rslot >>= RXMQ_BUMP; - } - - if (new_i >= RXMQ_LVLS) /* Can't reschedule */ - continue; - - /* Schedule at least in the next time slot. */ - rslot = ((rxm_slot >> (RXMQ_BUMP * (new_i - 1))) + MAX(rslot, 1)) - & (RXMQ_SLOTS - 1); - - list_add_tail(&r->next, &rw.rxms[new_i][rslot]); - + list_add(&r->next, &rw.rxms[lvl][rslot]); continue; flow_down: @@ -292,9 +281,9 @@ static void timerwheel_move(void) free(r); } } + rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1); /* Move up a level in the wheel. */ rxm_slot >>= RXMQ_BUMP; - j >>= RXMQ_BUMP; } ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; @@ -350,18 +339,17 @@ static int timerwheel_rxm(struct frcti * frcti, r->mul = 0; r->seqno = seqno; r->frcti = frcti; + r->len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); #ifdef RXM_BUFFER_ON_HEAP - r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); - r->pkt = malloc(r->pkt_len); + r->pkt = malloc(r->len); if (r->pkt == NULL) { free(r); return -ENOMEM; } - memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len); + memcpy(r->pkt, shm_du_buff_head(sdb), r->len); #else - r->sdb = sdb; - r->head = shm_du_buff_head(sdb); - r->tail = shm_du_buff_tail(sdb); + r->sdb = sdb; + r->pkt = (struct frct_pci *) shm_du_buff_head(sdb); #endif pthread_rwlock_rdlock(&r->frcti->lock); |