diff options
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r-- | src/lib/rxmwheel.c | 159 |
1 files changed, 81 insertions, 78 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 28cd78de..dbdd9377 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -29,7 +29,6 @@ #define RXMQ_MAX (1 << RXMQ_M) /* us */ /* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) #define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) #define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) @@ -39,26 +38,28 @@ struct rxm { struct shm_du_buff * sdb; uint8_t * head; uint8_t * tail; - time_t t0; /* Time when original was sent (s). */ + time_t t0; /* Time when original was sent (us). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; }; -struct { +struct rxmwheel { struct list_head wheel[RXMQ_SLOTS]; size_t prv; /* Last processed slot. */ pthread_mutex_t lock; -} rw; +}; -static void rxmwheel_fini(void) +static void rxmwheel_destroy(struct rxmwheel * rw) { size_t i; struct list_head * p; struct list_head * h; + pthread_mutex_destroy(&rw->lock); + for (i = 0; i < RXMQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { + list_for_each_safe(p, h, &rw->wheel[i]) { struct rxm * rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); shm_du_buff_ack(rxm->sdb); @@ -68,66 +69,49 @@ static void rxmwheel_fini(void) } } -static int rxmwheel_init(void) +static struct rxmwheel * rxmwheel_create(void) { - struct timespec now; - size_t i; + struct rxmwheel * rw; + struct timespec now; + size_t i; - if (pthread_mutex_init(&rw.lock, NULL)) - return -1; + rw = malloc(sizeof(*rw)); + if (rw == NULL) + return NULL; + + if (pthread_mutex_init(&rw->lock, NULL)) { + free(rw); + return NULL; + } clock_gettime(PTHREAD_COND_CLOCK, &now); /* Mark the previous timeslot as the last one processed. */ - rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); for (i = 0; i < RXMQ_SLOTS; ++i) - list_head_init(&rw.wheel[i]); - - return 0; -} - -static void rxmwheel_clear(int fd) -{ - size_t i; + list_head_init(&rw->wheel[i]); - /* FIXME: Add list element to avoid looping over full rxmwheel. */ - pthread_mutex_lock(&rw.lock); - - for (i = 0; i < RXMQ_SLOTS; ++i) { - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &rw.wheel[i]) { - struct rxm * r = list_entry(p, struct rxm, next); - if (r->frcti->fd == fd) { - list_del(&r->next); - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - } - } - } - - pthread_mutex_unlock(&rw.lock); + return rw; } static void check_probe(struct frcti * frcti, uint32_t seqno) { - /* disable rtt probe if this packet */ + /* Disable rtt probe on retransmitted packet! */ - /* TODO: This should be locked, but lock reversal! */ + pthread_rwlock_wrlock(&frcti->lock); if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ frcti->srtt_us += frcti->mdev_us; frcti->probe = false; } + + pthread_rwlock_unlock(&frcti->lock); } -/* Return fd on r-timer expiry. */ -static int rxmwheel_move(void) +static void rxmwheel_move(struct rxmwheel * rw) { struct timespec now; struct list_head * p; @@ -135,19 +119,22 @@ static int rxmwheel_move(void) size_t slot; size_t i; - pthread_mutex_lock(&rw.lock); + pthread_mutex_lock(&rw->lock); + + pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, + (void *) &rw->lock); clock_gettime(PTHREAD_COND_CLOCK, &now); slot = ts_to_slot(now); - i = rw.prv; + i = rw->prv; if (slot < i) slot += RXMQ_SLOTS; while (i++ < slot) { - list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { + list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; @@ -156,42 +143,55 @@ static int rxmwheel_move(void) struct shm_du_buff * sdb; uint8_t * head; struct flow * f; + int fd; + uint32_t snd_lwe; + uint32_t rcv_lwe; + time_t rto; r = list_entry(p, struct rxm, next); + list_del(&r->next); snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; + fd = r->frcti->fd; + f = &ai.flows[fd]; shm_du_buff_ack(r->sdb); + pthread_rwlock_rdlock(&r->frcti->lock); + + snd_lwe = snd_cr->lwe; + rcv_lwe = rcv_cr->lwe; + rto = r->frcti->rto; + + pthread_rwlock_unlock(&r->frcti->lock); + /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) < 0) { + if ((int) (r->seqno - snd_lwe) < 0) { ipcp_sdb_release(r->sdb); free(r); continue; } - /* Disable using this seqno as rto probe. */ - check_probe(r->frcti, r->seqno); - /* Check for r-timer expiry. */ - if (ts_to_ms(now) - r->t0 > r->frcti->r) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); + if (ts_to_us(now) - r->t0 > r->frcti->r) { ipcp_sdb_release(r->sdb); free(r); - return fd; + shm_rbuff_set_acl(ai.flows[fd].rx_rb, + ACL_FLOWDOWN); + shm_rbuff_set_acl(ai.flows[fd].tx_rb, + ACL_FLOWDOWN); + continue; } /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send instead of failing? */ - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); ipcp_sdb_release(r->sdb); free(r); - return fd; + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + continue; } idx = shm_du_buff_get_idx(sdb); @@ -202,20 +202,20 @@ static int rxmwheel_move(void) /* Release the old copy. */ ipcp_sdb_release(r->sdb); + /* Disable using this seqno as rto probe. */ + check_probe(r->frcti, r->seqno); + /* Update ackno and make sure DRF is not set. */ - ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); + ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe); ((struct frct_pci *) head)->flags &= ~FRCT_DRF; - f = &ai.flows[r->frcti->fd]; - - /* Retransmit the copy. FIXME: cancel flow */ - if (shm_rbuff_write(f->tx_rb, idx)) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); + /* Retransmit the copy. */ + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { ipcp_sdb_release(sdb); free(r); - /* FIXME: reschedule send? */ - return fd; + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + continue; } /* Reschedule. */ @@ -228,21 +228,20 @@ static int rxmwheel_move(void) r->sdb = sdb; /* Schedule at least in the next time slot */ - rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) + rslot = (slot + MAX(rto >> RXMQ_R, 1)) & (RXMQ_SLOTS - 1); - list_add_tail(&r->next, &rw.wheel[rslot]); + list_add_tail(&r->next, &rw->wheel[rslot]); } } - rw.prv = slot & (RXMQ_SLOTS - 1); - - pthread_mutex_unlock(&rw.lock); + rw->prv = slot & (RXMQ_SLOTS - 1); - return 0; + pthread_cleanup_pop(true); } -static int rxmwheel_add(struct frcti * frcti, +static int rxmwheel_add(struct rxmwheel * rw, + struct frcti * frcti, uint32_t seqno, struct shm_du_buff * sdb) { @@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti * frcti, if (r == NULL) return -ENOMEM; - pthread_mutex_lock(&rw.lock); - clock_gettime(PTHREAD_COND_CLOCK, &now); r->t0 = ts_to_us(now); @@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; + pthread_rwlock_rdlock(&r->frcti->lock); + slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); - list_add_tail(&r->next, &rw.wheel[slot]); + pthread_rwlock_unlock(&r->frcti->lock); + + pthread_mutex_lock(&rw->lock); + + list_add_tail(&r->next, &rw->wheel[slot]); shm_du_buff_wait_ack(sdb); - pthread_mutex_unlock(&rw.lock); + pthread_mutex_unlock(&rw->lock); return 0; } |