summaryrefslogtreecommitdiff
path: root/src/lib/timerwheel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/timerwheel.c')
-rw-r--r--src/lib/timerwheel.c153
1 files changed, 82 insertions, 71 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 33fcbc1c..4443832d 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -22,14 +22,6 @@
#include <ouroboros/list.h>
-#define RXMQ_SLOTS (1 << 8) /* #slots / level. */
-#define RXMQ_LVLS 3 /* #levels, bump for DTN. */
-#define RXMQ_BUMP 4 /* factor to bump lvl. */
-#define RXMQ_RES 20 /* res (ns) of lowest lvl. */
-
-#define ACKQ_SLOTS (1 << 7) /* #slots for delayed ACK. */
-#define ACKQ_RES 20 /* resolution for dACK. */
-
/* Overflow limits range to about 6 hours. */
#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
@@ -38,9 +30,14 @@
struct rxm {
struct list_head next;
uint32_t seqno;
+#ifdef RXM_BUFFER_ON_HEAP
+ uint8_t * pkt;
+ size_t pkt_len;
+#else
struct shm_du_buff * sdb;
uint8_t * head;
uint8_t * tail;
+#endif
time_t t0; /* Time when original was sent (us). */
size_t mul; /* RTO multiplier. */
struct frcti * frcti;
@@ -85,8 +82,12 @@ static void timerwheel_fini(void)
struct rxm * rxm;
rxm = list_entry(p, struct rxm, next);
list_del(&rxm->next);
+#ifdef RXM_BUFFER_ON_HEAP
+ free(rxm->pkt);
+#else
shm_du_buff_ack(rxm->sdb);
ipcp_sdb_release(rxm->sdb);
+#endif
free(rxm);
}
}
@@ -156,8 +157,7 @@ static void timerwheel_move(void)
j_max_slot += RXMQ_SLOTS;
while (j++ < j_max_slot) {
- list_for_each_safe(p,
- h,
+ list_for_each_safe(p, h,
&rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
@@ -178,15 +178,12 @@ static void timerwheel_move(void)
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
f = &ai.flows[r->fd];
-
+#ifndef RXM_BUFFER_ON_HEAP
shm_du_buff_ack(r->sdb);
-
+#endif
if (f->frcti == NULL
- || f->flow_id != r->flow_id) {
- ipcp_sdb_release(r->sdb);
- free(r);
- continue;
- }
+ || f->flow_id != r->flow_id)
+ goto cleanup;
pthread_rwlock_wrlock(&r->frcti->lock);
@@ -197,69 +194,57 @@ static void timerwheel_move(void)
pthread_rwlock_unlock(&r->frcti->lock);
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_lwe) < 0) {
- ipcp_sdb_release(r->sdb);
- free(r);
- continue;
- }
+ if ((int) (r->seqno - snd_lwe) < 0)
+ goto cleanup;
/* Check for r-timer expiry. */
- if (ts_to_ns(now) - r->t0 > r->frcti->r) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb,
- ACL_FLOWDOWN);
- continue;
- }
+ if (ts_to_ns(now) - r->t0 > r->frcti->r)
+ goto flow_down;
if (r->frcti->probe
&& (r->frcti->rttseq + 1) == r->seqno)
r->frcti->probe = false;
-
- /* Copy the data, safe rtx in other layers. */
- if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb,
- ACL_FLOWDOWN);
- continue;
- }
-
+#ifdef RXM_BLOCKING
+ #ifdef RXM_BUFFER_ON_HEAP
+ if (ipcp_sdb_reserve(&sdb, r->pkt_len))
+ #else
+ if (ipcp_sdb_reserve(&sdb, r->tail - r->head))
+ #endif
+#else
+ #ifdef RXM_BUFFER_ON_HEAP
+ if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL,
+ &sdb))
+ #else
+ if (shm_rdrbuff_alloc(ai.rdrb,
+ r->tail - r->head, NULL,
+ &sdb))
+ #endif
+#endif
+ goto reschedule; /* rbuff full */
idx = shm_du_buff_get_idx(sdb);
head = shm_du_buff_head(sdb);
+#ifdef RXM_BUFFER_ON_HEAP
+ memcpy(head, r->pkt, r->pkt_len);
+#else
memcpy(head, r->head, r->tail - r->head);
-
ipcp_sdb_release(r->sdb);
-
- ((struct frct_pci *) head)->ackno =
- hton32(rcv_lwe);
-
- /* Retransmit the copy. */
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- ipcp_sdb_release(sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb,
- ACL_FLOWDOWN);
- continue;
- }
-
- /* Reschedule. */
- shm_du_buff_wait_ack(sdb);
-
- shm_flow_set_notify(f->set,
- f->flow_id,
- FLOW_PKT);
-
+ r->sdb = sdb;
r->head = head;
r->tail = shm_du_buff_tail(sdb);
- r->sdb = sdb;
+ shm_du_buff_wait_ack(sdb);
+#endif
+ /* Retransmit the copy. */
+ ((struct frct_pci *) head)->ackno =
+ hton32(rcv_lwe);
+#ifdef RXM_BLOCKING
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0)
+#else
+ if (shm_rbuff_write(f->tx_rb, idx) == 0)
+#endif
+ shm_flow_set_notify(f->set, f->flow_id,
+ FLOW_PKT);
+ reschedule:
r->mul++;
/* Schedule at least in the next time slot. */
@@ -268,10 +253,24 @@ static void timerwheel_move(void)
& (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw.rxms[i][rslot]);
+
+ continue;
+
+ flow_down:
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ cleanup:
+#ifdef RXM_BUFFER_ON_HEAP
+ free(r->pkt);
+#else
+ ipcp_sdb_release(r->sdb);
+#endif
+ free(r);
}
}
/* Move up a level in the wheel. */
rxm_slot >>= RXMQ_BUMP;
+ j >>= RXMQ_BUMP;
}
ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
@@ -326,11 +325,20 @@ static int timerwheel_rxm(struct frcti * frcti,
r->t0 = ts_to_ns(now);
r->mul = 0;
r->seqno = seqno;
+ r->frcti = frcti;
+#ifdef RXM_BUFFER_ON_HEAP
+ r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ r->pkt = malloc(r->pkt_len);
+ if (r->pkt == NULL) {
+ free(r);
+ return -ENOMEM;
+ }
+ memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len);
+#else
r->sdb = sdb;
r->head = shm_du_buff_head(sdb);
r->tail = shm_du_buff_tail(sdb);
- r->frcti = frcti;
-
+#endif
pthread_rwlock_rdlock(&r->frcti->lock);
rto_slot = frcti->rto >> RXMQ_RES;
@@ -348,6 +356,9 @@ static int timerwheel_rxm(struct frcti * frcti,
}
if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
+#ifdef RXM_BUFFER_ON_HEAP
+ free(r->pkt);
+#endif
free(r);
return -EPERM;
}
@@ -357,9 +368,9 @@ static int timerwheel_rxm(struct frcti * frcti,
pthread_mutex_lock(&rw.lock);
list_add_tail(&r->next, &rw.rxms[lvl][slot]);
-
+#ifndef RXM_BUFFER_ON_HEAP
shm_du_buff_wait_ack(sdb);
-
+#endif
pthread_mutex_unlock(&rw.lock);
return 0;