summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lib/CMakeLists.txt32
-rw-r--r--src/lib/config.h.in26
-rw-r--r--src/lib/dev.c3
-rw-r--r--src/lib/frct.c32
-rw-r--r--src/lib/timerwheel.c153
5 files changed, 153 insertions, 93 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 4aad3a11..5e60e375 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -180,14 +180,42 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL
"Name for the main POSIX shared memory buffer")
set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING
"Packet buffer block size, multiple of pagesize for performance")
-set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL
+set(SHM_RDRB_MULTI_BLOCK TRUE CACHE BOOL
"Packet buffer multiblock packet support")
-set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL
+set(SHM_RBUFF_LOCKLESS TRUE CACHE BOOL
"Enable shared memory lockless rbuff support")
set(QOS_DISABLE_CRC TRUE CACHE BOOL
"Ignores ber setting on all QoS cubes")
+set(DELTA_T_MPL 60 CACHE STRING
+ "Maximum packet lifetime (s)")
+set(DELTA_T_ACK 10 CACHE STRING
+ "Maximum time to acknowledge a packet (s)")
+set(DELTA_T_RTX 120 CACHE STRING
+ "Maximum time to retransmit a packet (s)")
+set(DELTA_T_ACK_DELAY 10 CACHE STRING
+ "Maximum time to wait before acknowledging a packet (ms)")
+set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING
+ "Size of the reordering queue, must be a power of 2")
set(FRCT_RTO_MIN 250 CACHE STRING
"Minimum Retransmission Timeout (RTO) for FRCT (us)")
+set(FRCT_TICK_TIME 500 CACHE STRING
+ "Tick time for FRCT activity (retransmission, acknowledgments) (us)")
+set(RXM_BUFFER_ON_HEAP FALSE CACHE BOOL
+ "Store packets for retransmission on the heap instead of in packet buffer")
+set(RXM_BLOCKING TRUE CACHE BOOL
+ "Use blocking writes for retransmission")
+set(RXM_MIN_RESOLUTION 20 CACHE STRING
+ "Minimum retransmission delay (ns), as a power to 2")
+set(RXM_WHEEL_MULTIPLIER 4 CACHE STRING
+ "Factor for retransmission wheel levels as a power to 2")
+set(RXM_WHEEL_LEVELS 3 CACHE STRING
+ "Number of levels in the retransmission wheel")
+set(RXM_WHEEL_SLOTS_PER_LEVEL 256 CACHE STRING
+ "Number of slots per level in the retransmission wheel, must be a power of 2")
+set(ACK_WHEEL_SLOTS 128 CACHE STRING
+ "Number of slots in the acknowledgment wheel, must be a power of 2")
+set(ACK_WHEEL_RESOLUTION 20 CACHE STRING
+ "Minimum acknowledgment delay (ns), as a power to 2")
set(SOURCE_FILES_DEV
# Add source files here
diff --git a/src/lib/config.h.in b/src/lib/config.h.in
index 7cac76a6..38c364c6 100644
--- a/src/lib/config.h.in
+++ b/src/lib/config.h.in
@@ -55,7 +55,7 @@
#cmakedefine HAVE_FUSE
#ifdef HAVE_FUSE
-#define FUSE_PREFIX "@FUSE_PREFIX@"
+#define FUSE_PREFIX "@FUSE_PREFIX@"
#endif
#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@
@@ -67,4 +67,26 @@
#define DU_BUFF_HEADSPACE @DU_BUFF_HEADSPACE@
#define DU_BUFF_TAILSPACE @DU_BUFF_TAILSPACE@
-#define RTO_MIN @FRCT_RTO_MIN@
+/* Default Delta-t parameters */
+#define DELT_MPL (@DELTA_T_MPL@ * BILLION) /* ns */
+#define DELT_A (@DELTA_T_ACK@ * BILLION) /* ns */
+#define DELT_R (@DELTA_T_RTX@ * BILLION) /* ns */
+
+#define DELT_ACK (@DELTA_T_ACK_DELAY@ * MILLION) /* ns */
+
+#define RQ_SIZE (@FRCT_REORDER_QUEUE_SIZE@)
+#define RTO_MIN (@FRCT_RTO_MIN@ * 1000)
+
+#define TICTIME (@FRCT_TICK_TIME@ * 1000) /* ns */
+
+/* Retransmission tuning */
+#cmakedefine RXM_BUFFER_ON_HEAP
+#cmakedefine RXM_BLOCKING
+
+#define RXMQ_RES (@RXM_MIN_RESOLUTION@) /* 2^N ns */
+#define RXMQ_BUMP (@RXM_WHEEL_MULTIPLIER@)
+#define RXMQ_LVLS (@RXM_WHEEL_LEVELS@)
+#define RXMQ_SLOTS (@RXM_WHEEL_SLOTS_PER_LEVEL@)
+
+#define ACKQ_SLOTS (@ACK_WHEEL_SLOTS@)
+#define ACKQ_RES (@ACK_WHEEL_RESOLUTION@) /* 2^N ns */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 5cf23639..ed377367 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -63,7 +63,6 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
-#define TICTIME 1000000 /* ns */
struct flow_set {
size_t idx;
@@ -801,7 +800,7 @@ int flow_dealloc(int fd)
timeo = frcti_dealloc(f->frcti);
- if (ret == -ETIMEDOUT && timeo < 0)
+ if ((ret == -ETIMEDOUT || ret == -EFLOWDOWN) && timeo < 0)
timeo = -timeo;
}
diff --git a/src/lib/frct.c b/src/lib/frct.c
index c26910fa..6ead72af 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -20,15 +20,6 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-/* Default Delta-t parameters */
-#define DELT_MPL (5 * BILLION) /* ns */
-#define DELT_A (1 * BILLION) /* ns */
-#define DELT_R (20 * BILLION) /* ns */
-
-#define DELT_ACK (10 * MILLION) /* ns */
-
-#define RQ_SIZE 256
-
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
@@ -96,7 +87,7 @@ static bool after(uint32_t seq1,
return (int32_t)(seq2 - seq1) < 0;
}
-static void __send_ack(int fd,
+static int __send_ack(int fd,
int ackno)
{
struct shm_du_buff * sdb;
@@ -105,9 +96,13 @@ static void __send_ack(int fd,
struct flow * f;
/* Raw calls needed to bypass frcti. */
+#ifdef RXM_BLOCKING
idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+#else
+ idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);
+#endif
if (idx < 0)
- return;
+ return -ENOMEM;
pci = (struct frct_pci *) shm_du_buff_head(sdb);
memset(pci, 0, sizeof(*pci));
@@ -116,13 +111,18 @@ static void __send_ack(int fd,
pci->ackno = hton32(ackno);
f = &ai.flows[fd];
-
+#ifdef RXM_BLOCKING
if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+#else
+ if (shm_rbuff_write(f->tx_rb, idx)) {
+#endif
ipcp_sdb_release(sdb);
- return;
+ return -ENOMEM;
}
shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
+ return 0;
}
static void frct_send_ack(struct frcti * frcti)
@@ -153,7 +153,8 @@ static void frct_send_ack(struct frcti * frcti)
if (diff > frcti->a || diff < DELT_ACK)
return;
- __send_ack(fd, ackno);
+ if (__send_ack(fd, ackno) < 0)
+ return;
pthread_rwlock_wrlock(&frcti->lock);
@@ -439,8 +440,7 @@ static void rtt_estimator(struct frcti * frcti,
frcti->srtt = MAX(1000U, srtt);
frcti->mdev = MAX(100U, rttvar);
- frcti->rto = MAX(RTO_MIN * 1000,
- frcti->srtt + (frcti->mdev << 1));
+ frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1));
}
static void __frcti_tick(void)
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 33fcbc1c..4443832d 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -22,14 +22,6 @@
#include <ouroboros/list.h>
-#define RXMQ_SLOTS (1 << 8) /* #slots / level. */
-#define RXMQ_LVLS 3 /* #levels, bump for DTN. */
-#define RXMQ_BUMP 4 /* factor to bump lvl. */
-#define RXMQ_RES 20 /* res (ns) of lowest lvl. */
-
-#define ACKQ_SLOTS (1 << 7) /* #slots for delayed ACK. */
-#define ACKQ_RES 20 /* resolution for dACK. */
-
/* Overflow limits range to about 6 hours. */
#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
@@ -38,9 +30,14 @@
struct rxm {
struct list_head next;
uint32_t seqno;
+#ifdef RXM_BUFFER_ON_HEAP
+ uint8_t * pkt;
+ size_t pkt_len;
+#else
struct shm_du_buff * sdb;
uint8_t * head;
uint8_t * tail;
+#endif
time_t t0; /* Time when original was sent (us). */
size_t mul; /* RTO multiplier. */
struct frcti * frcti;
@@ -85,8 +82,12 @@ static void timerwheel_fini(void)
struct rxm * rxm;
rxm = list_entry(p, struct rxm, next);
list_del(&rxm->next);
+#ifdef RXM_BUFFER_ON_HEAP
+ free(rxm->pkt);
+#else
shm_du_buff_ack(rxm->sdb);
ipcp_sdb_release(rxm->sdb);
+#endif
free(rxm);
}
}
@@ -156,8 +157,7 @@ static void timerwheel_move(void)
j_max_slot += RXMQ_SLOTS;
while (j++ < j_max_slot) {
- list_for_each_safe(p,
- h,
+ list_for_each_safe(p, h,
&rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
@@ -178,15 +178,12 @@ static void timerwheel_move(void)
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
f = &ai.flows[r->fd];
-
+#ifndef RXM_BUFFER_ON_HEAP
shm_du_buff_ack(r->sdb);
-
+#endif
if (f->frcti == NULL
- || f->flow_id != r->flow_id) {
- ipcp_sdb_release(r->sdb);
- free(r);
- continue;
- }
+ || f->flow_id != r->flow_id)
+ goto cleanup;
pthread_rwlock_wrlock(&r->frcti->lock);
@@ -197,69 +194,57 @@ static void timerwheel_move(void)
pthread_rwlock_unlock(&r->frcti->lock);
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_lwe) < 0) {
- ipcp_sdb_release(r->sdb);
- free(r);
- continue;
- }
+ if ((int) (r->seqno - snd_lwe) < 0)
+ goto cleanup;
/* Check for r-timer expiry. */
- if (ts_to_ns(now) - r->t0 > r->frcti->r) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb,
- ACL_FLOWDOWN);
- continue;
- }
+ if (ts_to_ns(now) - r->t0 > r->frcti->r)
+ goto flow_down;
if (r->frcti->probe
&& (r->frcti->rttseq + 1) == r->seqno)
r->frcti->probe = false;
-
- /* Copy the data, safe rtx in other layers. */
- if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb,
- ACL_FLOWDOWN);
- continue;
- }
-
+#ifdef RXM_BLOCKING
+ #ifdef RXM_BUFFER_ON_HEAP
+ if (ipcp_sdb_reserve(&sdb, r->pkt_len))
+ #else
+ if (ipcp_sdb_reserve(&sdb, r->tail - r->head))
+ #endif
+#else
+ #ifdef RXM_BUFFER_ON_HEAP
+ if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL,
+ &sdb))
+ #else
+ if (shm_rdrbuff_alloc(ai.rdrb,
+ r->tail - r->head, NULL,
+ &sdb))
+ #endif
+#endif
+ goto reschedule; /* rbuff full */
idx = shm_du_buff_get_idx(sdb);
head = shm_du_buff_head(sdb);
+#ifdef RXM_BUFFER_ON_HEAP
+ memcpy(head, r->pkt, r->pkt_len);
+#else
memcpy(head, r->head, r->tail - r->head);
-
ipcp_sdb_release(r->sdb);
-
- ((struct frct_pci *) head)->ackno =
- hton32(rcv_lwe);
-
- /* Retransmit the copy. */
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- ipcp_sdb_release(sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb,
- ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb,
- ACL_FLOWDOWN);
- continue;
- }
-
- /* Reschedule. */
- shm_du_buff_wait_ack(sdb);
-
- shm_flow_set_notify(f->set,
- f->flow_id,
- FLOW_PKT);
-
+ r->sdb = sdb;
r->head = head;
r->tail = shm_du_buff_tail(sdb);
- r->sdb = sdb;
+ shm_du_buff_wait_ack(sdb);
+#endif
+ /* Retransmit the copy. */
+ ((struct frct_pci *) head)->ackno =
+ hton32(rcv_lwe);
+#ifdef RXM_BLOCKING
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0)
+#else
+ if (shm_rbuff_write(f->tx_rb, idx) == 0)
+#endif
+ shm_flow_set_notify(f->set, f->flow_id,
+ FLOW_PKT);
+ reschedule:
r->mul++;
/* Schedule at least in the next time slot. */
@@ -268,10 +253,24 @@ static void timerwheel_move(void)
& (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw.rxms[i][rslot]);
+
+ continue;
+
+ flow_down:
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ cleanup:
+#ifdef RXM_BUFFER_ON_HEAP
+ free(r->pkt);
+#else
+ ipcp_sdb_release(r->sdb);
+#endif
+ free(r);
}
}
/* Move up a level in the wheel. */
rxm_slot >>= RXMQ_BUMP;
+ j >>= RXMQ_BUMP;
}
ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
@@ -326,11 +325,20 @@ static int timerwheel_rxm(struct frcti * frcti,
r->t0 = ts_to_ns(now);
r->mul = 0;
r->seqno = seqno;
+ r->frcti = frcti;
+#ifdef RXM_BUFFER_ON_HEAP
+ r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+ r->pkt = malloc(r->pkt_len);
+ if (r->pkt == NULL) {
+ free(r);
+ return -ENOMEM;
+ }
+ memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len);
+#else
r->sdb = sdb;
r->head = shm_du_buff_head(sdb);
r->tail = shm_du_buff_tail(sdb);
- r->frcti = frcti;
-
+#endif
pthread_rwlock_rdlock(&r->frcti->lock);
rto_slot = frcti->rto >> RXMQ_RES;
@@ -348,6 +356,9 @@ static int timerwheel_rxm(struct frcti * frcti,
}
if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
+#ifdef RXM_BUFFER_ON_HEAP
+ free(r->pkt);
+#endif
free(r);
return -EPERM;
}
@@ -357,9 +368,9 @@ static int timerwheel_rxm(struct frcti * frcti,
pthread_mutex_lock(&rw.lock);
list_add_tail(&r->next, &rw.rxms[lvl][slot]);
-
+#ifndef RXM_BUFFER_ON_HEAP
shm_du_buff_wait_ack(sdb);
-
+#endif
pthread_mutex_unlock(&rw.lock);
return 0;