From 1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 20 Sep 2020 13:04:52 +0200 Subject: lib: Complete retransmission logic This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/fccntl.h | 4 +- src/irmd/ipcp.c | 13 +- src/irmd/ipcp.h | 5 +- src/irmd/main.c | 20 ++- src/lib/dev.c | 133 +++++++++++--- src/lib/frct.c | 244 ++++++++++++++++---------- src/lib/ipcpd_messages.proto | 3 +- src/lib/rxmwheel.c | 261 --------------------------- src/lib/timerwheel.c | 409 +++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 693 insertions(+), 399 deletions(-) delete mode 100644 src/lib/rxmwheel.c create mode 100644 src/lib/timerwheel.c diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h index 965e281d..ccd74b6c 100644 --- a/include/ouroboros/fccntl.h +++ b/include/ouroboros/fccntl.h @@ -48,6 +48,7 @@ /* FRCT flags */ #define FRCTFRESCNTRL 00000001 /* Feedback from receiver */ #define FRCTFRTX 00000002 /* Reliable flow */ +#define FRCTFLINGER 00000004 /* Sent unsent data */ /* Flow operations */ #define FLOWSRCVTIMEO 00000001 /* Set read timeout */ @@ -61,7 +62,8 @@ #define FLOWGTXQLEN 00000011 /* Get queue length on tx */ /* FRCT operations */ -#define FRCTGFLAGS 00001000 /* Get flags for FRCT */ +#define FRCTSFLAGS 00001000 /* Set flags for FRCT */ +#define FRCTGFLAGS 00002000 /* Get flags for FRCT */ __BEGIN_DECLS diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 78408185..cbd9ee15 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -543,16 +543,19 @@ int ipcp_flow_alloc_resp(pid_t pid, return ret; } -int ipcp_flow_dealloc(pid_t pid, - int flow_id) +int ipcp_flow_dealloc(pid_t pid, + int flow_id, + time_t timeo) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.flow_id = flow_id; + msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; + msg.has_flow_id = true; + msg.flow_id = flow_id; + msg.has_timeo_sec = true; + msg.timeo_sec = timeo; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index ae00792b..652316ba 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -85,7 +85,8 @@ int ipcp_flow_alloc_resp(pid_t pid, const void * data, size_t len); -int ipcp_flow_dealloc(pid_t pid, - int flow_id); +int ipcp_flow_dealloc(pid_t pid, + int flow_id, + time_t timeo); #endif /* OUROBOROS_IRMD_IPCP_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index 3709a3e5..3a0ad544 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -68,10 +68,11 @@ #endif #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ -#define SHM_SAN_HOLDOFF 1000 /* ms */ -#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) -#define IB_LEN SOCK_BUF_SIZE -#define BIND_TIMEOUT 10 /* ms */ +#define SHM_SAN_HOLDOFF 1000 /* ms */ +#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo) +#define IB_LEN SOCK_BUF_SIZE +#define BIND_TIMEOUT 10 /* ms */ +#define DEALLOC_TIME 300 /* s */ enum init_state { IPCP_NULL = 0, @@ -1475,7 +1476,8 @@ static int flow_alloc(pid_t pid, } static int flow_dealloc(pid_t pid, - int flow_id) + int flow_id, + time_t timeo) { pid_t n_1_pid = -1; int ret = 0; @@ -1521,7 +1523,7 @@ static int flow_dealloc(pid_t pid, pthread_rwlock_unlock(&irmd.flows_lock); if (n_1_pid != -1) - ret = ipcp_flow_dealloc(n_1_pid, flow_id); + ret = ipcp_flow_dealloc(n_1_pid, flow_id, timeo); return ret; } @@ -1927,7 +1929,7 @@ void * irm_sanitize(void * o) ipcpi = f->n_1_pid; flow_id = f->flow_id; pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_dealloc(ipcpi, flow_id); + ipcp_flow_dealloc(ipcpi, flow_id, DEALLOC_TIME); pthread_rwlock_wrlock(&irmd.flows_lock); continue; } @@ -2190,7 +2192,9 @@ static void * mainloop(void * o) } break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - result = flow_dealloc(msg->pid, msg->flow_id); + result = flow_dealloc(msg->pid, + msg->flow_id, + msg->timeo_sec); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: assert(msg->pk.len > 0 ? msg->pk.data != NULL diff --git a/src/lib/dev.c b/src/lib/dev.c index df616ead..8d7d7934 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -63,6 +63,7 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 +#define TICTIME 1000000 /* ns */ struct flow_set { size_t idx; @@ -255,6 +256,9 @@ static void flow_fini(int fd) bmp_release(ai.fds, fd); } + if (ai.flows[fd].frcti != NULL) + frcti_destroy(ai.flows[fd].frcti); + if (ai.flows[fd].rx_rb != NULL) { shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); shm_rbuff_close(ai.flows[fd].rx_rb); @@ -272,9 +276,6 @@ static void flow_fini(int fd) shm_flow_set_close(ai.flows[fd].set); } - if (ai.flows[fd].frcti != NULL) - frcti_destroy(ai.flows[fd].frcti); - if (ai.flows[fd].ctx != NULL) crypt_fini(ai.flows[fd].ctx); @@ -433,8 +434,13 @@ static void init(int argc, if (ai.fqset == NULL) goto fail_fqset; + if (timerwheel_init() < 0) + goto fail_timerwheel; + return; + fail_timerwheel: + shm_flow_set_close(ai.fqset); fail_fqset: pthread_rwlock_destroy(&ai.lock); fail_lock: @@ -491,6 +497,8 @@ static void fini(void) pthread_cond_destroy(&ai.ports[i].state_cond); } + timerwheel_fini(); + shm_rdrbuff_close(ai.rdrb); free(ai.flows); @@ -747,25 +755,59 @@ int flow_join(const char * dst, int flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg; + struct flow * f; + time_t timeo; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.has_pid = true; - msg.pid = ai.pid; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_flow_id = true; + msg.has_pid = true; + msg.pid = ai.pid; + msg.has_timeo_sec = true; + msg.has_timeo_nsec = true; + msg.timeo_nsec = 0; + + f = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id < 0) { + if (f->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - msg.flow_id = ai.flows[fd].flow_id; + msg.flow_id = f->flow_id; + + timeo = frcti_dealloc(f->frcti); + while (timeo < 0) { /* keep the flow active for rtx */ + ssize_t ret; + uint8_t buf[128]; + + f->oflags = FLOWFDEFAULT | FLOWFRNOPART; + + f->rcv_timesout = true; + f->rcv_timeo.tv_sec = -timeo; + f->rcv_timeo.tv_nsec = 0; + + pthread_rwlock_unlock(&ai.lock); + + ret = flow_read(fd, buf, 128); + + pthread_rwlock_rdlock(&ai.lock); + + timeo = frcti_dealloc(f->frcti); + + if (ret == -ETIMEDOUT && timeo < 0) + timeo = -timeo; + } + + msg.timeo_sec = timeo; + + shm_rbuff_fini(ai.flows[fd].tx_rb); pthread_rwlock_unlock(&ai.lock); @@ -904,13 +946,21 @@ int fccntl(int fd, goto einval; *fflags = flow->oflags; break; + case FRCTSFLAGS: + cflags = va_arg(l, uint16_t *); + if (cflags == NULL) + goto einval; + if (flow->frcti == NULL) + goto eperm; + frcti_setflags(flow->frcti, *cflags); + break; case FRCTGFLAGS: cflags = (uint16_t *) va_arg(l, int *); if (cflags == NULL) goto einval; if (flow->frcti == NULL) goto eperm; - *cflags = frcti_getconf(flow->frcti); + *cflags = frcti_getflags(flow->frcti); break; default: pthread_rwlock_unlock(&ai.lock); @@ -1067,6 +1117,8 @@ ssize_t flow_read(int fd, struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; + struct timespec tic = {0, TICTIME}; + struct timespec tictime; struct timespec * abstime = NULL; struct flow * flow; bool noblock; @@ -1096,6 +1148,8 @@ ssize_t flow_read(int fd, noblock = flow->oflags & FLOWFRNOBLOCK; partrd = !(flow->oflags & FLOWFRNOPART); + ts_add(&tic, &abs, &tictime); + if (ai.flows[fd].rcv_timesout) { ts_add(&abs, &flow->rcv_timeo, &abs); abstime = &abs; @@ -1108,9 +1162,21 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&ai.lock); idx = noblock ? shm_rbuff_read(rb) : - shm_rbuff_read_b(rb, abstime); - if (idx < 0) - return idx; + shm_rbuff_read_b(rb, &tictime); + if (idx < 0) { + frcti_tick(flow->frcti); + + if (idx != -ETIMEDOUT) + return idx; + + if (abstime != NULL + && ts_diff_ns(&tictime, &abs) < 0) + return -ETIMEDOUT; + + ts_add(&tictime, &tic, &tictime); + pthread_rwlock_rdlock(&ai.lock); + continue; + } sdb = shm_rdrbuff_get(ai.rdrb, idx); if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { @@ -1339,7 +1405,9 @@ ssize_t fevent(struct flow_set * set, const struct timespec * timeo) { ssize_t ret = 0; - struct timespec abstime; + struct timespec tic = {0, TICTIME}; + struct timespec tictime; + struct timespec abs; struct timespec * t = NULL; if (set == NULL || fq == NULL) @@ -1348,17 +1416,25 @@ ssize_t fevent(struct flow_set * set, if (fq->fqsize > 0 && fq->next != fq->fqsize) return fq->fqsize; - if (timeo != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeo, &abstime); - t = &abstime; - } + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&tic, &abs, &tictime); + t = &tictime; + + if (timeo != NULL) + ts_add(&abs, timeo, &abs); while (ret == 0) { ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; + if (timeo != NULL && ts_diff_ns(t, &abs) < 0) { + fq->fqsize = 0; + return -ETIMEDOUT; + } + ret = 0; + ts_add(t, &tic, t); + timerwheel_move(); + continue; } fq->fqsize = ret << 1; @@ -1382,10 +1458,19 @@ int np1_flow_alloc(pid_t n_pid, return flow_init(flow_id, n_pid, qs, NULL); } -int np1_flow_dealloc(int flow_id) +int np1_flow_dealloc(int flow_id, + time_t timeo) { int fd; + /* + * TODO: Don't pass timeo to the IPCP but wait in IRMd. + * This will need async ops, waiting until we bootstrap + * the IRMd over ouroboros. + */ + + sleep(timeo); + pthread_rwlock_rdlock(&ai.lock); fd = ai.ports[flow_id].fd; diff --git a/src/lib/frct.c b/src/lib/frct.c index 2bd126f4..c26910fa 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,7 +21,7 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL (60 * BILLION) /* ns */ +#define DELT_MPL (5 * BILLION) /* ns */ #define DELT_A (1 * BILLION) /* ns */ #define DELT_R (20 * BILLION) /* ns */ @@ -59,8 +59,6 @@ struct frcti { struct frct_cr snd_cr; struct frct_cr rcv_cr; - struct rxmwheel * rw; - ssize_t rq[RQ_SIZE]; pthread_rwlock_t lock; }; @@ -86,7 +84,84 @@ struct frct_pci { uint32_t ackno; } __attribute__((packed)); -#include +static bool before(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq1 - seq2) < 0; +} + +static bool after(uint32_t seq1, + uint32_t seq2) +{ + return (int32_t)(seq2 - seq1) < 0; +} + +static void __send_ack(int fd, + int ackno) +{ + struct shm_du_buff * sdb; + struct frct_pci * pci; + ssize_t idx; + struct flow * f; + + /* Raw calls needed to bypass frcti. */ + idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); + if (idx < 0) + return; + + pci = (struct frct_pci *) shm_du_buff_head(sdb); + memset(pci, 0, sizeof(*pci)); + + pci->flags = FRCT_ACK; + pci->ackno = hton32(ackno); + + f = &ai.flows[fd]; + + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + ipcp_sdb_release(sdb); + return; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); +} + +static void frct_send_ack(struct frcti * frcti) +{ + struct timespec now; + time_t diff; + uint32_t ackno; + int fd; + + assert(frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { + pthread_rwlock_unlock(&frcti->lock); + return; + } + + ackno = frcti->rcv_cr.lwe; + fd = frcti->fd; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + diff = ts_diff_ns(&frcti->rcv_cr.act, &now); + + pthread_rwlock_unlock(&frcti->lock); + + if (diff > frcti->a || diff < DELT_ACK) + return; + + __send_ack(fd, ackno); + + pthread_rwlock_wrlock(&frcti->lock); + + if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) + frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + + pthread_rwlock_unlock(&frcti->lock); +} static struct frcti * frcti_create(int fd) { @@ -123,14 +198,10 @@ static struct frcti * frcti_create(int fd) frcti->srtt = 0; /* Updated on first ACK */ frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */ frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */ - frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { - frcti->snd_cr.cflags |= FRCTFRTX; + frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER; frcti->rcv_cr.cflags |= FRCTFRTX; - frcti->rw = rxmwheel_create(); - if (frcti->rw == NULL) - goto fail_rw; } frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */ @@ -141,8 +212,6 @@ static struct frcti * frcti_create(int fd) return frcti; - fail_rw: - pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -151,24 +220,16 @@ static struct frcti * frcti_create(int fd) static void frcti_destroy(struct frcti * frcti) { - /* - * FIXME: In case of reliable transmission we should - * make sure everything we sent is acked. - */ - - if (frcti->rw != NULL) - rxmwheel_destroy(frcti->rw); - pthread_rwlock_destroy(&frcti->lock); free(frcti); } -static uint16_t frcti_getconf(struct frcti * frcti) +static uint16_t frcti_getflags(struct frcti * frcti) { uint16_t ret; - assert (frcti); + assert(frcti); pthread_rwlock_rdlock(&frcti->lock); @@ -179,6 +240,22 @@ static uint16_t frcti_getconf(struct frcti * frcti) return ret; } +static void frcti_setflags(struct frcti * frcti, + uint16_t flags) +{ + flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */ + + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */ + + frcti->snd_cr.cflags &= flags; + + pthread_rwlock_unlock(&frcti->lock); +} + #define frcti_queued_pdu(frcti) \ (frcti == NULL ? idx : __frcti_queued_pdu(frcti)) @@ -189,8 +266,10 @@ static uint16_t frcti_getconf(struct frcti * frcti) (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) #define frcti_tick(frcti) \ - (frcti == NULL ? 0 : __frcti_tick(frcti)) + (frcti == NULL ? 0 : __frcti_tick()) +#define frcti_dealloc(frcti) \ + (frcti == NULL ? 0 : __frcti_dealloc(frcti)) static ssize_t __frcti_queued_pdu(struct frcti * frcti) { @@ -233,78 +312,41 @@ static ssize_t __frcti_pdu_ready(struct frcti * frcti) return idx; } -static bool before(uint32_t seq1, - uint32_t seq2) -{ - return (int32_t)(seq1 - seq2) < 0; -} - -static bool after(uint32_t seq1, - uint32_t seq2) -{ - return (int32_t)(seq2 - seq1) < 0; -} +#include -static void frct_send_ack(struct frcti * frcti) +/* + * Send a final ACK for everything that has not been ACK'd. + * If the flow should be kept active for retransmission, + * the returned time will be negative. + */ +static time_t __frcti_dealloc(struct frcti * frcti) { - struct shm_du_buff * sdb; - struct frct_pci * pci; - ssize_t idx; - struct timespec now; - time_t diff; - uint32_t ackno; - struct flow * f; + struct timespec now; + time_t wait; + int ackno; + int fd = -1; - assert(frcti); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_rdlock(&frcti->lock); - if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) { - pthread_rwlock_unlock(&frcti->lock); - return; - } - ackno = frcti->rcv_cr.lwe; + if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno) + fd = frcti->fd; - pthread_rwlock_unlock(&frcti->lock); + wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec, + frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec); - clock_gettime(PTHREAD_COND_CLOCK, &now); + if (frcti->snd_cr.cflags & FRCTFLINGER + && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno)) + wait = -wait; - diff = ts_diff_ns(&frcti->rcv_cr.act, &now); - - if (diff > frcti->a) - return; - - if (diff < DELT_ACK) - return; - - /* Raw calls needed to bypass frcti. */ - idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL); - if (idx < 0) - return; - - pci = (struct frct_pci *) shm_du_buff_head(sdb); - memset(pci, 0, sizeof(*pci)); - - pci->flags = FRCT_ACK; - pci->ackno = hton32(ackno); - - f = &ai.flows[frcti->fd]; - - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - pthread_rwlock_rdlock(&ai.lock); - ipcp_sdb_release(sdb); - return; - } - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - - pthread_rwlock_wrlock(&frcti->lock); + pthread_rwlock_unlock(&frcti->lock); - if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno)) - frcti->rcv_cr.seqno = frcti->rcv_cr.lwe; + if (fd != -1) + __send_ack(fd, ackno); - pthread_rwlock_unlock(&frcti->lock); + return wait; } static int __frcti_snd(struct frcti * frcti, @@ -315,14 +357,14 @@ static int __frcti_snd(struct frcti * frcti, struct frct_cr * snd_cr; struct frct_cr * rcv_cr; uint32_t seqno; + bool rtx; assert(frcti); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - if (frcti->rw != NULL) - rxmwheel_move(frcti->rw); + timerwheel_move(); pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN); if (pci == NULL) @@ -334,6 +376,8 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_wrlock(&frcti->lock); + rtx = snd_cr->cflags & FRCTFRTX; + pci->flags |= FRCT_DATA; /* Set DRF if there are no unacknowledged packets. */ @@ -351,7 +395,7 @@ static int __frcti_snd(struct frcti * frcti, seqno = snd_cr->seqno; pci->seqno = hton32(seqno); - if (!(snd_cr->cflags & FRCTFRTX)) { + if (!rtx) { snd_cr->lwe++; } else { if (!frcti->probe) { @@ -372,8 +416,8 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); - if (frcti->rw != NULL) - rxmwheel_add(frcti->rw, frcti, seqno, sdb); + if (rtx) + timerwheel_rxm(frcti, seqno, sdb); return 0; } @@ -387,12 +431,10 @@ static void rtt_estimator(struct frcti * frcti, if (srtt == 0) { /* first measurement */ srtt = mrtt; rttvar = mrtt >> 1; - } else { time_t delta = mrtt - srtt; srtt += (delta >> 3); - rttvar -= rttvar >> 2; - rttvar += ABS(delta) >> 2; + rttvar += (ABS(delta) - rttvar) >> 2; } frcti->srtt = MAX(1000U, srtt); @@ -401,12 +443,9 @@ static void rtt_estimator(struct frcti * frcti, frcti->srtt + (frcti->mdev << 1)); } -static void __frcti_tick(struct frcti * frcti) +static void __frcti_tick(void) { - if (frcti->rw != NULL) { - rxmwheel_move(frcti->rw); - frct_send_ack(frcti); - } + timerwheel_move(); } /* Always queues the next application packet on the RQ. */ @@ -420,6 +459,7 @@ static void __frcti_rcv(struct frcti * frcti, struct frct_cr * rcv_cr; uint32_t seqno; uint32_t ackno; + int fd = -1; assert(frcti); @@ -456,8 +496,10 @@ static void __frcti_rcv(struct frcti * frcti, if (!(pci->flags & FRCT_DATA)) goto drop_packet; - if (before(seqno, rcv_cr->lwe)) + if (before(seqno, rcv_cr->lwe)) { + rcv_cr->seqno = seqno; goto drop_packet; + } if (rcv_cr->cflags & FRCTFRTX) { if ((seqno - rcv_cr->lwe) >= RQ_SIZE) @@ -465,6 +507,8 @@ static void __frcti_rcv(struct frcti * frcti, if (frcti->rq[pos] != -1) goto drop_packet; /* Duplicate in rq. */ + + fd = frcti->fd; } else { rcv_cr->lwe = seqno; } @@ -475,10 +519,16 @@ static void __frcti_rcv(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); + if (fd != -1) + timerwheel_ack(fd, frcti); + return; drop_packet: pthread_rwlock_unlock(&frcti->lock); + + frct_send_ack(frcti); + shm_rdrbuff_remove(ai.rdrb, idx); return; } @@ -492,7 +542,7 @@ int frcti_filter(struct fqueue * fq) struct frcti * frcti; struct shm_rbuff * rb; - while(fq->next < fq->fqsize) { + while (fq->next < fq->fqsize) { if (fq->fqueue[fq->next + 1] != FLOW_PKT) return 1; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index b0efe9ab..809117b8 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -52,5 +52,6 @@ message ipcp_msg { optional layer_info_msg layer_info = 9; optional int32 response = 10; optional string comp = 11; - optional int32 result = 12; + optional uint32 timeo_sec = 12; + optional int32 result = 13; }; diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c deleted file mode 100644 index 0572c7b7..00000000 --- a/src/lib/rxmwheel.c +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2020 - * - * Timerwheel - * - * Dimitri Staessens - * Sander Vrijders - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public License - * version 2.1 as published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#include - -#define RXMQ_S 14 /* defines #slots */ -#define RXMQ_M 34 /* defines max delay (ns) */ -#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */ -#define RXMQ_SLOTS (1 << RXMQ_S) -#define RXMQ_MAX (1 << RXMQ_M) /* us */ - -/* Overflow limits range to about 6 hours. */ -#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) -#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) - -struct rxm { - struct list_head next; - uint32_t seqno; - struct shm_du_buff * sdb; - uint8_t * head; - uint8_t * tail; - time_t t0; /* Time when original was sent (us). */ - size_t mul; /* RTO multiplier. */ - struct frcti * frcti; -}; - -struct rxmwheel { - struct list_head wheel[RXMQ_SLOTS]; - - size_t prv; /* Last processed slot. */ - pthread_mutex_t lock; -}; - -static void rxmwheel_destroy(struct rxmwheel * rw) -{ - size_t i; - struct list_head * p; - struct list_head * h; - - pthread_mutex_destroy(&rw->lock); - - for (i = 0; i < RXMQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw->wheel[i]) { - struct rxm * rxm = list_entry(p, struct rxm, next); - list_del(&rxm->next); - shm_du_buff_ack(rxm->sdb); - ipcp_sdb_release(rxm->sdb); - free(rxm); - } - } -} - -static struct rxmwheel * rxmwheel_create(void) -{ - struct rxmwheel * rw; - struct timespec now; - size_t i; - - rw = malloc(sizeof(*rw)); - if (rw == NULL) - return NULL; - - if (pthread_mutex_init(&rw->lock, NULL)) { - free(rw); - return NULL; - } - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - /* Mark the previous timeslot as the last one processed. */ - rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); - - for (i = 0; i < RXMQ_SLOTS; ++i) - list_head_init(&rw->wheel[i]); - - return rw; -} - -static void rxmwheel_move(struct rxmwheel * rw) -{ - struct timespec now; - struct list_head * p; - struct list_head * h; - size_t slot; - size_t i; - - pthread_mutex_lock(&rw->lock); - - pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, - (void *) &rw->lock); - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - slot = ts_to_slot(now); - - i = rw->prv; - - if (slot < i) - slot += RXMQ_SLOTS; - - while (i++ < slot) { - list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) { - struct rxm * r; - struct frct_cr * snd_cr; - struct frct_cr * rcv_cr; - size_t rslot; - ssize_t idx; - struct shm_du_buff * sdb; - uint8_t * head; - struct flow * f; - int fd; - uint32_t snd_lwe; - uint32_t rcv_lwe; - time_t rto; - - r = list_entry(p, struct rxm, next); - - list_del(&r->next); - - snd_cr = &r->frcti->snd_cr; - rcv_cr = &r->frcti->rcv_cr; - fd = r->frcti->fd; - f = &ai.flows[fd]; - - shm_du_buff_ack(r->sdb); - - pthread_rwlock_wrlock(&r->frcti->lock); - - snd_lwe = snd_cr->lwe; - rcv_lwe = rcv_cr->lwe; - rto = r->frcti->rto; - /* Assume last RTX is the one that's ACK'd. */ - if (r->frcti->probe - && (r->frcti->rttseq + 1) == r->seqno) - r->frcti->t_probe = now; - - pthread_rwlock_unlock(&r->frcti->lock); - - /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_lwe) < 0) { - ipcp_sdb_release(r->sdb); - free(r); - continue; - } - - /* Check for r-timer expiry. */ - if (ts_to_ns(now) - r->t0 > r->frcti->r) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - continue; - } - - /* Copy the payload, safe rtx in other layers. */ - if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - ipcp_sdb_release(r->sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - continue; - } - - idx = shm_du_buff_get_idx(sdb); - - head = shm_du_buff_head(sdb); - memcpy(head, r->head, r->tail - r->head); - - ipcp_sdb_release(r->sdb); - - ((struct frct_pci *) head)->ackno = hton32(rcv_lwe); - - /* Retransmit the copy. */ - if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { - ipcp_sdb_release(sdb); - free(r); - shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); - shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); - continue; - } - - /* Reschedule. */ - shm_du_buff_wait_ack(sdb); - - shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); - - r->head = head; - r->tail = shm_du_buff_tail(sdb); - r->sdb = sdb; - - /* Schedule at least in the next time slot */ - rslot = (slot + MAX((rto >> RXMQ_R), 1)) - & (RXMQ_SLOTS - 1); - - list_add_tail(&r->next, &rw->wheel[rslot]); - } - } - - rw->prv = slot & (RXMQ_SLOTS - 1); - - pthread_cleanup_pop(true); -} - -static int rxmwheel_add(struct rxmwheel * rw, - struct frcti * frcti, - uint32_t seqno, - struct shm_du_buff * sdb) -{ - struct timespec now; - struct rxm * r; - size_t slot; - - r = malloc(sizeof(*r)); - if (r == NULL) - return -ENOMEM; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - r->t0 = ts_to_ns(now); - r->mul = 0; - r->seqno = seqno; - r->sdb = sdb; - r->head = shm_du_buff_head(sdb); - r->tail = shm_du_buff_tail(sdb); - r->frcti = frcti; - - pthread_rwlock_rdlock(&r->frcti->lock); - - slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); - - pthread_rwlock_unlock(&r->frcti->lock); - - pthread_mutex_lock(&rw->lock); - - list_add_tail(&r->next, &rw->wheel[slot]); - - shm_du_buff_wait_ack(sdb); - - pthread_mutex_unlock(&rw->lock); - - return 0; -} diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c new file mode 100644 index 00000000..33fcbc1c --- /dev/null +++ b/src/lib/timerwheel.c @@ -0,0 +1,409 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Timerwheel + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include + +#define RXMQ_SLOTS (1 << 8) /* #slots / level. */ +#define RXMQ_LVLS 3 /* #levels, bump for DTN. */ +#define RXMQ_BUMP 4 /* factor to bump lvl. */ +#define RXMQ_RES 20 /* res (ns) of lowest lvl. */ + +#define ACKQ_SLOTS (1 << 7) /* #slots for delayed ACK. */ +#define ACKQ_RES 20 /* resolution for dACK. */ + +/* Overflow limits range to about 6 hours. */ +#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) +#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES) +#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES) + +struct rxm { + struct list_head next; + uint32_t seqno; + struct shm_du_buff * sdb; + uint8_t * head; + uint8_t * tail; + time_t t0; /* Time when original was sent (us). */ + size_t mul; /* RTO multiplier. */ + struct frcti * frcti; + int fd; + int flow_id; /* Prevent rtx when fd reused. */ +}; + +struct ack { + struct list_head next; + struct frcti * frcti; + int fd; + int flow_id; +}; + +struct { + /* + * At a 1 ms min resolution, every level bumps the + * resolution by a factor of 16. + */ + struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS]; + + struct list_head acks[ACKQ_SLOTS]; + bool map[ACKQ_SLOTS][PROG_MAX_FLOWS]; + + size_t prv_rxm; /* Last processed rxm slot at lvl 0. */ + size_t prv_ack; /* Last processed ack slot. */ + pthread_mutex_t lock; +} rw; + +static void timerwheel_fini(void) +{ + size_t i; + size_t j; + struct list_head * p; + struct list_head * h; + + pthread_mutex_lock(&rw.lock); + + for (i = 0; i < RXMQ_LVLS; ++i) { + for (j = 0; j < RXMQ_SLOTS; j++) { + list_for_each_safe(p, h, &rw.rxms[i][j]) { + struct rxm * rxm; + rxm = list_entry(p, struct rxm, next); + list_del(&rxm->next); + shm_du_buff_ack(rxm->sdb); + ipcp_sdb_release(rxm->sdb); + free(rxm); + } + } + } + + for (i = 0; i < ACKQ_SLOTS; ++i) { + list_for_each_safe(p, h, &rw.acks[i]) { + struct ack * a = list_entry(p, struct ack, next); + list_del(&a->next); + free(a); + } + } + + pthread_mutex_unlock(&rw.lock); + + pthread_mutex_destroy(&rw.lock); +} + +static int timerwheel_init(void) +{ + struct timespec now; + size_t i; + size_t j; + + if (pthread_mutex_init(&rw.lock, NULL)) + return -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1); + for (i = 0; i < RXMQ_LVLS; ++i) { + for (j = 0; j < RXMQ_SLOTS; ++j) + list_head_init(&rw.rxms[i][j]); + } + + rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1); + for (i = 0; i < ACKQ_SLOTS; ++i) + list_head_init(&rw.acks[i]); + + return 0; +} + +static void timerwheel_move(void) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + size_t rxm_slot; + size_t ack_slot; + size_t i; + size_t j; + + pthread_mutex_lock(&rw.lock); + + pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, + (void *) &rw.lock); + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + rxm_slot = ts_to_ns(now) >> RXMQ_RES; + j = rw.prv_rxm; + rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1); + + for (i = 0; i < RXMQ_LVLS; ++i) { + size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1); + if (j_max_slot < j) + j_max_slot += RXMQ_SLOTS; + + while (j++ < j_max_slot) { + list_for_each_safe(p, + h, + &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) { + struct rxm * r; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + size_t rslot; + ssize_t idx; + struct shm_du_buff * sdb; + uint8_t * head; + struct flow * f; + uint32_t snd_lwe; + uint32_t rcv_lwe; + time_t rto; + + r = list_entry(p, struct rxm, next); + + list_del(&r->next); + + snd_cr = &r->frcti->snd_cr; + rcv_cr = &r->frcti->rcv_cr; + f = &ai.flows[r->fd]; + + shm_du_buff_ack(r->sdb); + + if (f->frcti == NULL + || f->flow_id != r->flow_id) { + ipcp_sdb_release(r->sdb); + free(r); + continue; + } + + pthread_rwlock_wrlock(&r->frcti->lock); + + snd_lwe = snd_cr->lwe; + rcv_lwe = rcv_cr->lwe; + rto = r->frcti->rto; + + pthread_rwlock_unlock(&r->frcti->lock); + + /* Has been ack'd, remove. */ + if ((int) (r->seqno - snd_lwe) < 0) { + ipcp_sdb_release(r->sdb); + free(r); + continue; + } + + /* Check for r-timer expiry. */ + if (ts_to_ns(now) - r->t0 > r->frcti->r) { + ipcp_sdb_release(r->sdb); + free(r); + shm_rbuff_set_acl(f->rx_rb, + ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, + ACL_FLOWDOWN); + continue; + } + + if (r->frcti->probe + && (r->frcti->rttseq + 1) == r->seqno) + r->frcti->probe = false; + + /* Copy the data, safe rtx in other layers. */ + if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { + ipcp_sdb_release(r->sdb); + free(r); + shm_rbuff_set_acl(f->rx_rb, + ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, + ACL_FLOWDOWN); + continue; + } + + idx = shm_du_buff_get_idx(sdb); + + head = shm_du_buff_head(sdb); + memcpy(head, r->head, r->tail - r->head); + + ipcp_sdb_release(r->sdb); + + ((struct frct_pci *) head)->ackno = + hton32(rcv_lwe); + + /* Retransmit the copy. */ + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { + ipcp_sdb_release(sdb); + free(r); + shm_rbuff_set_acl(f->rx_rb, + ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, + ACL_FLOWDOWN); + continue; + } + + /* Reschedule. */ + shm_du_buff_wait_ack(sdb); + + shm_flow_set_notify(f->set, + f->flow_id, + FLOW_PKT); + + r->head = head; + r->tail = shm_du_buff_tail(sdb); + r->sdb = sdb; + r->mul++; + + /* Schedule at least in the next time slot. */ + rslot = (rxm_slot + + MAX(((rto * r->mul) >> RXMQ_RES), 1)) + & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.rxms[i][rslot]); + } + } + /* Move up a level in the wheel. */ + rxm_slot >>= RXMQ_BUMP; + } + + ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ; + + j = rw.prv_ack; + + if (ack_slot < j) + ack_slot += ACKQ_SLOTS; + + while (j++ < ack_slot) { + list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) { + struct ack * a; + struct flow * f; + + a = list_entry(p, struct ack, next); + + list_del(&a->next); + + f = &ai.flows[a->fd]; + + rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false; + + if (f->flow_id == a->flow_id && f->frcti != NULL) + frct_send_ack(a->frcti); + + free(a); + + } + } + + rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1); + + pthread_cleanup_pop(true); +} + +static int timerwheel_rxm(struct frcti * frcti, + uint32_t seqno, + struct shm_du_buff * sdb) +{ + struct timespec now; + struct rxm * r; + size_t slot; + size_t lvl = 0; + time_t rto_slot; + + r = malloc(sizeof(*r)); + if (r == NULL) + return -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + r->t0 = ts_to_ns(now); + r->mul = 0; + r->seqno = seqno; + r->sdb = sdb; + r->head = shm_du_buff_head(sdb); + r->tail = shm_du_buff_tail(sdb); + r->frcti = frcti; + + pthread_rwlock_rdlock(&r->frcti->lock); + + rto_slot = frcti->rto >> RXMQ_RES; + slot = r->t0 >> RXMQ_RES; + + r->fd = frcti->fd; + r->flow_id = ai.flows[r->fd].flow_id; + + pthread_rwlock_unlock(&r->frcti->lock); + + while (rto_slot >= RXMQ_SLOTS) { + ++lvl; + rto_slot >>= RXMQ_BUMP; + slot >>= RXMQ_BUMP; + } + + if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */ + free(r); + return -EPERM; + } + + slot = (slot + rto_slot) & (RXMQ_SLOTS - 1); + + pthread_mutex_lock(&rw.lock); + + list_add_tail(&r->next, &rw.rxms[lvl][slot]); + + shm_du_buff_wait_ack(sdb); + + pthread_mutex_unlock(&rw.lock); + + return 0; +} + +static int timerwheel_ack(int fd, + struct frcti * frcti) +{ + struct timespec now; + struct ack * a; + size_t slot; + + a = malloc(sizeof(*a)); + if (a == NULL) + return -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + slot = DELT_ACK >> ACKQ_RES; + if (slot >= ACKQ_SLOTS) { /* Out of timerwheel range. */ + free(a); + return -EPERM; + } + + slot = (((ts_to_ns(now) + DELT_ACK) >> ACKQ_RES) + 1) + & (ACKQ_SLOTS - 1); + + a->fd = fd; + a->frcti = frcti; + a->flow_id = ai.flows[fd].flow_id; + + pthread_mutex_lock(&rw.lock); + + if (rw.map[slot][fd]) { + pthread_mutex_unlock(&rw.lock); + free(a); + return 0; + } + + rw.map[slot][fd] = true; + + list_add_tail(&a->next, &rw.acks[slot]); + + pthread_mutex_unlock(&rw.lock); + + return 0; +} -- cgit v1.2.3