/*
 * Ouroboros - Copyright (C) 2016 - 2020
 *
 * Timerwheel
 *
 *    Dimitri Staessens <dimitri.staessens@ugent.be>
 *    Sander Vrijders   <sander.vrijders@ugent.be>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * version 2.1 as published by the Free Software Foundation.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., http://www.fsf.org/about/contact/.
 */

#include <ouroboros/list.h>

#define RXMQ_S     16                 /* defines #slots           */
#define RXMQ_M     24                 /* defines max delay  (us)  */
#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution (us)  */
#define RXMQ_SLOTS (1 << RXMQ_S)
#define RXMQ_MAX   (1 << RXMQ_M)      /* us                       */

/* Small inacurracy to avoid slow division by MILLION. */
#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))

struct rxm {
        struct list_head     next;
        uint32_t             seqno;
        struct shm_du_buff * sdb;
        uint8_t *            head;
        uint8_t *            tail;
        time_t               t0;    /* Time when original was sent (us). */
        size_t               mul;   /* RTO multiplier.                   */
        struct frcti *       frcti;
};

struct rxmwheel {
        struct list_head wheel[RXMQ_SLOTS];

        size_t           prv; /* Last processed slot. */
        pthread_mutex_t  lock;
};

static void rxmwheel_destroy(struct rxmwheel * rw)
{
        size_t             i;
        struct list_head * p;
        struct list_head * h;

        pthread_mutex_destroy(&rw->lock);

        for (i = 0; i < RXMQ_SLOTS; ++i) {
                list_for_each_safe(p, h, &rw->wheel[i]) {
                        struct rxm * rxm = list_entry(p, struct rxm, next);
                        list_del(&rxm->next);
                        shm_du_buff_ack(rxm->sdb);
                        ipcp_sdb_release(rxm->sdb);
                        free(rxm);
                }
        }
}

static struct rxmwheel * rxmwheel_create(void)
{
        struct rxmwheel * rw;
        struct timespec   now;
        size_t            i;

        rw = malloc(sizeof(*rw));
        if (rw == NULL)
                return NULL;

        if (pthread_mutex_init(&rw->lock, NULL)) {
                free(rw);
                return NULL;
        }

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        /* Mark the previous timeslot as the last one processed. */
        rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);

        for (i = 0; i < RXMQ_SLOTS; ++i)
                list_head_init(&rw->wheel[i]);

        return rw;
}

static void check_probe(struct frcti * frcti,
                        uint32_t       seqno)
{
        /* Disable rtt probe on retransmitted packet! */

        pthread_rwlock_wrlock(&frcti->lock);

        if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
                /* Backoff to avoid never updating rtt */
                frcti->srtt_us += frcti->mdev_us;
                frcti->probe = false;
        }

        pthread_rwlock_unlock(&frcti->lock);
}

static void rxmwheel_move(struct rxmwheel * rw)
{
        struct timespec    now;
        struct list_head * p;
        struct list_head * h;
        size_t             slot;
        size_t             i;

        pthread_mutex_lock(&rw->lock);

        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
                             (void *) &rw->lock);

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        slot = ts_to_slot(now);

        i = rw->prv;

        if (slot < i)
                slot += RXMQ_SLOTS;

        while (i++ < slot) {
                list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
                        struct rxm *         r;
                        struct frct_cr *     snd_cr;
                        struct frct_cr *     rcv_cr;
                        size_t               rslot;
                        ssize_t              idx;
                        struct shm_du_buff * sdb;
                        uint8_t *            head;
                        struct flow *        f;
                        int                  fd;
                        uint32_t             snd_lwe;
                        uint32_t             rcv_lwe;
                        time_t               rto;

                        r = list_entry(p, struct rxm, next);

                        list_del(&r->next);

                        snd_cr = &r->frcti->snd_cr;
                        rcv_cr = &r->frcti->rcv_cr;
                        fd     = r->frcti->fd;
                        f      = &ai.flows[fd];

                        shm_du_buff_ack(r->sdb);

                        pthread_rwlock_rdlock(&r->frcti->lock);

                        snd_lwe = snd_cr->lwe;
                        rcv_lwe = rcv_cr->lwe;
                        rto     = r->frcti->rto;

                        pthread_rwlock_unlock(&r->frcti->lock);

                        /* Has been ack'd, remove. */
                        if ((int) (r->seqno - snd_lwe) < 0) {
                                ipcp_sdb_release(r->sdb);
                                free(r);
                                continue;
                        }

                        /* Check for r-timer expiry. */
                        if (ts_to_us(now) - r->t0 > r->frcti->r) {
                                ipcp_sdb_release(r->sdb);
                                free(r);
                                shm_rbuff_set_acl(ai.flows[fd].rx_rb,
                                                  ACL_FLOWDOWN);
                                shm_rbuff_set_acl(ai.flows[fd].tx_rb,
                                                  ACL_FLOWDOWN);
                                continue;
                        }

                        /* Copy the payload, safe rtx in other layers. */
                        if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
                                ipcp_sdb_release(r->sdb);
                                free(r);
                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
                                continue;
                        }

                        idx = shm_du_buff_get_idx(sdb);

                        head = shm_du_buff_head(sdb);
                        memcpy(head, r->head, r->tail - r->head);

                        ipcp_sdb_release(r->sdb);

                        check_probe(r->frcti, r->seqno);

                        ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe);

                        /* Retransmit the copy. */
                        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
                                ipcp_sdb_release(sdb);
                                free(r);
                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
                                continue;
                        }

                        /* Reschedule. */
                        shm_du_buff_wait_ack(sdb);

                        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);

                        r->head = head;
                        r->tail = shm_du_buff_tail(sdb);
                        r->sdb  = sdb;

                        /* Schedule at least in the next time slot */
                        rslot = (slot + MAX(rto >> RXMQ_R, 1))
                                & (RXMQ_SLOTS - 1);

                        list_add_tail(&r->next, &rw->wheel[rslot]);
                }
        }

        rw->prv = slot & (RXMQ_SLOTS - 1);

        pthread_cleanup_pop(true);
}

static int rxmwheel_add(struct rxmwheel *    rw,
                        struct frcti *       frcti,
                        uint32_t             seqno,
                        struct shm_du_buff * sdb)
{
        struct timespec now;
        struct rxm *    r;
        size_t          slot;

        r = malloc(sizeof(*r));
        if (r == NULL)
                return -ENOMEM;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        r->t0    = ts_to_us(now);
        r->mul   = 0;
        r->seqno = seqno;
        r->sdb   = sdb;
        r->head  = shm_du_buff_head(sdb);
        r->tail  = shm_du_buff_tail(sdb);
        r->frcti = frcti;

        pthread_rwlock_rdlock(&r->frcti->lock);

        slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);

        pthread_rwlock_unlock(&r->frcti->lock);

        pthread_mutex_lock(&rw->lock);

        list_add_tail(&r->next, &rw->wheel[slot]);

        shm_du_buff_wait_ack(sdb);

        pthread_mutex_unlock(&rw->lock);

        return 0;
}