summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lib/dev.c3
-rw-r--r--src/lib/frct.c156
-rw-r--r--src/lib/timerwheel.c2
3 files changed, 130 insertions, 31 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ca004aa4..4b78c1db 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1058,8 +1058,7 @@ ssize_t flow_write(int fd,
return -EAGAIN;
idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
} else {
- while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
-
+ while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
if (ret != -ETIMEDOUT)
return ret;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index c0ae6bbb..919e2617 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -20,6 +20,9 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
+#define DELT_RDV (100 * MILLION) /* ns */
+#define MAX_RDV (1 * BILLION) /* ns */
+
#define FRCT_PCILEN (sizeof(struct frct_pci))
struct frct_cr {
@@ -39,6 +42,7 @@ struct frcti {
time_t mpl;
time_t a;
time_t r;
+ time_t rdv;
time_t srtt; /* Smoothed rtt */
time_t mdev; /* Deviation */
@@ -54,8 +58,8 @@ struct frcti {
pthread_rwlock_t lock;
bool open; /* Window open/closed */
- size_t wnd; /* Window size */
struct timespec t_wnd; /* Window closed time */
+ struct timespec t_rdvs; /* Last rendez-vous sent */
pthread_cond_t cond;
pthread_mutex_t mtx;
};
@@ -92,8 +96,10 @@ static bool after(uint32_t seq1,
return (int32_t)(seq2 - seq1) < 0;
}
-static int __send_ack(int fd,
- int ackno)
+static void __send_frct_pkt(int fd,
+ uint8_t flags,
+ uint32_t ackno,
+ uint32_t rwe)
{
struct shm_du_buff * sdb;
struct frct_pci * pci;
@@ -107,12 +113,14 @@ static int __send_ack(int fd,
idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);
#endif
if (idx < 0)
- return -ENOMEM;
+ return;
pci = (struct frct_pci *) shm_du_buff_head(sdb);
memset(pci, 0, sizeof(*pci));
- pci->flags = FRCT_ACK;
+ *((uint32_t *) pci) = hton32(rwe);
+
+ pci->flags = flags;
pci->ackno = hton32(ackno);
f = &ai.flows[fd];
@@ -122,19 +130,18 @@ static int __send_ack(int fd,
if (shm_rbuff_write(f->tx_rb, idx)) {
#endif
ipcp_sdb_release(sdb);
- return -ENOMEM;
+ return;
}
shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
- return 0;
}
-static void frct_send_ack(struct frcti * frcti)
+static void send_frct_pkt(struct frcti * frcti)
{
struct timespec now;
time_t diff;
uint32_t ackno;
+ uint32_t rwe;
int fd;
assert(frcti);
@@ -146,8 +153,9 @@ static void frct_send_ack(struct frcti * frcti)
return;
}
- ackno = frcti->rcv_cr.lwe;
fd = frcti->fd;
+ ackno = frcti->rcv_cr.lwe;
+ rwe = frcti->rcv_cr.rwe;
clock_gettime(PTHREAD_COND_CLOCK, &now);
@@ -158,8 +166,7 @@ static void frct_send_ack(struct frcti * frcti)
if (diff > frcti->a || diff < DELT_ACK)
return;
- if (__send_ack(fd, ackno) < 0)
- return;
+ __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);
pthread_rwlock_wrlock(&frcti->lock);
@@ -169,14 +176,42 @@ static void frct_send_ack(struct frcti * frcti)
pthread_rwlock_unlock(&frcti->lock);
}
+static void __send_rdv(int fd)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct flow * f;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_RDVS;
+
+ f = &ai.flows[fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+}
+
static struct frcti * frcti_create(int fd)
{
- struct frcti * frcti;
- ssize_t idx;
- struct timespec now;
- time_t mpl;
- time_t a;
- time_t r;
+ struct frcti * frcti;
+ ssize_t idx;
+ struct timespec now;
+ time_t mpl;
+ time_t a;
+ time_t r;
+ pthread_condattr_t cattr;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
@@ -190,7 +225,12 @@ static struct frcti * frcti_create(int fd)
if (pthread_mutex_init(&frcti->mtx, NULL))
goto fail_mutex;
- if (pthread_cond_init(&frcti->cond, NULL))
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&frcti->cond, &cattr))
goto fail_cond;
for (idx = 0; idx < RQ_SIZE; ++idx)
@@ -201,6 +241,7 @@ static struct frcti * frcti_create(int fd)
frcti->mpl = mpl = DELT_MPL;
frcti->a = a = DELT_A;
frcti->r = r = DELT_R;
+ frcti->rdv = DELT_RDV;
frcti->fd = fd;
@@ -229,6 +270,8 @@ static struct frcti * frcti_create(int fd)
return frcti;
fail_cond:
+ pthread_condattr_destroy(&cattr);
+fail_cattr:
pthread_mutex_destroy(&frcti->mtx);
fail_mutex:
pthread_rwlock_destroy(&frcti->lock);
@@ -311,11 +354,30 @@ static bool __frcti_is_window_open(struct frcti * frcti)
ret = before(snd_cr->seqno, snd_cr->rwe);
if (!ret) {
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
pthread_mutex_lock(&frcti->mtx);
if (frcti->open) {
- clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
- frcti->open = false;
+ frcti->open = false;
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ } else {
+ time_t diff;
+ diff = ts_diff_ns(&frcti->t_wnd, &now);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ return false;
+ }
+
+ diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+ }
}
+
pthread_mutex_unlock(&frcti->mtx);
}
@@ -338,12 +400,17 @@ static int __frcti_window_wait(struct frcti * frcti,
}
while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
+ struct timespec now;
+
pthread_rwlock_unlock(&frcti->lock);
pthread_mutex_lock(&frcti->mtx);
if (frcti->open) {
- clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
- frcti->open = false;
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ frcti->t_wnd = now;
+ frcti->t_rdvs = now;
+ frcti->open = false;
}
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
@@ -353,7 +420,27 @@ static int __frcti_window_wait(struct frcti * frcti,
&frcti->mtx,
abstime);
- pthread_cleanup_pop(true);
+ pthread_cleanup_pop(false);
+
+ if (ret == -ETIMEDOUT) {
+ time_t diff;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->t_wnd, &now);
+ if (diff > MAX_RDV) {
+ pthread_mutex_unlock(&frcti->mtx);
+ return -ECONNRESET; /* write fails! */
+ }
+
+ diff = ts_diff_ns(&frcti->t_rdvs, &now);
+ if (diff > frcti->rdv) {
+ frcti->t_rdvs = now;
+ __send_rdv(frcti->fd);
+ }
+ }
+
+ pthread_mutex_unlock(&frcti->mtx);
pthread_rwlock_rdlock(&frcti->lock);
}
@@ -436,7 +523,7 @@ static time_t __frcti_dealloc(struct frcti * frcti)
pthread_rwlock_unlock(&frcti->lock);
if (fd != -1)
- __send_ack(fd, ackno);
+ __send_frct_pkt(fd, FRCT_ACK, ackno, 0);
return wait;
}
@@ -557,6 +644,7 @@ static void __frcti_rcv(struct frcti * frcti,
struct frct_cr * snd_cr;
uint32_t seqno;
uint32_t ackno;
+ uint32_t rwe;
int fd = -1;
assert(frcti);
@@ -583,6 +671,18 @@ static void __frcti_rcv(struct frcti * frcti,
}
}
+ /* For now, just send an immediate window update. */
+ if (pci->flags & FRCT_RDVS) {
+ fd = frcti->fd;
+ rwe = rcv_cr->rwe;
+ pthread_rwlock_unlock(&frcti->lock);
+
+ __send_frct_pkt(fd, FRCT_FC, 0, rwe);
+
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return;
+ }
+
if (pci->flags & FRCT_ACK) {
ackno = ntoh32(pci->ackno);
if (after(ackno, frcti->snd_cr.lwe))
@@ -618,13 +718,13 @@ static void __frcti_rcv(struct frcti * frcti,
goto drop_packet;
if (before(seqno, rcv_cr->lwe)) {
- rcv_cr->seqno = seqno;
+ rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
goto drop_packet;
}
if (rcv_cr->cflags & FRCTFRTX) {
- if (!before(seqno, rcv_cr->rwe)) /* Out of window */
+ if (!before(seqno, rcv_cr->rwe)) /* Out of window. */
goto drop_packet;
if (!before(seqno, rcv_cr->lwe + RQ_SIZE))
@@ -652,7 +752,7 @@ static void __frcti_rcv(struct frcti * frcti,
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
- frct_send_ack(frcti);
+ send_frct_pkt(frcti);
shm_rdrbuff_remove(ai.rdrb, idx);
return;
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 4443832d..cd3074cb 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -294,7 +294,7 @@ static void timerwheel_move(void)
rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
if (f->flow_id == a->flow_id && f->frcti != NULL)
- frct_send_ack(a->frcti);
+ send_frct_pkt(a->frcti);
free(a);