summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c72
1 files changed, 33 insertions, 39 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 3c180128..2322a039 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -68,7 +68,7 @@ enum frct_flags {
FRCT_DRF = 0x02, /* Data run flag */
FRCT_ACK = 0x04, /* ACK field valid */
FRCT_FC = 0x08, /* FC window valid */
- FRCT_RDVZ = 0x10, /* Rendez-vous */
+ FRCT_RDVS = 0x10, /* Rendez-vous */
FRCT_FFGM = 0x20, /* First Fragment */
FRCT_MFGM = 0x40, /* More fragments */
};
@@ -181,8 +181,8 @@ static uint16_t frcti_getconf(struct frcti * frcti)
#define frcti_snd(frcti, sdb) \
(frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
-#define frcti_rcv(frcti, sdb) \
- (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+#define frcti_rcv(frcti, sdb) \
+ (frcti == NULL ? idx : __frcti_rcv(frcti, sdb))
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -195,6 +195,7 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
pthread_rwlock_wrlock(&frcti->lock);
pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+
idx = frcti->rq[pos];
if (idx != -1) {
++frcti->rcv_cr.lwe;
@@ -319,9 +320,9 @@ static void rtt_estimator(struct frcti * frcti,
frcti->rto = MAX(RTO_MIN, srtt + (rttvar >> 2));
}
-/* Returns 0 when idx contains a packet for the application. */
-static int __frcti_rcv(struct frcti * frcti,
- struct shm_du_buff * sdb)
+/* Always queues the packet on the RQ for the application. */
+static ssize_t __frcti_rcv(struct frcti * frcti,
+ struct shm_du_buff * sdb)
{
ssize_t idx;
struct frct_pci * pci;
@@ -329,39 +330,48 @@ static int __frcti_rcv(struct frcti * frcti,
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
- int ret = 0;
assert(frcti);
rcv_cr = &frcti->rcv_cr;
snd_cr = &frcti->snd_cr;
- pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
-
clock_gettime(CLOCK_REALTIME, &now);
- pthread_rwlock_wrlock(&frcti->lock);
+ pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
idx = shm_du_buff_get_idx(sdb);
-
seqno = ntoh32(pci->seqno);
- /* Check if receiver inactivity is true. */
+ pthread_rwlock_wrlock(&frcti->lock);
+
if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
- /* Inactive receiver, check for DRF. */
if (pci->flags & FRCT_DRF) /* New run. */
rcv_cr->lwe = seqno;
else
goto drop_packet;
}
- if (seqno == rcv_cr->lwe) {
- ++rcv_cr->lwe;
- } else { /* Out of order. */
- if (before(seqno, rcv_cr->lwe) )
- goto drop_packet;
+ if (before(seqno, rcv_cr->lwe))
+ goto drop_packet;
+
+ if (rcv_cr->cflags & FRCTFRTX) {
+ if (pci->flags & FRCT_ACK) {
+ uint32_t ackno = ntoh32(pci->ackno);
+ /* Check for duplicate (old) acks. */
+ if (after(ackno, snd_cr->lwe))
+ snd_cr->lwe = ackno;
+
+ if (frcti->probe && after(ackno, frcti->rttseq)) {
+ rtt_estimator(frcti, ts_diff_us(&frcti->t_probe,
+ &now));
+ frcti->probe = false;
+ }
+ }
- if (rcv_cr->cflags & FRCTFRTX) {
+ if (seqno == rcv_cr->lwe) {
+ ++frcti->rcv_cr.lwe;
+ } else {
size_t pos = seqno & (RQ_SIZE - 1);
if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
goto drop_packet; /* Out of rq. */
@@ -369,37 +379,21 @@ static int __frcti_rcv(struct frcti * frcti,
if (frcti->rq[pos] != -1)
goto drop_packet; /* Duplicate in rq */
- /* Queue. */
frcti->rq[pos] = idx;
- ret = -EAGAIN;
- } else {
- rcv_cr->lwe = seqno + 1;
- }
- }
-
- if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) {
- uint32_t ackno = ntoh32(pci->ackno);
- /* Check for duplicate (old) acks. */
- if ((int32_t)(ackno - snd_cr->lwe) > 0)
- snd_cr->lwe = ackno;
-
- if (frcti->probe && after(ackno, frcti->rttseq)) {
- rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, &now));
- frcti->probe = false;
+ idx = -EAGAIN;
}
+ } else {
+ rcv_cr->lwe = seqno + 1;
}
rcv_cr->act = now.tv_sec;
pthread_rwlock_unlock(&frcti->lock);
- if (ret == 0 && !(pci->flags & FRCT_DATA))
- shm_rdrbuff_remove(ai.rdrb, idx);
-
if (frcti->rw != NULL)
rxmwheel_move(frcti->rw);
- return ret;
+ return idx;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);