summaryrefslogtreecommitdiff
path: root/src/lib/timerwheel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/timerwheel.c')
-rw-r--r--src/lib/timerwheel.c174
1 files changed, 80 insertions, 94 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 3c1a44b4..96f4ac47 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016 - 2021
+ * Ouroboros - Copyright (C) 2016 - 2024
*
* Timerwheel
*
@@ -30,16 +30,12 @@
struct rxm {
struct list_head next;
uint32_t seqno;
-#ifdef RXM_BUFFER_ON_HEAP
- uint8_t * pkt;
- size_t pkt_len;
-#else
+#ifndef RXM_BUFFER_ON_HEAP
struct shm_du_buff * sdb;
- uint8_t * head;
- uint8_t * tail;
#endif
+ struct frct_pci * pkt;
+ size_t len;
time_t t0; /* Time when original was sent (us). */
- size_t mul; /* RTO multiplier. */
struct frcti * frcti;
int fd;
int flow_id; /* Prevent rtx when fd reused. */
@@ -62,11 +58,9 @@ struct {
struct list_head acks[ACKQ_SLOTS];
bool map[ACKQ_SLOTS][PROG_MAX_FLOWS];
- size_t prv_rxm; /* Last processed rxm slot at lvl 0. */
- size_t prv_ack; /* Last processed ack slot. */
+ size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
+ size_t prv_ack; /* Last processed ack slot. */
pthread_mutex_t lock;
-
- bool in_use;
} rw;
static void timerwheel_fini(void)
@@ -119,8 +113,10 @@ static int timerwheel_init(void)
clock_gettime(PTHREAD_COND_CLOCK, &now);
- rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1);
for (i = 0; i < RXMQ_LVLS; ++i) {
+ rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1);
+ rw.prv_rxm[i] >>= (RXMQ_BUMP * i);
+ rw.prv_rxm[i] &= (RXMQ_SLOTS - 1);
for (j = 0; j < RXMQ_SLOTS; ++j)
list_head_init(&rw.rxms[i][j]);
}
@@ -142,38 +138,34 @@ static void timerwheel_move(void)
size_t i;
size_t j;
- if (!__sync_bool_compare_and_swap(&rw.in_use, true, true))
- return;
-
pthread_mutex_lock(&rw.lock);
pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock);
clock_gettime(PTHREAD_COND_CLOCK, &now);
- rxm_slot = ts_to_ns(now) >> RXMQ_RES;
- j = rw.prv_rxm;
- rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1);
+ rxm_slot = ts_to_rxm_slot(now);
for (i = 0; i < RXMQ_LVLS; ++i) {
size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
+ j = rw.prv_rxm[i];
if (j_max_slot < j)
j_max_slot += RXMQ_SLOTS;
-
while (j++ < j_max_slot) {
list_for_each_safe(p, h,
&rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
+ size_t slot;
size_t rslot;
ssize_t idx;
struct shm_du_buff * sdb;
- uint8_t * head;
+ struct frct_pci * pci;
struct flow * f;
uint32_t snd_lwe;
uint32_t rcv_lwe;
- time_t rto;
+ size_t lvl = 0;
r = list_entry(p, struct rxm, next);
@@ -186,84 +178,87 @@ static void timerwheel_move(void)
shm_du_buff_ack(r->sdb);
#endif
if (f->frcti == NULL
- || f->flow_id != r->flow_id)
+ || f->info.id != r->flow_id)
goto cleanup;
- pthread_rwlock_wrlock(&r->frcti->lock);
+ pthread_rwlock_rdlock(&r->frcti->lock);
snd_lwe = snd_cr->lwe;
rcv_lwe = rcv_cr->lwe;
- rto = r->frcti->rto;
pthread_rwlock_unlock(&r->frcti->lock);
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_lwe) < 0)
+ if (before(r->seqno, snd_lwe))
goto cleanup;
/* Check for r-timer expiry. */
if (ts_to_ns(now) - r->t0 > r->frcti->r)
goto flow_down;
- if (r->frcti->probe
- && (r->frcti->rttseq + 1) == r->seqno)
+ pthread_rwlock_wrlock(&r->frcti->lock);
+
+ if (r->seqno == r->frcti->rttseq) {
+ r->frcti->rto +=
+ r->frcti->rto >> RTO_DIV;
r->frcti->probe = false;
+ }
+#ifdef PROC_FLOW_STATS
+ r->frcti->n_rtx++;
+#endif
+ rslot = r->frcti->rto >> RXMQ_RES;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ /* Schedule at least in the next time slot. */
+ slot = ts_to_ns(now) >> RXMQ_RES;
+
+ while (rslot >= RXMQ_SLOTS) {
+ ++lvl;
+ rslot >>= RXMQ_BUMP;
+ slot >>= RXMQ_BUMP;
+ }
+
+ if (lvl >= RXMQ_LVLS) /* Can't reschedule */
+ goto flow_down;
+
+ rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1);
#ifdef RXM_BLOCKING
- #ifdef RXM_BUFFER_ON_HEAP
- if (ipcp_sdb_reserve(&sdb, r->pkt_len))
- #else
- if (ipcp_sdb_reserve(&sdb, r->tail - r->head))
- #endif
+ if (ipcp_sdb_reserve(&sdb, r->len) < 0)
#else
- #ifdef RXM_BUFFER_ON_HEAP
- if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL,
- &sdb))
- #else
- if (shm_rdrbuff_alloc(ai.rdrb,
- r->tail - r->head, NULL,
- &sdb))
- #endif
+ if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL,
+ &sdb) < 0)
#endif
- goto reschedule; /* rbuff full */
- idx = shm_du_buff_get_idx(sdb);
+ goto reschedule; /* rdrbuff full */
- head = shm_du_buff_head(sdb);
-#ifdef RXM_BUFFER_ON_HEAP
- memcpy(head, r->pkt, r->pkt_len);
-#else
- memcpy(head, r->head, r->tail - r->head);
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memcpy(pci, r->pkt, r->len);
+#ifndef RXM_BUFFER_ON_HEAP
ipcp_sdb_release(r->sdb);
- r->sdb = sdb;
- r->head = head;
- r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+ r->pkt = pci;
shm_du_buff_wait_ack(sdb);
#endif
+ idx = shm_du_buff_get_idx(sdb);
+
/* Retransmit the copy. */
- ((struct frct_pci *) head)->ackno =
- hton32(rcv_lwe);
+ pci->ackno = hton32(rcv_lwe);
#ifdef RXM_BLOCKING
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0)
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0)
#else
- if (shm_rbuff_write(f->tx_rb, idx) == 0)
+ if (shm_rbuff_write(f->tx_rb, idx) < 0)
#endif
- shm_flow_set_notify(f->set, f->flow_id,
- FLOW_PKT);
- reschedule:
- r->mul++;
-
- /* Schedule at least in the next time slot. */
- rslot = (rxm_slot
- + MAX(((rto * r->mul) >> RXMQ_RES), 1))
- & (RXMQ_SLOTS - 1);
-
- list_add_tail(&r->next, &rw.rxms[i][rslot]);
-
+ goto flow_down;
+ shm_flow_set_notify(f->set, f->info.id,
+ FLOW_PKT);
+ reschedule:
+ list_add(&r->next, &rw.rxms[lvl][rslot]);
continue;
- flow_down:
+ flow_down:
shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- cleanup:
+ cleanup:
#ifdef RXM_BUFFER_ON_HEAP
free(r->pkt);
#else
@@ -272,9 +267,9 @@ static void timerwheel_move(void)
free(r);
}
}
+ rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1);
/* Move up a level in the wheel. */
rxm_slot >>= RXMQ_BUMP;
- j >>= RXMQ_BUMP;
}
ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
@@ -297,11 +292,10 @@ static void timerwheel_move(void)
rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
- if (f->flow_id == a->flow_id && f->frcti != NULL)
+ if (f->info.id == a->flow_id && f->frcti != NULL)
send_frct_pkt(a->frcti);
free(a);
-
}
}
@@ -327,21 +321,19 @@ static int timerwheel_rxm(struct frcti * frcti,
clock_gettime(PTHREAD_COND_CLOCK, &now);
r->t0 = ts_to_ns(now);
- r->mul = 0;
r->seqno = seqno;
r->frcti = frcti;
+ r->len = shm_du_buff_len(sdb);
#ifdef RXM_BUFFER_ON_HEAP
- r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
- r->pkt = malloc(r->pkt_len);
+ r->pkt = malloc(r->len);
if (r->pkt == NULL) {
free(r);
return -ENOMEM;
}
- memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len);
+ memcpy(r->pkt, shm_du_buff_head(sdb), r->len);
#else
- r->sdb = sdb;
- r->head = shm_du_buff_head(sdb);
- r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+ r->pkt = (struct frct_pci *) shm_du_buff_head(sdb);
#endif
pthread_rwlock_rdlock(&r->frcti->lock);
@@ -349,7 +341,7 @@ static int timerwheel_rxm(struct frcti * frcti,
slot = r->t0 >> RXMQ_RES;
r->fd = frcti->fd;
- r->flow_id = ai.flows[r->fd].flow_id;
+ r->flow_id = ai.flows[r->fd].info.id;
pthread_rwlock_unlock(&r->frcti->lock);
@@ -367,7 +359,7 @@ static int timerwheel_rxm(struct frcti * frcti,
return -EPERM;
}
- slot = (slot + rto_slot) & (RXMQ_SLOTS - 1);
+ slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1);
pthread_mutex_lock(&rw.lock);
@@ -377,13 +369,11 @@ static int timerwheel_rxm(struct frcti * frcti,
#endif
pthread_mutex_unlock(&rw.lock);
- __sync_bool_compare_and_swap(&rw.in_use, false, true);
-
return 0;
}
-static int timerwheel_ack(int fd,
- struct frcti * frcti)
+static int timerwheel_delayed_ack(int fd,
+ struct frcti * frcti)
{
struct timespec now;
struct ack * a;
@@ -395,18 +385,16 @@ static int timerwheel_ack(int fd,
clock_gettime(PTHREAD_COND_CLOCK, &now);
- slot = DELT_ACK >> ACKQ_RES;
- if (slot >= ACKQ_SLOTS) { /* Out of timerwheel range. */
- free(a);
- return -EPERM;
- }
+ pthread_rwlock_rdlock(&frcti->lock);
- slot = (((ts_to_ns(now) + DELT_ACK) >> ACKQ_RES) + 1)
+ slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1)
& (ACKQ_SLOTS - 1);
+ pthread_rwlock_unlock(&frcti->lock);
+
a->fd = fd;
a->frcti = frcti;
- a->flow_id = ai.flows[fd].flow_id;
+ a->flow_id = ai.flows[fd].info.id;
pthread_mutex_lock(&rw.lock);
@@ -422,7 +410,5 @@ static int timerwheel_ack(int fd,
pthread_mutex_unlock(&rw.lock);
- __sync_bool_compare_and_swap(&rw.in_use, false, true);
-
return 0;
}