summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c70
1 files changed, 57 insertions, 13 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 47fec48d..14ee31f4 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -39,6 +39,7 @@
#include <ouroboros/qoscube.h>
#include <ouroboros/timerwheel.h>
#include <ouroboros/frct_pci.h>
+#include <ouroboros/rq.h>
#include <stdlib.h>
#include <string.h>
@@ -50,7 +51,8 @@
#define TW_ELEMENTS 6000
#define TW_RESOLUTION 1 /* ms */
-#define MPL 2000 /* ms */
+#define MPL 2000 /* ms */
+#define RQ_SIZE 20
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
@@ -89,6 +91,8 @@ struct frcti {
uint16_t conf_flags;
+ struct rq * rq;
+
pthread_rwlock_t lock;
};
@@ -269,6 +273,12 @@ static int frcti_init(int fd)
frcti->rcv_lwe = 0;
frcti->rcv_rwe = 0;
+ frcti->conf_flags = 0;
+
+ frcti->rq = rq_create(RQ_SIZE);
+ if (frcti->rq == NULL)
+ return -1;
+
return 0;
}
@@ -285,6 +295,8 @@ static void frcti_fini(int fd)
*/
frcti_clear(fd);
+
+ rq_destroy(ai.frcti[fd].rq);
}
static int frcti_send(int fd,
@@ -382,6 +394,25 @@ static ssize_t frcti_read(int fd)
struct frcti * frcti;
struct frct_pci pci;
struct shm_du_buff * sdb;
+ uint64_t seqno;
+ bool nxt_pdu = true;
+
+ frcti = &(ai.frcti[fd]);
+
+ /* See if we already have the next PDU */
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (!rq_is_empty(frcti->rq)) {
+ seqno = rq_peek(frcti->rq);
+ if (seqno == frcti->rcv_lwe) {
+ frcti->rcv_lwe++;
+ idx = rq_pop(frcti->rq);
+ pthread_rwlock_unlock(&frcti->lock);
+ return idx;
+ }
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
do {
struct timespec now;
@@ -390,7 +421,7 @@ static ssize_t frcti_read(int fd)
struct shm_rbuff * rb;
bool noblock;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
pthread_rwlock_rdlock(&ai.lock);
@@ -404,18 +435,16 @@ static ssize_t frcti_read(int fd)
pthread_rwlock_unlock(&ai.lock);
- if (noblock)
+ if (noblock) {
idx = shm_rbuff_read(rb);
- else
+ } else {
idx = shm_rbuff_read_b(rb, abstime);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ }
if (idx < 0)
return idx;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- frcti = &(ai.frcti[fd]);
-
sdb = shm_rdrbuff_get(ai.rdrb, idx);
pthread_rwlock_wrlock(&frcti->lock);
@@ -432,10 +461,11 @@ static ssize_t frcti_read(int fd)
ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
frcti->rcv_drf = true;
- /* We don't accept packets when there is receiver inactivity. */
+ /* When there is receiver inactivity queue the packet. */
if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
+ if (rq_push(frcti->rq, pci.seqno, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&frcti->lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
return -EAGAIN;
}
@@ -451,12 +481,26 @@ static ssize_t frcti_read(int fd)
frcti->last_rcv = now;
- pthread_rwlock_unlock(&frcti->lock);
+ nxt_pdu = true;
- if (!(pci.type & PDU_TYPE_DATA))
+ if (!(pci.type & PDU_TYPE_DATA)) {
shm_rdrbuff_remove(ai.rdrb, idx);
+ nxt_pdu = false;
+ }
+
+ if (frcti->conf_flags & FRCTFORDERING) {
+ if (pci.seqno != frcti->rcv_lwe) {
+ if (rq_push(frcti->rq, pci.seqno, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ nxt_pdu = false;
+ } else {
+ frcti->rcv_lwe++;
+ }
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
- } while (!(pci.type & PDU_TYPE_DATA));
+ } while (!nxt_pdu);
return idx;
}