diff options
author | Sander Vrijders <sander.vrijders@ugent.be> | 2017-08-23 15:26:18 +0000 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-08-23 15:26:18 +0000 |
commit | 1ca26ab86712575a5e89dcd68295b57fd27c5703 (patch) | |
tree | f7d3378a9698e7b92cabe631f44108d42b4668bc | |
parent | 4be31447a73a739e4fb44a1629d2adcb6c2b0f21 (diff) | |
parent | b6a04b551d64531452089b869f9fa56f7e545e4d (diff) | |
download | ouroboros-1ca26ab86712575a5e89dcd68295b57fd27c5703.tar.gz ouroboros-1ca26ab86712575a5e89dcd68295b57fd27c5703.zip |
Merged in sandervrijders/ouroboros/be-inactivity (pull request #567)
lib: Make sender and receiver inactivity simple checks
-rw-r--r-- | src/lib/dev.c | 221 |
1 files changed, 68 insertions, 153 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index b6c6087f..43543af3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -51,9 +51,12 @@ #define MPL 2000 /* ms */ +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + struct flow_set { - size_t idx; - bool np1_set; + size_t idx; }; struct fqueue { @@ -71,19 +74,19 @@ enum port_state { }; struct frcti { - bool used; + bool used; - struct tw_f * snd_inact; - bool snd_drf; - uint64_t snd_lwe; - uint64_t snd_rwe; + struct timespec last_snd; + bool snd_drf; + uint64_t snd_lwe; + uint64_t snd_rwe; - struct tw_f * rcv_inact; - bool rcv_drf; - uint64_t rcv_lwe; - uint64_t rcv_rwe; + struct timespec last_rcv; + bool rcv_drf; + uint64_t rcv_lwe; + uint64_t rcv_rwe; - uint8_t conf_flags; + uint8_t conf_flags; }; struct port { @@ -268,27 +271,15 @@ static int frcti_init(int fd) static void frcti_clear(int fd) { - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - frcti->used = false; - frcti->snd_inact = NULL; - frcti->rcv_inact = NULL; + ai.frcti[fd].used = false; } static void frcti_fini(int fd) { - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - /* FIXME: We actually need to wait until these timers become NULL. */ - if (frcti->snd_inact != NULL) - timerwheel_stop(ai.tw, frcti->snd_inact); - - if (frcti->rcv_inact != NULL) - timerwheel_stop(ai.tw, frcti->rcv_inact); + /* + * FIXME: In case of reliable transmission we should + * make sure everything is acked. + */ frcti_clear(fd); } @@ -304,78 +295,49 @@ static int frcti_configure(int fd, return 0; } -static void frcti_snd_inactivity(void * arg) -{ - struct frcti * frcti; - - pthread_rwlock_wrlock(&ai.lock); - - frcti = (struct frcti * ) arg; - - frcti->snd_drf = true; - frcti->snd_inact = NULL; - - pthread_rwlock_unlock(&ai.lock); -} - -/* Called under flows lock */ static int frcti_write(int fd, struct shm_du_buff * sdb) { struct frcti * frcti; struct frct_pci pci; + struct timespec now = {0, 0}; memset(&pci, 0, sizeof(pci)); frcti = &(ai.frcti[fd]); - pthread_rwlock_unlock(&ai.lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - timerwheel_move(ai.tw); + pthread_rwlock_wrlock(&ai.lock); - pthread_rwlock_rdlock(&ai.lock); + /* Check if sender inactivity is true. */ + if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL) + frcti->snd_drf = true; - /* - * Set the DRF in the first packet of a new run of SDUs, - * otherwise simply recharge the timer. - */ + /* Set the DRF in the first packet of a new run of SDUs. */ if (frcti->snd_drf) { - frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, - frcti, 2 * MPL); - if (frcti->snd_inact == NULL) - return -1; - pci.flags |= FLAG_DATA_RUN; frcti->snd_drf = false; - } else { - if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) - return -1; } + frcti->last_snd = now; + pci.seqno = frcti->snd_lwe++; pci.type |= PDU_TYPE_DATA; - if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) + if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) { + pthread_rwlock_unlock(&ai.lock); return -1; + } - if (finalize_write(fd, shm_du_buff_get_idx(sdb))) + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; - - return 0; -} - -static void frcti_rcv_inactivity(void * arg) -{ - struct frcti * frcti; - - pthread_rwlock_wrlock(&ai.lock); - - frcti = (struct frcti * ) arg; - - frcti->rcv_drf = true; - frcti->rcv_inact = NULL; + } pthread_rwlock_unlock(&ai.lock); + + return 0; } static ssize_t frcti_read(int fd) @@ -385,8 +347,7 @@ static ssize_t frcti_read(int fd) struct frcti * frcti; struct frct_pci pci; struct shm_du_buff * sdb; - - timerwheel_move(ai.tw); + struct timespec now = {0, 0}; pthread_rwlock_rdlock(&ai.lock); @@ -412,6 +373,8 @@ static ssize_t frcti_read(int fd) if (idx < 0) return idx; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + pthread_rwlock_rdlock(&ai.lock); frcti = &(ai.frcti[fd]); @@ -425,7 +388,11 @@ static ssize_t frcti_read(int fd) return -EAGAIN; } - /* We don't accept packets when there is no inactivity timer. */ + /* Check if receiver inactivity is true. */ + if (!frcti->rcv_drf && ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) + frcti->rcv_drf = true; + + /* We don't accept packets when there is receiver inactivity. */ if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); @@ -433,68 +400,22 @@ static ssize_t frcti_read(int fd) } /* - * If there is an inactivity timer and the DRF is set, + * If there is no receiver inactivity and the DRF is set, * reset the state of the connection. */ - if (pci.flags & FLAG_DATA_RUN) { - frcti->rcv_drf = true; - if (frcti->rcv_inact != NULL) - timerwheel_stop(ai.tw, frcti->rcv_inact); + if (pci.flags & FLAG_DATA_RUN) frcti->rcv_lwe = pci.seqno; - } - - /* - * Start receiver inactivity if this packet has the DRF, - * otherwise simply restart it. - */ - if (frcti->rcv_drf) { - frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, - frcti, 3 * MPL); - if (frcti->rcv_inact == NULL) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } + if (frcti->rcv_drf) frcti->rcv_drf = false; - } else { - if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } - } + + frcti->last_rcv = now; pthread_rwlock_unlock(&ai.lock); return idx; } -static int frcti_event_wait(struct flow_set * set, - struct fqueue * fq, - const struct timespec * timeout) -{ - int ret; - - assert(set); - assert(fq); - - timerwheel_move(ai.tw); - - /* - * FIXME: Return the fq only if a data SDU - * for the application is available. - */ - - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); - if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; - } - - return ret; -} - static void flow_clear(int fd) { assert(!(fd < 0)); @@ -1073,16 +994,17 @@ ssize_t flow_write(int fd, shm_rdrbuff_remove(ai.rdrb, idx); return -ENOTALLOC; } + + pthread_rwlock_unlock(&ai.lock); } else { + pthread_rwlock_unlock(&ai.lock); + if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { - pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -1; } } - pthread_rwlock_unlock(&ai.lock); - return 0; } @@ -1090,9 +1012,10 @@ ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx = -1; - ssize_t n; + ssize_t idx = -1; + ssize_t n; uint8_t * sdu; + bool used; if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; @@ -1104,9 +1027,11 @@ ssize_t flow_read(int fd, return -ENOTALLOC; } + used = ai.frcti[fd].used; + pthread_rwlock_unlock(&ai.lock); - if (!ai.frcti[fd].used) + if (!used) idx = shm_rbuff_read(ai.flows[fd].rx_rb); else idx = frcti_read(fd); @@ -1146,8 +1071,6 @@ struct flow_set * flow_set_create() return NULL; } - set->np1_set = false; - pthread_rwlock_unlock(&ai.lock); return set; @@ -1208,7 +1131,7 @@ int flow_set_add(struct flow_set * set, if (set == NULL) return -EINVAL; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); @@ -1216,9 +1139,6 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); - if (ai.frcti[fd].used) - set->np1_set = true; - pthread_rwlock_unlock(&ai.lock); return ret; @@ -1230,7 +1150,7 @@ void flow_set_del(struct flow_set * set, if (set == NULL) return; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id >= 0) shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); @@ -1306,12 +1226,7 @@ int flow_event_wait(struct flow_set * set, t = &abstime; } - if (set->np1_set) - ret = frcti_event_wait(set, fq, t); - else - ret = shm_flow_set_wait(ai.fqset, set->idx, - fq->fqueue, t); - + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); if (ret == -ETIMEDOUT) { fq->fqsize = 0; return -ETIMEDOUT; @@ -1524,15 +1439,15 @@ int ipcp_flow_write(int fd, pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } + + pthread_rwlock_unlock(&ai.lock); } else { - if (frcti_write(fd, sdb)) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&ai.lock); + + if (frcti_write(fd, sdb)) return -1; - } } - pthread_rwlock_unlock(&ai.lock); - return 0; } |