diff options
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r-- | src/lib/rxmwheel.c | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 3f01a0d3..28cd78de 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -61,6 +61,8 @@ static void rxmwheel_fini(void) list_for_each_safe(p, h, &rw.wheel[i]) { struct rxm * rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); + shm_du_buff_ack(rxm->sdb); + ipcp_sdb_release(rxm->sdb); free(rxm); } } @@ -133,10 +135,10 @@ static int rxmwheel_move(void) size_t slot; size_t i; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_mutex_lock(&rw.lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + slot = ts_to_slot(now); i = rw.prv; @@ -150,7 +152,6 @@ static int rxmwheel_move(void) struct frct_cr * snd_cr; struct frct_cr * rcv_cr; size_t rslot; - time_t newtime; ssize_t idx; struct shm_du_buff * sdb; uint8_t * head; @@ -161,9 +162,11 @@ static int rxmwheel_move(void) snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; + + shm_du_buff_ack(r->sdb); + /* Has been ack'd, remove. */ if ((int) (r->seqno - snd_cr->lwe) < 0) { - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); continue; @@ -176,7 +179,6 @@ static int rxmwheel_move(void) if (ts_to_ms(now) - r->t0 > r->frcti->r) { int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); return fd; @@ -187,7 +189,6 @@ static int rxmwheel_move(void) /* FIXME: reschedule send instead of failing? */ int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); free(r); return fd; @@ -199,7 +200,6 @@ static int rxmwheel_move(void) memcpy(head, r->head, r->tail - r->head); /* Release the old copy. */ - shm_du_buff_ack(r->sdb); ipcp_sdb_release(r->sdb); /* Update ackno and make sure DRF is not set. */ @@ -208,7 +208,7 @@ static int rxmwheel_move(void) f = &ai.flows[r->frcti->fd]; - /* Retransmit the copy. */ + /* Retransmit the copy. FIXME: cancel flow */ if (shm_rbuff_write(f->tx_rb, idx)) { int fd = r->frcti->fd; pthread_mutex_unlock(&rw.lock); @@ -218,17 +218,18 @@ static int rxmwheel_move(void) return fd; } - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - /* Reschedule. */ shm_du_buff_wait_ack(sdb); + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + r->head = head; r->tail = shm_du_buff_tail(sdb); r->sdb = sdb; - newtime = ts_to_us(now) + f->frcti->rto; - rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); + /* Schedule at least in the next time slot */ + rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) + & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[rslot]); } @@ -253,10 +254,10 @@ static int rxmwheel_add(struct frcti * frcti, if (r == NULL) return -ENOMEM; - clock_gettime(PTHREAD_COND_CLOCK, &now); - pthread_mutex_lock(&rw.lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + r->t0 = ts_to_us(now); r->mul = 0; r->seqno = seqno; @@ -265,13 +266,13 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; - slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1); + slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); list_add_tail(&r->next, &rw.wheel[slot]); - pthread_mutex_unlock(&rw.lock); - shm_du_buff_wait_ack(sdb); + pthread_mutex_unlock(&rw.lock); + return 0; } |