summaryrefslogtreecommitdiff
path: root/src/lib/rxmwheel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r--src/lib/rxmwheel.c159
1 files changed, 81 insertions, 78 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 28cd78de..dbdd9377 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -29,7 +29,6 @@
#define RXMQ_MAX (1 << RXMQ_M) /* us */
/* Small inacurracy to avoid slow division by MILLION. */
-#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
@@ -39,26 +38,28 @@ struct rxm {
struct shm_du_buff * sdb;
uint8_t * head;
uint8_t * tail;
- time_t t0; /* Time when original was sent (s). */
+ time_t t0; /* Time when original was sent (us). */
size_t mul; /* RTO multiplier. */
struct frcti * frcti;
};
-struct {
+struct rxmwheel {
struct list_head wheel[RXMQ_SLOTS];
size_t prv; /* Last processed slot. */
pthread_mutex_t lock;
-} rw;
+};
-static void rxmwheel_fini(void)
+static void rxmwheel_destroy(struct rxmwheel * rw)
{
size_t i;
struct list_head * p;
struct list_head * h;
+ pthread_mutex_destroy(&rw->lock);
+
for (i = 0; i < RXMQ_SLOTS; ++i) {
- list_for_each_safe(p, h, &rw.wheel[i]) {
+ list_for_each_safe(p, h, &rw->wheel[i]) {
struct rxm * rxm = list_entry(p, struct rxm, next);
list_del(&rxm->next);
shm_du_buff_ack(rxm->sdb);
@@ -68,66 +69,49 @@ static void rxmwheel_fini(void)
}
}
-static int rxmwheel_init(void)
+static struct rxmwheel * rxmwheel_create(void)
{
- struct timespec now;
- size_t i;
+ struct rxmwheel * rw;
+ struct timespec now;
+ size_t i;
- if (pthread_mutex_init(&rw.lock, NULL))
- return -1;
+ rw = malloc(sizeof(*rw));
+ if (rw == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&rw->lock, NULL)) {
+ free(rw);
+ return NULL;
+ }
clock_gettime(PTHREAD_COND_CLOCK, &now);
/* Mark the previous timeslot as the last one processed. */
- rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
+ rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
for (i = 0; i < RXMQ_SLOTS; ++i)
- list_head_init(&rw.wheel[i]);
-
- return 0;
-}
-
-static void rxmwheel_clear(int fd)
-{
- size_t i;
+ list_head_init(&rw->wheel[i]);
- /* FIXME: Add list element to avoid looping over full rxmwheel. */
- pthread_mutex_lock(&rw.lock);
-
- for (i = 0; i < RXMQ_SLOTS; ++i) {
- struct list_head * p;
- struct list_head * h;
-
- list_for_each_safe(p, h, &rw.wheel[i]) {
- struct rxm * r = list_entry(p, struct rxm, next);
- if (r->frcti->fd == fd) {
- list_del(&r->next);
- shm_du_buff_ack(r->sdb);
- ipcp_sdb_release(r->sdb);
- free(r);
- }
- }
- }
-
- pthread_mutex_unlock(&rw.lock);
+ return rw;
}
static void check_probe(struct frcti * frcti,
uint32_t seqno)
{
- /* disable rtt probe if this packet */
+ /* Disable rtt probe on retransmitted packet! */
- /* TODO: This should be locked, but lock reversal! */
+ pthread_rwlock_wrlock(&frcti->lock);
if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
/* Backoff to avoid never updating rtt */
frcti->srtt_us += frcti->mdev_us;
frcti->probe = false;
}
+
+ pthread_rwlock_unlock(&frcti->lock);
}
-/* Return fd on r-timer expiry. */
-static int rxmwheel_move(void)
+static void rxmwheel_move(struct rxmwheel * rw)
{
struct timespec now;
struct list_head * p;
@@ -135,19 +119,22 @@ static int rxmwheel_move(void)
size_t slot;
size_t i;
- pthread_mutex_lock(&rw.lock);
+ pthread_mutex_lock(&rw->lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
+ (void *) &rw->lock);
clock_gettime(PTHREAD_COND_CLOCK, &now);
slot = ts_to_slot(now);
- i = rw.prv;
+ i = rw->prv;
if (slot < i)
slot += RXMQ_SLOTS;
while (i++ < slot) {
- list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) {
+ list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
@@ -156,42 +143,55 @@ static int rxmwheel_move(void)
struct shm_du_buff * sdb;
uint8_t * head;
struct flow * f;
+ int fd;
+ uint32_t snd_lwe;
+ uint32_t rcv_lwe;
+ time_t rto;
r = list_entry(p, struct rxm, next);
+
list_del(&r->next);
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
+ fd = r->frcti->fd;
+ f = &ai.flows[fd];
shm_du_buff_ack(r->sdb);
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
+ snd_lwe = snd_cr->lwe;
+ rcv_lwe = rcv_cr->lwe;
+ rto = r->frcti->rto;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_cr->lwe) < 0) {
+ if ((int) (r->seqno - snd_lwe) < 0) {
ipcp_sdb_release(r->sdb);
free(r);
continue;
}
- /* Disable using this seqno as rto probe. */
- check_probe(r->frcti, r->seqno);
-
/* Check for r-timer expiry. */
- if (ts_to_ms(now) - r->t0 > r->frcti->r) {
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
+ if (ts_to_us(now) - r->t0 > r->frcti->r) {
ipcp_sdb_release(r->sdb);
free(r);
- return fd;
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb,
+ ACL_FLOWDOWN);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb,
+ ACL_FLOWDOWN);
+ continue;
}
/* Copy the payload, safe rtx in other layers. */
if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- /* FIXME: reschedule send instead of failing? */
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
ipcp_sdb_release(r->sdb);
free(r);
- return fd;
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ continue;
}
idx = shm_du_buff_get_idx(sdb);
@@ -202,20 +202,20 @@ static int rxmwheel_move(void)
/* Release the old copy. */
ipcp_sdb_release(r->sdb);
+ /* Disable using this seqno as rto probe. */
+ check_probe(r->frcti, r->seqno);
+
/* Update ackno and make sure DRF is not set. */
- ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe);
+ ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);
((struct frct_pci *) head)->flags &= ~FRCT_DRF;
- f = &ai.flows[r->frcti->fd];
-
- /* Retransmit the copy. FIXME: cancel flow */
- if (shm_rbuff_write(f->tx_rb, idx)) {
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
+ /* Retransmit the copy. */
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
ipcp_sdb_release(sdb);
free(r);
- /* FIXME: reschedule send? */
- return fd;
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ continue;
}
/* Reschedule. */
@@ -228,21 +228,20 @@ static int rxmwheel_move(void)
r->sdb = sdb;
/* Schedule at least in the next time slot */
- rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1))
+ rslot = (slot + MAX(rto >> RXMQ_R, 1))
& (RXMQ_SLOTS - 1);
- list_add_tail(&r->next, &rw.wheel[rslot]);
+ list_add_tail(&r->next, &rw->wheel[rslot]);
}
}
- rw.prv = slot & (RXMQ_SLOTS - 1);
-
- pthread_mutex_unlock(&rw.lock);
+ rw->prv = slot & (RXMQ_SLOTS - 1);
- return 0;
+ pthread_cleanup_pop(true);
}
-static int rxmwheel_add(struct frcti * frcti,
+static int rxmwheel_add(struct rxmwheel * rw,
+ struct frcti * frcti,
uint32_t seqno,
struct shm_du_buff * sdb)
{
@@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti * frcti,
if (r == NULL)
return -ENOMEM;
- pthread_mutex_lock(&rw.lock);
-
clock_gettime(PTHREAD_COND_CLOCK, &now);
r->t0 = ts_to_us(now);
@@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti * frcti,
r->tail = shm_du_buff_tail(sdb);
r->frcti = frcti;
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
- list_add_tail(&r->next, &rw.wheel[slot]);
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ pthread_mutex_lock(&rw->lock);
+
+ list_add_tail(&r->next, &rw->wheel[slot]);
shm_du_buff_wait_ack(sdb);
- pthread_mutex_unlock(&rw.lock);
+ pthread_mutex_unlock(&rw->lock);
return 0;
}