diff options
-rw-r--r-- | src/lib/CMakeLists.txt | 32 | ||||
-rw-r--r-- | src/lib/config.h.in | 26 | ||||
-rw-r--r-- | src/lib/dev.c | 3 | ||||
-rw-r--r-- | src/lib/frct.c | 32 | ||||
-rw-r--r-- | src/lib/timerwheel.c | 153 |
5 files changed, 153 insertions, 93 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4aad3a11..5e60e375 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -180,14 +180,42 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL "Name for the main POSIX shared memory buffer") set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING "Packet buffer block size, multiple of pagesize for performance") -set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL +set(SHM_RDRB_MULTI_BLOCK TRUE CACHE BOOL "Packet buffer multiblock packet support") -set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL +set(SHM_RBUFF_LOCKLESS TRUE CACHE BOOL "Enable shared memory lockless rbuff support") set(QOS_DISABLE_CRC TRUE CACHE BOOL "Ignores ber setting on all QoS cubes") +set(DELTA_T_MPL 60 CACHE STRING + "Maximum packet lifetime (s)") +set(DELTA_T_ACK 10 CACHE STRING + "Maximum time to acknowledge a packet (s)") +set(DELTA_T_RTX 120 CACHE STRING + "Maximum time to retransmit a packet (s)") +set(DELTA_T_ACK_DELAY 10 CACHE STRING + "Maximum time to wait before acknowledging a packet (ms)") +set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING + "Size of the reordering queue, must be a power of 2") set(FRCT_RTO_MIN 250 CACHE STRING "Minimum Retransmission Timeout (RTO) for FRCT (us)") +set(FRCT_TICK_TIME 500 CACHE STRING + "Tick time for FRCT activity (retransmission, acknowledgments) (us)") +set(RXM_BUFFER_ON_HEAP FALSE CACHE BOOL + "Store packets for retransmission on the heap instead of in packet buffer") +set(RXM_BLOCKING TRUE CACHE BOOL + "Use blocking writes for retransmission") +set(RXM_MIN_RESOLUTION 20 CACHE STRING + "Minimum retransmission delay (ns), as a power to 2") +set(RXM_WHEEL_MULTIPLIER 4 CACHE STRING + "Factor for retransmission wheel levels as a power to 2") +set(RXM_WHEEL_LEVELS 3 CACHE STRING + "Number of levels in the retransmission wheel") +set(RXM_WHEEL_SLOTS_PER_LEVEL 256 CACHE STRING + "Number of slots per level in the retransmission wheel, must be a power of 2") +set(ACK_WHEEL_SLOTS 128 CACHE STRING + "Number of slots in the acknowledgment wheel, must be a power of 2") +set(ACK_WHEEL_RESOLUTION 20 CACHE STRING + "Minimum acknowledgment delay (ns), as a power to 2") set(SOURCE_FILES_DEV # Add source files here diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 7cac76a6..38c364c6 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -55,7 +55,7 @@ #cmakedefine HAVE_FUSE #ifdef HAVE_FUSE -#define FUSE_PREFIX "@FUSE_PREFIX@" +#define FUSE_PREFIX "@FUSE_PREFIX@" #endif #define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@ @@ -67,4 +67,26 @@ #define DU_BUFF_HEADSPACE @DU_BUFF_HEADSPACE@ #define DU_BUFF_TAILSPACE @DU_BUFF_TAILSPACE@ -#define RTO_MIN @FRCT_RTO_MIN@ +/* Default Delta-t parameters */ +#define DELT_MPL (@DELTA_T_MPL@ * BILLION) /* ns */ +#define DELT_A (@DELTA_T_ACK@ * BILLION) /* ns */ +#define DELT_R (@DELTA_T_RTX@ * BILLION) /* ns */ + +#define DELT_ACK (@DELTA_T_ACK_DELAY@ * MILLION) /* ns */ + +#define RQ_SIZE (@FRCT_REORDER_QUEUE_SIZE@) +#define RTO_MIN (@FRCT_RTO_MIN@ * 1000) + +#define TICTIME (@FRCT_TICK_TIME@ * 1000) /* ns */ + +/* Retransmission tuning */ +#cmakedefine RXM_BUFFER_ON_HEAP +#cmakedefine RXM_BLOCKING + +#define RXMQ_RES (@RXM_MIN_RESOLUTION@) /* 2^N ns */ +#define RXMQ_BUMP (@RXM_WHEEL_MULTIPLIER@) +#define RXMQ_LVLS (@RXM_WHEEL_LEVELS@) +#define RXMQ_SLOTS (@RXM_WHEEL_SLOTS_PER_LEVEL@) + +#define ACKQ_SLOTS (@ACK_WHEEL_SLOTS@) +#define ACKQ_RES (@ACK_WHEEL_RESOLUTION@) /* 2^N ns */ diff --git a/src/lib/dev.c b/src/lib/dev.c index 5cf23639..ed377367 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,7 +63,6 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 -#define TICTIME 1000000 /* ns */ struct flow_set { size_t idx; @@ -801,7 +800,7 @@ int flow_dealloc(int fd) timeo = frcti_dealloc(f->frcti); - if (ret == -ETIMEDOUT && timeo < 0) + if ((ret == -ETIMEDOUT || ret == -EFLOWDOWN) && timeo < 0) timeo = -timeo; } diff --git a/src/lib/frct.c b/src/lib/frct.c index c26910fa..6ead72af 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -20,15 +20,6 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -/* Default Delta-t parameters */ -#define DELT_MPL (5 * BILLION) /* ns */ -#define DELT_A (1 * BILLION) /* ns */ -#define DELT_R (20 * BILLION) /* ns */ - -#define DELT_ACK (10 * MILLION) /* ns */ - -#define RQ_SIZE 256 - #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { @@ -96,7 +87,7 @@ static bool after(uint32_t seq1, return (int32_t)(seq2 - seq1) < 0; } -static void __send_ack(int fd, +static int __send_ack(int fd, int ackno) { struct shm_du_buff * sdb; @@ -105,9 +96,13 @@ static void __send_ack(int fd, struct flow * f; /* Raw calls needed to bypass frcti. */ +#ifdef RXM_BLOCKING idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); +#else + idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); +#endif if (idx < 0) - return; + return -ENOMEM; pci = (struct frct_pci *) shm_du_buff_head(sdb); memset(pci, 0, sizeof(*pci)); @@ -116,13 +111,18 @@ static void __send_ack(int fd, pci->ackno = hton32(ackno); f = &ai.flows[fd]; - +#ifdef RXM_BLOCKING if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { +#else + if (shm_rbuff_write(f->tx_rb, idx)) { +#endif ipcp_sdb_release(sdb); - return; + return -ENOMEM; } shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + + return 0; } static void frct_send_ack(struct frcti * frcti) @@ -153,7 +153,8 @@ static void frct_send_ack(struct frcti * frcti) if (diff > frcti->a || diff < DELT_ACK) return; - __send_ack(fd, ackno); + if (__send_ack(fd, ackno) < 0) + return; pthread_rwlock_wrlock(&frcti->lock); @@ -439,8 +440,7 @@ static void rtt_estimator(struct frcti * frcti, frcti->srtt = MAX(1000U, srtt); frcti->mdev = MAX(100U, rttvar); - frcti->rto = MAX(RTO_MIN * 1000, - frcti->srtt + (frcti->mdev << 1)); + frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1)); } static void __frcti_tick(void) diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 33fcbc1c..4443832d 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -22,14 +22,6 @@ #include <ouroboros/list.h> -#define RXMQ_SLOTS (1 << 8) /* #slots / level. */ -#define RXMQ_LVLS 3 /* #levels, bump for DTN. */ -#define RXMQ_BUMP 4 /* factor to bump lvl. */ -#define RXMQ_RES 20 /* res (ns) of lowest lvl. */ - -#define ACKQ_SLOTS (1 << 7) /* #slots for delayed ACK. */ -#define ACKQ_RES 20 /* resolution for dACK. */ - /* Overflow limits range to about 6 hours. */ #define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) #define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) @@ -38,9 +30,14 @@ struct rxm { struct list_head next; uint32_t seqno; +#ifdef RXM_BUFFER_ON_HEAP + uint8_t * pkt; + size_t pkt_len; +#else struct shm_du_buff * sdb; uint8_t * head; uint8_t * tail; +#endif time_t t0; /* Time when original was sent (us). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; @@ -85,8 +82,12 @@ static void timerwheel_fini(void) struct rxm * rxm; rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); +#ifdef RXM_BUFFER_ON_HEAP + free(rxm->pkt); +#else shm_du_buff_ack(rxm->sdb); ipcp_sdb_release(rxm->sdb); +#endif free(rxm); } } @@ -156,8 +157,7 @@ static void timerwheel_move(void) j_max_slot += RXMQ_SLOTS; while (j++ < j_max_slot) { - list_for_each_safe(p, - h, + list_for_each_safe(p, h, &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; @@ -178,15 +178,12 @@ static void timerwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; f = &ai.flows[r->fd]; - +#ifndef RXM_BUFFER_ON_HEAP shm_du_buff_ack(r->sdb); - +#endif if (f->frcti == NULL - || f->flow_id != r->flow_id) { - ipcp_sdb_release(r->sdb); - free(r); - continue; - } + || f->flow_id != r->flow_id) + goto cleanup; pthread_rwlock_wrlock(&r->frcti->lock); @@ -197,69 +194,57 @@ static void timerwheel_move(void) pthread_rwlock_unlock(&r->frcti->lock); /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_lwe) < 0) { - ipcp_sdb_release(r->sdb); - free(r); - continue; - } + if ((int) (r->seqno - snd_lwe) < 0) + goto cleanup; /* Check for r-timer expiry. */ - if (ts_to_ns(now) - r->t0 > r->frcti->r) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, - ACL_FLOWDOWN); - continue; - } + if (ts_to_ns(now) - r->t0 > r->frcti->r) + goto flow_down; if (r->frcti->probe && (r->frcti->rttseq + 1) == r->seqno) r->frcti->probe = false; - - /* Copy the data, safe rtx in other layers. */ - if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, - ACL_FLOWDOWN); - continue; - } - +#ifdef RXM_BLOCKING + #ifdef RXM_BUFFER_ON_HEAP + if (ipcp_sdb_reserve(&sdb, r->pkt_len)) + #else + if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) + #endif +#else + #ifdef RXM_BUFFER_ON_HEAP + if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL, + &sdb)) + #else + if (shm_rdrbuff_alloc(ai.rdrb, + r->tail - r->head, NULL, + &sdb)) + #endif +#endif + goto reschedule; /* rbuff full */ idx = shm_du_buff_get_idx(sdb); head = shm_du_buff_head(sdb); +#ifdef RXM_BUFFER_ON_HEAP + memcpy(head, r->pkt, r->pkt_len); +#else memcpy(head, r->head, r->tail - r->head); - ipcp_sdb_release(r->sdb); - - ((struct frct_pci *) head)->ackno = - hton32(rcv_lwe); - - /* Retransmit the copy. */ - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - ipcp_sdb_release(sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, - ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, - ACL_FLOWDOWN); - continue; - } - - /* Reschedule. */ - shm_du_buff_wait_ack(sdb); - - shm_flow_set_notify(f->set, - f->flow_id, - FLOW_PKT); - + r->sdb = sdb; r->head = head; r->tail = shm_du_buff_tail(sdb); - r->sdb = sdb; + shm_du_buff_wait_ack(sdb); +#endif + /* Retransmit the copy. */ + ((struct frct_pci *) head)->ackno = + hton32(rcv_lwe); +#ifdef RXM_BLOCKING + if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0) +#else + if (shm_rbuff_write(f->tx_rb, idx) == 0) +#endif + shm_flow_set_notify(f->set, f->flow_id, + FLOW_PKT); + reschedule: r->mul++; /* Schedule at least in the next time slot. */ @@ -268,10 +253,24 @@ static void timerwheel_move(void) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.rxms[i][rslot]); + + continue; + + flow_down: + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + cleanup: +#ifdef RXM_BUFFER_ON_HEAP + free(r->pkt); +#else + ipcp_sdb_release(r->sdb); +#endif + free(r); } } /* Move up a level in the wheel. */ rxm_slot >>= RXMQ_BUMP; + j >>= RXMQ_BUMP; } ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; @@ -326,11 +325,20 @@ static int timerwheel_rxm(struct frcti * frcti, r->t0 = ts_to_ns(now); r->mul = 0; r->seqno = seqno; + r->frcti = frcti; +#ifdef RXM_BUFFER_ON_HEAP + r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + r->pkt = malloc(r->pkt_len); + if (r->pkt == NULL) { + free(r); + return -ENOMEM; + } + memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len); +#else r->sdb = sdb; r->head = shm_du_buff_head(sdb); r->tail = shm_du_buff_tail(sdb); - r->frcti = frcti; - +#endif pthread_rwlock_rdlock(&r->frcti->lock); rto_slot = frcti->rto >> RXMQ_RES; @@ -348,6 +356,9 @@ static int timerwheel_rxm(struct frcti * frcti, } if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ +#ifdef RXM_BUFFER_ON_HEAP + free(r->pkt); +#endif free(r); return -EPERM; } @@ -357,9 +368,9 @@ static int timerwheel_rxm(struct frcti * frcti, pthread_mutex_lock(&rw.lock); list_add_tail(&r->next, &rw.rxms[lvl][slot]); - +#ifndef RXM_BUFFER_ON_HEAP shm_du_buff_wait_ack(sdb); - +#endif pthread_mutex_unlock(&rw.lock); return 0; |