diff options
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 719 |
1 files changed, 218 insertions, 501 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 28a99bc4..ff22cca6 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -38,22 +38,19 @@ #include <ouroboros/fqueue.h> #include <ouroboros/qoscube.h> #include <ouroboros/timerwheel.h> -#include <ouroboros/frct_pci.h> -#include <ouroboros/rq.h> + +#include "frct_pci.h" +#include "rq.h" #include <stdlib.h> #include <string.h> #include <stdio.h> #include <stdarg.h> +#include <stdbool.h> +#include <sys/types.h> #define BUF_SIZE 1500 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - -#define MPL 2000 /* ms */ -#define RQ_SIZE 20 - #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif @@ -76,26 +73,6 @@ enum port_state { PORT_DESTROY }; -struct frcti { - bool used; - - struct timespec last_snd; - bool snd_drf; - uint64_t snd_lwe; - uint64_t snd_rwe; - - struct timespec last_rcv; - bool rcv_drf; - uint64_t rcv_lwe; - uint64_t rcv_rwe; - - uint16_t conf_flags; - - struct rq * rq; - - pthread_rwlock_t lock; -}; - struct port { int fd; @@ -119,6 +96,8 @@ struct flow { bool rcv_timesout; struct timespec snd_timeo; struct timespec rcv_timeo; + + struct frcti * frcti; }; struct { @@ -132,13 +111,15 @@ struct { struct bmp * fds; struct bmp * fqueues; + struct flow * flows; struct port * ports; - struct frcti * frcti; pthread_rwlock_t lock; } ai; +#include "frct.c" + static void port_destroy(struct port * p) { pthread_mutex_lock(&p->state_lock); @@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id) enum port_state state; struct port * p; - pthread_rwlock_rdlock(&ai.lock); - p = &ai.ports[port_id]; - pthread_rwlock_unlock(&ai.lock); - pthread_mutex_lock(&p->state_lock); if (p->state == PORT_ID_ASSIGNED) { @@ -245,275 +222,8 @@ static int api_announce(char * ap_name) return ret; } -/* Call under flows lock. */ -static int finalize_write(int fd, - size_t idx) -{ - int ret; - - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); - if (ret < 0) - return ret; - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - return ret; -} - -static int frcti_init(int fd) -{ - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - frcti->used = true; - - frcti->snd_drf = true; - frcti->snd_lwe = 0; - frcti->snd_rwe = 0; - - frcti->rcv_drf = true; - frcti->rcv_lwe = 0; - frcti->rcv_rwe = 0; - - frcti->conf_flags = 0; - - frcti->rq = rq_create(RQ_SIZE); - if (frcti->rq == NULL) - return -1; - - return 0; -} - -static void frcti_clear(int fd) -{ - ai.frcti[fd].used = false; -} - -static void frcti_fini(int fd) -{ - /* - * FIXME: In case of reliable transmission we should - * make sure everything is acked. - */ - - frcti_clear(fd); - - rq_destroy(ai.frcti[fd].rq); -} - -static int frcti_send(int fd, - struct frct_pci * pci, - struct shm_du_buff * sdb) -{ - struct timespec now = {0, 0}; - struct frcti * frcti; - int ret; - - frcti = &(ai.frcti[fd]); - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_wrlock(&frcti->lock); - - /* Check if sender inactivity is true. */ - if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL) - frcti->snd_drf = true; - - /* Set the DRF in the first packet of a new run of SDUs. */ - if (frcti->snd_drf) { - pci->flags |= FLAG_DATA_RUN; - frcti->snd_drf = false; - } - - frcti->last_snd = now; - - pci->seqno = frcti->snd_lwe++; - - if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) { - pthread_rwlock_unlock(&frcti->lock); - return -1; - } - - ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); - if (ret < 0) { - pthread_rwlock_unlock(&frcti->lock); - return ret; - } - - pthread_rwlock_unlock(&frcti->lock); - - return 0; -} - - -static int frcti_configure(int fd, - uint16_t flags) -{ - struct frcti * frcti; - struct frct_pci pci; - struct shm_du_buff * sdb; - - frcti = &(ai.frcti[fd]); - - memset(&pci, 0, sizeof(pci)); - - if (ipcp_sdb_reserve(&sdb, 0)) - return -1; - - pci.conf_flags = flags; - - /* Always set the DRF on a configure message. */ - pci.flags |= FLAG_DATA_RUN; - pci.type |= PDU_TYPE_CONFIG; - - pthread_rwlock_wrlock(&frcti->lock); - - frcti->conf_flags = pci.conf_flags; - - pthread_rwlock_unlock(&frcti->lock); - - if (frcti_send(fd, &pci, sdb)) { - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); - return -1; - } - - return 0; -} - -static int frcti_write(int fd, - struct shm_du_buff * sdb) -{ - struct frct_pci pci; - - memset(&pci, 0, sizeof(pci)); - - pci.type |= PDU_TYPE_DATA; - - return frcti_send(fd, &pci, sdb); -} - -static ssize_t frcti_read(int fd) -{ - ssize_t idx; - struct frcti * frcti; - struct frct_pci pci; - struct shm_du_buff * sdb; - uint64_t seqno; - bool nxt_pdu = true; - - frcti = &(ai.frcti[fd]); - - /* See if we already have the next PDU */ - pthread_rwlock_wrlock(&frcti->lock); - - if (!rq_is_empty(frcti->rq)) { - seqno = rq_peek(frcti->rq); - if (seqno == frcti->rcv_lwe) { - frcti->rcv_lwe++; - idx = rq_pop(frcti->rq); - pthread_rwlock_unlock(&frcti->lock); - return idx; - } - } - - pthread_rwlock_unlock(&frcti->lock); - - do { - struct timespec now; - struct timespec abs; - struct timespec * abstime = NULL; - struct shm_rbuff * rb; - bool noblock; - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_rdlock(&ai.lock); - - noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK; - rb = ai.flows[fd].rx_rb; - - if (ai.flows[fd].rcv_timesout) { - ts_add(&now, &ai.flows[fd].rcv_timeo, &abs); - abstime = &abs; - } - - pthread_rwlock_unlock(&ai.lock); - - if (noblock) { - idx = shm_rbuff_read(rb); - } else { - idx = shm_rbuff_read_b(rb, abstime); - clock_gettime(CLOCK_REALTIME_COARSE, &now); - } - - if (idx < 0) - return idx; - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - - pthread_rwlock_wrlock(&frcti->lock); - - /* SDU may be corrupted. */ - if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) { - pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } - - /* Check if receiver inactivity is true. */ - if (!frcti->rcv_drf && - ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) - frcti->rcv_drf = true; - - /* When there is receiver inactivity queue the packet. */ - if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { - if (rq_push(frcti->rq, pci.seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&frcti->lock); - return -EAGAIN; - } - - /* If the DRF is set, reset the state of the connection. */ - if (pci.flags & FLAG_DATA_RUN) - frcti->rcv_lwe = pci.seqno; - - if (pci.type & PDU_TYPE_CONFIG) - frcti->conf_flags = pci.conf_flags; - - if (frcti->rcv_drf) - frcti->rcv_drf = false; - - frcti->last_rcv = now; - - nxt_pdu = true; - - if (!(pci.type & PDU_TYPE_DATA)) { - shm_rdrbuff_remove(ai.rdrb, idx); - nxt_pdu = false; - } - - if (frcti->conf_flags & FRCTFORDERING) { - if (pci.seqno != frcti->rcv_lwe) { - if (rq_push(frcti->rq, pci.seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - nxt_pdu = false; - } else { - frcti->rcv_lwe++; - } - } - - pthread_rwlock_unlock(&frcti->lock); - - } while (!nxt_pdu); - - return idx; -} - static void flow_clear(int fd) { - assert(!(fd < 0)); - memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); ai.flows[fd].port_id = -1; @@ -525,8 +235,10 @@ static void flow_fini(int fd) { assert(!(fd < 0)); - if (ai.flows[fd].port_id != -1) + if (ai.flows[fd].port_id != -1) { port_destroy(&ai.ports[ai.flows[fd].port_id]); + bmp_release(ai.fds, fd); + } if (ai.flows[fd].rx_rb != NULL) shm_rbuff_close(ai.flows[fd].rx_rb); @@ -537,8 +249,8 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); - if (ai.frcti[fd].used) - frcti_fini(fd); + if (ai.flows[fd].frcti != NULL) + frcti_destroy(ai.flows[fd].frcti); flow_clear(fd); } @@ -548,37 +260,27 @@ static int flow_init(int port_id, qoscube_t qc) { int fd; + int err = -ENOMEM; pthread_rwlock_wrlock(&ai.lock); fd = bmp_allocate(ai.fds); if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.lock); - return -EBADF; + err = -EBADF; + goto fail_fds; } ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); - if (ai.flows[fd].rx_rb == NULL) { - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].rx_rb == NULL) + goto fail; ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); - if (ai.flows[fd].tx_rb == NULL) { - flow_fini(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].tx_rb == NULL) + goto fail; ai.flows[fd].set = shm_flow_set_open(api); - if (ai.flows[fd].set == NULL) { - flow_fini(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].set == NULL) + goto fail; ai.flows[fd].port_id = port_id; ai.flows[fd].oflags = FLOWFDEFAULT; @@ -593,6 +295,12 @@ static int flow_init(int port_id, pthread_rwlock_unlock(&ai.lock); return fd; + + fail: + flow_fini(fd); + fail_fds: + pthread_rwlock_unlock(&ai.lock); + return err; } static bool check_python(char * str) @@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int argc, { const char * ap_name = argv[0]; int i; - int j; (void) argc; (void) envp; @@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int argc, if (ai.flows == NULL) goto fail_flows; - ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); - if (ai.frcti == NULL) - goto fail_frcti; - - for (i = 0; i < AP_MAX_FLOWS; ++i) { + for (i = 0; i < AP_MAX_FLOWS; ++i) flow_clear(i); - frcti_clear(i); - - if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) { - for (j = i - 1; j >= 0 ; j--) - pthread_rwlock_destroy(&ai.frcti[j].lock); - goto fail_frct_lock; - } - } ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); if (ai.ports == NULL) @@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); - if (ai.tw == NULL) - goto fail_timerwheel; + if (frct_init()) + goto fail_frct; return; - fail_timerwheel: + fail_frct: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int argc, fail_ap_name: free(ai.ports); fail_ports: - for (i = 0; i < AP_MAX_FLOWS; ++i) - pthread_rwlock_destroy(&ai.frcti[i].lock); - fail_frct_lock: - free(ai.frcti); - fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); @@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void) if (ai.fds == NULL) return; - bmp_destroy(ai.fds); - bmp_destroy(ai.fqueues); + frct_fini(); shm_flow_set_destroy(ai.fqset); if (ai.ap_name != NULL) free(ai.ap_name); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { if (ai.flows[i].port_id != -1) { @@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void) shm_rdrbuff_remove(ai.rdrb, idx); flow_fini(i); } - - pthread_rwlock_destroy(&ai.frcti[i].lock); } for (i = 0; i < SYS_MAX_FLOWS; ++i) { @@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void) free(ai.flows); free(ai.ports); - free(ai.frcti); + + bmp_destroy(ai.fds); + bmp_destroy(ai.fqueues); pthread_rwlock_unlock(&ai.lock); @@ -825,7 +513,16 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - frcti_init(fd); + /* FIXME: check if FRCT is needed based on qc? */ + + assert(ai.flows[fd].frcti == NULL); + + ai.flows[fd].frcti = frcti_create(fd); + if (ai.flows[fd].frcti == NULL) { + flow_fini(fd); + pthread_rwlock_unlock(&ai.lock); + return -ENOMEM; + } if (qs != NULL) *qs = ai.flows[fd].spec; @@ -891,7 +588,15 @@ int flow_alloc(const char * dst_name, pthread_rwlock_wrlock(&ai.lock); - frcti_init(fd); + /* FIXME: check if FRCT is needed based on qc? */ + assert(ai.flows[fd].frcti == NULL); + + ai.flows[fd].frcti = frcti_create(fd); + if (ai.flows[fd].frcti == NULL) { + flow_fini(fd); + pthread_rwlock_unlock(&ai.lock); + return -ENOMEM; + } pthread_rwlock_unlock(&ai.lock); @@ -913,7 +618,7 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&ai.lock); - assert(!(ai.flows[fd].port_id < 0)); + assert(ai.flows[fd].port_id >= 0); msg.port_id = ai.flows[fd].port_id; @@ -933,7 +638,6 @@ int flow_dealloc(int fd) pthread_rwlock_wrlock(&ai.lock); flow_fini(fd); - bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.lock); @@ -944,6 +648,7 @@ int fccntl(int fd, int cmd, ...) { + uint16_t sflags; uint32_t * fflags; uint16_t * cflags; va_list l; @@ -951,15 +656,18 @@ int fccntl(int fd, qosspec_t * qs; uint32_t rx_acl; uint32_t tx_acl; + struct flow * flow; if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + va_start(l, cmd); pthread_rwlock_wrlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -969,57 +677,57 @@ int fccntl(int fd, case FLOWSSNDTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) { - ai.flows[fd].snd_timesout = false; + flow->snd_timesout = false; } else { - ai.flows[fd].snd_timesout = true; - ai.flows[fd].snd_timeo = *timeo; + flow->snd_timesout = true; + flow->snd_timeo = *timeo; } break; case FLOWGSNDTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) goto einval; - if (!ai.flows[fd].snd_timesout) + if (!flow->snd_timesout) goto eperm; - *timeo = ai.flows[fd].snd_timeo; + *timeo = flow->snd_timeo; break; case FLOWSRCVTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) { - ai.flows[fd].rcv_timesout = false; + flow->rcv_timesout = false; } else { - ai.flows[fd].rcv_timesout = true; - ai.flows[fd].rcv_timeo = *timeo; + flow->rcv_timesout = true; + flow->rcv_timeo = *timeo; } break; case FLOWGRCVTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) goto einval; - if (!ai.flows[fd].rcv_timesout) + if (!flow->rcv_timesout) goto eperm; - *timeo = ai.flows[fd].snd_timeo; + *timeo = flow->snd_timeo; break; case FLOWGQOSSPEC: qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = ai.flows[fd].spec; + *qs = flow->spec; break; case FLOWSFLAGS: - ai.flows[fd].oflags = va_arg(l, uint32_t); - rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); - tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); + flow->oflags = va_arg(l, uint32_t); + rx_acl = shm_rbuff_get_acl(flow->rx_rb); + tx_acl = shm_rbuff_get_acl(flow->rx_rb); /* * Making our own flow write only means making the * the other side of the flow read only. */ - if (ai.flows[fd].oflags & FLOWFWRONLY) + if (flow->oflags & FLOWFWRONLY) rx_acl |= ACL_RDONLY; - if (ai.flows[fd].oflags & FLOWFRDWR) + if (flow->oflags & FLOWFRDWR) rx_acl |= ACL_RDWR; - if (ai.flows[fd].oflags & FLOWFDOWN) { + if (flow->oflags & FLOWFDOWN) { rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; } else { @@ -1027,26 +735,28 @@ int fccntl(int fd, tx_acl &= ~ACL_FLOWDOWN; } - shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl); - shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl); + shm_rbuff_set_acl(flow->rx_rb, rx_acl); + shm_rbuff_set_acl(flow->tx_rb, tx_acl); break; case FLOWGFLAGS: fflags = va_arg(l, uint32_t *); if (fflags == NULL) goto einval; - *fflags = ai.flows[fd].oflags; + *fflags = flow->oflags; break; case FRCTSFLAGS: - ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int); + sflags = (uint16_t) va_arg(l, int); + if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags)) + goto eperm; break; case FRCTGFLAGS: cflags = (uint16_t *) va_arg(l, int *); if (cflags == NULL) goto einval; - *cflags = ai.frcti[fd].conf_flags; - if (frcti_configure(fd, ai.frcti[fd].conf_flags)) + if (flow->frcti == NULL) goto eperm; + *cflags = frcti_getconf(flow->frcti); break; default: pthread_rwlock_unlock(&ai.lock); @@ -1075,8 +785,10 @@ ssize_t flow_write(int fd, const void * buf, size_t count) { - ssize_t idx; - int ret; + struct flow * flow; + ssize_t idx; + int ret; + int flags; if (buf == NULL) return 0; @@ -1084,104 +796,110 @@ ssize_t flow_write(int fd, if (fd < 0 || fd > AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { - pthread_rwlock_unlock(&ai.lock); - return -EPERM; - } + flags = flow->oflags; - if (ai.flows[fd].oflags & FLOWFWNOBLOCK) { - idx = shm_rdrbuff_write(ai.rdrb, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - buf, - count); - if (idx < 0) { - pthread_rwlock_unlock(&ai.lock); - return idx; - } + pthread_rwlock_unlock(&ai.lock); - } else { /* Blocking. */ - pthread_rwlock_unlock(&ai.lock); + if ((flags & FLOWFACCMODE) == FLOWFRDONLY) + return -EPERM; + if (flags & FLOWFWNOBLOCK) + idx = shm_rdrbuff_write(ai.rdrb, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + buf, + count); + else /* Blocking. */ idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (idx < 0) - return idx; + if (idx < 0) + return idx; - pthread_rwlock_rdlock(&ai.lock); + if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; } - if (!ai.frcti[fd].used) { - ret = finalize_write(fd, idx); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return ret; - } + pthread_rwlock_rdlock(&ai.lock); - pthread_rwlock_unlock(&ai.lock); - } else { - pthread_rwlock_unlock(&ai.lock); + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret < 0) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx)); - if (ret < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - return ret; - } - } + pthread_rwlock_unlock(&ai.lock); - return 0; + assert(ret <= 0); + + return ret; } ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx; - ssize_t n; - uint8_t * sdu; - bool used; - struct shm_rbuff * rb; + ssize_t idx; + ssize_t n; + uint8_t * sdu; + struct shm_rbuff * rb; + struct shm_du_buff * sdb; + struct timespec now; + struct timespec abs; + struct timespec * abstime = NULL; + struct flow * flow; + bool noblock; if (fd < 0 || fd > AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - used = ai.frcti[fd].used; - rb = ai.flows[fd].rx_rb; + rb = flow->rx_rb; + noblock = flow->oflags & FLOWFRNOBLOCK; - pthread_rwlock_unlock(&ai.lock); + if (ai.flows[fd].rcv_timesout) { + ts_add(&now, &flow->rcv_timeo, &abs); + abstime = &abs; + } - if (!used) - idx = shm_rbuff_read(rb); - else - idx = frcti_read(fd); + pthread_rwlock_unlock(&ai.lock); + idx = frcti_queued_pdu(flow->frcti); if (idx < 0) { - assert(idx == -EAGAIN || idx == -ETIMEDOUT || - idx == -EFLOWDOWN); - return idx; + do { + idx = noblock ? shm_rbuff_read(rb) : + shm_rbuff_read_b(rb, abstime); + if (idx < 0) + return idx; + sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, sdb) != 0); } n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); - if (n < 0) - return -1; + + assert(n >= 0); memcpy(buf, sdu, MIN((size_t) n, count)); @@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api, if (recv_msg == NULL) return -EIRMD; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd, if (recv_msg == NULL) return -EIRMD; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd, int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) { - ssize_t idx = -1; - int port_id = -1; + struct flow * flow; + struct shm_rbuff * rb; + ssize_t idx; assert(fd >= 0); assert(sdb); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if ((port_id = ai.flows[fd].port_id) < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(flow->port_id >= 0); - pthread_rwlock_unlock(&ai.lock); + rb = flow->rx_rb; - if (!ai.frcti[fd].used) - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - else - idx = frcti_read(fd); + pthread_rwlock_unlock(&ai.lock); - if (idx < 0) - return idx; + if (flow->frcti != NULL) { + idx = frcti_queued_pdu(flow->frcti); + if (idx >= 0) { + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + return 0; + } + } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + do { + idx = shm_rbuff_read(rb); + if (idx < 0) + return idx; + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, *sdb) != 0); return 0; } @@ -1555,53 +1280,49 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - int ret; + struct flow * flow; + int ret; + ssize_t idx; - if (sdb == NULL) - return -EINVAL; + assert(sdb); + + flow = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(flow->port_id >= 0); - if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { + if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); return -EPERM; } - assert(ai.flows[fd].tx_rb); + assert(flow->tx_rb); - if (!ai.frcti[fd].used) { - ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - return ret; - } + idx = shm_du_buff_get_idx(sdb); + if (frcti_snd(flow->frcti, sdb) < 0) { pthread_rwlock_unlock(&ai.lock); - } else { - pthread_rwlock_unlock(&ai.lock); - - ret = frcti_write(fd, sdb); - if (ret < 0) - return ret; + return -ENOMEM; } - return 0; + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret == 0) + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.lock); + + assert(ret <= 0); + + return ret; } int ipcp_sdb_reserve(struct shm_du_buff ** sdb, size_t len) { - struct shm_rdrbuff * rdrb; - ssize_t idx; - - rdrb = ai.rdrb; + ssize_t idx; - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, NULL, @@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb, if (idx < 0) return -1; - *sdb = shm_rdrbuff_get(rdrb, idx); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); return 0; } +void ipcp_sdb_release(struct shm_du_buff * sdb) +{ + shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); +} + void ipcp_flow_fini(int fd) { struct shm_rbuff * rx_rb; + assert(fd >= 0); + fccntl(fd, FLOWSFLAGS, FLOWFWRONLY); pthread_rwlock_rdlock(&ai.lock); @@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd) int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL) - return -EINVAL; + assert(fd >= 0); + assert(cube); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(ai.flows[fd].port_id >= 0); *cube = ai.flows[fd].cube; @@ -1670,28 +1395,20 @@ int local_flow_write(int fd, { int ret; - if (fd < 0) - return -EINVAL; + assert(fd >= 0); pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); return -ENOTALLOC; } - ret = finalize_write(fd, idx); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - return ret; - } + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret == 0) + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.lock); - return 0; -} - -void ipcp_sdb_release(struct shm_du_buff * sdb) -{ - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); + return ret; } |