summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2017-08-23 15:26:18 +0000
committerdimitri staessens <dimitri.staessens@ugent.be>2017-08-23 15:26:18 +0000
commit1ca26ab86712575a5e89dcd68295b57fd27c5703 (patch)
treef7d3378a9698e7b92cabe631f44108d42b4668bc
parent4be31447a73a739e4fb44a1629d2adcb6c2b0f21 (diff)
parentb6a04b551d64531452089b869f9fa56f7e545e4d (diff)
downloadouroboros-1ca26ab86712575a5e89dcd68295b57fd27c5703.tar.gz
ouroboros-1ca26ab86712575a5e89dcd68295b57fd27c5703.zip
Merged in sandervrijders/ouroboros/be-inactivity (pull request #567)
lib: Make sender and receiver inactivity simple checks
-rw-r--r--src/lib/dev.c221
1 files changed, 68 insertions, 153 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index b6c6087f..43543af3 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -51,9 +51,12 @@
#define MPL 2000 /* ms */
+#ifndef CLOCK_REALTIME_COARSE
+#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
+#endif
+
struct flow_set {
- size_t idx;
- bool np1_set;
+ size_t idx;
};
struct fqueue {
@@ -71,19 +74,19 @@ enum port_state {
};
struct frcti {
- bool used;
+ bool used;
- struct tw_f * snd_inact;
- bool snd_drf;
- uint64_t snd_lwe;
- uint64_t snd_rwe;
+ struct timespec last_snd;
+ bool snd_drf;
+ uint64_t snd_lwe;
+ uint64_t snd_rwe;
- struct tw_f * rcv_inact;
- bool rcv_drf;
- uint64_t rcv_lwe;
- uint64_t rcv_rwe;
+ struct timespec last_rcv;
+ bool rcv_drf;
+ uint64_t rcv_lwe;
+ uint64_t rcv_rwe;
- uint8_t conf_flags;
+ uint8_t conf_flags;
};
struct port {
@@ -268,27 +271,15 @@ static int frcti_init(int fd)
static void frcti_clear(int fd)
{
- struct frcti * frcti;
-
- frcti = &(ai.frcti[fd]);
-
- frcti->used = false;
- frcti->snd_inact = NULL;
- frcti->rcv_inact = NULL;
+ ai.frcti[fd].used = false;
}
static void frcti_fini(int fd)
{
- struct frcti * frcti;
-
- frcti = &(ai.frcti[fd]);
-
- /* FIXME: We actually need to wait until these timers become NULL. */
- if (frcti->snd_inact != NULL)
- timerwheel_stop(ai.tw, frcti->snd_inact);
-
- if (frcti->rcv_inact != NULL)
- timerwheel_stop(ai.tw, frcti->rcv_inact);
+ /*
+ * FIXME: In case of reliable transmission we should
+ * make sure everything is acked.
+ */
frcti_clear(fd);
}
@@ -304,78 +295,49 @@ static int frcti_configure(int fd,
return 0;
}
-static void frcti_snd_inactivity(void * arg)
-{
- struct frcti * frcti;
-
- pthread_rwlock_wrlock(&ai.lock);
-
- frcti = (struct frcti * ) arg;
-
- frcti->snd_drf = true;
- frcti->snd_inact = NULL;
-
- pthread_rwlock_unlock(&ai.lock);
-}
-
-/* Called under flows lock */
static int frcti_write(int fd,
struct shm_du_buff * sdb)
{
struct frcti * frcti;
struct frct_pci pci;
+ struct timespec now = {0, 0};
memset(&pci, 0, sizeof(pci));
frcti = &(ai.frcti[fd]);
- pthread_rwlock_unlock(&ai.lock);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
- timerwheel_move(ai.tw);
+ pthread_rwlock_wrlock(&ai.lock);
- pthread_rwlock_rdlock(&ai.lock);
+ /* Check if sender inactivity is true. */
+ if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL)
+ frcti->snd_drf = true;
- /*
- * Set the DRF in the first packet of a new run of SDUs,
- * otherwise simply recharge the timer.
- */
+ /* Set the DRF in the first packet of a new run of SDUs. */
if (frcti->snd_drf) {
- frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity,
- frcti, 2 * MPL);
- if (frcti->snd_inact == NULL)
- return -1;
-
pci.flags |= FLAG_DATA_RUN;
frcti->snd_drf = false;
- } else {
- if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL))
- return -1;
}
+ frcti->last_snd = now;
+
pci.seqno = frcti->snd_lwe++;
pci.type |= PDU_TYPE_DATA;
- if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK))
+ if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) {
+ pthread_rwlock_unlock(&ai.lock);
return -1;
+ }
- if (finalize_write(fd, shm_du_buff_get_idx(sdb)))
+ if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
-
- return 0;
-}
-
-static void frcti_rcv_inactivity(void * arg)
-{
- struct frcti * frcti;
-
- pthread_rwlock_wrlock(&ai.lock);
-
- frcti = (struct frcti * ) arg;
-
- frcti->rcv_drf = true;
- frcti->rcv_inact = NULL;
+ }
pthread_rwlock_unlock(&ai.lock);
+
+ return 0;
}
static ssize_t frcti_read(int fd)
@@ -385,8 +347,7 @@ static ssize_t frcti_read(int fd)
struct frcti * frcti;
struct frct_pci pci;
struct shm_du_buff * sdb;
-
- timerwheel_move(ai.tw);
+ struct timespec now = {0, 0};
pthread_rwlock_rdlock(&ai.lock);
@@ -412,6 +373,8 @@ static ssize_t frcti_read(int fd)
if (idx < 0)
return idx;
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
pthread_rwlock_rdlock(&ai.lock);
frcti = &(ai.frcti[fd]);
@@ -425,7 +388,11 @@ static ssize_t frcti_read(int fd)
return -EAGAIN;
}
- /* We don't accept packets when there is no inactivity timer. */
+ /* Check if receiver inactivity is true. */
+ if (!frcti->rcv_drf && ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
+ frcti->rcv_drf = true;
+
+ /* We don't accept packets when there is receiver inactivity. */
if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
@@ -433,68 +400,22 @@ static ssize_t frcti_read(int fd)
}
/*
- * If there is an inactivity timer and the DRF is set,
+ * If there is no receiver inactivity and the DRF is set,
* reset the state of the connection.
*/
- if (pci.flags & FLAG_DATA_RUN) {
- frcti->rcv_drf = true;
- if (frcti->rcv_inact != NULL)
- timerwheel_stop(ai.tw, frcti->rcv_inact);
+ if (pci.flags & FLAG_DATA_RUN)
frcti->rcv_lwe = pci.seqno;
- }
-
- /*
- * Start receiver inactivity if this packet has the DRF,
- * otherwise simply restart it.
- */
- if (frcti->rcv_drf) {
- frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity,
- frcti, 3 * MPL);
- if (frcti->rcv_inact == NULL) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
+ if (frcti->rcv_drf)
frcti->rcv_drf = false;
- } else {
- if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
- }
+
+ frcti->last_rcv = now;
pthread_rwlock_unlock(&ai.lock);
return idx;
}
-static int frcti_event_wait(struct flow_set * set,
- struct fqueue * fq,
- const struct timespec * timeout)
-{
- int ret;
-
- assert(set);
- assert(fq);
-
- timerwheel_move(ai.tw);
-
- /*
- * FIXME: Return the fq only if a data SDU
- * for the application is available.
- */
-
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout);
- if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
-
- return ret;
-}
-
static void flow_clear(int fd)
{
assert(!(fd < 0));
@@ -1073,16 +994,17 @@ ssize_t flow_write(int fd,
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOTALLOC;
}
+
+ pthread_rwlock_unlock(&ai.lock);
} else {
+ pthread_rwlock_unlock(&ai.lock);
+
if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) {
- pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -1;
}
}
- pthread_rwlock_unlock(&ai.lock);
-
return 0;
}
@@ -1090,9 +1012,10 @@ ssize_t flow_read(int fd,
void * buf,
size_t count)
{
- ssize_t idx = -1;
- ssize_t n;
+ ssize_t idx = -1;
+ ssize_t n;
uint8_t * sdu;
+ bool used;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
@@ -1104,9 +1027,11 @@ ssize_t flow_read(int fd,
return -ENOTALLOC;
}
+ used = ai.frcti[fd].used;
+
pthread_rwlock_unlock(&ai.lock);
- if (!ai.frcti[fd].used)
+ if (!used)
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
else
idx = frcti_read(fd);
@@ -1146,8 +1071,6 @@ struct flow_set * flow_set_create()
return NULL;
}
- set->np1_set = false;
-
pthread_rwlock_unlock(&ai.lock);
return set;
@@ -1208,7 +1131,7 @@ int flow_set_add(struct flow_set * set,
if (set == NULL)
return -EINVAL;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);
@@ -1216,9 +1139,6 @@ int flow_set_add(struct flow_set * set,
for (i = 0; i < sdus; i++)
shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
- if (ai.frcti[fd].used)
- set->np1_set = true;
-
pthread_rwlock_unlock(&ai.lock);
return ret;
@@ -1230,7 +1150,7 @@ void flow_set_del(struct flow_set * set,
if (set == NULL)
return;
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id >= 0)
shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);
@@ -1306,12 +1226,7 @@ int flow_event_wait(struct flow_set * set,
t = &abstime;
}
- if (set->np1_set)
- ret = frcti_event_wait(set, fq, t);
- else
- ret = shm_flow_set_wait(ai.fqset, set->idx,
- fq->fqueue, t);
-
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
if (ret == -ETIMEDOUT) {
fq->fqsize = 0;
return -ETIMEDOUT;
@@ -1524,15 +1439,15 @@ int ipcp_flow_write(int fd,
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
+
+ pthread_rwlock_unlock(&ai.lock);
} else {
- if (frcti_write(fd, sdb)) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_unlock(&ai.lock);
+
+ if (frcti_write(fd, sdb))
return -1;
- }
}
- pthread_rwlock_unlock(&ai.lock);
-
return 0;
}