summaryrefslogtreecommitdiff
path: root/src/lib/rxmwheel.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-05-01 18:23:58 +0200
committerSander Vrijders <sander@ouroboros.rocks>2020-05-02 11:34:28 +0200
commit25d1721e7dc9fa15c8a7c5513f30e636e9bda397 (patch)
tree9a012ab53513ffc78bf122e448045cc6084c13a4 /src/lib/rxmwheel.c
parent6415d0f683dbe5f20d4d00c74bf75a795753f444 (diff)
downloadouroboros-25d1721e7dc9fa15c8a7c5513f30e636e9bda397.tar.gz
ouroboros-25d1721e7dc9fa15c8a7c5513f30e636e9bda397.zip
lib: Create an rxmwheel per flow
The single retransmission wheel caused locking headaches as the calls for different flows could block on the same rxmwheel. This stabilizes the stack, but if the rdrbuff gets full there can now be big delays. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r--src/lib/rxmwheel.c159
1 files changed, 81 insertions, 78 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 28cd78de..dbdd9377 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -29,7 +29,6 @@
#define RXMQ_MAX (1 << RXMQ_M) /* us */
/* Small inacurracy to avoid slow division by MILLION. */
-#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
@@ -39,26 +38,28 @@ struct rxm {
struct shm_du_buff * sdb;
uint8_t * head;
uint8_t * tail;
- time_t t0; /* Time when original was sent (s). */
+ time_t t0; /* Time when original was sent (us). */
size_t mul; /* RTO multiplier. */
struct frcti * frcti;
};
-struct {
+struct rxmwheel {
struct list_head wheel[RXMQ_SLOTS];
size_t prv; /* Last processed slot. */
pthread_mutex_t lock;
-} rw;
+};
-static void rxmwheel_fini(void)
+static void rxmwheel_destroy(struct rxmwheel * rw)
{
size_t i;
struct list_head * p;
struct list_head * h;
+ pthread_mutex_destroy(&rw->lock);
+
for (i = 0; i < RXMQ_SLOTS; ++i) {
- list_for_each_safe(p, h, &rw.wheel[i]) {
+ list_for_each_safe(p, h, &rw->wheel[i]) {
struct rxm * rxm = list_entry(p, struct rxm, next);
list_del(&rxm->next);
shm_du_buff_ack(rxm->sdb);
@@ -68,66 +69,49 @@ static void rxmwheel_fini(void)
}
}
-static int rxmwheel_init(void)
+static struct rxmwheel * rxmwheel_create(void)
{
- struct timespec now;
- size_t i;
+ struct rxmwheel * rw;
+ struct timespec now;
+ size_t i;
- if (pthread_mutex_init(&rw.lock, NULL))
- return -1;
+ rw = malloc(sizeof(*rw));
+ if (rw == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&rw->lock, NULL)) {
+ free(rw);
+ return NULL;
+ }
clock_gettime(PTHREAD_COND_CLOCK, &now);
/* Mark the previous timeslot as the last one processed. */
- rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
+ rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
for (i = 0; i < RXMQ_SLOTS; ++i)
- list_head_init(&rw.wheel[i]);
-
- return 0;
-}
-
-static void rxmwheel_clear(int fd)
-{
- size_t i;
+ list_head_init(&rw->wheel[i]);
- /* FIXME: Add list element to avoid looping over full rxmwheel. */
- pthread_mutex_lock(&rw.lock);
-
- for (i = 0; i < RXMQ_SLOTS; ++i) {
- struct list_head * p;
- struct list_head * h;
-
- list_for_each_safe(p, h, &rw.wheel[i]) {
- struct rxm * r = list_entry(p, struct rxm, next);
- if (r->frcti->fd == fd) {
- list_del(&r->next);
- shm_du_buff_ack(r->sdb);
- ipcp_sdb_release(r->sdb);
- free(r);
- }
- }
- }
-
- pthread_mutex_unlock(&rw.lock);
+ return rw;
}
static void check_probe(struct frcti * frcti,
uint32_t seqno)
{
- /* disable rtt probe if this packet */
+ /* Disable rtt probe on retransmitted packet! */
- /* TODO: This should be locked, but lock reversal! */
+ pthread_rwlock_wrlock(&frcti->lock);
if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
/* Backoff to avoid never updating rtt */
frcti->srtt_us += frcti->mdev_us;
frcti->probe = false;
}
+
+ pthread_rwlock_unlock(&frcti->lock);
}
-/* Return fd on r-timer expiry. */
-static int rxmwheel_move(void)
+static void rxmwheel_move(struct rxmwheel * rw)
{
struct timespec now;
struct list_head * p;
@@ -135,19 +119,22 @@ static int rxmwheel_move(void)
size_t slot;
size_t i;
- pthread_mutex_lock(&rw.lock);
+ pthread_mutex_lock(&rw->lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
+ (void *) &rw->lock);
clock_gettime(PTHREAD_COND_CLOCK, &now);
slot = ts_to_slot(now);
- i = rw.prv;
+ i = rw->prv;
if (slot < i)
slot += RXMQ_SLOTS;
while (i++ < slot) {
- list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) {
+ list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
@@ -156,42 +143,55 @@ static int rxmwheel_move(void)
struct shm_du_buff * sdb;
uint8_t * head;
struct flow * f;
+ int fd;
+ uint32_t snd_lwe;
+ uint32_t rcv_lwe;
+ time_t rto;
r = list_entry(p, struct rxm, next);
+
list_del(&r->next);
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
+ fd = r->frcti->fd;
+ f = &ai.flows[fd];
shm_du_buff_ack(r->sdb);
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
+ snd_lwe = snd_cr->lwe;
+ rcv_lwe = rcv_cr->lwe;
+ rto = r->frcti->rto;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_cr->lwe) < 0) {
+ if ((int) (r->seqno - snd_lwe) < 0) {
ipcp_sdb_release(r->sdb);
free(r);
continue;
}
- /* Disable using this seqno as rto probe. */
- check_probe(r->frcti, r->seqno);
-
/* Check for r-timer expiry. */
- if (ts_to_ms(now) - r->t0 > r->frcti->r) {
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
+ if (ts_to_us(now) - r->t0 > r->frcti->r) {
ipcp_sdb_release(r->sdb);
free(r);
- return fd;
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb,
+ ACL_FLOWDOWN);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb,
+ ACL_FLOWDOWN);
+ continue;
}
/* Copy the payload, safe rtx in other layers. */
if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- /* FIXME: reschedule send instead of failing? */
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
ipcp_sdb_release(r->sdb);
free(r);
- return fd;
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ continue;
}
idx = shm_du_buff_get_idx(sdb);
@@ -202,20 +202,20 @@ static int rxmwheel_move(void)
/* Release the old copy. */
ipcp_sdb_release(r->sdb);
+ /* Disable using this seqno as rto probe. */
+ check_probe(r->frcti, r->seqno);
+
/* Update ackno and make sure DRF is not set. */
- ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe);
+ ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);
((struct frct_pci *) head)->flags &= ~FRCT_DRF;
- f = &ai.flows[r->frcti->fd];
-
- /* Retransmit the copy. FIXME: cancel flow */
- if (shm_rbuff_write(f->tx_rb, idx)) {
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
+ /* Retransmit the copy. */
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
ipcp_sdb_release(sdb);
free(r);
- /* FIXME: reschedule send? */
- return fd;
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ continue;
}
/* Reschedule. */
@@ -228,21 +228,20 @@ static int rxmwheel_move(void)
r->sdb = sdb;
/* Schedule at least in the next time slot */
- rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1))
+ rslot = (slot + MAX(rto >> RXMQ_R, 1))
& (RXMQ_SLOTS - 1);
- list_add_tail(&r->next, &rw.wheel[rslot]);
+ list_add_tail(&r->next, &rw->wheel[rslot]);
}
}
- rw.prv = slot & (RXMQ_SLOTS - 1);
-
- pthread_mutex_unlock(&rw.lock);
+ rw->prv = slot & (RXMQ_SLOTS - 1);
- return 0;
+ pthread_cleanup_pop(true);
}
-static int rxmwheel_add(struct frcti * frcti,
+static int rxmwheel_add(struct rxmwheel * rw,
+ struct frcti * frcti,
uint32_t seqno,
struct shm_du_buff * sdb)
{
@@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti * frcti,
if (r == NULL)
return -ENOMEM;
- pthread_mutex_lock(&rw.lock);
-
clock_gettime(PTHREAD_COND_CLOCK, &now);
r->t0 = ts_to_us(now);
@@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti * frcti,
r->tail = shm_du_buff_tail(sdb);
r->frcti = frcti;
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
- list_add_tail(&r->next, &rw.wheel[slot]);
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ pthread_mutex_lock(&rw->lock);
+
+ list_add_tail(&r->next, &rw->wheel[slot]);
shm_du_buff_wait_ack(sdb);
- pthread_mutex_unlock(&rw.lock);
+ pthread_mutex_unlock(&rw->lock);
return 0;
}