diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 719 | ||||
| -rw-r--r-- | src/lib/frct.c | 320 | ||||
| -rw-r--r-- | src/lib/frct_pci.c | 63 | ||||
| -rw-r--r-- | src/lib/frct_pci.h | 67 | ||||
| -rw-r--r-- | src/lib/rq.c | 8 | ||||
| -rw-r--r-- | src/lib/rq.h | 47 | ||||
| -rw-r--r-- | src/lib/tests/rq_test.c | 2 | 
7 files changed, 683 insertions, 543 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 28a99bc4..ff22cca6 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -38,22 +38,19 @@  #include <ouroboros/fqueue.h>  #include <ouroboros/qoscube.h>  #include <ouroboros/timerwheel.h> -#include <ouroboros/frct_pci.h> -#include <ouroboros/rq.h> + +#include "frct_pci.h" +#include "rq.h"  #include <stdlib.h>  #include <string.h>  #include <stdio.h>  #include <stdarg.h> +#include <stdbool.h> +#include <sys/types.h>  #define BUF_SIZE       1500 -#define TW_ELEMENTS    6000 -#define TW_RESOLUTION  1   /* ms */ - -#define MPL            2000 /* ms */ -#define RQ_SIZE        20 -  #ifndef CLOCK_REALTIME_COARSE  #define CLOCK_REALTIME_COARSE CLOCK_REALTIME  #endif @@ -76,26 +73,6 @@ enum port_state {          PORT_DESTROY  }; -struct frcti { -        bool             used; - -        struct timespec  last_snd; -        bool             snd_drf; -        uint64_t         snd_lwe; -        uint64_t         snd_rwe; - -        struct timespec  last_rcv; -        bool             rcv_drf; -        uint64_t         rcv_lwe; -        uint64_t         rcv_rwe; - -        uint16_t         conf_flags; - -        struct rq *      rq; - -        pthread_rwlock_t lock; -}; -  struct port {          int             fd; @@ -119,6 +96,8 @@ struct flow {          bool                  rcv_timesout;          struct timespec       snd_timeo;          struct timespec       rcv_timeo; + +        struct frcti *        frcti;  };  struct { @@ -132,13 +111,15 @@ struct {          struct bmp *          fds;          struct bmp *          fqueues; +          struct flow *         flows;          struct port *         ports; -        struct frcti *        frcti;          pthread_rwlock_t      lock;  } ai; +#include "frct.c" +  static void port_destroy(struct port * p)  {          pthread_mutex_lock(&p->state_lock); @@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id)          enum port_state state;          struct port *   p; -        pthread_rwlock_rdlock(&ai.lock); -          p = &ai.ports[port_id]; -        pthread_rwlock_unlock(&ai.lock); -          pthread_mutex_lock(&p->state_lock);          if (p->state == PORT_ID_ASSIGNED) { @@ -245,275 +222,8 @@ static int api_announce(char * ap_name)          return ret;  } -/* Call under flows lock. */ -static int finalize_write(int    fd, -                          size_t idx) -{ -        int ret; - -        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); -        if (ret < 0) -                return ret; - -        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - -        return ret; -} - -static int frcti_init(int fd) -{ -        struct frcti * frcti; - -        frcti = &(ai.frcti[fd]); - -        frcti->used = true; - -        frcti->snd_drf = true; -        frcti->snd_lwe = 0; -        frcti->snd_rwe = 0; - -        frcti->rcv_drf = true; -        frcti->rcv_lwe = 0; -        frcti->rcv_rwe = 0; - -        frcti->conf_flags = 0; - -        frcti->rq = rq_create(RQ_SIZE); -        if (frcti->rq == NULL) -                return -1; - -        return 0; -} - -static void frcti_clear(int fd) -{ -        ai.frcti[fd].used = false; -} - -static void frcti_fini(int fd) -{ -        /* -         * FIXME: In case of reliable transmission we should -         * make sure everything is acked. -         */ - -        frcti_clear(fd); - -        rq_destroy(ai.frcti[fd].rq); -} - -static int frcti_send(int                  fd, -                      struct frct_pci *    pci, -                      struct shm_du_buff * sdb) -{ -        struct timespec now = {0, 0}; -        struct frcti *  frcti; -        int             ret; - -        frcti = &(ai.frcti[fd]); - -        clock_gettime(CLOCK_REALTIME_COARSE, &now); - -        pthread_rwlock_wrlock(&frcti->lock); - -        /* Check if sender inactivity is true. */ -        if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL) -                frcti->snd_drf = true; - -        /* Set the DRF in the first packet of a new run of SDUs. */ -        if (frcti->snd_drf) { -                pci->flags |= FLAG_DATA_RUN; -                frcti->snd_drf = false; -        } - -        frcti->last_snd = now; - -        pci->seqno = frcti->snd_lwe++; - -        if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) { -                pthread_rwlock_unlock(&frcti->lock); -                return -1; -        } - -        ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); -        if (ret < 0) { -                pthread_rwlock_unlock(&frcti->lock); -                return ret; -        } - -        pthread_rwlock_unlock(&frcti->lock); - -        return 0; -} - - -static int frcti_configure(int      fd, -                           uint16_t flags) -{ -        struct frcti *       frcti; -        struct frct_pci      pci; -        struct shm_du_buff * sdb; - -        frcti = &(ai.frcti[fd]); - -        memset(&pci, 0, sizeof(pci)); - -        if (ipcp_sdb_reserve(&sdb, 0)) -                return -1; - -        pci.conf_flags = flags; - -        /* Always set the DRF on a configure message. */ -        pci.flags |= FLAG_DATA_RUN; -        pci.type |= PDU_TYPE_CONFIG; - -        pthread_rwlock_wrlock(&frcti->lock); - -        frcti->conf_flags = pci.conf_flags; - -        pthread_rwlock_unlock(&frcti->lock); - -        if (frcti_send(fd, &pci, sdb)) { -                shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); -                return -1; -        } - -        return 0; -} - -static int frcti_write(int                  fd, -                       struct shm_du_buff * sdb) -{ -        struct frct_pci pci; - -        memset(&pci, 0, sizeof(pci)); - -        pci.type |= PDU_TYPE_DATA; - -        return frcti_send(fd, &pci, sdb); -} - -static ssize_t frcti_read(int fd) -{ -        ssize_t              idx; -        struct frcti *       frcti; -        struct frct_pci      pci; -        struct shm_du_buff * sdb; -        uint64_t             seqno; -        bool                 nxt_pdu = true; - -        frcti = &(ai.frcti[fd]); - -        /* See if we already have the next PDU */ -        pthread_rwlock_wrlock(&frcti->lock); - -        if (!rq_is_empty(frcti->rq)) { -                seqno = rq_peek(frcti->rq); -                if (seqno == frcti->rcv_lwe) { -                        frcti->rcv_lwe++; -                        idx = rq_pop(frcti->rq); -                        pthread_rwlock_unlock(&frcti->lock); -                        return idx; -                } -        } - -        pthread_rwlock_unlock(&frcti->lock); - -        do { -                struct timespec    now; -                struct timespec    abs; -                struct timespec *  abstime = NULL; -                struct shm_rbuff * rb; -                bool               noblock; - -                clock_gettime(CLOCK_REALTIME_COARSE, &now); - -                pthread_rwlock_rdlock(&ai.lock); - -                noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK; -                rb      = ai.flows[fd].rx_rb; - -                if (ai.flows[fd].rcv_timesout) { -                        ts_add(&now, &ai.flows[fd].rcv_timeo, &abs); -                        abstime = &abs; -                } - -                pthread_rwlock_unlock(&ai.lock); - -                if (noblock) { -                        idx = shm_rbuff_read(rb); -                } else { -                        idx = shm_rbuff_read_b(rb, abstime); -                        clock_gettime(CLOCK_REALTIME_COARSE, &now); -                } - -                if (idx < 0) -                        return idx; - -                sdb = shm_rdrbuff_get(ai.rdrb, idx); - -                pthread_rwlock_wrlock(&frcti->lock); - -                /* SDU may be corrupted. */ -                if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) { -                        pthread_rwlock_unlock(&frcti->lock); -                        shm_rdrbuff_remove(ai.rdrb, idx); -                        return -EAGAIN; -                } - -                /* Check if receiver inactivity is true. */ -                if (!frcti->rcv_drf && -                    ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) -                        frcti->rcv_drf = true; - -                /* When there is receiver inactivity queue the packet. */ -                if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { -                        if (rq_push(frcti->rq, pci.seqno, idx)) -                                shm_rdrbuff_remove(ai.rdrb, idx); -                        pthread_rwlock_unlock(&frcti->lock); -                        return -EAGAIN; -                } - -                /* If the DRF is set, reset the state of the connection. */ -                if (pci.flags & FLAG_DATA_RUN) -                        frcti->rcv_lwe = pci.seqno; - -                if (pci.type & PDU_TYPE_CONFIG) -                        frcti->conf_flags = pci.conf_flags; - -                if (frcti->rcv_drf) -                        frcti->rcv_drf = false; - -                frcti->last_rcv = now; - -                nxt_pdu = true; - -                if (!(pci.type & PDU_TYPE_DATA)) { -                        shm_rdrbuff_remove(ai.rdrb, idx); -                        nxt_pdu = false; -                } - -                if (frcti->conf_flags & FRCTFORDERING) { -                        if (pci.seqno != frcti->rcv_lwe) { -                                if (rq_push(frcti->rq, pci.seqno, idx)) -                                        shm_rdrbuff_remove(ai.rdrb, idx); -                                nxt_pdu = false; -                        } else { -                                frcti->rcv_lwe++; -                        } -                } - -                pthread_rwlock_unlock(&frcti->lock); - -        } while (!nxt_pdu); - -        return idx; -} -  static void flow_clear(int fd)  { -        assert(!(fd < 0)); -          memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));          ai.flows[fd].port_id  = -1; @@ -525,8 +235,10 @@ static void flow_fini(int fd)  {          assert(!(fd < 0)); -        if (ai.flows[fd].port_id != -1) +        if (ai.flows[fd].port_id != -1) {                  port_destroy(&ai.ports[ai.flows[fd].port_id]); +                bmp_release(ai.fds, fd); +        }          if (ai.flows[fd].rx_rb != NULL)                  shm_rbuff_close(ai.flows[fd].rx_rb); @@ -537,8 +249,8 @@ static void flow_fini(int fd)          if (ai.flows[fd].set != NULL)                  shm_flow_set_close(ai.flows[fd].set); -        if (ai.frcti[fd].used) -                frcti_fini(fd); +        if (ai.flows[fd].frcti != NULL) +                frcti_destroy(ai.flows[fd].frcti);          flow_clear(fd);  } @@ -548,37 +260,27 @@ static int flow_init(int       port_id,                       qoscube_t qc)  {          int fd; +        int err = -ENOMEM;          pthread_rwlock_wrlock(&ai.lock);          fd = bmp_allocate(ai.fds);          if (!bmp_is_id_valid(ai.fds, fd)) { -                pthread_rwlock_unlock(&ai.lock); -                return -EBADF; +                err = -EBADF; +                goto fail_fds;          }          ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); -        if (ai.flows[fd].rx_rb == NULL) { -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.lock); -                return -ENOMEM; -        } +        if (ai.flows[fd].rx_rb == NULL) +                goto fail;          ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); -        if (ai.flows[fd].tx_rb == NULL) { -                flow_fini(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.lock); -                return -ENOMEM; -        } +        if (ai.flows[fd].tx_rb == NULL) +                goto fail;          ai.flows[fd].set = shm_flow_set_open(api); -        if (ai.flows[fd].set == NULL) { -                flow_fini(fd); -                bmp_release(ai.fds, fd); -                pthread_rwlock_unlock(&ai.lock); -                return -ENOMEM; -        } +        if (ai.flows[fd].set == NULL) +                goto fail;          ai.flows[fd].port_id = port_id;          ai.flows[fd].oflags  = FLOWFDEFAULT; @@ -593,6 +295,12 @@ static int flow_init(int       port_id,          pthread_rwlock_unlock(&ai.lock);          return fd; + + fail: +        flow_fini(fd); + fail_fds: +        pthread_rwlock_unlock(&ai.lock); +        return err;  }  static bool check_python(char * str) @@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int     argc,  {          const char * ap_name = argv[0];          int          i; -        int          j;          (void) argc;          (void) envp; @@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int     argc,          if (ai.flows == NULL)                  goto fail_flows; -        ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); -        if (ai.frcti == NULL) -                goto fail_frcti; - -        for (i = 0; i < AP_MAX_FLOWS; ++i) { +        for (i = 0; i < AP_MAX_FLOWS; ++i)                  flow_clear(i); -                frcti_clear(i); - -                if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) { -                        for (j = i - 1; j >= 0 ; j--) -                                pthread_rwlock_destroy(&ai.frcti[j].lock); -                        goto fail_frct_lock; -                } -        }          ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);          if (ai.ports == NULL) @@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int     argc,          if (pthread_rwlock_init(&ai.lock, NULL))                  goto fail_lock; -        ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); -        if (ai.tw == NULL) -                goto fail_timerwheel; +        if (frct_init()) +                goto fail_frct;          return; - fail_timerwheel: + fail_frct:          pthread_rwlock_destroy(&ai.lock);   fail_lock:          for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int     argc,   fail_ap_name:          free(ai.ports);   fail_ports: -        for (i = 0; i < AP_MAX_FLOWS; ++i) -                pthread_rwlock_destroy(&ai.frcti[i].lock); - fail_frct_lock: -        free(ai.frcti); - fail_frcti:          free(ai.flows);   fail_flows:          shm_rdrbuff_close(ai.rdrb); @@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void)          if (ai.fds == NULL)                  return; -        bmp_destroy(ai.fds); -        bmp_destroy(ai.fqueues); +        frct_fini();          shm_flow_set_destroy(ai.fqset);          if (ai.ap_name != NULL)                  free(ai.ap_name); -        pthread_rwlock_rdlock(&ai.lock); +        pthread_rwlock_wrlock(&ai.lock);          for (i = 0; i < AP_MAX_FLOWS; ++i) {                  if (ai.flows[i].port_id != -1) { @@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void)                                  shm_rdrbuff_remove(ai.rdrb, idx);                          flow_fini(i);                  } - -                pthread_rwlock_destroy(&ai.frcti[i].lock);          }          for (i = 0; i < SYS_MAX_FLOWS; ++i) { @@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void)          free(ai.flows);          free(ai.ports); -        free(ai.frcti); + +        bmp_destroy(ai.fds); +        bmp_destroy(ai.fqueues);          pthread_rwlock_unlock(&ai.lock); @@ -825,7 +513,16 @@ int flow_accept(qosspec_t *             qs,          pthread_rwlock_wrlock(&ai.lock); -        frcti_init(fd); +        /* FIXME: check if FRCT is needed based on qc? */ + +        assert(ai.flows[fd].frcti == NULL); + +        ai.flows[fd].frcti = frcti_create(fd); +        if (ai.flows[fd].frcti == NULL) { +                flow_fini(fd); +                pthread_rwlock_unlock(&ai.lock); +                return -ENOMEM; +        }          if (qs != NULL)                  *qs = ai.flows[fd].spec; @@ -891,7 +588,15 @@ int flow_alloc(const char *            dst_name,          pthread_rwlock_wrlock(&ai.lock); -        frcti_init(fd); +        /* FIXME: check if FRCT is needed based on qc? */ +        assert(ai.flows[fd].frcti == NULL); + +        ai.flows[fd].frcti = frcti_create(fd); +        if (ai.flows[fd].frcti == NULL) { +                flow_fini(fd); +                pthread_rwlock_unlock(&ai.lock); +                return -ENOMEM; +        }          pthread_rwlock_unlock(&ai.lock); @@ -913,7 +618,7 @@ int flow_dealloc(int fd)          pthread_rwlock_rdlock(&ai.lock); -        assert(!(ai.flows[fd].port_id < 0)); +        assert(ai.flows[fd].port_id >= 0);          msg.port_id = ai.flows[fd].port_id; @@ -933,7 +638,6 @@ int flow_dealloc(int fd)          pthread_rwlock_wrlock(&ai.lock);          flow_fini(fd); -        bmp_release(ai.fds, fd);          pthread_rwlock_unlock(&ai.lock); @@ -944,6 +648,7 @@ int fccntl(int fd,             int cmd,             ...)  { +        uint16_t          sflags;          uint32_t *        fflags;          uint16_t *        cflags;          va_list           l; @@ -951,15 +656,18 @@ int fccntl(int fd,          qosspec_t *       qs;          uint32_t          rx_acl;          uint32_t          tx_acl; +        struct flow *     flow;          if (fd < 0 || fd >= AP_MAX_FLOWS)                  return -EBADF; +        flow = &ai.flows[fd]; +          va_start(l, cmd);          pthread_rwlock_wrlock(&ai.lock); -        if (ai.flows[fd].port_id < 0) { +        if (flow->port_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  va_end(l);                  return -ENOTALLOC; @@ -969,57 +677,57 @@ int fccntl(int fd,          case FLOWSSNDTIMEO:                  timeo = va_arg(l, struct timespec *);                  if (timeo == NULL) { -                        ai.flows[fd].snd_timesout = false; +                        flow->snd_timesout = false;                  } else { -                        ai.flows[fd].snd_timesout = true; -                        ai.flows[fd].snd_timeo    = *timeo; +                        flow->snd_timesout = true; +                        flow->snd_timeo    = *timeo;                  }                  break;          case FLOWGSNDTIMEO:                  timeo = va_arg(l, struct timespec *);                  if (timeo == NULL)                          goto einval; -                if (!ai.flows[fd].snd_timesout) +                if (!flow->snd_timesout)                          goto eperm; -                *timeo = ai.flows[fd].snd_timeo; +                *timeo = flow->snd_timeo;                  break;          case FLOWSRCVTIMEO:                  timeo = va_arg(l, struct timespec *);                  if (timeo == NULL) { -                        ai.flows[fd].rcv_timesout = false; +                        flow->rcv_timesout = false;                  } else { -                        ai.flows[fd].rcv_timesout = true; -                        ai.flows[fd].rcv_timeo    = *timeo; +                        flow->rcv_timesout = true; +                        flow->rcv_timeo    = *timeo;                  }                  break;          case FLOWGRCVTIMEO:                  timeo = va_arg(l, struct timespec *);                  if (timeo == NULL)                          goto einval; -                if (!ai.flows[fd].rcv_timesout) +                if (!flow->rcv_timesout)                          goto eperm; -                *timeo = ai.flows[fd].snd_timeo; +                *timeo = flow->snd_timeo;                  break;          case FLOWGQOSSPEC:                  qs = va_arg(l, qosspec_t *);                  if (qs == NULL)                          goto einval; -                *qs = ai.flows[fd].spec; +                *qs = flow->spec;                  break;          case FLOWSFLAGS: -                ai.flows[fd].oflags = va_arg(l, uint32_t); -                rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); -                tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); +                flow->oflags = va_arg(l, uint32_t); +                rx_acl = shm_rbuff_get_acl(flow->rx_rb); +                tx_acl = shm_rbuff_get_acl(flow->rx_rb);                  /*                   * Making our own flow write only means making the                   * the other side of the flow read only.                   */ -                if (ai.flows[fd].oflags & FLOWFWRONLY) +                if (flow->oflags & FLOWFWRONLY)                          rx_acl |= ACL_RDONLY; -                if (ai.flows[fd].oflags & FLOWFRDWR) +                if (flow->oflags & FLOWFRDWR)                          rx_acl |= ACL_RDWR; -                if (ai.flows[fd].oflags & FLOWFDOWN) { +                if (flow->oflags & FLOWFDOWN) {                          rx_acl |= ACL_FLOWDOWN;                          tx_acl |= ACL_FLOWDOWN;                  } else { @@ -1027,26 +735,28 @@ int fccntl(int fd,                          tx_acl &= ~ACL_FLOWDOWN;                  } -                shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl); -                shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl); +                shm_rbuff_set_acl(flow->rx_rb, rx_acl); +                shm_rbuff_set_acl(flow->tx_rb, tx_acl);                  break;          case FLOWGFLAGS:                  fflags = va_arg(l, uint32_t *);                  if (fflags == NULL)                          goto einval; -                *fflags = ai.flows[fd].oflags; +                *fflags = flow->oflags;                  break;          case FRCTSFLAGS: -                ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int); +                sflags = (uint16_t) va_arg(l, int); +                if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags)) +                        goto eperm;                  break;          case FRCTGFLAGS:                  cflags = (uint16_t *) va_arg(l, int *);                  if (cflags == NULL)                          goto einval; -                *cflags = ai.frcti[fd].conf_flags; -                if (frcti_configure(fd, ai.frcti[fd].conf_flags)) +                if (flow->frcti == NULL)                          goto eperm; +                *cflags = frcti_getconf(flow->frcti);                  break;          default:                  pthread_rwlock_unlock(&ai.lock); @@ -1075,8 +785,10 @@ ssize_t flow_write(int          fd,                     const void * buf,                     size_t       count)  { -        ssize_t idx; -        int     ret; +        struct flow * flow; +        ssize_t       idx; +        int           ret; +        int           flags;          if (buf == NULL)                  return 0; @@ -1084,104 +796,110 @@ ssize_t flow_write(int          fd,          if (fd < 0 || fd > AP_MAX_FLOWS)                  return -EBADF; +        flow = &ai.flows[fd]; +          pthread_rwlock_rdlock(&ai.lock); -        if (ai.flows[fd].port_id < 0) { +        if (flow->port_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC;          } -        if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { -                pthread_rwlock_unlock(&ai.lock); -                return -EPERM; -        } +        flags = flow->oflags; -        if (ai.flows[fd].oflags & FLOWFWNOBLOCK) { -                idx = shm_rdrbuff_write(ai.rdrb, -                                       DU_BUFF_HEADSPACE, -                                       DU_BUFF_TAILSPACE, -                                       buf, -                                       count); -                if (idx < 0) { -                        pthread_rwlock_unlock(&ai.lock); -                        return idx; -                } +        pthread_rwlock_unlock(&ai.lock); -        } else { /* Blocking. */ -                pthread_rwlock_unlock(&ai.lock); +        if ((flags & FLOWFACCMODE) == FLOWFRDONLY) +                return -EPERM; +        if (flags & FLOWFWNOBLOCK) +                idx = shm_rdrbuff_write(ai.rdrb, +                                        DU_BUFF_HEADSPACE, +                                        DU_BUFF_TAILSPACE, +                                        buf, +                                        count); +        else  /* Blocking. */                  idx = shm_rdrbuff_write_b(ai.rdrb,                                            DU_BUFF_HEADSPACE,                                            DU_BUFF_TAILSPACE,                                            buf,                                            count); -                if (idx < 0) -                        return idx; +        if (idx < 0) +                return idx; -                pthread_rwlock_rdlock(&ai.lock); +        if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { +                shm_rdrbuff_remove(ai.rdrb, idx); +                return -ENOMEM;          } -        if (!ai.frcti[fd].used) { -                ret = finalize_write(fd, idx); -                if (ret < 0) { -                        pthread_rwlock_unlock(&ai.lock); -                        shm_rdrbuff_remove(ai.rdrb, idx); -                        return ret; -                } +        pthread_rwlock_rdlock(&ai.lock); -                pthread_rwlock_unlock(&ai.lock); -        } else { -                pthread_rwlock_unlock(&ai.lock); +        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        if (ret < 0) +                shm_rdrbuff_remove(ai.rdrb, idx); +        else +                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); -                ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx)); -                if (ret < 0) { -                        shm_rdrbuff_remove(ai.rdrb, idx); -                        return ret; -                } -        } +        pthread_rwlock_unlock(&ai.lock); -        return 0; +        assert(ret <= 0); + +        return ret;  }  ssize_t flow_read(int    fd,                    void * buf,                    size_t count)  { -        ssize_t            idx; -        ssize_t            n; -        uint8_t *          sdu; -        bool               used; -        struct shm_rbuff * rb; +        ssize_t              idx; +        ssize_t              n; +        uint8_t *            sdu; +        struct shm_rbuff *   rb; +        struct shm_du_buff * sdb; +        struct timespec      now; +        struct timespec      abs; +        struct timespec *    abstime = NULL; +        struct flow *        flow; +        bool                 noblock;          if (fd < 0 || fd > AP_MAX_FLOWS)                  return -EBADF; +        flow = &ai.flows[fd]; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); +          pthread_rwlock_rdlock(&ai.lock); -        if (ai.flows[fd].port_id < 0) { +        if (flow->port_id < 0) {                  pthread_rwlock_unlock(&ai.lock);                  return -ENOTALLOC;          } -        used = ai.frcti[fd].used; -        rb   = ai.flows[fd].rx_rb; +        rb   = flow->rx_rb; +        noblock = flow->oflags & FLOWFRNOBLOCK; -        pthread_rwlock_unlock(&ai.lock); +        if (ai.flows[fd].rcv_timesout) { +                ts_add(&now, &flow->rcv_timeo, &abs); +                abstime = &abs; +        } -        if (!used) -                idx = shm_rbuff_read(rb); -        else -                idx = frcti_read(fd); +        pthread_rwlock_unlock(&ai.lock); +        idx = frcti_queued_pdu(flow->frcti);          if (idx < 0) { -                assert(idx == -EAGAIN || idx == -ETIMEDOUT || -                       idx == -EFLOWDOWN); -                return idx; +                do { +                        idx = noblock ? shm_rbuff_read(rb) : +                                shm_rbuff_read_b(rb, abstime); +                        if (idx < 0) +                                return idx; +                        sdb = shm_rdrbuff_get(ai.rdrb, idx); +                } while (frcti_rcv(flow->frcti, sdb) != 0);          }          n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); -        if (n < 0) -                return -1; + +        assert(n >= 0);          memcpy(buf, sdu, MIN((size_t) n, count)); @@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api,          if (recv_msg == NULL)                  return -EIRMD; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd,          if (recv_msg == NULL)                  return -EIRMD; -        if (recv_msg->has_result == false) { +        if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL);                  return -1;          } @@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd,  int ipcp_flow_read(int                   fd,                     struct shm_du_buff ** sdb)  { -        ssize_t idx = -1; -        int port_id = -1; +        struct flow *      flow; +        struct shm_rbuff * rb; +        ssize_t            idx;          assert(fd >= 0);          assert(sdb); +        flow = &ai.flows[fd]; +          pthread_rwlock_rdlock(&ai.lock); -        if ((port_id = ai.flows[fd].port_id) < 0) { -                pthread_rwlock_unlock(&ai.lock); -                return -ENOTALLOC; -        } +        assert(flow->port_id >= 0); -        pthread_rwlock_unlock(&ai.lock); +        rb = flow->rx_rb; -        if (!ai.frcti[fd].used) -                idx = shm_rbuff_read(ai.flows[fd].rx_rb); -        else -                idx = frcti_read(fd); +        pthread_rwlock_unlock(&ai.lock); -        if (idx < 0) -                return idx; +        if (flow->frcti != NULL) { +                idx = frcti_queued_pdu(flow->frcti); +                if (idx >= 0) { +                        *sdb = shm_rdrbuff_get(ai.rdrb, idx); +                        return 0; +                } +        } -        *sdb = shm_rdrbuff_get(ai.rdrb, idx); +        do { +                idx = shm_rbuff_read(rb); +                if (idx < 0) +                        return idx; +                *sdb = shm_rdrbuff_get(ai.rdrb, idx); +        } while (frcti_rcv(flow->frcti, *sdb) != 0);          return 0;  } @@ -1555,53 +1280,49 @@ int ipcp_flow_read(int                   fd,  int ipcp_flow_write(int                  fd,                      struct shm_du_buff * sdb)  { -        int ret; +        struct flow * flow; +        int           ret; +        ssize_t       idx; -        if (sdb == NULL) -                return -EINVAL; +        assert(sdb); + +        flow = &ai.flows[fd];          pthread_rwlock_rdlock(&ai.lock); -        if (ai.flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai.lock); -                return -ENOTALLOC; -        } +        assert(flow->port_id >= 0); -        if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { +        if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {                  pthread_rwlock_unlock(&ai.lock);                  return -EPERM;          } -        assert(ai.flows[fd].tx_rb); +        assert(flow->tx_rb); -        if (!ai.frcti[fd].used) { -                ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); -                if (ret < 0) { -                        pthread_rwlock_unlock(&ai.lock); -                        return ret; -                } +        idx = shm_du_buff_get_idx(sdb); +        if (frcti_snd(flow->frcti, sdb) < 0) {                  pthread_rwlock_unlock(&ai.lock); -        } else { -                pthread_rwlock_unlock(&ai.lock); - -                ret = frcti_write(fd, sdb); -                if (ret < 0) -                        return ret; +                return -ENOMEM;          } -        return 0; +        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        if (ret == 0) +                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + +        pthread_rwlock_unlock(&ai.lock); + +        assert(ret <= 0); + +        return ret;  }  int ipcp_sdb_reserve(struct shm_du_buff ** sdb,                       size_t                len)  { -        struct shm_rdrbuff * rdrb; -        ssize_t              idx; - -        rdrb = ai.rdrb; +        ssize_t idx; -        idx = shm_rdrbuff_write_b(rdrb, +        idx = shm_rdrbuff_write_b(ai.rdrb,                                    DU_BUFF_HEADSPACE,                                    DU_BUFF_TAILSPACE,                                    NULL, @@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb,          if (idx < 0)                  return -1; -        *sdb = shm_rdrbuff_get(rdrb, idx); +        *sdb = shm_rdrbuff_get(ai.rdrb, idx);          return 0;  } +void ipcp_sdb_release(struct shm_du_buff * sdb) +{ +        shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); +} +  void ipcp_flow_fini(int fd)  {          struct shm_rbuff * rx_rb; +        assert(fd >= 0); +          fccntl(fd, FLOWSFLAGS, FLOWFWRONLY);          pthread_rwlock_rdlock(&ai.lock); @@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd)  int ipcp_flow_get_qoscube(int         fd,                            qoscube_t * cube)  { -        if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL) -                return -EINVAL; +        assert(fd >= 0); +        assert(cube); -        pthread_rwlock_wrlock(&ai.lock); +        pthread_rwlock_rdlock(&ai.lock); -        if (ai.flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai.lock); -                return -ENOTALLOC; -        } +        assert(ai.flows[fd].port_id >= 0);          *cube = ai.flows[fd].cube; @@ -1670,28 +1395,20 @@ int local_flow_write(int    fd,  {          int ret; -        if (fd < 0) -                return -EINVAL; +        assert(fd >= 0);          pthread_rwlock_rdlock(&ai.lock);          if (ai.flows[fd].port_id < 0) { -                pthread_rwlock_unlock(&ai.lock); +                pthread_rwlock_rdlock(&ai.lock);                  return -ENOTALLOC;          } -        ret = finalize_write(fd, idx); -        if (ret < 0) { -                pthread_rwlock_unlock(&ai.lock); -                return ret; -        } +        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); +        if (ret == 0) +                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);          pthread_rwlock_unlock(&ai.lock); -        return 0; -} - -void ipcp_sdb_release(struct shm_du_buff * sdb) -{ -        shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); +        return ret;  } diff --git a/src/lib/frct.c b/src/lib/frct.c new file mode 100644 index 00000000..abebb2ff --- /dev/null +++ b/src/lib/frct.c @@ -0,0 +1,320 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow and Retransmission Control + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +/* Default Delta-t parameters */ +#define DELT_MPL       60000 /* ms */ +#define DELT_A         0     /* ms */ +#define DELT_R         2000  /* ms */ + +#define RQ_SIZE        20 + +#define TW_ELEMENTS    6000 +#define TW_RESOLUTION  1     /* ms */ + +struct frct_cr { +        bool            drf; +        uint64_t        lwe; +        uint64_t        rwe; + +        bool            conf; +        uint16_t        cflags; + +        time_t          act; +        time_t          inact; +}; + +struct frcti { +        int              fd; + +        time_t           mpl; +        time_t           a; +        time_t           r; + +        struct frct_cr   snd_cr; +        struct frct_cr   rcv_cr; + +        struct rq *      rq; + +        struct timespec  rtt; + +        pthread_rwlock_t lock; +}; + +struct { +        struct timerwheel * tw; +} frct; + +static int frct_init(void) +{ +        frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); +        if (frct.tw == NULL) +                return -1; + +        return 0; +} + +static void frct_fini(void) +{ +        assert(frct.tw); + +        timerwheel_destroy(frct.tw); +} + +static struct frcti * frcti_create(int fd) +{ +        struct frcti * frcti; +        time_t delta_t; + +        frcti = malloc(sizeof(*frcti)); +        if (frcti == NULL) +                goto fail_malloc; + +        if (pthread_rwlock_init(&frcti->lock, NULL)) +                goto fail_lock; + +        frcti->rq = rq_create(RQ_SIZE); +        if (frcti->rq == NULL) +                goto fail_rq; + +        frcti->mpl = DELT_MPL; +        frcti->a   = DELT_A; +        frcti->r   = DELT_R; +        frcti->fd  = fd; + +        delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + +        frcti->snd_cr.drf    = true; +        frcti->snd_cr.conf   = true; +        frcti->snd_cr.lwe    = 0; +        frcti->snd_cr.rwe    = 0; +        frcti->snd_cr.cflags = 0; +        frcti->snd_cr.inact  = 2 * delta_t + 1; + +        frcti->rcv_cr.drf    = true; +        frcti->rcv_cr.lwe    = 0; +        frcti->rcv_cr.rwe    = 0; +        frcti->rcv_cr.cflags = 0; +        frcti->rcv_cr.inact  = 3 * delta_t + 1; + +        return frcti; + + fail_rq: +        pthread_rwlock_destroy(&frcti->lock); + fail_lock: +        free(frcti); + fail_malloc: +        return NULL; +} + +static void frcti_destroy(struct frcti * frcti) +{ +        /* +         * FIXME: In case of reliable transmission we should +         * make sure everything is acked. +         */ + +        pthread_rwlock_destroy(&frcti->lock); + +        rq_destroy(frcti->rq); +        free(frcti); +} + +static int frcti_setconf(struct frcti * frcti, +                         uint16_t       flags) +{ +        assert(frcti); + +        pthread_rwlock_wrlock(&frcti->lock); + +        if (frcti->snd_cr.cflags != flags) { +                frcti->snd_cr.cflags = flags; +                frcti->snd_cr.conf   = true; +                frcti->snd_cr.drf    = true; +        } + +        pthread_rwlock_unlock(&frcti->lock); + +        return 0; +} + +static uint16_t frcti_getconf(struct frcti * frcti) +{ +        uint16_t ret; + +        assert (frcti); + +        pthread_rwlock_rdlock(&frcti->lock); + +        ret = frcti->snd_cr.cflags; + +        pthread_rwlock_unlock(&frcti->lock); + +        return ret; +} + +#define frcti_queued_pdu(frcti) \ +        (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) + +#define frcti_snd(frcti, sdb) \ +        (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) + +#define frcti_rcv(frcti, sdb) \ +        (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + +static ssize_t __frcti_queued_pdu(struct frcti * frcti) +{ +        ssize_t idx = -1; + +        assert(frcti); + +        /* See if we already have the next PDU. */ +        pthread_rwlock_wrlock(&frcti->lock); + +        if (!rq_is_empty(frcti->rq)) { +                if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) { +                        ++frcti->rcv_cr.lwe; +                        idx = rq_pop(frcti->rq); +                } +        } + +        pthread_rwlock_unlock(&frcti->lock); + +        return idx; +} + +static int __frcti_snd(struct frcti *       frcti, +                       struct shm_du_buff * sdb) +{ +        struct frct_pci  pci; +        struct timespec  now; +        struct frct_cr * snd_cr; + +        if (frcti == NULL) +                return 0; + +        snd_cr = &frcti->snd_cr; + +        memset(&pci, 0, sizeof(pci)); + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pci.type |= PDU_TYPE_DATA; + +        pthread_rwlock_wrlock(&frcti->lock); + +        /* Check if sender is inactive. */ +        if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact) +                snd_cr->drf = true; + +        /* Set the DRF in the first packet of a new run of SDUs. */ +        if (snd_cr->drf) { +                pci.flags |= FLAG_DATA_RUN; +                if (snd_cr->conf) { +                        pci.type |= PDU_TYPE_CONFIG; +                        pci.cflags = snd_cr->cflags; +                } +        } + +        pci.seqno = snd_cr->lwe++; + +        if (frct_pci_ser(sdb, &pci, snd_cr->cflags & FRCTFERRCHCK)) { +                pthread_rwlock_unlock(&frcti->lock); +                return -1; +        } + +        snd_cr->act = now.tv_sec; + +        snd_cr->drf  = false; +        snd_cr->conf = false; + +        pthread_rwlock_unlock(&frcti->lock); + +        return 0; +} + +/* Returns 0 when idx contains an SDU for the application. */ +static int __frcti_rcv(struct frcti *       frcti, +                       struct shm_du_buff * sdb) +{ +        ssize_t          idx; +        struct frct_pci  pci; +        struct timespec  now; +        struct frct_cr * rcv_cr; + +        assert(frcti); + +        rcv_cr = &frcti->rcv_cr; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_rwlock_wrlock(&frcti->lock); + +        idx = shm_du_buff_get_idx(sdb); + +        /* SDU may be corrupted. */ +        if (frct_pci_des(sdb, &pci, rcv_cr->cflags & FRCTFERRCHCK)) { +                pthread_rwlock_unlock(&frcti->lock); +                shm_rdrbuff_remove(ai.rdrb, idx); +                return -EAGAIN; +        } + +        /* Check if receiver inactivity is true. */ +        if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact) +                rcv_cr->drf = true; + +        /* When there is receiver inactivity and no DRF, drop the SDU. */ +        if (rcv_cr->drf && !(pci.flags & FLAG_DATA_RUN)) { +                pthread_rwlock_unlock(&frcti->lock); +                shm_rdrbuff_remove(ai.rdrb, idx); +                return -EAGAIN; +        } + +        /* If the DRF is set, reset the state of the connection. */ +        if (pci.flags & FLAG_DATA_RUN) { +                rcv_cr->lwe = pci.seqno; +                if (pci.type & PDU_TYPE_CONFIG) +                        rcv_cr->cflags = pci.cflags; +        } + +        if (rcv_cr->drf) +                rcv_cr->drf = false; + +        rcv_cr->act = now.tv_sec; + +        if (!(pci.type & PDU_TYPE_DATA)) +                shm_rdrbuff_remove(ai.rdrb, idx); + +        if (rcv_cr->cflags & FRCTFORDERING) { +                if (pci.seqno != frcti->rcv_cr.lwe) { +                        if (rq_push(frcti->rq, pci.seqno, idx)) +                                shm_rdrbuff_remove(ai.rdrb, idx); +                        pthread_rwlock_unlock(&frcti->lock); +                        return -EAGAIN; +                } else { +                      ++rcv_cr->lwe; +                } +        } + +        pthread_rwlock_unlock(&frcti->lock); + +        return 0; +} diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c index e44554f2..509cc8e2 100644 --- a/src/lib/frct_pci.c +++ b/src/lib/frct_pci.c @@ -20,29 +20,23 @@   * Foundation, Inc., http://www.fsf.org/about/contact/.   */ -#include <ouroboros/frct_pci.h>  #include <ouroboros/hash.h>  #include <ouroboros/errno.h> +#include "frct_pci.h" +  #include <assert.h>  #include <string.h> -#define TYPE_SIZE       1 -#define SEQNO_SIZE      8 -#define FLAGS_SIZE      1 -#define CONF_FLAGS_SIZE sizeof(((struct frct_pci *) NULL)->conf_flags) -#define BASE_SIZE       TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE -#define CONFIG_SIZE     CONF_FLAGS_SIZE - -static size_t get_head_len(struct frct_pci * pci) -{ -        size_t len = BASE_SIZE; +#define TYPE_SIZE        1 +#define FLAGS_SIZE       1 +#define SEQNO_SIZE       8 +#define CONF_FLAGS_SIZE  2 -        if (pci->type & PDU_TYPE_CONFIG) -                len += CONFIG_SIZE; +#define BASE_SIZE        TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE -        return len; -} +#define head_len(pci)    (pci->type & PDU_TYPE_CONFIG ?                 \ +                          BASE_SIZE + CONF_FLAGS_SIZE : BASE_SIZE)  int frct_pci_ser(struct shm_du_buff * sdb,                   struct frct_pci *    pci, @@ -50,15 +44,12 @@ int frct_pci_ser(struct shm_du_buff * sdb,  {          uint8_t * head;          uint8_t * tail; -        size_t    len;          size_t    offset = 0;          assert(sdb);          assert(pci); -        len = get_head_len(pci); - -        head = shm_du_buff_head_alloc(sdb, len); +        head = shm_du_buff_head_alloc(sdb, head_len(pci));          if (head == NULL)                  return -EPERM; @@ -70,14 +61,14 @@ int frct_pci_ser(struct shm_du_buff * sdb,          offset += SEQNO_SIZE;          if (pci->type & PDU_TYPE_CONFIG) { -                memcpy(head + offset, &pci->conf_flags, CONF_FLAGS_SIZE); +                memcpy(head + offset, &pci->cflags, CONF_FLAGS_SIZE);                  /* offset += CONF_FLAGS_SIZE; */          }          if (error_check) {                  tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32));                  if (tail == NULL) { -                        shm_du_buff_head_release(sdb, len); +                        shm_du_buff_head_release(sdb, head_len(pci));                          return -EPERM;                  } @@ -103,23 +94,8 @@ int frct_pci_des(struct shm_du_buff * sdb,          head = shm_du_buff_head(sdb); -         /* Depending on the type a different deserialization. */ -        memcpy(&pci->type, head, TYPE_SIZE); -        offset += TYPE_SIZE; -        memcpy(&pci->flags, head + offset, FLAGS_SIZE); -        offset += FLAGS_SIZE; -        memcpy(&pci->seqno, head + offset, SEQNO_SIZE); -        offset += SEQNO_SIZE; - -        if (pci->type & PDU_TYPE_CONFIG) { -                memcpy(&pci->conf_flags, head + offset, CONF_FLAGS_SIZE); -                /* offset += CONF_FLAGS_SIZE; */ -        } -          if (error_check) {                  tail = shm_du_buff_tail(sdb); -                if (tail == NULL) -                        return -EPERM;                  mem_hash(HASH_CRC32, &crc, head,                           tail - head - hash_len(HASH_CRC32)); @@ -134,7 +110,20 @@ int frct_pci_des(struct shm_du_buff * sdb,                  shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32));          } -        shm_du_buff_head_release(sdb, get_head_len(pci)); +        /* Depending on the type a different deserialization. */ +        memcpy(&pci->type, head, TYPE_SIZE); +        offset += TYPE_SIZE; +        memcpy(&pci->flags, head + offset, FLAGS_SIZE); +        offset += FLAGS_SIZE; +        memcpy(&pci->seqno, head + offset, SEQNO_SIZE); +        offset += SEQNO_SIZE; + +        if (pci->type & PDU_TYPE_CONFIG) { +                memcpy(&pci->cflags, head + offset, CONF_FLAGS_SIZE); +                /* offset += CONF_FLAGS_SIZE; */ +        } + +        shm_du_buff_head_release(sdb, head_len(pci));          return 0;  } diff --git a/src/lib/frct_pci.h b/src/lib/frct_pci.h new file mode 100644 index 00000000..fbbfd354 --- /dev/null +++ b/src/lib/frct_pci.h @@ -0,0 +1,67 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_FRCT_PCI_H +#define OUROBOROS_LIB_FRCT_PCI_H + +#include <ouroboros/shm_du_buff.h> + +#include <stdint.h> +#include <stdbool.h> + +struct frct_pci { +        /* Present in every PDU. */ +        uint8_t  type; +        uint8_t  flags; +        uint64_t seqno; + +        /* Present in config PDU. */ +        uint16_t cflags; + +        /* Present in flow control PDU. */ +        uint64_t lwe; +        uint64_t rwe; +}; + +enum pdu_types { +        PDU_TYPE_DATA        = 0x01, +        PDU_TYPE_ACK         = 0x02, +        PDU_TYPE_FC          = 0x04, +        PDU_TYPE_ACK_AND_FC  = (PDU_TYPE_ACK | PDU_TYPE_FC), +        PDU_TYPE_RENDEZ_VOUS = 0x08, +        PDU_TYPE_CONFIG      = 0x10 +}; + +enum data_flags { +        FLAG_DATA_RUN       = 0x01, +        FLAG_MORE_FRAGMENTS = 0x02 +}; + +int frct_pci_ser(struct shm_du_buff * sdb, +                 struct frct_pci *    pci, +                 bool                 error_check); + +int frct_pci_des(struct shm_du_buff * sdb, +                 struct frct_pci *    pci, +                 bool                 error_check); + +#endif /* OUROBOROS_LIB_FRCT_PCI_H */ diff --git a/src/lib/rq.c b/src/lib/rq.c index bd0594b5..ba425236 100644 --- a/src/lib/rq.c +++ b/src/lib/rq.c @@ -20,7 +20,7 @@   * Foundation, Inc., http://www.fsf.org/about/contact/.   */ -#include <ouroboros/rq.h> +#include "rq.h"  #include <assert.h> @@ -77,11 +77,11 @@ int rq_push(struct rq * rq,                  return -1;          i = ++rq->n_items; -        j = i / 2; +        j = i >> 1;          while (i > 1 && rq->items[j].seqno > seqno) {                  rq->items[i] = rq->items[j];                  i = j; -                j = j / 2; +                j >>= 1;          }          rq->items[i].seqno = seqno; @@ -121,7 +121,7 @@ size_t rq_pop(struct rq * rq)          i = 1;          while (true) {                  k = i; -                j = 2 * i; +                j = i << 1;                  if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno)                          k = j; diff --git a/src/lib/rq.h b/src/lib/rq.h new file mode 100644 index 00000000..7c024c11 --- /dev/null +++ b/src/lib/rq.h @@ -0,0 +1,47 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_RQ_H +#define OUROBOROS_LIB_RQ_H + +#include <stdint.h> +#include <stdlib.h> +#include <stdbool.h> + +struct rq * rq_create(int size); + +void        rq_destroy(struct rq * rq); + +int         rq_push(struct rq * rq, +                    uint64_t    seqno, +                    size_t      idx); + +uint64_t    rq_peek(struct rq * rq); + +bool        rq_is_empty(struct rq * rq); + +size_t      rq_pop(struct rq * rq); + +bool        rq_has(struct rq * rq, +                   uint64_t    seqno); + +#endif /* OUROBOROS_LIB_RQ_H */ diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c index e2d0f435..7b57cf30 100644 --- a/src/lib/tests/rq_test.c +++ b/src/lib/tests/rq_test.c @@ -20,7 +20,7 @@   * Foundation, Inc., http://www.fsf.org/about/contact/.   */ -#include <ouroboros/rq.h> +#include "rq.h"  #include <stdio.h>  | 
