diff options
Diffstat (limited to 'src/lib/timerwheel.c')
-rw-r--r-- | src/lib/timerwheel.c | 153 |
1 files changed, 82 insertions, 71 deletions
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 33fcbc1c..4443832d 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -22,14 +22,6 @@ #include <ouroboros/list.h> -#define RXMQ_SLOTS (1 << 8) /* #slots / level. */ -#define RXMQ_LVLS 3 /* #levels, bump for DTN. */ -#define RXMQ_BUMP 4 /* factor to bump lvl. */ -#define RXMQ_RES 20 /* res (ns) of lowest lvl. */ - -#define ACKQ_SLOTS (1 << 7) /* #slots for delayed ACK. */ -#define ACKQ_RES 20 /* resolution for dACK. */ - /* Overflow limits range to about 6 hours. */ #define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) #define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) @@ -38,9 +30,14 @@ struct rxm { struct list_head next; uint32_t seqno; +#ifdef RXM_BUFFER_ON_HEAP + uint8_t * pkt; + size_t pkt_len; +#else struct shm_du_buff * sdb; uint8_t * head; uint8_t * tail; +#endif time_t t0; /* Time when original was sent (us). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; @@ -85,8 +82,12 @@ static void timerwheel_fini(void) struct rxm * rxm; rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); +#ifdef RXM_BUFFER_ON_HEAP + free(rxm->pkt); +#else shm_du_buff_ack(rxm->sdb); ipcp_sdb_release(rxm->sdb); +#endif free(rxm); } } @@ -156,8 +157,7 @@ static void timerwheel_move(void) j_max_slot += RXMQ_SLOTS; while (j++ < j_max_slot) { - list_for_each_safe(p, - h, + list_for_each_safe(p, h, &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; @@ -178,15 +178,12 @@ static void timerwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; f = &ai.flows[r->fd]; - +#ifndef RXM_BUFFER_ON_HEAP shm_du_buff_ack(r->sdb); - +#endif if (f->frcti == NULL - || f->flow_id != r->flow_id) { - ipcp_sdb_release(r->sdb); - free(r); - continue; - } + || f->flow_id != r->flow_id) + goto cleanup; pthread_rwlock_wrlock(&r->frcti->lock); @@ -197,69 +194,57 @@ static void timerwheel_move(void) pthread_rwlock_unlock(&r->frcti->lock); /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_lwe) < 0) { - ipcp_sdb_release(r->sdb); - free(r); - continue; - } + if ((int) (r->seqno - snd_lwe) < 0) + goto cleanup; /* Check for r-timer expiry. */ - if (ts_to_ns(now) - r->t0 > r->frcti->r) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, - ACL_FLOWDOWN); - continue; - } + if (ts_to_ns(now) - r->t0 > r->frcti->r) + goto flow_down; if (r->frcti->probe && (r->frcti->rttseq + 1) == r->seqno) r->frcti->probe = false; - - /* Copy the data, safe rtx in other layers. */ - if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, - ACL_FLOWDOWN); - continue; - } - +#ifdef RXM_BLOCKING + #ifdef RXM_BUFFER_ON_HEAP + if (ipcp_sdb_reserve(&sdb, r->pkt_len)) + #else + if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) + #endif +#else + #ifdef RXM_BUFFER_ON_HEAP + if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL, + &sdb)) + #else + if (shm_rdrbuff_alloc(ai.rdrb, + r->tail - r->head, NULL, + &sdb)) + #endif +#endif + goto reschedule; /* rbuff full */ idx = shm_du_buff_get_idx(sdb); head = shm_du_buff_head(sdb); +#ifdef RXM_BUFFER_ON_HEAP + memcpy(head, r->pkt, r->pkt_len); +#else memcpy(head, r->head, r->tail - r->head); - ipcp_sdb_release(r->sdb); - - ((struct frct_pci *) head)->ackno = - hton32(rcv_lwe); - - /* Retransmit the copy. */ - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - ipcp_sdb_release(sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, - ACL_FLOWDOWN); - continue; - } - - /* Reschedule. */ - shm_du_buff_wait_ack(sdb); - - shm_flow_set_notify(f->set, - f->flow_id, - FLOW_PKT); - + r->sdb = sdb; r->head = head; r->tail = shm_du_buff_tail(sdb); - r->sdb = sdb; + shm_du_buff_wait_ack(sdb); +#endif + /* Retransmit the copy. */ + ((struct frct_pci *) head)->ackno = + hton32(rcv_lwe); +#ifdef RXM_BLOCKING + if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0) +#else + if (shm_rbuff_write(f->tx_rb, idx) == 0) +#endif + shm_flow_set_notify(f->set, f->flow_id, + FLOW_PKT); + reschedule: r->mul++; /* Schedule at least in the next time slot. */ @@ -268,10 +253,24 @@ static void timerwheel_move(void) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.rxms[i][rslot]); + + continue; + + flow_down: + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + cleanup: +#ifdef RXM_BUFFER_ON_HEAP + free(r->pkt); +#else + ipcp_sdb_release(r->sdb); +#endif + free(r); } } /* Move up a level in the wheel. */ rxm_slot >>= RXMQ_BUMP; + j >>= RXMQ_BUMP; } ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; @@ -326,11 +325,20 @@ static int timerwheel_rxm(struct frcti * frcti, r->t0 = ts_to_ns(now); r->mul = 0; r->seqno = seqno; + r->frcti = frcti; +#ifdef RXM_BUFFER_ON_HEAP + r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + r->pkt = malloc(r->pkt_len); + if (r->pkt == NULL) { + free(r); + return -ENOMEM; + } + memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len); +#else r->sdb = sdb; r->head = shm_du_buff_head(sdb); r->tail = shm_du_buff_tail(sdb); - r->frcti = frcti; - +#endif pthread_rwlock_rdlock(&r->frcti->lock); rto_slot = frcti->rto >> RXMQ_RES; @@ -348,6 +356,9 @@ static int timerwheel_rxm(struct frcti * frcti, } if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ +#ifdef RXM_BUFFER_ON_HEAP + free(r->pkt); +#endif free(r); return -EPERM; } @@ -357,9 +368,9 @@ static int timerwheel_rxm(struct frcti * frcti, pthread_mutex_lock(&rw.lock); list_add_tail(&r->next, &rw.rxms[lvl][slot]); - +#ifndef RXM_BUFFER_ON_HEAP shm_du_buff_wait_ack(sdb); - +#endif pthread_mutex_unlock(&rw.lock); return 0; |