diff options
-rw-r--r-- | src/lib/dev.c | 3 | ||||
-rw-r--r-- | src/lib/frct.c | 156 | ||||
-rw-r--r-- | src/lib/timerwheel.c | 2 |
3 files changed, 130 insertions, 31 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index ca004aa4..4b78c1db 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1058,8 +1058,7 @@ ssize_t flow_write(int fd, return -EAGAIN; idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); } else { - while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { - + while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { if (ret != -ETIMEDOUT) return ret; diff --git a/src/lib/frct.c b/src/lib/frct.c index c0ae6bbb..919e2617 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -20,6 +20,9 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ +#define DELT_RDV (100 * MILLION) /* ns */ +#define MAX_RDV (1 * BILLION) /* ns */ + #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { @@ -39,6 +42,7 @@ struct frcti { time_t mpl; time_t a; time_t r; + time_t rdv; time_t srtt; /* Smoothed rtt */ time_t mdev; /* Deviation */ @@ -54,8 +58,8 @@ struct frcti { pthread_rwlock_t lock; bool open; /* Window open/closed */ - size_t wnd; /* Window size */ struct timespec t_wnd; /* Window closed time */ + struct timespec t_rdvs; /* Last rendez-vous sent */ pthread_cond_t cond; pthread_mutex_t mtx; }; @@ -92,8 +96,10 @@ static bool after(uint32_t seq1, return (int32_t)(seq2 - seq1) < 0; } -static int __send_ack(int fd, - int ackno) +static void __send_frct_pkt(int fd, + uint8_t flags, + uint32_t ackno, + uint32_t rwe) { struct shm_du_buff * sdb; struct frct_pci * pci; @@ -107,12 +113,14 @@ static int __send_ack(int fd, idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb); #endif if (idx < 0) - return -ENOMEM; + return; pci = (struct frct_pci *) shm_du_buff_head(sdb); memset(pci, 0, sizeof(*pci)); - pci->flags = FRCT_ACK; + *((uint32_t *) pci) = hton32(rwe); + + pci->flags = flags; pci->ackno = hton32(ackno); f = &ai.flows[fd]; @@ -122,19 +130,18 @@ static int __send_ack(int fd, if (shm_rbuff_write(f->tx_rb, idx)) { #endif ipcp_sdb_release(sdb); - return -ENOMEM; + return; } shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - - return 0; } -static void frct_send_ack(struct frcti * frcti) +static void send_frct_pkt(struct frcti * frcti) { struct timespec now; time_t diff; uint32_t ackno; + uint32_t rwe; int fd; assert(frcti); @@ -146,8 +153,9 @@ static void frct_send_ack(struct frcti * frcti) return; } - ackno = frcti->rcv_cr.lwe; fd = frcti->fd; + ackno = frcti->rcv_cr.lwe; + rwe = frcti->rcv_cr.rwe; clock_gettime(PTHREAD_COND_CLOCK, &now); @@ -158,8 +166,7 @@ static void frct_send_ack(struct frcti * frcti) if (diff > frcti->a || diff < DELT_ACK) return; - if (__send_ack(fd, ackno) < 0) - return; + __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe); pthread_rwlock_wrlock(&frcti->lock); @@ -169,14 +176,42 @@ static void frct_send_ack(struct frcti * frcti) pthread_rwlock_unlock(&frcti->lock); } +static void __send_rdv(int fd) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct flow * f; + + /* Raw calls needed to bypass frcti. */ + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + pci->flags = FRCT_RDVS; + + f = &ai.flows[fd]; + + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + ipcp_sdb_release(sdb); + return; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); +} + static struct frcti * frcti_create(int fd) { - struct frcti * frcti; - ssize_t idx; - struct timespec now; - time_t mpl; - time_t a; - time_t r; + struct frcti * frcti; + ssize_t idx; + struct timespec now; + time_t mpl; + time_t a; + time_t r; + pthread_condattr_t cattr; frcti = malloc(sizeof(*frcti)); if (frcti == NULL) @@ -190,7 +225,12 @@ static struct frcti * frcti_create(int fd) if (pthread_mutex_init(&frcti->mtx, NULL)) goto fail_mutex; - if (pthread_cond_init(&frcti->cond, NULL)) + if (pthread_condattr_init(&cattr)) + goto fail_cattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&frcti->cond, &cattr)) goto fail_cond; for (idx = 0; idx < RQ_SIZE; ++idx) @@ -201,6 +241,7 @@ static struct frcti * frcti_create(int fd) frcti->mpl = mpl = DELT_MPL; frcti->a = a = DELT_A; frcti->r = r = DELT_R; + frcti->rdv = DELT_RDV; frcti->fd = fd; @@ -229,6 +270,8 @@ static struct frcti * frcti_create(int fd) return frcti; fail_cond: + pthread_condattr_destroy(&cattr); +fail_cattr: pthread_mutex_destroy(&frcti->mtx); fail_mutex: pthread_rwlock_destroy(&frcti->lock); @@ -311,11 +354,30 @@ static bool __frcti_is_window_open(struct frcti * frcti) ret = before(snd_cr->seqno, snd_cr->rwe); if (!ret) { + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + pthread_mutex_lock(&frcti->mtx); if (frcti->open) { - clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); - frcti->open = false; + frcti->open = false; + frcti->t_wnd = now; + frcti->t_rdvs = now; + } else { + time_t diff; + diff = ts_diff_ns(&frcti->t_wnd, &now); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + return false; + } + + diff = ts_diff_ns(&frcti->t_rdvs, &now); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); + } } + pthread_mutex_unlock(&frcti->mtx); } @@ -338,12 +400,17 @@ static int __frcti_window_wait(struct frcti * frcti, } while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) { + struct timespec now; + pthread_rwlock_unlock(&frcti->lock); pthread_mutex_lock(&frcti->mtx); if (frcti->open) { - clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd); - frcti->open = false; + clock_gettime(PTHREAD_COND_CLOCK, &now); + + frcti->t_wnd = now; + frcti->t_rdvs = now; + frcti->open = false; } pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, @@ -353,7 +420,27 @@ static int __frcti_window_wait(struct frcti * frcti, &frcti->mtx, abstime); - pthread_cleanup_pop(true); + pthread_cleanup_pop(false); + + if (ret == -ETIMEDOUT) { + time_t diff; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->t_wnd, &now); + if (diff > MAX_RDV) { + pthread_mutex_unlock(&frcti->mtx); + return -ECONNRESET; /* write fails! */ + } + + diff = ts_diff_ns(&frcti->t_rdvs, &now); + if (diff > frcti->rdv) { + frcti->t_rdvs = now; + __send_rdv(frcti->fd); + } + } + + pthread_mutex_unlock(&frcti->mtx); pthread_rwlock_rdlock(&frcti->lock); } @@ -436,7 +523,7 @@ static time_t __frcti_dealloc(struct frcti * frcti) pthread_rwlock_unlock(&frcti->lock); if (fd != -1) - __send_ack(fd, ackno); + __send_frct_pkt(fd, FRCT_ACK, ackno, 0); return wait; } @@ -557,6 +644,7 @@ static void __frcti_rcv(struct frcti * frcti, struct frct_cr * snd_cr; uint32_t seqno; uint32_t ackno; + uint32_t rwe; int fd = -1; assert(frcti); @@ -583,6 +671,18 @@ static void __frcti_rcv(struct frcti * frcti, } } + /* For now, just send an immediate window update. */ + if (pci->flags & FRCT_RDVS) { + fd = frcti->fd; + rwe = rcv_cr->rwe; + pthread_rwlock_unlock(&frcti->lock); + + __send_frct_pkt(fd, FRCT_FC, 0, rwe); + + shm_rdrbuff_remove(ai.rdrb, idx); + return; + } + if (pci->flags & FRCT_ACK) { ackno = ntoh32(pci->ackno); if (after(ackno, frcti->snd_cr.lwe)) @@ -618,13 +718,13 @@ static void __frcti_rcv(struct frcti * frcti, goto drop_packet; if (before(seqno, rcv_cr->lwe)) { - rcv_cr->seqno = seqno; + rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */ goto drop_packet; } if (rcv_cr->cflags & FRCTFRTX) { - if (!before(seqno, rcv_cr->rwe)) /* Out of window */ + if (!before(seqno, rcv_cr->rwe)) /* Out of window. */ goto drop_packet; if (!before(seqno, rcv_cr->lwe + RQ_SIZE)) @@ -652,7 +752,7 @@ static void __frcti_rcv(struct frcti * frcti, drop_packet: pthread_rwlock_unlock(&frcti->lock); - frct_send_ack(frcti); + send_frct_pkt(frcti); shm_rdrbuff_remove(ai.rdrb, idx); return; diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index 4443832d..cd3074cb 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -294,7 +294,7 @@ static void timerwheel_move(void) rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; if (f->flow_id == a->flow_id && f->frcti != NULL) - frct_send_ack(a->frcti); + send_frct_pkt(a->frcti); free(a); |