diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/dev.c | 21 | ||||
-rw-r--r-- | src/lib/frct.c | 86 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 159 | ||||
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 11 |
4 files changed, 149 insertions, 128 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 80d7e9ad..e8989a48 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -429,9 +429,6 @@ static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - if (rxmwheel_init()) - goto fail_rxmwheel; - ai.fqset = shm_flow_set_open(getpid()); if (ai.fqset == NULL) goto fail_fqset; @@ -439,8 +436,6 @@ static void init(int argc, return; fail_fqset: - rxmwheel_fini(); - fail_rxmwheel: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -474,8 +469,6 @@ static void fini(void) if (ai.fds == NULL) return; - rxmwheel_fini(); - if (ai.prog != NULL) free(ai.prog); @@ -1080,15 +1073,16 @@ ssize_t flow_read(int fd, flow = &ai.flows[fd]; + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + pthread_rwlock_rdlock(&ai.lock); + if (flow->part_idx == DONE_PART) { + pthread_rwlock_unlock(&ai.lock); flow->part_idx = NO_PART; return 0; } - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; @@ -1141,8 +1135,13 @@ ssize_t flow_read(int fd, if (n <= (ssize_t) count) { memcpy(buf, packet, n); shm_rdrbuff_remove(ai.rdrb, idx); + + pthread_rwlock_wrlock(&ai.lock); + flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; + + pthread_rwlock_unlock(&ai.lock); return n; } else { if (partrd) { diff --git a/src/lib/frct.c b/src/lib/frct.c index 0e9d64c7..3c180128 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,15 +21,12 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 3000 /* ms */ -#define DELT_R 20000 /* ms */ +#define DELT_MPL (60 * MILLION) /* us */ +#define DELT_A (1 * MILLION) /* us */ +#define DELT_R (20 * MILLION) /* us */ #define RQ_SIZE 1024 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { @@ -44,24 +41,26 @@ struct frct_cr { }; struct frcti { - int fd; + int fd; + + time_t mpl; + time_t a; + time_t r; - time_t mpl; - time_t a; - time_t r; + time_t srtt_us; /* smoothed rtt */ + time_t mdev_us; /* mdev */ + time_t rto; /* retransmission timeout */ + uint32_t rttseq; + struct timespec t_probe; /* probe time */ + bool probe; /* probe active */ - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - time_t rto; /* retransmission timeout */ - uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct frct_cr snd_cr; + struct frct_cr rcv_cr; - struct frct_cr snd_cr; - struct frct_cr rcv_cr; + struct rxmwheel * rw; - ssize_t rq[RQ_SIZE]; - pthread_rwlock_t lock; + ssize_t rq[RQ_SIZE]; + pthread_rwlock_t lock; }; enum frct_flags { @@ -111,28 +110,35 @@ static struct frcti * frcti_create(int fd) frcti->r = DELT_R; frcti->fd = fd; - delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + delta_t = frcti->mpl + frcti->a + frcti->r; - frcti->snd_cr.inact = 3 * delta_t; - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ + frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - frcti->rttseq = 0; - frcti->probe = false; + frcti->rttseq = 0; + frcti->probe = false; - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ - frcti->rto = 20000; /* initial rxm will be after 20 ms */ + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ + frcti->rto = 20000; /* initial rxm will be after 20 ms */ + frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.cflags |= FRCTFRTX; + frcti->rw = rxmwheel_create(); + if (frcti->rw == NULL) + goto fail_rw; } - frcti->rcv_cr.inact = 2 * delta_t; - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ + frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + return frcti; + fail_rw: + pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -146,7 +152,8 @@ static void frcti_destroy(struct frcti * frcti) * make sure everything we sent is acked. */ - rxmwheel_clear(frcti->fd); + if (frcti->rw != NULL) + rxmwheel_destroy(frcti->rw); pthread_rwlock_destroy(&frcti->lock); @@ -229,13 +236,15 @@ static int __frcti_snd(struct frcti * frcti, struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + uint32_t seqno; assert(frcti); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - rxmwheel_move(); + if (frcti->rw != NULL) + rxmwheel_move(frcti->rw); pci = frcti_alloc_head(sdb); if (pci == NULL) @@ -259,7 +268,9 @@ static int __frcti_snd(struct frcti * frcti, frcti->snd_cr.lwe = snd_cr->seqno - 1; } - pci->seqno = hton32(snd_cr->seqno); + seqno = snd_cr->seqno; + pci->seqno = hton32(seqno); + if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; } else { @@ -269,8 +280,6 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); @@ -282,6 +291,9 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); + if (frcti->rw != NULL) + rxmwheel_add(frcti->rw, frcti, seqno, sdb); + return 0; } @@ -384,13 +396,13 @@ static int __frcti_rcv(struct frcti * frcti, if (ret == 0 && !(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); - rxmwheel_move(); + if (frcti->rw != NULL) + rxmwheel_move(frcti->rw); return ret; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 28cd78de..dbdd9377 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -29,7 +29,6 @@ #define RXMQ_MAX (1 << RXMQ_M) /* us */ /* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) #define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) #define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) @@ -39,26 +38,28 @@ struct rxm { struct shm_du_buff * sdb; uint8_t * head; uint8_t * tail; - time_t t0; /* Time when original was sent (s). */ + time_t t0; /* Time when original was sent (us). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; }; -struct { +struct rxmwheel { struct list_head wheel[RXMQ_SLOTS]; size_t prv; /* Last processed slot. */ pthread_mutex_t lock; -} rw; +}; -static void rxmwheel_fini(void) +static void rxmwheel_destroy(struct rxmwheel * rw) { size_t i; struct list_head * p; struct list_head * h; + pthread_mutex_destroy(&rw->lock); + for (i = 0; i < RXMQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { + list_for_each_safe(p, h, &rw->wheel[i]) { struct rxm * rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); shm_du_buff_ack(rxm->sdb); @@ -68,66 +69,49 @@ static void rxmwheel_fini(void) } } -static int rxmwheel_init(void) +static struct rxmwheel * rxmwheel_create(void) { - struct timespec now; - size_t i; + struct rxmwheel * rw; + struct timespec now; + size_t i; - if (pthread_mutex_init(&rw.lock, NULL)) - return -1; + rw = malloc(sizeof(*rw)); + if (rw == NULL) + return NULL; + + if (pthread_mutex_init(&rw->lock, NULL)) { + free(rw); + return NULL; + } clock_gettime(PTHREAD_COND_CLOCK, &now); /* Mark the previous timeslot as the last one processed. */ - rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); for (i = 0; i < RXMQ_SLOTS; ++i) - list_head_init(&rw.wheel[i]); - - return 0; -} - -static void rxmwheel_clear(int fd) -{ - size_t i; + list_head_init(&rw->wheel[i]); - /* FIXME: Add list element to avoid looping over full rxmwheel. */ - pthread_mutex_lock(&rw.lock); - - for (i = 0; i < RXMQ_SLOTS; ++i) { - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &rw.wheel[i]) { - struct rxm * r = list_entry(p, struct rxm, next); - if (r->frcti->fd == fd) { - list_del(&r->next); - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - } - } - } - - pthread_mutex_unlock(&rw.lock); + return rw; } static void check_probe(struct frcti * frcti, uint32_t seqno) { - /* disable rtt probe if this packet */ + /* Disable rtt probe on retransmitted packet! */ - /* TODO: This should be locked, but lock reversal! */ + pthread_rwlock_wrlock(&frcti->lock); if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ frcti->srtt_us += frcti->mdev_us; frcti->probe = false; } + + pthread_rwlock_unlock(&frcti->lock); } -/* Return fd on r-timer expiry. */ -static int rxmwheel_move(void) +static void rxmwheel_move(struct rxmwheel * rw) { struct timespec now; struct list_head * p; @@ -135,19 +119,22 @@ static int rxmwheel_move(void) size_t slot; size_t i; - pthread_mutex_lock(&rw.lock); + pthread_mutex_lock(&rw->lock); + + pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, + (void *) &rw->lock); clock_gettime(PTHREAD_COND_CLOCK, &now); slot = ts_to_slot(now); - i = rw.prv; + i = rw->prv; if (slot < i) slot += RXMQ_SLOTS; while (i++ < slot) { - list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { + list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; @@ -156,42 +143,55 @@ static int rxmwheel_move(void) struct shm_du_buff * sdb; uint8_t * head; struct flow * f; + int fd; + uint32_t snd_lwe; + uint32_t rcv_lwe; + time_t rto; r = list_entry(p, struct rxm, next); + list_del(&r->next); snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; + fd = r->frcti->fd; + f = &ai.flows[fd]; shm_du_buff_ack(r->sdb); + pthread_rwlock_rdlock(&r->frcti->lock); + + snd_lwe = snd_cr->lwe; + rcv_lwe = rcv_cr->lwe; + rto = r->frcti->rto; + + pthread_rwlock_unlock(&r->frcti->lock); + /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) < 0) { + if ((int) (r->seqno - snd_lwe) < 0) { ipcp_sdb_release(r->sdb); free(r); continue; } - /* Disable using this seqno as rto probe. */ - check_probe(r->frcti, r->seqno); - /* Check for r-timer expiry. */ - if (ts_to_ms(now) - r->t0 > r->frcti->r) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); + if (ts_to_us(now) - r->t0 > r->frcti->r) { ipcp_sdb_release(r->sdb); free(r); - return fd; + shm_rbuff_set_acl(ai.flows[fd].rx_rb, + ACL_FLOWDOWN); + shm_rbuff_set_acl(ai.flows[fd].tx_rb, + ACL_FLOWDOWN); + continue; } /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send instead of failing? */ - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); ipcp_sdb_release(r->sdb); free(r); - return fd; + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + continue; } idx = shm_du_buff_get_idx(sdb); @@ -202,20 +202,20 @@ static int rxmwheel_move(void) /* Release the old copy. */ ipcp_sdb_release(r->sdb); + /* Disable using this seqno as rto probe. */ + check_probe(r->frcti, r->seqno); + /* Update ackno and make sure DRF is not set. */ - ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); + ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe); ((struct frct_pci *) head)->flags &= ~FRCT_DRF; - f = &ai.flows[r->frcti->fd]; - - /* Retransmit the copy. FIXME: cancel flow */ - if (shm_rbuff_write(f->tx_rb, idx)) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); + /* Retransmit the copy. */ + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { ipcp_sdb_release(sdb); free(r); - /* FIXME: reschedule send? */ - return fd; + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + continue; } /* Reschedule. */ @@ -228,21 +228,20 @@ static int rxmwheel_move(void) r->sdb = sdb; /* Schedule at least in the next time slot */ - rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) + rslot = (slot + MAX(rto >> RXMQ_R, 1)) & (RXMQ_SLOTS - 1); - list_add_tail(&r->next, &rw.wheel[rslot]); + list_add_tail(&r->next, &rw->wheel[rslot]); } } - rw.prv = slot & (RXMQ_SLOTS - 1); - - pthread_mutex_unlock(&rw.lock); + rw->prv = slot & (RXMQ_SLOTS - 1); - return 0; + pthread_cleanup_pop(true); } -static int rxmwheel_add(struct frcti * frcti, +static int rxmwheel_add(struct rxmwheel * rw, + struct frcti * frcti, uint32_t seqno, struct shm_du_buff * sdb) { @@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti * frcti, if (r == NULL) return -ENOMEM; - pthread_mutex_lock(&rw.lock); - clock_gettime(PTHREAD_COND_CLOCK, &now); r->t0 = ts_to_us(now); @@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; + pthread_rwlock_rdlock(&r->frcti->lock); + slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); - list_add_tail(&r->next, &rw.wheel[slot]); + pthread_rwlock_unlock(&r->frcti->lock); + + pthread_mutex_lock(&rw->lock); + + list_add_tail(&r->next, &rw->wheel[slot]); shm_du_buff_wait_ack(sdb); - pthread_mutex_unlock(&rw.lock); + pthread_mutex_unlock(&rw->lock); return 0; } diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 00ffd583..91eb8b5f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -109,7 +109,9 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); - while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { + while (!shm_rbuff_free(rb) + && ret != -ETIMEDOUT + && !(*rb->acl & ACL_FLOWDOWN)) { if (abstime != NULL) ret = -pthread_cond_timedwait(rb->del, rb->lock, @@ -187,7 +189,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); - while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { + while (shm_rbuff_empty(rb) + && (idx != -ETIMEDOUT) + && !(*rb->acl & ACL_FLOWDOWN)) { if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, @@ -224,6 +228,9 @@ void shm_rbuff_set_acl(struct shm_rbuff * rb, #endif *rb->acl = (size_t) flags; + pthread_cond_broadcast(rb->del); + pthread_cond_broadcast(rb->add); + pthread_mutex_unlock(rb->lock); } |