From eef84a2afd2aa0d21072f6e7ef038fe10dcc245d Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Mon, 6 Nov 2017 21:38:55 +0100 Subject: lib: Refactor FRCT implementation The frct_pci and rq headers are moved from include/ouroboros to src/lib since they are only needed in the library. FRCT is moved to its own source file. FRCT takes the application PDUs, encapsulates and processes them and hands them back. This makes it easier to disable FRCT should the application want to write to a "raw" flow. An FRCT instance is now allocated upon alloc and released upon dealloc. The FRCT data structure is split into a sender and receiver connection record. Setting a new configuration will now be done upon sending the next data PDU, which will flag the DRF for a new run and use that configuration. This avoids some issues should packets arrive out-of-order, and simplifies setting a configuration. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/dev.c | 719 +++++++++++++++--------------------------------- src/lib/frct.c | 320 +++++++++++++++++++++ src/lib/frct_pci.c | 63 ++--- src/lib/frct_pci.h | 67 +++++ src/lib/rq.c | 8 +- src/lib/rq.h | 47 ++++ src/lib/tests/rq_test.c | 2 +- 7 files changed, 683 insertions(+), 543 deletions(-) create mode 100644 src/lib/frct.c create mode 100644 src/lib/frct_pci.h create mode 100644 src/lib/rq.h (limited to 'src') diff --git a/src/lib/dev.c b/src/lib/dev.c index 28a99bc4..ff22cca6 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -38,22 +38,19 @@ #include #include #include -#include -#include + +#include "frct_pci.h" +#include "rq.h" #include #include #include #include +#include +#include #define BUF_SIZE 1500 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - -#define MPL 2000 /* ms */ -#define RQ_SIZE 20 - #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif @@ -76,26 +73,6 @@ enum port_state { PORT_DESTROY }; -struct frcti { - bool used; - - struct timespec last_snd; - bool snd_drf; - uint64_t snd_lwe; - uint64_t snd_rwe; - - struct timespec last_rcv; - bool rcv_drf; - uint64_t rcv_lwe; - uint64_t rcv_rwe; - - uint16_t conf_flags; - - struct rq * rq; - - pthread_rwlock_t lock; -}; - struct port { int fd; @@ -119,6 +96,8 @@ struct flow { bool rcv_timesout; struct timespec snd_timeo; struct timespec rcv_timeo; + + struct frcti * frcti; }; struct { @@ -132,13 +111,15 @@ struct { struct bmp * fds; struct bmp * fqueues; + struct flow * flows; struct port * ports; - struct frcti * frcti; pthread_rwlock_t lock; } ai; +#include "frct.c" + static void port_destroy(struct port * p) { pthread_mutex_lock(&p->state_lock); @@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id) enum port_state state; struct port * p; - pthread_rwlock_rdlock(&ai.lock); - p = &ai.ports[port_id]; - pthread_rwlock_unlock(&ai.lock); - pthread_mutex_lock(&p->state_lock); if (p->state == PORT_ID_ASSIGNED) { @@ -245,275 +222,8 @@ static int api_announce(char * ap_name) return ret; } -/* Call under flows lock. */ -static int finalize_write(int fd, - size_t idx) -{ - int ret; - - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); - if (ret < 0) - return ret; - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - return ret; -} - -static int frcti_init(int fd) -{ - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - frcti->used = true; - - frcti->snd_drf = true; - frcti->snd_lwe = 0; - frcti->snd_rwe = 0; - - frcti->rcv_drf = true; - frcti->rcv_lwe = 0; - frcti->rcv_rwe = 0; - - frcti->conf_flags = 0; - - frcti->rq = rq_create(RQ_SIZE); - if (frcti->rq == NULL) - return -1; - - return 0; -} - -static void frcti_clear(int fd) -{ - ai.frcti[fd].used = false; -} - -static void frcti_fini(int fd) -{ - /* - * FIXME: In case of reliable transmission we should - * make sure everything is acked. - */ - - frcti_clear(fd); - - rq_destroy(ai.frcti[fd].rq); -} - -static int frcti_send(int fd, - struct frct_pci * pci, - struct shm_du_buff * sdb) -{ - struct timespec now = {0, 0}; - struct frcti * frcti; - int ret; - - frcti = &(ai.frcti[fd]); - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_wrlock(&frcti->lock); - - /* Check if sender inactivity is true. */ - if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL) - frcti->snd_drf = true; - - /* Set the DRF in the first packet of a new run of SDUs. */ - if (frcti->snd_drf) { - pci->flags |= FLAG_DATA_RUN; - frcti->snd_drf = false; - } - - frcti->last_snd = now; - - pci->seqno = frcti->snd_lwe++; - - if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) { - pthread_rwlock_unlock(&frcti->lock); - return -1; - } - - ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); - if (ret < 0) { - pthread_rwlock_unlock(&frcti->lock); - return ret; - } - - pthread_rwlock_unlock(&frcti->lock); - - return 0; -} - - -static int frcti_configure(int fd, - uint16_t flags) -{ - struct frcti * frcti; - struct frct_pci pci; - struct shm_du_buff * sdb; - - frcti = &(ai.frcti[fd]); - - memset(&pci, 0, sizeof(pci)); - - if (ipcp_sdb_reserve(&sdb, 0)) - return -1; - - pci.conf_flags = flags; - - /* Always set the DRF on a configure message. */ - pci.flags |= FLAG_DATA_RUN; - pci.type |= PDU_TYPE_CONFIG; - - pthread_rwlock_wrlock(&frcti->lock); - - frcti->conf_flags = pci.conf_flags; - - pthread_rwlock_unlock(&frcti->lock); - - if (frcti_send(fd, &pci, sdb)) { - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); - return -1; - } - - return 0; -} - -static int frcti_write(int fd, - struct shm_du_buff * sdb) -{ - struct frct_pci pci; - - memset(&pci, 0, sizeof(pci)); - - pci.type |= PDU_TYPE_DATA; - - return frcti_send(fd, &pci, sdb); -} - -static ssize_t frcti_read(int fd) -{ - ssize_t idx; - struct frcti * frcti; - struct frct_pci pci; - struct shm_du_buff * sdb; - uint64_t seqno; - bool nxt_pdu = true; - - frcti = &(ai.frcti[fd]); - - /* See if we already have the next PDU */ - pthread_rwlock_wrlock(&frcti->lock); - - if (!rq_is_empty(frcti->rq)) { - seqno = rq_peek(frcti->rq); - if (seqno == frcti->rcv_lwe) { - frcti->rcv_lwe++; - idx = rq_pop(frcti->rq); - pthread_rwlock_unlock(&frcti->lock); - return idx; - } - } - - pthread_rwlock_unlock(&frcti->lock); - - do { - struct timespec now; - struct timespec abs; - struct timespec * abstime = NULL; - struct shm_rbuff * rb; - bool noblock; - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_rdlock(&ai.lock); - - noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK; - rb = ai.flows[fd].rx_rb; - - if (ai.flows[fd].rcv_timesout) { - ts_add(&now, &ai.flows[fd].rcv_timeo, &abs); - abstime = &abs; - } - - pthread_rwlock_unlock(&ai.lock); - - if (noblock) { - idx = shm_rbuff_read(rb); - } else { - idx = shm_rbuff_read_b(rb, abstime); - clock_gettime(CLOCK_REALTIME_COARSE, &now); - } - - if (idx < 0) - return idx; - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - - pthread_rwlock_wrlock(&frcti->lock); - - /* SDU may be corrupted. */ - if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) { - pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } - - /* Check if receiver inactivity is true. */ - if (!frcti->rcv_drf && - ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) - frcti->rcv_drf = true; - - /* When there is receiver inactivity queue the packet. */ - if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { - if (rq_push(frcti->rq, pci.seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&frcti->lock); - return -EAGAIN; - } - - /* If the DRF is set, reset the state of the connection. */ - if (pci.flags & FLAG_DATA_RUN) - frcti->rcv_lwe = pci.seqno; - - if (pci.type & PDU_TYPE_CONFIG) - frcti->conf_flags = pci.conf_flags; - - if (frcti->rcv_drf) - frcti->rcv_drf = false; - - frcti->last_rcv = now; - - nxt_pdu = true; - - if (!(pci.type & PDU_TYPE_DATA)) { - shm_rdrbuff_remove(ai.rdrb, idx); - nxt_pdu = false; - } - - if (frcti->conf_flags & FRCTFORDERING) { - if (pci.seqno != frcti->rcv_lwe) { - if (rq_push(frcti->rq, pci.seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - nxt_pdu = false; - } else { - frcti->rcv_lwe++; - } - } - - pthread_rwlock_unlock(&frcti->lock); - - } while (!nxt_pdu); - - return idx; -} - static void flow_clear(int fd) { - assert(!(fd < 0)); - memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); ai.flows[fd].port_id = -1; @@ -525,8 +235,10 @@ static void flow_fini(int fd) { assert(!(fd < 0)); - if (ai.flows[fd].port_id != -1) + if (ai.flows[fd].port_id != -1) { port_destroy(&ai.ports[ai.flows[fd].port_id]); + bmp_release(ai.fds, fd); + } if (ai.flows[fd].rx_rb != NULL) shm_rbuff_close(ai.flows[fd].rx_rb); @@ -537,8 +249,8 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); - if (ai.frcti[fd].used) - frcti_fini(fd); + if (ai.flows[fd].frcti != NULL) + frcti_destroy(ai.flows[fd].frcti); flow_clear(fd); } @@ -548,37 +260,27 @@ static int flow_init(int port_id, qoscube_t qc) { int fd; + int err = -ENOMEM; pthread_rwlock_wrlock(&ai.lock); fd = bmp_allocate(ai.fds); if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.lock); - return -EBADF; + err = -EBADF; + goto fail_fds; } ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); - if (ai.flows[fd].rx_rb == NULL) { - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].rx_rb == NULL) + goto fail; ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); - if (ai.flows[fd].tx_rb == NULL) { - flow_fini(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].tx_rb == NULL) + goto fail; ai.flows[fd].set = shm_flow_set_open(api); - if (ai.flows[fd].set == NULL) { - flow_fini(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].set == NULL) + goto fail; ai.flows[fd].port_id = port_id; ai.flows[fd].oflags = FLOWFDEFAULT; @@ -593,6 +295,12 @@ static int flow_init(int port_id, pthread_rwlock_unlock(&ai.lock); return fd; + + fail: + flow_fini(fd); + fail_fds: + pthread_rwlock_unlock(&ai.lock); + return err; } static bool check_python(char * str) @@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int argc, { const char * ap_name = argv[0]; int i; - int j; (void) argc; (void) envp; @@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int argc, if (ai.flows == NULL) goto fail_flows; - ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); - if (ai.frcti == NULL) - goto fail_frcti; - - for (i = 0; i < AP_MAX_FLOWS; ++i) { + for (i = 0; i < AP_MAX_FLOWS; ++i) flow_clear(i); - frcti_clear(i); - - if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) { - for (j = i - 1; j >= 0 ; j--) - pthread_rwlock_destroy(&ai.frcti[j].lock); - goto fail_frct_lock; - } - } ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); if (ai.ports == NULL) @@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); - if (ai.tw == NULL) - goto fail_timerwheel; + if (frct_init()) + goto fail_frct; return; - fail_timerwheel: + fail_frct: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int argc, fail_ap_name: free(ai.ports); fail_ports: - for (i = 0; i < AP_MAX_FLOWS; ++i) - pthread_rwlock_destroy(&ai.frcti[i].lock); - fail_frct_lock: - free(ai.frcti); - fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); @@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void) if (ai.fds == NULL) return; - bmp_destroy(ai.fds); - bmp_destroy(ai.fqueues); + frct_fini(); shm_flow_set_destroy(ai.fqset); if (ai.ap_name != NULL) free(ai.ap_name); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { if (ai.flows[i].port_id != -1) { @@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void) shm_rdrbuff_remove(ai.rdrb, idx); flow_fini(i); } - - pthread_rwlock_destroy(&ai.frcti[i].lock); } for (i = 0; i < SYS_MAX_FLOWS; ++i) { @@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void) free(ai.flows); free(ai.ports); - free(ai.frcti); + + bmp_destroy(ai.fds); + bmp_destroy(ai.fqueues); pthread_rwlock_unlock(&ai.lock); @@ -825,7 +513,16 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - frcti_init(fd); + /* FIXME: check if FRCT is needed based on qc? */ + + assert(ai.flows[fd].frcti == NULL); + + ai.flows[fd].frcti = frcti_create(fd); + if (ai.flows[fd].frcti == NULL) { + flow_fini(fd); + pthread_rwlock_unlock(&ai.lock); + return -ENOMEM; + } if (qs != NULL) *qs = ai.flows[fd].spec; @@ -891,7 +588,15 @@ int flow_alloc(const char * dst_name, pthread_rwlock_wrlock(&ai.lock); - frcti_init(fd); + /* FIXME: check if FRCT is needed based on qc? */ + assert(ai.flows[fd].frcti == NULL); + + ai.flows[fd].frcti = frcti_create(fd); + if (ai.flows[fd].frcti == NULL) { + flow_fini(fd); + pthread_rwlock_unlock(&ai.lock); + return -ENOMEM; + } pthread_rwlock_unlock(&ai.lock); @@ -913,7 +618,7 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&ai.lock); - assert(!(ai.flows[fd].port_id < 0)); + assert(ai.flows[fd].port_id >= 0); msg.port_id = ai.flows[fd].port_id; @@ -933,7 +638,6 @@ int flow_dealloc(int fd) pthread_rwlock_wrlock(&ai.lock); flow_fini(fd); - bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.lock); @@ -944,6 +648,7 @@ int fccntl(int fd, int cmd, ...) { + uint16_t sflags; uint32_t * fflags; uint16_t * cflags; va_list l; @@ -951,15 +656,18 @@ int fccntl(int fd, qosspec_t * qs; uint32_t rx_acl; uint32_t tx_acl; + struct flow * flow; if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + va_start(l, cmd); pthread_rwlock_wrlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -969,57 +677,57 @@ int fccntl(int fd, case FLOWSSNDTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) { - ai.flows[fd].snd_timesout = false; + flow->snd_timesout = false; } else { - ai.flows[fd].snd_timesout = true; - ai.flows[fd].snd_timeo = *timeo; + flow->snd_timesout = true; + flow->snd_timeo = *timeo; } break; case FLOWGSNDTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) goto einval; - if (!ai.flows[fd].snd_timesout) + if (!flow->snd_timesout) goto eperm; - *timeo = ai.flows[fd].snd_timeo; + *timeo = flow->snd_timeo; break; case FLOWSRCVTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) { - ai.flows[fd].rcv_timesout = false; + flow->rcv_timesout = false; } else { - ai.flows[fd].rcv_timesout = true; - ai.flows[fd].rcv_timeo = *timeo; + flow->rcv_timesout = true; + flow->rcv_timeo = *timeo; } break; case FLOWGRCVTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) goto einval; - if (!ai.flows[fd].rcv_timesout) + if (!flow->rcv_timesout) goto eperm; - *timeo = ai.flows[fd].snd_timeo; + *timeo = flow->snd_timeo; break; case FLOWGQOSSPEC: qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = ai.flows[fd].spec; + *qs = flow->spec; break; case FLOWSFLAGS: - ai.flows[fd].oflags = va_arg(l, uint32_t); - rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); - tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); + flow->oflags = va_arg(l, uint32_t); + rx_acl = shm_rbuff_get_acl(flow->rx_rb); + tx_acl = shm_rbuff_get_acl(flow->rx_rb); /* * Making our own flow write only means making the * the other side of the flow read only. */ - if (ai.flows[fd].oflags & FLOWFWRONLY) + if (flow->oflags & FLOWFWRONLY) rx_acl |= ACL_RDONLY; - if (ai.flows[fd].oflags & FLOWFRDWR) + if (flow->oflags & FLOWFRDWR) rx_acl |= ACL_RDWR; - if (ai.flows[fd].oflags & FLOWFDOWN) { + if (flow->oflags & FLOWFDOWN) { rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; } else { @@ -1027,26 +735,28 @@ int fccntl(int fd, tx_acl &= ~ACL_FLOWDOWN; } - shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl); - shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl); + shm_rbuff_set_acl(flow->rx_rb, rx_acl); + shm_rbuff_set_acl(flow->tx_rb, tx_acl); break; case FLOWGFLAGS: fflags = va_arg(l, uint32_t *); if (fflags == NULL) goto einval; - *fflags = ai.flows[fd].oflags; + *fflags = flow->oflags; break; case FRCTSFLAGS: - ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int); + sflags = (uint16_t) va_arg(l, int); + if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags)) + goto eperm; break; case FRCTGFLAGS: cflags = (uint16_t *) va_arg(l, int *); if (cflags == NULL) goto einval; - *cflags = ai.frcti[fd].conf_flags; - if (frcti_configure(fd, ai.frcti[fd].conf_flags)) + if (flow->frcti == NULL) goto eperm; + *cflags = frcti_getconf(flow->frcti); break; default: pthread_rwlock_unlock(&ai.lock); @@ -1075,8 +785,10 @@ ssize_t flow_write(int fd, const void * buf, size_t count) { - ssize_t idx; - int ret; + struct flow * flow; + ssize_t idx; + int ret; + int flags; if (buf == NULL) return 0; @@ -1084,104 +796,110 @@ ssize_t flow_write(int fd, if (fd < 0 || fd > AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { - pthread_rwlock_unlock(&ai.lock); - return -EPERM; - } + flags = flow->oflags; - if (ai.flows[fd].oflags & FLOWFWNOBLOCK) { - idx = shm_rdrbuff_write(ai.rdrb, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - buf, - count); - if (idx < 0) { - pthread_rwlock_unlock(&ai.lock); - return idx; - } + pthread_rwlock_unlock(&ai.lock); - } else { /* Blocking. */ - pthread_rwlock_unlock(&ai.lock); + if ((flags & FLOWFACCMODE) == FLOWFRDONLY) + return -EPERM; + if (flags & FLOWFWNOBLOCK) + idx = shm_rdrbuff_write(ai.rdrb, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + buf, + count); + else /* Blocking. */ idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (idx < 0) - return idx; + if (idx < 0) + return idx; - pthread_rwlock_rdlock(&ai.lock); + if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; } - if (!ai.frcti[fd].used) { - ret = finalize_write(fd, idx); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return ret; - } + pthread_rwlock_rdlock(&ai.lock); - pthread_rwlock_unlock(&ai.lock); - } else { - pthread_rwlock_unlock(&ai.lock); + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret < 0) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx)); - if (ret < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - return ret; - } - } + pthread_rwlock_unlock(&ai.lock); - return 0; + assert(ret <= 0); + + return ret; } ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx; - ssize_t n; - uint8_t * sdu; - bool used; - struct shm_rbuff * rb; + ssize_t idx; + ssize_t n; + uint8_t * sdu; + struct shm_rbuff * rb; + struct shm_du_buff * sdb; + struct timespec now; + struct timespec abs; + struct timespec * abstime = NULL; + struct flow * flow; + bool noblock; if (fd < 0 || fd > AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - used = ai.frcti[fd].used; - rb = ai.flows[fd].rx_rb; + rb = flow->rx_rb; + noblock = flow->oflags & FLOWFRNOBLOCK; - pthread_rwlock_unlock(&ai.lock); + if (ai.flows[fd].rcv_timesout) { + ts_add(&now, &flow->rcv_timeo, &abs); + abstime = &abs; + } - if (!used) - idx = shm_rbuff_read(rb); - else - idx = frcti_read(fd); + pthread_rwlock_unlock(&ai.lock); + idx = frcti_queued_pdu(flow->frcti); if (idx < 0) { - assert(idx == -EAGAIN || idx == -ETIMEDOUT || - idx == -EFLOWDOWN); - return idx; + do { + idx = noblock ? shm_rbuff_read(rb) : + shm_rbuff_read_b(rb, abstime); + if (idx < 0) + return idx; + sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, sdb) != 0); } n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); - if (n < 0) - return -1; + + assert(n >= 0); memcpy(buf, sdu, MIN((size_t) n, count)); @@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api, if (recv_msg == NULL) return -EIRMD; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd, if (recv_msg == NULL) return -EIRMD; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd, int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) { - ssize_t idx = -1; - int port_id = -1; + struct flow * flow; + struct shm_rbuff * rb; + ssize_t idx; assert(fd >= 0); assert(sdb); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if ((port_id = ai.flows[fd].port_id) < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(flow->port_id >= 0); - pthread_rwlock_unlock(&ai.lock); + rb = flow->rx_rb; - if (!ai.frcti[fd].used) - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - else - idx = frcti_read(fd); + pthread_rwlock_unlock(&ai.lock); - if (idx < 0) - return idx; + if (flow->frcti != NULL) { + idx = frcti_queued_pdu(flow->frcti); + if (idx >= 0) { + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + return 0; + } + } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + do { + idx = shm_rbuff_read(rb); + if (idx < 0) + return idx; + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, *sdb) != 0); return 0; } @@ -1555,53 +1280,49 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - int ret; + struct flow * flow; + int ret; + ssize_t idx; - if (sdb == NULL) - return -EINVAL; + assert(sdb); + + flow = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(flow->port_id >= 0); - if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { + if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); return -EPERM; } - assert(ai.flows[fd].tx_rb); + assert(flow->tx_rb); - if (!ai.frcti[fd].used) { - ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - return ret; - } + idx = shm_du_buff_get_idx(sdb); + if (frcti_snd(flow->frcti, sdb) < 0) { pthread_rwlock_unlock(&ai.lock); - } else { - pthread_rwlock_unlock(&ai.lock); - - ret = frcti_write(fd, sdb); - if (ret < 0) - return ret; + return -ENOMEM; } - return 0; + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret == 0) + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.lock); + + assert(ret <= 0); + + return ret; } int ipcp_sdb_reserve(struct shm_du_buff ** sdb, size_t len) { - struct shm_rdrbuff * rdrb; - ssize_t idx; - - rdrb = ai.rdrb; + ssize_t idx; - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, NULL, @@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb, if (idx < 0) return -1; - *sdb = shm_rdrbuff_get(rdrb, idx); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); return 0; } +void ipcp_sdb_release(struct shm_du_buff * sdb) +{ + shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); +} + void ipcp_flow_fini(int fd) { struct shm_rbuff * rx_rb; + assert(fd >= 0); + fccntl(fd, FLOWSFLAGS, FLOWFWRONLY); pthread_rwlock_rdlock(&ai.lock); @@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd) int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL) - return -EINVAL; + assert(fd >= 0); + assert(cube); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(ai.flows[fd].port_id >= 0); *cube = ai.flows[fd].cube; @@ -1670,28 +1395,20 @@ int local_flow_write(int fd, { int ret; - if (fd < 0) - return -EINVAL; + assert(fd >= 0); pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); return -ENOTALLOC; } - ret = finalize_write(fd, idx); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - return ret; - } + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret == 0) + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.lock); - return 0; -} - -void ipcp_sdb_release(struct shm_du_buff * sdb) -{ - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); + return ret; } diff --git a/src/lib/frct.c b/src/lib/frct.c new file mode 100644 index 00000000..abebb2ff --- /dev/null +++ b/src/lib/frct.c @@ -0,0 +1,320 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow and Retransmission Control + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +/* Default Delta-t parameters */ +#define DELT_MPL 60000 /* ms */ +#define DELT_A 0 /* ms */ +#define DELT_R 2000 /* ms */ + +#define RQ_SIZE 20 + +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ + +struct frct_cr { + bool drf; + uint64_t lwe; + uint64_t rwe; + + bool conf; + uint16_t cflags; + + time_t act; + time_t inact; +}; + +struct frcti { + int fd; + + time_t mpl; + time_t a; + time_t r; + + struct frct_cr snd_cr; + struct frct_cr rcv_cr; + + struct rq * rq; + + struct timespec rtt; + + pthread_rwlock_t lock; +}; + +struct { + struct timerwheel * tw; +} frct; + +static int frct_init(void) +{ + frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); + if (frct.tw == NULL) + return -1; + + return 0; +} + +static void frct_fini(void) +{ + assert(frct.tw); + + timerwheel_destroy(frct.tw); +} + +static struct frcti * frcti_create(int fd) +{ + struct frcti * frcti; + time_t delta_t; + + frcti = malloc(sizeof(*frcti)); + if (frcti == NULL) + goto fail_malloc; + + if (pthread_rwlock_init(&frcti->lock, NULL)) + goto fail_lock; + + frcti->rq = rq_create(RQ_SIZE); + if (frcti->rq == NULL) + goto fail_rq; + + frcti->mpl = DELT_MPL; + frcti->a = DELT_A; + frcti->r = DELT_R; + frcti->fd = fd; + + delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + + frcti->snd_cr.drf = true; + frcti->snd_cr.conf = true; + frcti->snd_cr.lwe = 0; + frcti->snd_cr.rwe = 0; + frcti->snd_cr.cflags = 0; + frcti->snd_cr.inact = 2 * delta_t + 1; + + frcti->rcv_cr.drf = true; + frcti->rcv_cr.lwe = 0; + frcti->rcv_cr.rwe = 0; + frcti->rcv_cr.cflags = 0; + frcti->rcv_cr.inact = 3 * delta_t + 1; + + return frcti; + + fail_rq: + pthread_rwlock_destroy(&frcti->lock); + fail_lock: + free(frcti); + fail_malloc: + return NULL; +} + +static void frcti_destroy(struct frcti * frcti) +{ + /* + * FIXME: In case of reliable transmission we should + * make sure everything is acked. + */ + + pthread_rwlock_destroy(&frcti->lock); + + rq_destroy(frcti->rq); + free(frcti); +} + +static int frcti_setconf(struct frcti * frcti, + uint16_t flags) +{ + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + if (frcti->snd_cr.cflags != flags) { + frcti->snd_cr.cflags = flags; + frcti->snd_cr.conf = true; + frcti->snd_cr.drf = true; + } + + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} + +static uint16_t frcti_getconf(struct frcti * frcti) +{ + uint16_t ret; + + assert (frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + ret = frcti->snd_cr.cflags; + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) + +#define frcti_snd(frcti, sdb) \ + (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) + +#define frcti_rcv(frcti, sdb) \ + (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + +static ssize_t __frcti_queued_pdu(struct frcti * frcti) +{ + ssize_t idx = -1; + + assert(frcti); + + /* See if we already have the next PDU. */ + pthread_rwlock_wrlock(&frcti->lock); + + if (!rq_is_empty(frcti->rq)) { + if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) { + ++frcti->rcv_cr.lwe; + idx = rq_pop(frcti->rq); + } + } + + pthread_rwlock_unlock(&frcti->lock); + + return idx; +} + +static int __frcti_snd(struct frcti * frcti, + struct shm_du_buff * sdb) +{ + struct frct_pci pci; + struct timespec now; + struct frct_cr * snd_cr; + + if (frcti == NULL) + return 0; + + snd_cr = &frcti->snd_cr; + + memset(&pci, 0, sizeof(pci)); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pci.type |= PDU_TYPE_DATA; + + pthread_rwlock_wrlock(&frcti->lock); + + /* Check if sender is inactive. */ + if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact) + snd_cr->drf = true; + + /* Set the DRF in the first packet of a new run of SDUs. */ + if (snd_cr->drf) { + pci.flags |= FLAG_DATA_RUN; + if (snd_cr->conf) { + pci.type |= PDU_TYPE_CONFIG; + pci.cflags = snd_cr->cflags; + } + } + + pci.seqno = snd_cr->lwe++; + + if (frct_pci_ser(sdb, &pci, snd_cr->cflags & FRCTFERRCHCK)) { + pthread_rwlock_unlock(&frcti->lock); + return -1; + } + + snd_cr->act = now.tv_sec; + + snd_cr->drf = false; + snd_cr->conf = false; + + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} + +/* Returns 0 when idx contains an SDU for the application. */ +static int __frcti_rcv(struct frcti * frcti, + struct shm_du_buff * sdb) +{ + ssize_t idx; + struct frct_pci pci; + struct timespec now; + struct frct_cr * rcv_cr; + + assert(frcti); + + rcv_cr = &frcti->rcv_cr; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&frcti->lock); + + idx = shm_du_buff_get_idx(sdb); + + /* SDU may be corrupted. */ + if (frct_pci_des(sdb, &pci, rcv_cr->cflags & FRCTFERRCHCK)) { + pthread_rwlock_unlock(&frcti->lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -EAGAIN; + } + + /* Check if receiver inactivity is true. */ + if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact) + rcv_cr->drf = true; + + /* When there is receiver inactivity and no DRF, drop the SDU. */ + if (rcv_cr->drf && !(pci.flags & FLAG_DATA_RUN)) { + pthread_rwlock_unlock(&frcti->lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -EAGAIN; + } + + /* If the DRF is set, reset the state of the connection. */ + if (pci.flags & FLAG_DATA_RUN) { + rcv_cr->lwe = pci.seqno; + if (pci.type & PDU_TYPE_CONFIG) + rcv_cr->cflags = pci.cflags; + } + + if (rcv_cr->drf) + rcv_cr->drf = false; + + rcv_cr->act = now.tv_sec; + + if (!(pci.type & PDU_TYPE_DATA)) + shm_rdrbuff_remove(ai.rdrb, idx); + + if (rcv_cr->cflags & FRCTFORDERING) { + if (pci.seqno != frcti->rcv_cr.lwe) { + if (rq_push(frcti->rq, pci.seqno, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); + pthread_rwlock_unlock(&frcti->lock); + return -EAGAIN; + } else { + ++rcv_cr->lwe; + } + } + + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c index e44554f2..509cc8e2 100644 --- a/src/lib/frct_pci.c +++ b/src/lib/frct_pci.c @@ -20,29 +20,23 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include #include #include +#include "frct_pci.h" + #include #include -#define TYPE_SIZE 1 -#define SEQNO_SIZE 8 -#define FLAGS_SIZE 1 -#define CONF_FLAGS_SIZE sizeof(((struct frct_pci *) NULL)->conf_flags) -#define BASE_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE -#define CONFIG_SIZE CONF_FLAGS_SIZE - -static size_t get_head_len(struct frct_pci * pci) -{ - size_t len = BASE_SIZE; +#define TYPE_SIZE 1 +#define FLAGS_SIZE 1 +#define SEQNO_SIZE 8 +#define CONF_FLAGS_SIZE 2 - if (pci->type & PDU_TYPE_CONFIG) - len += CONFIG_SIZE; +#define BASE_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE - return len; -} +#define head_len(pci) (pci->type & PDU_TYPE_CONFIG ? \ + BASE_SIZE + CONF_FLAGS_SIZE : BASE_SIZE) int frct_pci_ser(struct shm_du_buff * sdb, struct frct_pci * pci, @@ -50,15 +44,12 @@ int frct_pci_ser(struct shm_du_buff * sdb, { uint8_t * head; uint8_t * tail; - size_t len; size_t offset = 0; assert(sdb); assert(pci); - len = get_head_len(pci); - - head = shm_du_buff_head_alloc(sdb, len); + head = shm_du_buff_head_alloc(sdb, head_len(pci)); if (head == NULL) return -EPERM; @@ -70,14 +61,14 @@ int frct_pci_ser(struct shm_du_buff * sdb, offset += SEQNO_SIZE; if (pci->type & PDU_TYPE_CONFIG) { - memcpy(head + offset, &pci->conf_flags, CONF_FLAGS_SIZE); + memcpy(head + offset, &pci->cflags, CONF_FLAGS_SIZE); /* offset += CONF_FLAGS_SIZE; */ } if (error_check) { tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32)); if (tail == NULL) { - shm_du_buff_head_release(sdb, len); + shm_du_buff_head_release(sdb, head_len(pci)); return -EPERM; } @@ -103,23 +94,8 @@ int frct_pci_des(struct shm_du_buff * sdb, head = shm_du_buff_head(sdb); - /* Depending on the type a different deserialization. */ - memcpy(&pci->type, head, TYPE_SIZE); - offset += TYPE_SIZE; - memcpy(&pci->flags, head + offset, FLAGS_SIZE); - offset += FLAGS_SIZE; - memcpy(&pci->seqno, head + offset, SEQNO_SIZE); - offset += SEQNO_SIZE; - - if (pci->type & PDU_TYPE_CONFIG) { - memcpy(&pci->conf_flags, head + offset, CONF_FLAGS_SIZE); - /* offset += CONF_FLAGS_SIZE; */ - } - if (error_check) { tail = shm_du_buff_tail(sdb); - if (tail == NULL) - return -EPERM; mem_hash(HASH_CRC32, &crc, head, tail - head - hash_len(HASH_CRC32)); @@ -134,7 +110,20 @@ int frct_pci_des(struct shm_du_buff * sdb, shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32)); } - shm_du_buff_head_release(sdb, get_head_len(pci)); + /* Depending on the type a different deserialization. */ + memcpy(&pci->type, head, TYPE_SIZE); + offset += TYPE_SIZE; + memcpy(&pci->flags, head + offset, FLAGS_SIZE); + offset += FLAGS_SIZE; + memcpy(&pci->seqno, head + offset, SEQNO_SIZE); + offset += SEQNO_SIZE; + + if (pci->type & PDU_TYPE_CONFIG) { + memcpy(&pci->cflags, head + offset, CONF_FLAGS_SIZE); + /* offset += CONF_FLAGS_SIZE; */ + } + + shm_du_buff_head_release(sdb, head_len(pci)); return 0; } diff --git a/src/lib/frct_pci.h b/src/lib/frct_pci.h new file mode 100644 index 00000000..fbbfd354 --- /dev/null +++ b/src/lib/frct_pci.h @@ -0,0 +1,67 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_FRCT_PCI_H +#define OUROBOROS_LIB_FRCT_PCI_H + +#include + +#include +#include + +struct frct_pci { + /* Present in every PDU. */ + uint8_t type; + uint8_t flags; + uint64_t seqno; + + /* Present in config PDU. */ + uint16_t cflags; + + /* Present in flow control PDU. */ + uint64_t lwe; + uint64_t rwe; +}; + +enum pdu_types { + PDU_TYPE_DATA = 0x01, + PDU_TYPE_ACK = 0x02, + PDU_TYPE_FC = 0x04, + PDU_TYPE_ACK_AND_FC = (PDU_TYPE_ACK | PDU_TYPE_FC), + PDU_TYPE_RENDEZ_VOUS = 0x08, + PDU_TYPE_CONFIG = 0x10 +}; + +enum data_flags { + FLAG_DATA_RUN = 0x01, + FLAG_MORE_FRAGMENTS = 0x02 +}; + +int frct_pci_ser(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check); + +int frct_pci_des(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check); + +#endif /* OUROBOROS_LIB_FRCT_PCI_H */ diff --git a/src/lib/rq.c b/src/lib/rq.c index bd0594b5..ba425236 100644 --- a/src/lib/rq.c +++ b/src/lib/rq.c @@ -20,7 +20,7 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include +#include "rq.h" #include @@ -77,11 +77,11 @@ int rq_push(struct rq * rq, return -1; i = ++rq->n_items; - j = i / 2; + j = i >> 1; while (i > 1 && rq->items[j].seqno > seqno) { rq->items[i] = rq->items[j]; i = j; - j = j / 2; + j >>= 1; } rq->items[i].seqno = seqno; @@ -121,7 +121,7 @@ size_t rq_pop(struct rq * rq) i = 1; while (true) { k = i; - j = 2 * i; + j = i << 1; if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno) k = j; diff --git a/src/lib/rq.h b/src/lib/rq.h new file mode 100644 index 00000000..7c024c11 --- /dev/null +++ b/src/lib/rq.h @@ -0,0 +1,47 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_RQ_H +#define OUROBOROS_LIB_RQ_H + +#include +#include +#include + +struct rq * rq_create(int size); + +void rq_destroy(struct rq * rq); + +int rq_push(struct rq * rq, + uint64_t seqno, + size_t idx); + +uint64_t rq_peek(struct rq * rq); + +bool rq_is_empty(struct rq * rq); + +size_t rq_pop(struct rq * rq); + +bool rq_has(struct rq * rq, + uint64_t seqno); + +#endif /* OUROBOROS_LIB_RQ_H */ diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c index e2d0f435..7b57cf30 100644 --- a/src/lib/tests/rq_test.c +++ b/src/lib/tests/rq_test.c @@ -20,7 +20,7 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include +#include "rq.h" #include -- cgit v1.2.3