/*
 * Ouroboros - Copyright (C) 2016 - 2020
 *
 * Flow and Retransmission Control
 *
 *    Dimitri Staessens <dimitri.staessens@ugent.be>
 *    Sander Vrijders   <sander.vrijders@ugent.be>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * version 2.1 as published by the Free Software Foundation.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., http://www.fsf.org/about/contact/.
 */

#define DELT_RDV      (100 * MILLION) /* ns */
#define MAX_RDV         (1 * BILLION) /* ns */

#define FRCT_PCILEN    (sizeof(struct frct_pci))

struct frct_cr {
        uint32_t        lwe;     /* Left window edge               */
        uint32_t        rwe;     /* Right window edge              */

        uint8_t         cflags;
        uint32_t        seqno;   /* SEQ to send, or last SEQ Ack'd */

        struct timespec act;     /* Last seen activity             */
        time_t          inact;   /* Inactivity (s)                 */
};

struct frcti {
        int               fd;

        time_t            mpl;
        time_t            a;
        time_t            r;
        time_t            rdv;

        time_t            srtt;        /* Smoothed rtt           */
        time_t            mdev;        /* Deviation              */
        time_t            rto;         /* Retransmission timeout */
        uint32_t          rttseq;
        struct timespec   t_probe;     /* Probe time             */
        bool              probe;       /* Probe active           */

        struct frct_cr    snd_cr;
        struct frct_cr    rcv_cr;

        ssize_t           rq[RQ_SIZE];
        pthread_rwlock_t  lock;

        bool              open;        /* Window open/closed     */
        struct timespec   t_wnd;       /* Window closed time     */
        struct timespec   t_rdvs;      /* Last rendez-vous sent  */
        pthread_cond_t    cond;
        pthread_mutex_t   mtx;
};

enum frct_flags {
        FRCT_DATA = 0x01, /* PDU carries data */
        FRCT_DRF  = 0x02, /* Data run flag    */
        FRCT_ACK  = 0x04, /* ACK field valid  */
        FRCT_FC   = 0x08, /* FC window valid  */
        FRCT_RDVS = 0x10, /* Rendez-vous      */
        FRCT_FFGM = 0x20, /* First Fragment   */
        FRCT_MFGM = 0x40, /* More fragments   */
};

struct frct_pci {
        uint8_t  flags;

        uint8_t  pad;    /* 24 bit window! */
        uint16_t window;

        uint32_t seqno;
        uint32_t ackno;
} __attribute__((packed));

static bool before(uint32_t seq1,
                   uint32_t seq2)
{
        return (int32_t)(seq1 - seq2) < 0;
}

static bool after(uint32_t seq1,
                  uint32_t seq2)
{
        return (int32_t)(seq2 - seq1) < 0;
}

static void __send_frct_pkt(int      fd,
                            uint8_t  flags,
                            uint32_t ackno,
                            uint32_t rwe)
{
        struct shm_du_buff * sdb;
        struct frct_pci *    pci;
        ssize_t              idx;
        struct flow *        f;

        /* Raw calls needed to bypass frcti. */
#ifdef RXM_BLOCKING
        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
#else
        idx = shm_rdrbuff_alloc(ai.rdrb, sizeof(*pci), NULL, &sdb);
#endif
        if (idx < 0)
                return;

        pci = (struct frct_pci *) shm_du_buff_head(sdb);
        memset(pci, 0, sizeof(*pci));

        *((uint32_t *) pci) = hton32(rwe);

        pci->flags = flags;
        pci->ackno = hton32(ackno);

        f = &ai.flows[fd];
#ifdef RXM_BLOCKING
        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
#else
        if (shm_rbuff_write(f->tx_rb, idx)) {
#endif
                ipcp_sdb_release(sdb);
                return;
        }

        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
}

static void send_frct_pkt(struct frcti * frcti)
{
        struct timespec      now;
        time_t               diff;
        uint32_t             ackno;
        uint32_t             rwe;
        int                  fd;

        assert(frcti);

        pthread_rwlock_rdlock(&frcti->lock);

        if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
                pthread_rwlock_unlock(&frcti->lock);
                return;
        }

        fd    = frcti->fd;
        ackno = frcti->rcv_cr.lwe;
        rwe   = frcti->rcv_cr.rwe;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        diff = ts_diff_ns(&frcti->rcv_cr.act, &now);

        pthread_rwlock_unlock(&frcti->lock);

        if (diff > frcti->a || diff < DELT_ACK)
                return;

        __send_frct_pkt(fd, FRCT_ACK | FRCT_FC, ackno, rwe);

        pthread_rwlock_wrlock(&frcti->lock);

        if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
                frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;

        pthread_rwlock_unlock(&frcti->lock);
}

static void __send_rdv(int fd)
{
        struct shm_du_buff * sdb;
        struct frct_pci *    pci;
        ssize_t              idx;
        struct flow *        f;

        /* Raw calls needed to bypass frcti. */
        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
        if (idx < 0)
                return;

        pci = (struct frct_pci *) shm_du_buff_head(sdb);
        memset(pci, 0, sizeof(*pci));

        pci->flags = FRCT_RDVS;

        f = &ai.flows[fd];

        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
                ipcp_sdb_release(sdb);
                return;
        }

        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
}

static struct frcti * frcti_create(int fd)
{
        struct frcti *     frcti;
        ssize_t            idx;
        struct timespec    now;
        time_t             mpl;
        time_t             a;
        time_t             r;
        pthread_condattr_t cattr;

        frcti = malloc(sizeof(*frcti));
        if (frcti == NULL)
                goto fail_malloc;

        memset(frcti, 0, sizeof(*frcti));

        if (pthread_rwlock_init(&frcti->lock, NULL))
                goto fail_lock;

        if (pthread_mutex_init(&frcti->mtx, NULL))
                goto fail_mutex;

        if (pthread_condattr_init(&cattr))
                goto fail_cattr;
#ifndef __APPLE__
        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
        if (pthread_cond_init(&frcti->cond, &cattr))
                goto fail_cond;

        for (idx = 0; idx < RQ_SIZE; ++idx)
                frcti->rq[idx] = -1;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        frcti->mpl = mpl = DELT_MPL;
        frcti->a   = a   = DELT_A;
        frcti->r   = r   = DELT_R;
        frcti->rdv = DELT_RDV;
        frcti->fd  = fd;


        frcti->rttseq = 0;
        frcti->probe  = false;

        frcti->srtt = 0;            /* Updated on first ACK */
        frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
        frcti->rto  = 20 * MILLION; /* Initial rxm will be after 20 ms */

        if (ai.flows[fd].qs.loss == 0) {
                frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
                frcti->rcv_cr.cflags |= FRCTFRTX;
        }

        frcti->snd_cr.cflags |= FRCTFRESCNTL;

        frcti->snd_cr.rwe = START_WINDOW;

        frcti->snd_cr.inact  = (3 * mpl + a + r) / BILLION + 1; /* s */
        frcti->snd_cr.act.tv_sec = now.tv_sec - (frcti->snd_cr.inact + 1);

        frcti->rcv_cr.inact = (2 * mpl + a + r)  / BILLION + 1; /* s */
        frcti->rcv_cr.act.tv_sec = now.tv_sec - (frcti->rcv_cr.inact + 1);

        return frcti;

 fail_cond:
        pthread_condattr_destroy(&cattr);
fail_cattr:
        pthread_mutex_destroy(&frcti->mtx);
 fail_mutex:
        pthread_rwlock_destroy(&frcti->lock);
 fail_lock:
        free(frcti);
 fail_malloc:
        return NULL;
}

static void frcti_destroy(struct frcti * frcti)
{
        pthread_cond_destroy(&frcti->cond);
        pthread_mutex_destroy(&frcti->mtx);
        pthread_rwlock_destroy(&frcti->lock);

        free(frcti);
}

static uint16_t frcti_getflags(struct frcti * frcti)
{
        uint16_t ret;

        assert(frcti);

        pthread_rwlock_rdlock(&frcti->lock);

        ret = frcti->snd_cr.cflags;

        pthread_rwlock_unlock(&frcti->lock);

        return ret;
}

static void frcti_setflags(struct frcti * frcti,
                           uint16_t       flags)
{
        flags |= FRCTFRTX; /* Should not be set by command */

        assert(frcti);

        pthread_rwlock_wrlock(&frcti->lock);

        frcti->snd_cr.cflags &= FRCTFRTX; /* Zero other flags */

        frcti->snd_cr.cflags &= flags;

        pthread_rwlock_unlock(&frcti->lock);
}

#define frcti_queued_pdu(frcti)                         \
        (frcti == NULL ? idx : __frcti_queued_pdu(frcti))

#define frcti_snd(frcti, sdb)                           \
        (frcti == NULL ? 0 : __frcti_snd(frcti, sdb))

#define frcti_rcv(frcti, sdb)                           \
        (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))

#define frcti_tick(frcti)                               \
        (frcti == NULL ? 0 : __frcti_tick())

#define frcti_dealloc(frcti)                            \
        (frcti == NULL ? 0 : __frcti_dealloc(frcti))

#define frcti_is_window_open(frcti)                     \
        (frcti == NULL ? true : __frcti_is_window_open(frcti))

#define frcti_window_wait(frcti, abstime)               \
        (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime))


static bool __frcti_is_window_open(struct frcti * frcti)
{
        struct frct_cr * snd_cr = &frcti->snd_cr;
        int ret                 = true;

        pthread_rwlock_rdlock(&frcti->lock);

        if (snd_cr->cflags & FRCTFRESCNTL)
                ret = before(snd_cr->seqno, snd_cr->rwe);

        if (!ret) {
                 struct timespec now;

                clock_gettime(PTHREAD_COND_CLOCK, &now);

                pthread_mutex_lock(&frcti->mtx);
                if (frcti->open) {
                        frcti->open   = false;
                        frcti->t_wnd  = now;
                        frcti->t_rdvs = now;
                } else {
                        time_t diff;
                        diff = ts_diff_ns(&frcti->t_wnd, &now);
                        if (diff > MAX_RDV) {
                                pthread_mutex_unlock(&frcti->mtx);
                                return false;
                        }

                        diff = ts_diff_ns(&frcti->t_rdvs, &now);
                        if  (diff > frcti->rdv) {
                                frcti->t_rdvs = now;
                                __send_rdv(frcti->fd);
                        }
                }

                pthread_mutex_unlock(&frcti->mtx);
        }

        pthread_rwlock_unlock(&frcti->lock);

        return ret;
}

static int __frcti_window_wait(struct frcti *    frcti,
                               struct timespec * abstime)
{
        struct frct_cr * snd_cr = &frcti->snd_cr;
        int ret                 = 0;

        pthread_rwlock_rdlock(&frcti->lock);

        if (!(snd_cr->cflags & FRCTFRESCNTL)) {
                pthread_rwlock_unlock(&frcti->lock);
                return 0;
        }

        while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
                struct timespec now;

                pthread_rwlock_unlock(&frcti->lock);
                pthread_mutex_lock(&frcti->mtx);

                if (frcti->open) {
                        clock_gettime(PTHREAD_COND_CLOCK, &now);

                        frcti->t_wnd  = now;
                        frcti->t_rdvs = now;
                        frcti->open   = false;
                }

                pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
                                     (void *) &frcti->mtx);

                ret = -pthread_cond_timedwait(&frcti->cond,
                                              &frcti->mtx,
                                              abstime);

                pthread_cleanup_pop(false);

                if (ret == -ETIMEDOUT) {
                        time_t diff;

                        clock_gettime(PTHREAD_COND_CLOCK, &now);

                        diff = ts_diff_ns(&frcti->t_wnd, &now);
                        if (diff > MAX_RDV) {
                                pthread_mutex_unlock(&frcti->mtx);
                                return -ECONNRESET; /* write fails! */
                        }

                        diff = ts_diff_ns(&frcti->t_rdvs, &now);
                        if  (diff > frcti->rdv) {
                                frcti->t_rdvs = now;
                                __send_rdv(frcti->fd);
                        }
                }

                pthread_mutex_unlock(&frcti->mtx);
                pthread_rwlock_rdlock(&frcti->lock);
        }

        pthread_rwlock_unlock(&frcti->lock);

        return ret;
}

static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
        ssize_t idx;
        size_t  pos;

        assert(frcti);

        /* See if we already have the next PDU. */
        pthread_rwlock_wrlock(&frcti->lock);

        pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);

        idx = frcti->rq[pos];
        if (idx != -1) {
                ++frcti->rcv_cr.lwe;
                ++frcti->rcv_cr.rwe;
                frcti->rq[pos] = -1;
        }

        pthread_rwlock_unlock(&frcti->lock);

        return idx;
}

static ssize_t __frcti_pdu_ready(struct frcti * frcti)
{
        ssize_t idx;
        size_t  pos;

        assert(frcti);

        /* See if we already have the next PDU. */
        pthread_rwlock_rdlock(&frcti->lock);

        pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
        idx = frcti->rq[pos];

        pthread_rwlock_unlock(&frcti->lock);

        return idx;
}

#include <timerwheel.c>

/*
 * Send a final ACK for everything that has not been ACK'd.
 * If the flow should be kept active for retransmission,
 * the returned time will be negative.
 */
static time_t __frcti_dealloc(struct frcti * frcti)
{
        struct timespec now;
        time_t          wait;
        int             ackno;
        int             fd = -1;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        pthread_rwlock_rdlock(&frcti->lock);

        ackno = frcti->rcv_cr.lwe;
        if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno)
                fd = frcti->fd;

        wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
                   frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec);

        if (frcti->snd_cr.cflags & FRCTFLINGER
            && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
                wait = -wait;

        pthread_rwlock_unlock(&frcti->lock);

        if (fd != -1)
                __send_frct_pkt(fd, FRCT_ACK, ackno, 0);

        return wait;
}

static int __frcti_snd(struct frcti *       frcti,
                       struct shm_du_buff * sdb)
{
        struct frct_pci * pci;
        struct timespec   now;
        struct frct_cr *  snd_cr;
        struct frct_cr *  rcv_cr;
        uint32_t          seqno;
        bool              rtx;

        assert(frcti);

        snd_cr = &frcti->snd_cr;
        rcv_cr = &frcti->rcv_cr;

        timerwheel_move();

        pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
        if (pci == NULL)
                return -ENOMEM;

        memset(pci, 0, sizeof(*pci));

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        pthread_rwlock_wrlock(&frcti->lock);

        rtx = snd_cr->cflags & FRCTFRTX;

        pci->flags |= FRCT_DATA;

        /* Set DRF if there are no unacknowledged packets. */
        if (snd_cr->seqno == snd_cr->lwe)
                pci->flags |= FRCT_DRF;

        /* Choose a new sequence number if sender inactivity expired. */
        if (now.tv_sec - snd_cr->act.tv_sec > snd_cr->inact) {
                /* There are no unacknowledged packets. */
                assert(snd_cr->seqno == snd_cr->lwe);
                random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
                snd_cr->lwe = snd_cr->seqno - 1;
                snd_cr->rwe = snd_cr->lwe + START_WINDOW;
        }

        seqno = snd_cr->seqno;
        pci->seqno = hton32(seqno);

        if (now.tv_sec - rcv_cr->act.tv_sec < rcv_cr->inact) {
                pci->flags |= FRCT_FC;
                *((uint32_t *) pci) |= hton32(rcv_cr->rwe & 0x00FFFFFF);
        }

        if (!rtx) {
                snd_cr->lwe++;
        } else {
                if (!frcti->probe) {
                        frcti->rttseq  = snd_cr->seqno;
                        frcti->t_probe = now;
                        frcti->probe   = true;
                }

                if (now.tv_sec - rcv_cr->act.tv_sec <= frcti->a) {
                        pci->flags |= FRCT_ACK;
                        pci->ackno = hton32(rcv_cr->lwe);
                        rcv_cr->seqno = rcv_cr->lwe;
                }
        }

        snd_cr->seqno++;
        snd_cr->act = now;

        pthread_rwlock_unlock(&frcti->lock);

        if (rtx)
                timerwheel_rxm(frcti, seqno, sdb);

        return 0;
}

static void rtt_estimator(struct frcti * frcti,
                          time_t         mrtt)
{
        time_t srtt     = frcti->srtt;
        time_t rttvar   = frcti->mdev;

        if (srtt == 0) { /* first measurement */
                srtt   = mrtt;
                rttvar = mrtt >> 1;
        } else {
                time_t delta = mrtt - srtt;
                srtt += (delta >> 3);
                rttvar += (ABS(delta) - rttvar) >> 2;
        }

        frcti->srtt     = MAX(1000U, srtt);
        frcti->mdev     = MAX(100U, rttvar);
        frcti->rto      = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1));
}

static void __frcti_tick(void)
{
        timerwheel_move();
}

/* Always queues the next application packet on the RQ. */
static void __frcti_rcv(struct frcti *       frcti,
                        struct shm_du_buff * sdb)
{
        ssize_t           idx;
        size_t            pos;
        struct frct_pci * pci;
        struct timespec   now;
        struct frct_cr *  rcv_cr;
        struct frct_cr *  snd_cr;
        uint32_t          seqno;
        uint32_t          ackno;
        uint32_t          rwe;
        int               fd = -1;

        assert(frcti);

        rcv_cr = &frcti->rcv_cr;
        snd_cr = &frcti->snd_cr;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);

        idx = shm_du_buff_get_idx(sdb);
        seqno = ntoh32(pci->seqno);
        pos = seqno & (RQ_SIZE - 1);

        pthread_rwlock_wrlock(&frcti->lock);

        if (now.tv_sec - rcv_cr->act.tv_sec > rcv_cr->inact) {
                if (pci->flags & FRCT_DRF)  { /* New run. */
                        rcv_cr->lwe = seqno;
                        rcv_cr->rwe = seqno + RQ_SIZE;
                } else {
                        goto drop_packet;
                }
        }

        /* For now, just send an immediate window update. */
        if (pci->flags & FRCT_RDVS) {
                fd = frcti->fd;
                rwe = rcv_cr->rwe;
                pthread_rwlock_unlock(&frcti->lock);

                __send_frct_pkt(fd, FRCT_FC, 0, rwe);

                shm_rdrbuff_remove(ai.rdrb, idx);
                return;
        }

        if (pci->flags & FRCT_ACK) {
                ackno = ntoh32(pci->ackno);
                if (after(ackno, frcti->snd_cr.lwe))
                        frcti->snd_cr.lwe = ackno;

                if (frcti->probe && after(ackno, frcti->rttseq)) {
                        rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, &now));
                        frcti->probe = false;
                }
        }

        if (pci->flags & FRCT_FC) {
                uint32_t rwe;

                rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
                rwe |= snd_cr->rwe & 0xFF000000;

                /* Rollover for 24 bit */
                if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF)
                        rwe += 0x01000000;

                snd_cr->rwe = rwe;

                pthread_mutex_lock(&frcti->mtx);
                if (!frcti->open) {
                        frcti->open = true;
                        pthread_cond_broadcast(&frcti->cond);
                }
                pthread_mutex_unlock(&frcti->mtx);
        }

        if (!(pci->flags & FRCT_DATA))
                goto drop_packet;

        if (before(seqno, rcv_cr->lwe)) {
                rcv_cr->seqno = seqno; /* Ensures we send a new ACK. */
                goto drop_packet;
        }

        if (rcv_cr->cflags & FRCTFRTX) {

                if (!before(seqno, rcv_cr->rwe)) /* Out of window. */
                        goto drop_packet;

                if (!before(seqno, rcv_cr->lwe + RQ_SIZE))
                        goto drop_packet; /* Out of rq. */

                if (frcti->rq[pos] != -1)
                        goto drop_packet; /* Duplicate in rq. */

                fd = frcti->fd;
        } else {
                rcv_cr->lwe = seqno;
        }

        frcti->rq[pos] = idx;

        rcv_cr->act = now;

        pthread_rwlock_unlock(&frcti->lock);

        if (fd != -1)
                timerwheel_ack(fd, frcti);

        return;

 drop_packet:
        pthread_rwlock_unlock(&frcti->lock);

        send_frct_pkt(frcti);

        shm_rdrbuff_remove(ai.rdrb, idx);
        return;
}

/* Filter fqueue events for non-data packets */
int frcti_filter(struct fqueue * fq)
{
        struct shm_du_buff * sdb;
        int                  fd;
        ssize_t              idx;
        struct frcti *       frcti;
        struct shm_rbuff *   rb;

        while (fq->next < fq->fqsize) {
                if (fq->fqueue[fq->next + 1] != FLOW_PKT)
                        return 1;

                pthread_rwlock_rdlock(&ai.lock);

                fd = ai.ports[fq->fqueue[fq->next]].fd;
                rb = ai.flows[fd].rx_rb;
                frcti = ai.flows[fd].frcti;

                if (frcti == NULL) {
                        pthread_rwlock_unlock(&ai.lock);
                        return 1;
                }

                if (__frcti_pdu_ready(frcti) >= 0) {
                        pthread_rwlock_unlock(&ai.lock);
                        return 1;
                }

                idx = shm_rbuff_read(rb);
                if (idx < 0) {
                        pthread_rwlock_unlock(&ai.lock);
                        return 0;
                }

                sdb = shm_rdrbuff_get(ai.rdrb, idx);

                __frcti_rcv(frcti, sdb);

                if (__frcti_pdu_ready(frcti) >= 0) {
                        pthread_rwlock_unlock(&ai.lock);
                        return 1;
                }

                pthread_rwlock_unlock(&ai.lock);

                fq->next += 2;
        }

        return fq->next < fq->fqsize;
}