summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lib/dev.c21
-rw-r--r--src/lib/frct.c86
-rw-r--r--src/lib/rxmwheel.c159
-rw-r--r--src/lib/shm_rbuff_pthr.c11
4 files changed, 149 insertions, 128 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 80d7e9ad..e8989a48 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -429,9 +429,6 @@ static void init(int argc,
if (pthread_rwlock_init(&ai.lock, NULL))
goto fail_lock;
- if (rxmwheel_init())
- goto fail_rxmwheel;
-
ai.fqset = shm_flow_set_open(getpid());
if (ai.fqset == NULL)
goto fail_fqset;
@@ -439,8 +436,6 @@ static void init(int argc,
return;
fail_fqset:
- rxmwheel_fini();
- fail_rxmwheel:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -474,8 +469,6 @@ static void fini(void)
if (ai.fds == NULL)
return;
- rxmwheel_fini();
-
if (ai.prog != NULL)
free(ai.prog);
@@ -1080,15 +1073,16 @@ ssize_t flow_read(int fd,
flow = &ai.flows[fd];
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
if (flow->part_idx == DONE_PART) {
+ pthread_rwlock_unlock(&ai.lock);
flow->part_idx = NO_PART;
return 0;
}
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
-
- pthread_rwlock_rdlock(&ai.lock);
-
if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
@@ -1141,8 +1135,13 @@ ssize_t flow_read(int fd,
if (n <= (ssize_t) count) {
memcpy(buf, packet, n);
shm_rdrbuff_remove(ai.rdrb, idx);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
flow->part_idx = (partrd && n == (ssize_t) count) ?
DONE_PART : NO_PART;
+
+ pthread_rwlock_unlock(&ai.lock);
return n;
} else {
if (partrd) {
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 0e9d64c7..3c180128 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,15 +21,12 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL 60000 /* ms */
-#define DELT_A 3000 /* ms */
-#define DELT_R 20000 /* ms */
+#define DELT_MPL (60 * MILLION) /* us */
+#define DELT_A (1 * MILLION) /* us */
+#define DELT_R (20 * MILLION) /* us */
#define RQ_SIZE 1024
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
-
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
@@ -44,24 +41,26 @@ struct frct_cr {
};
struct frcti {
- int fd;
+ int fd;
+
+ time_t mpl;
+ time_t a;
+ time_t r;
- time_t mpl;
- time_t a;
- time_t r;
+ time_t srtt_us; /* smoothed rtt */
+ time_t mdev_us; /* mdev */
+ time_t rto; /* retransmission timeout */
+ uint32_t rttseq;
+ struct timespec t_probe; /* probe time */
+ bool probe; /* probe active */
- time_t srtt_us; /* smoothed rtt */
- time_t mdev_us; /* mdev */
- time_t rto; /* retransmission timeout */
- uint32_t rttseq;
- struct timespec t_probe; /* probe time */
- bool probe; /* probe active */
+ struct frct_cr snd_cr;
+ struct frct_cr rcv_cr;
- struct frct_cr snd_cr;
- struct frct_cr rcv_cr;
+ struct rxmwheel * rw;
- ssize_t rq[RQ_SIZE];
- pthread_rwlock_t lock;
+ ssize_t rq[RQ_SIZE];
+ pthread_rwlock_t lock;
};
enum frct_flags {
@@ -111,28 +110,35 @@ static struct frcti * frcti_create(int fd)
frcti->r = DELT_R;
frcti->fd = fd;
- delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000;
+ delta_t = frcti->mpl + frcti->a + frcti->r;
- frcti->snd_cr.inact = 3 * delta_t;
- frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
+ frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */
+ frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
- frcti->rttseq = 0;
- frcti->probe = false;
+ frcti->rttseq = 0;
+ frcti->probe = false;
- frcti->srtt_us = 0; /* updated on first ACK */
- frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */
- frcti->rto = 20000; /* initial rxm will be after 20 ms */
+ frcti->srtt_us = 0; /* updated on first ACK */
+ frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */
+ frcti->rto = 20000; /* initial rxm will be after 20 ms */
+ frcti->rw = NULL;
if (ai.flows[fd].qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX;
frcti->rcv_cr.cflags |= FRCTFRTX;
+ frcti->rw = rxmwheel_create();
+ if (frcti->rw == NULL)
+ goto fail_rw;
}
- frcti->rcv_cr.inact = 2 * delta_t;
- frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
+ frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */
+ frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
+
return frcti;
+ fail_rw:
+ pthread_rwlock_destroy(&frcti->lock);
fail_lock:
free(frcti);
fail_malloc:
@@ -146,7 +152,8 @@ static void frcti_destroy(struct frcti * frcti)
* make sure everything we sent is acked.
*/
- rxmwheel_clear(frcti->fd);
+ if (frcti->rw != NULL)
+ rxmwheel_destroy(frcti->rw);
pthread_rwlock_destroy(&frcti->lock);
@@ -229,13 +236,15 @@ static int __frcti_snd(struct frcti * frcti,
struct timespec now;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
+ uint32_t seqno;
assert(frcti);
snd_cr = &frcti->snd_cr;
rcv_cr = &frcti->rcv_cr;
- rxmwheel_move();
+ if (frcti->rw != NULL)
+ rxmwheel_move(frcti->rw);
pci = frcti_alloc_head(sdb);
if (pci == NULL)
@@ -259,7 +268,9 @@ static int __frcti_snd(struct frcti * frcti,
frcti->snd_cr.lwe = snd_cr->seqno - 1;
}
- pci->seqno = hton32(snd_cr->seqno);
+ seqno = snd_cr->seqno;
+ pci->seqno = hton32(seqno);
+
if (!(snd_cr->cflags & FRCTFRTX)) {
snd_cr->lwe++;
} else {
@@ -269,8 +280,6 @@ static int __frcti_snd(struct frcti * frcti,
frcti->probe = true;
}
- rxmwheel_add(frcti, snd_cr->seqno, sdb);
-
if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
pci->flags |= FRCT_ACK;
pci->ackno = hton32(rcv_cr->lwe);
@@ -282,6 +291,9 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
+ if (frcti->rw != NULL)
+ rxmwheel_add(frcti->rw, frcti, seqno, sdb);
+
return 0;
}
@@ -384,13 +396,13 @@ static int __frcti_rcv(struct frcti * frcti,
if (ret == 0 && !(pci->flags & FRCT_DATA))
shm_rdrbuff_remove(ai.rdrb, idx);
- rxmwheel_move();
+ if (frcti->rw != NULL)
+ rxmwheel_move(frcti->rw);
return ret;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
shm_rdrbuff_remove(ai.rdrb, idx);
-
return -EAGAIN;
}
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 28cd78de..dbdd9377 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -29,7 +29,6 @@
#define RXMQ_MAX (1 << RXMQ_M) /* us */
/* Small inacurracy to avoid slow division by MILLION. */
-#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
@@ -39,26 +38,28 @@ struct rxm {
struct shm_du_buff * sdb;
uint8_t * head;
uint8_t * tail;
- time_t t0; /* Time when original was sent (s). */
+ time_t t0; /* Time when original was sent (us). */
size_t mul; /* RTO multiplier. */
struct frcti * frcti;
};
-struct {
+struct rxmwheel {
struct list_head wheel[RXMQ_SLOTS];
size_t prv; /* Last processed slot. */
pthread_mutex_t lock;
-} rw;
+};
-static void rxmwheel_fini(void)
+static void rxmwheel_destroy(struct rxmwheel * rw)
{
size_t i;
struct list_head * p;
struct list_head * h;
+ pthread_mutex_destroy(&rw->lock);
+
for (i = 0; i < RXMQ_SLOTS; ++i) {
- list_for_each_safe(p, h, &rw.wheel[i]) {
+ list_for_each_safe(p, h, &rw->wheel[i]) {
struct rxm * rxm = list_entry(p, struct rxm, next);
list_del(&rxm->next);
shm_du_buff_ack(rxm->sdb);
@@ -68,66 +69,49 @@ static void rxmwheel_fini(void)
}
}
-static int rxmwheel_init(void)
+static struct rxmwheel * rxmwheel_create(void)
{
- struct timespec now;
- size_t i;
+ struct rxmwheel * rw;
+ struct timespec now;
+ size_t i;
- if (pthread_mutex_init(&rw.lock, NULL))
- return -1;
+ rw = malloc(sizeof(*rw));
+ if (rw == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&rw->lock, NULL)) {
+ free(rw);
+ return NULL;
+ }
clock_gettime(PTHREAD_COND_CLOCK, &now);
/* Mark the previous timeslot as the last one processed. */
- rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
+ rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
for (i = 0; i < RXMQ_SLOTS; ++i)
- list_head_init(&rw.wheel[i]);
-
- return 0;
-}
-
-static void rxmwheel_clear(int fd)
-{
- size_t i;
+ list_head_init(&rw->wheel[i]);
- /* FIXME: Add list element to avoid looping over full rxmwheel. */
- pthread_mutex_lock(&rw.lock);
-
- for (i = 0; i < RXMQ_SLOTS; ++i) {
- struct list_head * p;
- struct list_head * h;
-
- list_for_each_safe(p, h, &rw.wheel[i]) {
- struct rxm * r = list_entry(p, struct rxm, next);
- if (r->frcti->fd == fd) {
- list_del(&r->next);
- shm_du_buff_ack(r->sdb);
- ipcp_sdb_release(r->sdb);
- free(r);
- }
- }
- }
-
- pthread_mutex_unlock(&rw.lock);
+ return rw;
}
static void check_probe(struct frcti * frcti,
uint32_t seqno)
{
- /* disable rtt probe if this packet */
+ /* Disable rtt probe on retransmitted packet! */
- /* TODO: This should be locked, but lock reversal! */
+ pthread_rwlock_wrlock(&frcti->lock);
if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
/* Backoff to avoid never updating rtt */
frcti->srtt_us += frcti->mdev_us;
frcti->probe = false;
}
+
+ pthread_rwlock_unlock(&frcti->lock);
}
-/* Return fd on r-timer expiry. */
-static int rxmwheel_move(void)
+static void rxmwheel_move(struct rxmwheel * rw)
{
struct timespec now;
struct list_head * p;
@@ -135,19 +119,22 @@ static int rxmwheel_move(void)
size_t slot;
size_t i;
- pthread_mutex_lock(&rw.lock);
+ pthread_mutex_lock(&rw->lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
+ (void *) &rw->lock);
clock_gettime(PTHREAD_COND_CLOCK, &now);
slot = ts_to_slot(now);
- i = rw.prv;
+ i = rw->prv;
if (slot < i)
slot += RXMQ_SLOTS;
while (i++ < slot) {
- list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) {
+ list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
struct rxm * r;
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
@@ -156,42 +143,55 @@ static int rxmwheel_move(void)
struct shm_du_buff * sdb;
uint8_t * head;
struct flow * f;
+ int fd;
+ uint32_t snd_lwe;
+ uint32_t rcv_lwe;
+ time_t rto;
r = list_entry(p, struct rxm, next);
+
list_del(&r->next);
snd_cr = &r->frcti->snd_cr;
rcv_cr = &r->frcti->rcv_cr;
+ fd = r->frcti->fd;
+ f = &ai.flows[fd];
shm_du_buff_ack(r->sdb);
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
+ snd_lwe = snd_cr->lwe;
+ rcv_lwe = rcv_cr->lwe;
+ rto = r->frcti->rto;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
/* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_cr->lwe) < 0) {
+ if ((int) (r->seqno - snd_lwe) < 0) {
ipcp_sdb_release(r->sdb);
free(r);
continue;
}
- /* Disable using this seqno as rto probe. */
- check_probe(r->frcti, r->seqno);
-
/* Check for r-timer expiry. */
- if (ts_to_ms(now) - r->t0 > r->frcti->r) {
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
+ if (ts_to_us(now) - r->t0 > r->frcti->r) {
ipcp_sdb_release(r->sdb);
free(r);
- return fd;
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb,
+ ACL_FLOWDOWN);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb,
+ ACL_FLOWDOWN);
+ continue;
}
/* Copy the payload, safe rtx in other layers. */
if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- /* FIXME: reschedule send instead of failing? */
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
ipcp_sdb_release(r->sdb);
free(r);
- return fd;
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ continue;
}
idx = shm_du_buff_get_idx(sdb);
@@ -202,20 +202,20 @@ static int rxmwheel_move(void)
/* Release the old copy. */
ipcp_sdb_release(r->sdb);
+ /* Disable using this seqno as rto probe. */
+ check_probe(r->frcti, r->seqno);
+
/* Update ackno and make sure DRF is not set. */
- ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe);
+ ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);
((struct frct_pci *) head)->flags &= ~FRCT_DRF;
- f = &ai.flows[r->frcti->fd];
-
- /* Retransmit the copy. FIXME: cancel flow */
- if (shm_rbuff_write(f->tx_rb, idx)) {
- int fd = r->frcti->fd;
- pthread_mutex_unlock(&rw.lock);
+ /* Retransmit the copy. */
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
ipcp_sdb_release(sdb);
free(r);
- /* FIXME: reschedule send? */
- return fd;
+ shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
+ continue;
}
/* Reschedule. */
@@ -228,21 +228,20 @@ static int rxmwheel_move(void)
r->sdb = sdb;
/* Schedule at least in the next time slot */
- rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1))
+ rslot = (slot + MAX(rto >> RXMQ_R, 1))
& (RXMQ_SLOTS - 1);
- list_add_tail(&r->next, &rw.wheel[rslot]);
+ list_add_tail(&r->next, &rw->wheel[rslot]);
}
}
- rw.prv = slot & (RXMQ_SLOTS - 1);
-
- pthread_mutex_unlock(&rw.lock);
+ rw->prv = slot & (RXMQ_SLOTS - 1);
- return 0;
+ pthread_cleanup_pop(true);
}
-static int rxmwheel_add(struct frcti * frcti,
+static int rxmwheel_add(struct rxmwheel * rw,
+ struct frcti * frcti,
uint32_t seqno,
struct shm_du_buff * sdb)
{
@@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti * frcti,
if (r == NULL)
return -ENOMEM;
- pthread_mutex_lock(&rw.lock);
-
clock_gettime(PTHREAD_COND_CLOCK, &now);
r->t0 = ts_to_us(now);
@@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti * frcti,
r->tail = shm_du_buff_tail(sdb);
r->frcti = frcti;
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
- list_add_tail(&r->next, &rw.wheel[slot]);
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ pthread_mutex_lock(&rw->lock);
+
+ list_add_tail(&r->next, &rw->wheel[slot]);
shm_du_buff_wait_ack(sdb);
- pthread_mutex_unlock(&rw.lock);
+ pthread_mutex_unlock(&rw->lock);
return 0;
}
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index 00ffd583..91eb8b5f 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -109,7 +109,9 @@ int shm_rbuff_write_b(struct shm_rbuff * rb,
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
- while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) {
+ while (!shm_rbuff_free(rb)
+ && ret != -ETIMEDOUT
+ && !(*rb->acl & ACL_FLOWDOWN)) {
if (abstime != NULL)
ret = -pthread_cond_timedwait(rb->del,
rb->lock,
@@ -187,7 +189,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
- while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb)
+ && (idx != -ETIMEDOUT)
+ && !(*rb->acl & ACL_FLOWDOWN)) {
if (abstime != NULL)
idx = -pthread_cond_timedwait(rb->add,
rb->lock,
@@ -224,6 +228,9 @@ void shm_rbuff_set_acl(struct shm_rbuff * rb,
#endif
*rb->acl = (size_t) flags;
+ pthread_cond_broadcast(rb->del);
+ pthread_cond_broadcast(rb->add);
+
pthread_mutex_unlock(rb->lock);
}