diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/CMakeLists.txt | 32 | ||||
| -rw-r--r-- | src/lib/config.h.in | 26 | ||||
| -rw-r--r-- | src/lib/dev.c | 3 | ||||
| -rw-r--r-- | src/lib/frct.c | 32 | ||||
| -rw-r--r-- | src/lib/timerwheel.c | 153 | 
5 files changed, 153 insertions, 93 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4aad3a11..5e60e375 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -180,14 +180,42 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL    "Name for the main POSIX shared memory buffer")  set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING    "Packet buffer block size, multiple of pagesize for performance") -set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL +set(SHM_RDRB_MULTI_BLOCK TRUE CACHE BOOL    "Packet buffer multiblock packet support") -set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL +set(SHM_RBUFF_LOCKLESS TRUE CACHE BOOL    "Enable shared memory lockless rbuff support")  set(QOS_DISABLE_CRC TRUE CACHE BOOL    "Ignores ber setting on all QoS cubes") +set(DELTA_T_MPL 60 CACHE STRING +  "Maximum packet lifetime (s)") +set(DELTA_T_ACK 10 CACHE STRING +  "Maximum time to acknowledge a packet (s)") +set(DELTA_T_RTX 120 CACHE STRING +  "Maximum time to retransmit a packet (s)") +set(DELTA_T_ACK_DELAY 10 CACHE STRING +  "Maximum time to wait before acknowledging a packet (ms)") +set(FRCT_REORDER_QUEUE_SIZE 256 CACHE STRING +  "Size of the reordering queue, must be a power of 2")  set(FRCT_RTO_MIN 250 CACHE STRING    "Minimum Retransmission Timeout (RTO) for FRCT (us)") +set(FRCT_TICK_TIME 500 CACHE STRING +  "Tick time for FRCT activity (retransmission, acknowledgments) (us)") +set(RXM_BUFFER_ON_HEAP FALSE CACHE BOOL +  "Store packets for retransmission on the heap instead of in packet buffer") +set(RXM_BLOCKING TRUE CACHE BOOL +  "Use blocking writes for retransmission") +set(RXM_MIN_RESOLUTION 20 CACHE STRING +  "Minimum retransmission delay (ns), as a power to 2") +set(RXM_WHEEL_MULTIPLIER 4 CACHE STRING +  "Factor for retransmission wheel levels as a power to 2") +set(RXM_WHEEL_LEVELS 3 CACHE STRING +  "Number of levels in the retransmission wheel") +set(RXM_WHEEL_SLOTS_PER_LEVEL 256 CACHE STRING +  "Number of slots per level in the retransmission wheel, must be a power of 2") +set(ACK_WHEEL_SLOTS 128 CACHE STRING +  "Number of slots in the acknowledgment wheel, must be a power of 2") +set(ACK_WHEEL_RESOLUTION 20 CACHE STRING +  "Minimum acknowledgment delay (ns), as a power to 2")  set(SOURCE_FILES_DEV    # Add source files here diff --git a/src/lib/config.h.in b/src/lib/config.h.in index 7cac76a6..38c364c6 100644 --- a/src/lib/config.h.in +++ b/src/lib/config.h.in @@ -55,7 +55,7 @@  #cmakedefine HAVE_FUSE  #ifdef HAVE_FUSE -#define FUSE_PREFIX "@FUSE_PREFIX@" +#define FUSE_PREFIX         "@FUSE_PREFIX@"  #endif  #define PTHREAD_COND_CLOCK  @PTHREAD_COND_CLOCK@ @@ -67,4 +67,26 @@  #define DU_BUFF_HEADSPACE   @DU_BUFF_HEADSPACE@  #define DU_BUFF_TAILSPACE   @DU_BUFF_TAILSPACE@ -#define RTO_MIN             @FRCT_RTO_MIN@ +/* Default Delta-t parameters */ +#define DELT_MPL            (@DELTA_T_MPL@ * BILLION)        /* ns */ +#define DELT_A              (@DELTA_T_ACK@ * BILLION)        /* ns */ +#define DELT_R              (@DELTA_T_RTX@ * BILLION)        /* ns */ + +#define DELT_ACK            (@DELTA_T_ACK_DELAY@ * MILLION)  /* ns */ + +#define RQ_SIZE             (@FRCT_REORDER_QUEUE_SIZE@) +#define RTO_MIN             (@FRCT_RTO_MIN@ * 1000) + +#define TICTIME             (@FRCT_TICK_TIME@ * 1000)        /* ns */ + +/* Retransmission tuning */ +#cmakedefine                RXM_BUFFER_ON_HEAP +#cmakedefine                RXM_BLOCKING + +#define RXMQ_RES            (@RXM_MIN_RESOLUTION@)           /* 2^N ns */ +#define RXMQ_BUMP           (@RXM_WHEEL_MULTIPLIER@) +#define RXMQ_LVLS           (@RXM_WHEEL_LEVELS@) +#define RXMQ_SLOTS          (@RXM_WHEEL_SLOTS_PER_LEVEL@) + +#define ACKQ_SLOTS          (@ACK_WHEEL_SLOTS@) +#define ACKQ_RES            (@ACK_WHEEL_RESOLUTION@)         /* 2^N ns */ diff --git a/src/lib/dev.c b/src/lib/dev.c index 5cf23639..ed377367 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,7 +63,6 @@  #define SECMEMSZ  16384  #define SYMMKEYSZ 32  #define MSGBUFSZ  2048 -#define TICTIME   1000000  /* ns */  struct flow_set {          size_t idx; @@ -801,7 +800,7 @@ int flow_dealloc(int fd)                  timeo = frcti_dealloc(f->frcti); -                if (ret == -ETIMEDOUT && timeo < 0) +                if ((ret == -ETIMEDOUT || ret == -EFLOWDOWN) && timeo < 0)                          timeo = -timeo;          } diff --git a/src/lib/frct.c b/src/lib/frct.c index c26910fa..6ead72af 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -20,15 +20,6 @@   * Foundation, Inc., http://www.fsf.org/about/contact/.   */ -/* Default Delta-t parameters */ -#define DELT_MPL        (5 * BILLION) /* ns */ -#define DELT_A          (1 * BILLION) /* ns */ -#define DELT_R         (20 * BILLION) /* ns */ - -#define DELT_ACK       (10 * MILLION) /* ns */ - -#define RQ_SIZE        256 -  #define FRCT_PCILEN    (sizeof(struct frct_pci))  struct frct_cr { @@ -96,7 +87,7 @@ static bool after(uint32_t seq1,          return (int32_t)(seq2 - seq1) < 0;  } -static void __send_ack(int fd, +static int __send_ack(int fd,                         int ackno)  {          struct shm_du_buff * sdb; @@ -105,9 +96,13 @@ static void __send_ack(int fd,          struct flow *        f;          /* Raw calls needed to bypass frcti. */ +#ifdef RXM_BLOCKING          idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); +#else +        idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); +#endif          if (idx < 0) -                return; +                return -ENOMEM;          pci = (struct frct_pci *) shm_du_buff_head(sdb);          memset(pci, 0, sizeof(*pci)); @@ -116,13 +111,18 @@ static void __send_ack(int fd,          pci->ackno = hton32(ackno);          f = &ai.flows[fd]; - +#ifdef RXM_BLOCKING          if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { +#else +        if (shm_rbuff_write(f->tx_rb, idx)) { +#endif                  ipcp_sdb_release(sdb); -                return; +                return -ENOMEM;          }          shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + +        return 0;  }  static void frct_send_ack(struct frcti * frcti) @@ -153,7 +153,8 @@ static void frct_send_ack(struct frcti * frcti)          if (diff > frcti->a || diff < DELT_ACK)                  return; -        __send_ack(fd, ackno); +        if (__send_ack(fd, ackno) < 0) +                return;          pthread_rwlock_wrlock(&frcti->lock); @@ -439,8 +440,7 @@ static void rtt_estimator(struct frcti * frcti,          frcti->srtt     = MAX(1000U, srtt);          frcti->mdev     = MAX(100U, rttvar); -        frcti->rto      = MAX(RTO_MIN * 1000, -                              frcti->srtt + (frcti->mdev << 1)); +        frcti->rto      = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1));  }  static void __frcti_tick(void) diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 33fcbc1c..4443832d 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -22,14 +22,6 @@  #include <ouroboros/list.h> -#define RXMQ_SLOTS (1 << 8)           /* #slots / level.           */ -#define RXMQ_LVLS  3                  /* #levels, bump for DTN.    */ -#define RXMQ_BUMP  4                  /* factor to bump lvl.       */ -#define RXMQ_RES   20                 /* res (ns) of lowest lvl.   */ - -#define ACKQ_SLOTS (1 << 7)           /* #slots for delayed ACK.   */ -#define ACKQ_RES   20                 /* resolution for dACK.      */ -  /* Overflow limits range to about 6 hours. */  #define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)  #define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) @@ -38,9 +30,14 @@  struct rxm {          struct list_head     next;          uint32_t             seqno; +#ifdef RXM_BUFFER_ON_HEAP +        uint8_t *            pkt; +        size_t               pkt_len; +#else          struct shm_du_buff * sdb;          uint8_t *            head;          uint8_t *            tail; +#endif          time_t               t0;      /* Time when original was sent (us). */          size_t               mul;     /* RTO multiplier.                   */          struct frcti *       frcti; @@ -85,8 +82,12 @@ static void timerwheel_fini(void)                                  struct rxm * rxm;                                  rxm = list_entry(p, struct rxm, next);                                  list_del(&rxm->next); +#ifdef RXM_BUFFER_ON_HEAP +                                free(rxm->pkt); +#else                                  shm_du_buff_ack(rxm->sdb);                                  ipcp_sdb_release(rxm->sdb); +#endif                                  free(rxm);                          }                  } @@ -156,8 +157,7 @@ static void timerwheel_move(void)                          j_max_slot += RXMQ_SLOTS;                  while (j++ < j_max_slot) { -                        list_for_each_safe(p, -                                           h, +                        list_for_each_safe(p, h,                                             &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {                                  struct rxm *         r;                                  struct frct_cr *     snd_cr; @@ -178,15 +178,12 @@ static void timerwheel_move(void)                                  snd_cr = &r->frcti->snd_cr;                                  rcv_cr = &r->frcti->rcv_cr;                                  f      = &ai.flows[r->fd]; - +#ifndef RXM_BUFFER_ON_HEAP                                  shm_du_buff_ack(r->sdb); - +#endif                                  if (f->frcti == NULL -                                    || f->flow_id != r->flow_id) { -                                        ipcp_sdb_release(r->sdb); -                                        free(r); -                                        continue; -                                } +                                    || f->flow_id != r->flow_id) +                                        goto cleanup;                                  pthread_rwlock_wrlock(&r->frcti->lock); @@ -197,69 +194,57 @@ static void timerwheel_move(void)                                  pthread_rwlock_unlock(&r->frcti->lock);                                  /* Has been ack'd, remove. */ -                                if ((int) (r->seqno - snd_lwe) < 0) { -                                        ipcp_sdb_release(r->sdb); -                                        free(r); -                                        continue; -                                } +                                if ((int) (r->seqno - snd_lwe) < 0) +                                        goto cleanup;                                  /* Check for r-timer expiry. */ -                                if (ts_to_ns(now) - r->t0 > r->frcti->r) { -                                        ipcp_sdb_release(r->sdb); -                                        free(r); -                                        shm_rbuff_set_acl(f->rx_rb, -                                                          ACL_FLOWDOWN); -                                        shm_rbuff_set_acl(f->tx_rb, -                                                          ACL_FLOWDOWN); -                                        continue; -                                } +                                if (ts_to_ns(now) - r->t0 > r->frcti->r) +                                        goto flow_down;                                  if (r->frcti->probe                                      && (r->frcti->rttseq + 1) == r->seqno)                                          r->frcti->probe = false; - -                                /* Copy the data, safe rtx in other layers. */ -                                if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { -                                        ipcp_sdb_release(r->sdb); -                                        free(r); -                                        shm_rbuff_set_acl(f->rx_rb, -                                                          ACL_FLOWDOWN); -                                        shm_rbuff_set_acl(f->tx_rb, -                                                          ACL_FLOWDOWN); -                                        continue; -                                } - +#ifdef RXM_BLOCKING +  #ifdef RXM_BUFFER_ON_HEAP +                                if (ipcp_sdb_reserve(&sdb, r->pkt_len)) +  #else +                                if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) +  #endif +#else +  #ifdef RXM_BUFFER_ON_HEAP +                                if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, NULL, +                                                      &sdb)) +  #else +                                if (shm_rdrbuff_alloc(ai.rdrb, +                                                      r->tail - r->head, NULL, +                                                      &sdb)) +  #endif +#endif +                                        goto reschedule; /* rbuff full */                                  idx = shm_du_buff_get_idx(sdb);                                  head = shm_du_buff_head(sdb); +#ifdef RXM_BUFFER_ON_HEAP +                                memcpy(head, r->pkt, r->pkt_len); +#else                                  memcpy(head, r->head, r->tail - r->head); -                                  ipcp_sdb_release(r->sdb); - -                                ((struct frct_pci *) head)->ackno = -                                        hton32(rcv_lwe); - -                                /* Retransmit the copy. */ -                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { -                                        ipcp_sdb_release(sdb); -                                        free(r); -                                        shm_rbuff_set_acl(f->rx_rb, -                                                          ACL_FLOWDOWN); -                                        shm_rbuff_set_acl(f->tx_rb, -                                                          ACL_FLOWDOWN); -                                        continue; -                                } - -                                /* Reschedule. */ -                                shm_du_buff_wait_ack(sdb); - -                                shm_flow_set_notify(f->set, -                                                    f->flow_id, -                                                    FLOW_PKT); - +                                r->sdb  = sdb;                                  r->head = head;                                  r->tail = shm_du_buff_tail(sdb); -                                r->sdb  = sdb; +                                shm_du_buff_wait_ack(sdb); +#endif +                                /* Retransmit the copy. */ +                                ((struct frct_pci *) head)->ackno = +                                        hton32(rcv_lwe); +#ifdef RXM_BLOCKING +                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 0) +#else +                                if (shm_rbuff_write(f->tx_rb, idx) == 0) +#endif +                                        shm_flow_set_notify(f->set, f->flow_id, +                                                            FLOW_PKT); +                        reschedule:                                  r->mul++;                                  /* Schedule at least in the next time slot. */ @@ -268,10 +253,24 @@ static void timerwheel_move(void)                                          & (RXMQ_SLOTS - 1);                                  list_add_tail(&r->next, &rw.rxms[i][rslot]); + +                                continue; + +                        flow_down: +                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); +                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); +                        cleanup: +#ifdef RXM_BUFFER_ON_HEAP +                                free(r->pkt); +#else +                                ipcp_sdb_release(r->sdb); +#endif +                                free(r);                          }                  }                  /* Move up a level in the wheel. */                  rxm_slot >>= RXMQ_BUMP; +                j >>= RXMQ_BUMP;          }          ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; @@ -326,11 +325,20 @@ static int timerwheel_rxm(struct frcti *       frcti,          r->t0    = ts_to_ns(now);          r->mul   = 0;          r->seqno = seqno; +        r->frcti = frcti; +#ifdef RXM_BUFFER_ON_HEAP +        r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +        r->pkt = malloc(r->pkt_len); +        if (r->pkt == NULL) { +                free(r); +                return -ENOMEM; +        } +        memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len); +#else          r->sdb   = sdb;          r->head  = shm_du_buff_head(sdb);          r->tail  = shm_du_buff_tail(sdb); -        r->frcti = frcti; - +#endif          pthread_rwlock_rdlock(&r->frcti->lock);          rto_slot = frcti->rto >> RXMQ_RES; @@ -348,6 +356,9 @@ static int timerwheel_rxm(struct frcti *       frcti,          }          if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ +#ifdef RXM_BUFFER_ON_HEAP +                free(r->pkt); +#endif                  free(r);                  return -EPERM;          } @@ -357,9 +368,9 @@ static int timerwheel_rxm(struct frcti *       frcti,          pthread_mutex_lock(&rw.lock);          list_add_tail(&r->next, &rw.rxms[lvl][slot]); - +#ifndef RXM_BUFFER_ON_HEAP          shm_du_buff_wait_ack(sdb); - +#endif          pthread_mutex_unlock(&rw.lock);          return 0;  | 
