summaryrefslogtreecommitdiff
path: root/src/lib/rxmwheel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r--src/lib/rxmwheel.c35
1 files changed, 18 insertions, 17 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 3f01a0d3..28cd78de 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -61,6 +61,8 @@ static void rxmwheel_fini(void)
list_for_each_safe(p, h, &rw.wheel[i]) {
struct rxm * rxm = list_entry(p, struct rxm, next);
list_del(&rxm->next);
+ shm_du_buff_ack(rxm->sdb);
+ ipcp_sdb_release(rxm->sdb);
free(rxm);
}
}
@@ -133,10 +135,10 @@ static int rxmwheel_move(void)
size_t slot;
size_t i;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
pthread_mutex_lock(&rw.lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
slot = ts_to_slot(now);
i = rw.prv;
@@ -150,7 +152,6 @@ static int rxmwheel_move(void)
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
size_t rslot;
- time_t newtime;
ssize_t idx;
struct shm_du_buff * sdb;
uint8_t * head;
@@ -161,9 +162,11 @@ static int rxmwheel_move(void)
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
+
+ shm_du_buff_ack(r->sdb);
+
/* Has been ack'd, remove. */
if ((int) (r->seqno - snd_cr->lwe) < 0) {
- shm_du_buff_ack(r->sdb);
ipcp_sdb_release(r->sdb);
free(r);
continue;
@@ -176,7 +179,6 @@ static int rxmwheel_move(void)
if (ts_to_ms(now) - r->t0 > r->frcti->r) {
int fd = r->frcti->fd;
pthread_mutex_unlock(&rw.lock);
- shm_du_buff_ack(r->sdb);
ipcp_sdb_release(r->sdb);
free(r);
return fd;
@@ -187,7 +189,6 @@ static int rxmwheel_move(void)
/* FIXME: reschedule send instead of failing? */
int fd = r->frcti->fd;
pthread_mutex_unlock(&rw.lock);
- shm_du_buff_ack(r->sdb);
ipcp_sdb_release(r->sdb);
free(r);
return fd;
@@ -199,7 +200,6 @@ static int rxmwheel_move(void)
memcpy(head, r->head, r->tail - r->head);
/* Release the old copy. */
- shm_du_buff_ack(r->sdb);
ipcp_sdb_release(r->sdb);
/* Update ackno and make sure DRF is not set. */
@@ -208,7 +208,7 @@ static int rxmwheel_move(void)
f = &ai.flows[r->frcti->fd];
- /* Retransmit the copy. */
+ /* Retransmit the copy. FIXME: cancel flow */
if (shm_rbuff_write(f->tx_rb, idx)) {
int fd = r->frcti->fd;
pthread_mutex_unlock(&rw.lock);
@@ -218,17 +218,18 @@ static int rxmwheel_move(void)
return fd;
}
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
/* Reschedule. */
shm_du_buff_wait_ack(sdb);
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
r->head = head;
r->tail = shm_du_buff_tail(sdb);
r->sdb = sdb;
- newtime = ts_to_us(now) + f->frcti->rto;
- rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);
+ /* Schedule at least in the next time slot */
+ rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1))
+ & (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw.wheel[rslot]);
}
@@ -253,10 +254,10 @@ static int rxmwheel_add(struct frcti * frcti,
if (r == NULL)
return -ENOMEM;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
pthread_mutex_lock(&rw.lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
r->t0 = ts_to_us(now);
r->mul = 0;
r->seqno = seqno;
@@ -265,13 +266,13 @@ static int rxmwheel_add(struct frcti * frcti,
r->tail = shm_du_buff_tail(sdb);
r->frcti = frcti;
- slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1);
+ slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
list_add_tail(&r->next, &rw.wheel[slot]);
- pthread_mutex_unlock(&rw.lock);
-
shm_du_buff_wait_ack(sdb);
+ pthread_mutex_unlock(&rw.lock);
+
return 0;
}