summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri.staessens@ugent.be>2017-11-06 21:38:55 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2017-11-07 11:22:45 +0100
commiteef84a2afd2aa0d21072f6e7ef038fe10dcc245d (patch)
tree201297fccdaaf865283039b3bf39b13e5e50bfda /src/lib/dev.c
parentf5c60ee47c097d7408470e4be6182bf9ee684e84 (diff)
downloadouroboros-eef84a2afd2aa0d21072f6e7ef038fe10dcc245d.tar.gz
ouroboros-eef84a2afd2aa0d21072f6e7ef038fe10dcc245d.zip
lib: Refactor FRCT implementation
The frct_pci and rq headers are moved from include/ouroboros to src/lib since they are only needed in the library. FRCT is moved to its own source file. FRCT takes the application PDUs, encapsulates and processes them and hands them back. This makes it easier to disable FRCT should the application want to write to a "raw" flow. An FRCT instance is now allocated upon alloc and released upon dealloc. The FRCT data structure is split into a sender and receiver connection record. Setting a new configuration will now be done upon sending the next data PDU, which will flag the DRF for a new run and use that configuration. This avoids some issues should packets arrive out-of-order, and simplifies setting a configuration. Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be> Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c719
1 files changed, 218 insertions, 501 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 28a99bc4..ff22cca6 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -38,22 +38,19 @@
#include <ouroboros/fqueue.h>
#include <ouroboros/qoscube.h>
#include <ouroboros/timerwheel.h>
-#include <ouroboros/frct_pci.h>
-#include <ouroboros/rq.h>
+
+#include "frct_pci.h"
+#include "rq.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
+#include <stdbool.h>
+#include <sys/types.h>
#define BUF_SIZE 1500
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
-
-#define MPL 2000 /* ms */
-#define RQ_SIZE 20
-
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
@@ -76,26 +73,6 @@ enum port_state {
PORT_DESTROY
};
-struct frcti {
- bool used;
-
- struct timespec last_snd;
- bool snd_drf;
- uint64_t snd_lwe;
- uint64_t snd_rwe;
-
- struct timespec last_rcv;
- bool rcv_drf;
- uint64_t rcv_lwe;
- uint64_t rcv_rwe;
-
- uint16_t conf_flags;
-
- struct rq * rq;
-
- pthread_rwlock_t lock;
-};
-
struct port {
int fd;
@@ -119,6 +96,8 @@ struct flow {
bool rcv_timesout;
struct timespec snd_timeo;
struct timespec rcv_timeo;
+
+ struct frcti * frcti;
};
struct {
@@ -132,13 +111,15 @@ struct {
struct bmp * fds;
struct bmp * fqueues;
+
struct flow * flows;
struct port * ports;
- struct frcti * frcti;
pthread_rwlock_t lock;
} ai;
+#include "frct.c"
+
static void port_destroy(struct port * p)
{
pthread_mutex_lock(&p->state_lock);
@@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id)
enum port_state state;
struct port * p;
- pthread_rwlock_rdlock(&ai.lock);
-
p = &ai.ports[port_id];
- pthread_rwlock_unlock(&ai.lock);
-
pthread_mutex_lock(&p->state_lock);
if (p->state == PORT_ID_ASSIGNED) {
@@ -245,275 +222,8 @@ static int api_announce(char * ap_name)
return ret;
}
-/* Call under flows lock. */
-static int finalize_write(int fd,
- size_t idx)
-{
- int ret;
-
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- if (ret < 0)
- return ret;
-
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
-
- return ret;
-}
-
-static int frcti_init(int fd)
-{
- struct frcti * frcti;
-
- frcti = &(ai.frcti[fd]);
-
- frcti->used = true;
-
- frcti->snd_drf = true;
- frcti->snd_lwe = 0;
- frcti->snd_rwe = 0;
-
- frcti->rcv_drf = true;
- frcti->rcv_lwe = 0;
- frcti->rcv_rwe = 0;
-
- frcti->conf_flags = 0;
-
- frcti->rq = rq_create(RQ_SIZE);
- if (frcti->rq == NULL)
- return -1;
-
- return 0;
-}
-
-static void frcti_clear(int fd)
-{
- ai.frcti[fd].used = false;
-}
-
-static void frcti_fini(int fd)
-{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything is acked.
- */
-
- frcti_clear(fd);
-
- rq_destroy(ai.frcti[fd].rq);
-}
-
-static int frcti_send(int fd,
- struct frct_pci * pci,
- struct shm_du_buff * sdb)
-{
- struct timespec now = {0, 0};
- struct frcti * frcti;
- int ret;
-
- frcti = &(ai.frcti[fd]);
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- /* Check if sender inactivity is true. */
- if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL)
- frcti->snd_drf = true;
-
- /* Set the DRF in the first packet of a new run of SDUs. */
- if (frcti->snd_drf) {
- pci->flags |= FLAG_DATA_RUN;
- frcti->snd_drf = false;
- }
-
- frcti->last_snd = now;
-
- pci->seqno = frcti->snd_lwe++;
-
- if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) {
- pthread_rwlock_unlock(&frcti->lock);
- return -1;
- }
-
- ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
- if (ret < 0) {
- pthread_rwlock_unlock(&frcti->lock);
- return ret;
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- return 0;
-}
-
-
-static int frcti_configure(int fd,
- uint16_t flags)
-{
- struct frcti * frcti;
- struct frct_pci pci;
- struct shm_du_buff * sdb;
-
- frcti = &(ai.frcti[fd]);
-
- memset(&pci, 0, sizeof(pci));
-
- if (ipcp_sdb_reserve(&sdb, 0))
- return -1;
-
- pci.conf_flags = flags;
-
- /* Always set the DRF on a configure message. */
- pci.flags |= FLAG_DATA_RUN;
- pci.type |= PDU_TYPE_CONFIG;
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- frcti->conf_flags = pci.conf_flags;
-
- pthread_rwlock_unlock(&frcti->lock);
-
- if (frcti_send(fd, &pci, sdb)) {
- shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
- return -1;
- }
-
- return 0;
-}
-
-static int frcti_write(int fd,
- struct shm_du_buff * sdb)
-{
- struct frct_pci pci;
-
- memset(&pci, 0, sizeof(pci));
-
- pci.type |= PDU_TYPE_DATA;
-
- return frcti_send(fd, &pci, sdb);
-}
-
-static ssize_t frcti_read(int fd)
-{
- ssize_t idx;
- struct frcti * frcti;
- struct frct_pci pci;
- struct shm_du_buff * sdb;
- uint64_t seqno;
- bool nxt_pdu = true;
-
- frcti = &(ai.frcti[fd]);
-
- /* See if we already have the next PDU */
- pthread_rwlock_wrlock(&frcti->lock);
-
- if (!rq_is_empty(frcti->rq)) {
- seqno = rq_peek(frcti->rq);
- if (seqno == frcti->rcv_lwe) {
- frcti->rcv_lwe++;
- idx = rq_pop(frcti->rq);
- pthread_rwlock_unlock(&frcti->lock);
- return idx;
- }
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- do {
- struct timespec now;
- struct timespec abs;
- struct timespec * abstime = NULL;
- struct shm_rbuff * rb;
- bool noblock;
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_rdlock(&ai.lock);
-
- noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK;
- rb = ai.flows[fd].rx_rb;
-
- if (ai.flows[fd].rcv_timesout) {
- ts_add(&now, &ai.flows[fd].rcv_timeo, &abs);
- abstime = &abs;
- }
-
- pthread_rwlock_unlock(&ai.lock);
-
- if (noblock) {
- idx = shm_rbuff_read(rb);
- } else {
- idx = shm_rbuff_read_b(rb, abstime);
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
- }
-
- if (idx < 0)
- return idx;
-
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- /* SDU may be corrupted. */
- if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) {
- pthread_rwlock_unlock(&frcti->lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
-
- /* Check if receiver inactivity is true. */
- if (!frcti->rcv_drf &&
- ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
- frcti->rcv_drf = true;
-
- /* When there is receiver inactivity queue the packet. */
- if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
- if (rq_push(frcti->rq, pci.seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&frcti->lock);
- return -EAGAIN;
- }
-
- /* If the DRF is set, reset the state of the connection. */
- if (pci.flags & FLAG_DATA_RUN)
- frcti->rcv_lwe = pci.seqno;
-
- if (pci.type & PDU_TYPE_CONFIG)
- frcti->conf_flags = pci.conf_flags;
-
- if (frcti->rcv_drf)
- frcti->rcv_drf = false;
-
- frcti->last_rcv = now;
-
- nxt_pdu = true;
-
- if (!(pci.type & PDU_TYPE_DATA)) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- nxt_pdu = false;
- }
-
- if (frcti->conf_flags & FRCTFORDERING) {
- if (pci.seqno != frcti->rcv_lwe) {
- if (rq_push(frcti->rq, pci.seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- nxt_pdu = false;
- } else {
- frcti->rcv_lwe++;
- }
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- } while (!nxt_pdu);
-
- return idx;
-}
-
static void flow_clear(int fd)
{
- assert(!(fd < 0));
-
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
ai.flows[fd].port_id = -1;
@@ -525,8 +235,10 @@ static void flow_fini(int fd)
{
assert(!(fd < 0));
- if (ai.flows[fd].port_id != -1)
+ if (ai.flows[fd].port_id != -1) {
port_destroy(&ai.ports[ai.flows[fd].port_id]);
+ bmp_release(ai.fds, fd);
+ }
if (ai.flows[fd].rx_rb != NULL)
shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -537,8 +249,8 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL)
shm_flow_set_close(ai.flows[fd].set);
- if (ai.frcti[fd].used)
- frcti_fini(fd);
+ if (ai.flows[fd].frcti != NULL)
+ frcti_destroy(ai.flows[fd].frcti);
flow_clear(fd);
}
@@ -548,37 +260,27 @@ static int flow_init(int port_id,
qoscube_t qc)
{
int fd;
+ int err = -ENOMEM;
pthread_rwlock_wrlock(&ai.lock);
fd = bmp_allocate(ai.fds);
if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.lock);
- return -EBADF;
+ err = -EBADF;
+ goto fail_fds;
}
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].rx_rb == NULL)
+ goto fail;
ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].tx_rb == NULL)
+ goto fail;
ai.flows[fd].set = shm_flow_set_open(api);
- if (ai.flows[fd].set == NULL) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].set == NULL)
+ goto fail;
ai.flows[fd].port_id = port_id;
ai.flows[fd].oflags = FLOWFDEFAULT;
@@ -593,6 +295,12 @@ static int flow_init(int port_id,
pthread_rwlock_unlock(&ai.lock);
return fd;
+
+ fail:
+ flow_fini(fd);
+ fail_fds:
+ pthread_rwlock_unlock(&ai.lock);
+ return err;
}
static bool check_python(char * str)
@@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int argc,
{
const char * ap_name = argv[0];
int i;
- int j;
(void) argc;
(void) envp;
@@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int argc,
if (ai.flows == NULL)
goto fail_flows;
- ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS);
- if (ai.frcti == NULL)
- goto fail_frcti;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
flow_clear(i);
- frcti_clear(i);
-
- if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) {
- for (j = i - 1; j >= 0 ; j--)
- pthread_rwlock_destroy(&ai.frcti[j].lock);
- goto fail_frct_lock;
- }
- }
ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
if (ai.ports == NULL)
@@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int argc,
if (pthread_rwlock_init(&ai.lock, NULL))
goto fail_lock;
- ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
- if (ai.tw == NULL)
- goto fail_timerwheel;
+ if (frct_init())
+ goto fail_frct;
return;
- fail_timerwheel:
+ fail_frct:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int argc,
fail_ap_name:
free(ai.ports);
fail_ports:
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- pthread_rwlock_destroy(&ai.frcti[i].lock);
- fail_frct_lock:
- free(ai.frcti);
- fail_frcti:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
@@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void)
if (ai.fds == NULL)
return;
- bmp_destroy(ai.fds);
- bmp_destroy(ai.fqueues);
+ frct_fini();
shm_flow_set_destroy(ai.fqset);
if (ai.ap_name != NULL)
free(ai.ap_name);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (ai.flows[i].port_id != -1) {
@@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void)
shm_rdrbuff_remove(ai.rdrb, idx);
flow_fini(i);
}
-
- pthread_rwlock_destroy(&ai.frcti[i].lock);
}
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
@@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void)
free(ai.flows);
free(ai.ports);
- free(ai.frcti);
+
+ bmp_destroy(ai.fds);
+ bmp_destroy(ai.fqueues);
pthread_rwlock_unlock(&ai.lock);
@@ -825,7 +513,16 @@ int flow_accept(qosspec_t * qs,
pthread_rwlock_wrlock(&ai.lock);
- frcti_init(fd);
+ /* FIXME: check if FRCT is needed based on qc? */
+
+ assert(ai.flows[fd].frcti == NULL);
+
+ ai.flows[fd].frcti = frcti_create(fd);
+ if (ai.flows[fd].frcti == NULL) {
+ flow_fini(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOMEM;
+ }
if (qs != NULL)
*qs = ai.flows[fd].spec;
@@ -891,7 +588,15 @@ int flow_alloc(const char * dst_name,
pthread_rwlock_wrlock(&ai.lock);
- frcti_init(fd);
+ /* FIXME: check if FRCT is needed based on qc? */
+ assert(ai.flows[fd].frcti == NULL);
+
+ ai.flows[fd].frcti = frcti_create(fd);
+ if (ai.flows[fd].frcti == NULL) {
+ flow_fini(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOMEM;
+ }
pthread_rwlock_unlock(&ai.lock);
@@ -913,7 +618,7 @@ int flow_dealloc(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(!(ai.flows[fd].port_id < 0));
+ assert(ai.flows[fd].port_id >= 0);
msg.port_id = ai.flows[fd].port_id;
@@ -933,7 +638,6 @@ int flow_dealloc(int fd)
pthread_rwlock_wrlock(&ai.lock);
flow_fini(fd);
- bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.lock);
@@ -944,6 +648,7 @@ int fccntl(int fd,
int cmd,
...)
{
+ uint16_t sflags;
uint32_t * fflags;
uint16_t * cflags;
va_list l;
@@ -951,15 +656,18 @@ int fccntl(int fd,
qosspec_t * qs;
uint32_t rx_acl;
uint32_t tx_acl;
+ struct flow * flow;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
va_start(l, cmd);
pthread_rwlock_wrlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -969,57 +677,57 @@ int fccntl(int fd,
case FLOWSSNDTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL) {
- ai.flows[fd].snd_timesout = false;
+ flow->snd_timesout = false;
} else {
- ai.flows[fd].snd_timesout = true;
- ai.flows[fd].snd_timeo = *timeo;
+ flow->snd_timesout = true;
+ flow->snd_timeo = *timeo;
}
break;
case FLOWGSNDTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL)
goto einval;
- if (!ai.flows[fd].snd_timesout)
+ if (!flow->snd_timesout)
goto eperm;
- *timeo = ai.flows[fd].snd_timeo;
+ *timeo = flow->snd_timeo;
break;
case FLOWSRCVTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL) {
- ai.flows[fd].rcv_timesout = false;
+ flow->rcv_timesout = false;
} else {
- ai.flows[fd].rcv_timesout = true;
- ai.flows[fd].rcv_timeo = *timeo;
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = *timeo;
}
break;
case FLOWGRCVTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL)
goto einval;
- if (!ai.flows[fd].rcv_timesout)
+ if (!flow->rcv_timesout)
goto eperm;
- *timeo = ai.flows[fd].snd_timeo;
+ *timeo = flow->snd_timeo;
break;
case FLOWGQOSSPEC:
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = ai.flows[fd].spec;
+ *qs = flow->spec;
break;
case FLOWSFLAGS:
- ai.flows[fd].oflags = va_arg(l, uint32_t);
- rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
- tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ flow->oflags = va_arg(l, uint32_t);
+ rx_acl = shm_rbuff_get_acl(flow->rx_rb);
+ tx_acl = shm_rbuff_get_acl(flow->rx_rb);
/*
* Making our own flow write only means making the
* the other side of the flow read only.
*/
- if (ai.flows[fd].oflags & FLOWFWRONLY)
+ if (flow->oflags & FLOWFWRONLY)
rx_acl |= ACL_RDONLY;
- if (ai.flows[fd].oflags & FLOWFRDWR)
+ if (flow->oflags & FLOWFRDWR)
rx_acl |= ACL_RDWR;
- if (ai.flows[fd].oflags & FLOWFDOWN) {
+ if (flow->oflags & FLOWFDOWN) {
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
} else {
@@ -1027,26 +735,28 @@ int fccntl(int fd,
tx_acl &= ~ACL_FLOWDOWN;
}
- shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
- shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);
+ shm_rbuff_set_acl(flow->rx_rb, rx_acl);
+ shm_rbuff_set_acl(flow->tx_rb, tx_acl);
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
if (fflags == NULL)
goto einval;
- *fflags = ai.flows[fd].oflags;
+ *fflags = flow->oflags;
break;
case FRCTSFLAGS:
- ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int);
+ sflags = (uint16_t) va_arg(l, int);
+ if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags))
+ goto eperm;
break;
case FRCTGFLAGS:
cflags = (uint16_t *) va_arg(l, int *);
if (cflags == NULL)
goto einval;
- *cflags = ai.frcti[fd].conf_flags;
- if (frcti_configure(fd, ai.frcti[fd].conf_flags))
+ if (flow->frcti == NULL)
goto eperm;
+ *cflags = frcti_getconf(flow->frcti);
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1075,8 +785,10 @@ ssize_t flow_write(int fd,
const void * buf,
size_t count)
{
- ssize_t idx;
- int ret;
+ struct flow * flow;
+ ssize_t idx;
+ int ret;
+ int flags;
if (buf == NULL)
return 0;
@@ -1084,104 +796,110 @@ ssize_t flow_write(int fd,
if (fd < 0 || fd > AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
- pthread_rwlock_unlock(&ai.lock);
- return -EPERM;
- }
+ flags = flow->oflags;
- if (ai.flows[fd].oflags & FLOWFWNOBLOCK) {
- idx = shm_rdrbuff_write(ai.rdrb,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- buf,
- count);
- if (idx < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return idx;
- }
+ pthread_rwlock_unlock(&ai.lock);
- } else { /* Blocking. */
- pthread_rwlock_unlock(&ai.lock);
+ if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
+ return -EPERM;
+ if (flags & FLOWFWNOBLOCK)
+ idx = shm_rdrbuff_write(ai.rdrb,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ buf,
+ count);
+ else /* Blocking. */
idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
buf,
count);
- if (idx < 0)
- return idx;
+ if (idx < 0)
+ return idx;
- pthread_rwlock_rdlock(&ai.lock);
+ if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOMEM;
}
- if (!ai.frcti[fd].used) {
- ret = finalize_write(fd, idx);
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return ret;
- }
+ pthread_rwlock_rdlock(&ai.lock);
- pthread_rwlock_unlock(&ai.lock);
- } else {
- pthread_rwlock_unlock(&ai.lock);
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret < 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
- if (ret < 0) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- return ret;
- }
- }
+ pthread_rwlock_unlock(&ai.lock);
- return 0;
+ assert(ret <= 0);
+
+ return ret;
}
ssize_t flow_read(int fd,
void * buf,
size_t count)
{
- ssize_t idx;
- ssize_t n;
- uint8_t * sdu;
- bool used;
- struct shm_rbuff * rb;
+ ssize_t idx;
+ ssize_t n;
+ uint8_t * sdu;
+ struct shm_rbuff * rb;
+ struct shm_du_buff * sdb;
+ struct timespec now;
+ struct timespec abs;
+ struct timespec * abstime = NULL;
+ struct flow * flow;
+ bool noblock;
if (fd < 0 || fd > AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- used = ai.frcti[fd].used;
- rb = ai.flows[fd].rx_rb;
+ rb = flow->rx_rb;
+ noblock = flow->oflags & FLOWFRNOBLOCK;
- pthread_rwlock_unlock(&ai.lock);
+ if (ai.flows[fd].rcv_timesout) {
+ ts_add(&now, &flow->rcv_timeo, &abs);
+ abstime = &abs;
+ }
- if (!used)
- idx = shm_rbuff_read(rb);
- else
- idx = frcti_read(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ idx = frcti_queued_pdu(flow->frcti);
if (idx < 0) {
- assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
- idx == -EFLOWDOWN);
- return idx;
+ do {
+ idx = noblock ? shm_rbuff_read(rb) :
+ shm_rbuff_read_b(rb, abstime);
+ if (idx < 0)
+ return idx;
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, sdb) != 0);
}
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
- if (n < 0)
- return -1;
+
+ assert(n >= 0);
memcpy(buf, sdu, MIN((size_t) n, count));
@@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api,
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd,
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd,
int ipcp_flow_read(int fd,
struct shm_du_buff ** sdb)
{
- ssize_t idx = -1;
- int port_id = -1;
+ struct flow * flow;
+ struct shm_rbuff * rb;
+ ssize_t idx;
assert(fd >= 0);
assert(sdb);
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if ((port_id = ai.flows[fd].port_id) < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(flow->port_id >= 0);
- pthread_rwlock_unlock(&ai.lock);
+ rb = flow->rx_rb;
- if (!ai.frcti[fd].used)
- idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- else
- idx = frcti_read(fd);
+ pthread_rwlock_unlock(&ai.lock);
- if (idx < 0)
- return idx;
+ if (flow->frcti != NULL) {
+ idx = frcti_queued_pdu(flow->frcti);
+ if (idx >= 0) {
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ return 0;
+ }
+ }
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ do {
+ idx = shm_rbuff_read(rb);
+ if (idx < 0)
+ return idx;
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, *sdb) != 0);
return 0;
}
@@ -1555,53 +1280,49 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
- int ret;
+ struct flow * flow;
+ int ret;
+ ssize_t idx;
- if (sdb == NULL)
- return -EINVAL;
+ assert(sdb);
+
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(flow->port_id >= 0);
- if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
+ if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
pthread_rwlock_unlock(&ai.lock);
return -EPERM;
}
- assert(ai.flows[fd].tx_rb);
+ assert(flow->tx_rb);
- if (!ai.frcti[fd].used) {
- ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return ret;
- }
+ idx = shm_du_buff_get_idx(sdb);
+ if (frcti_snd(flow->frcti, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
- } else {
- pthread_rwlock_unlock(&ai.lock);
-
- ret = frcti_write(fd, sdb);
- if (ret < 0)
- return ret;
+ return -ENOMEM;
}
- return 0;
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret == 0)
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ assert(ret <= 0);
+
+ return ret;
}
int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
size_t len)
{
- struct shm_rdrbuff * rdrb;
- ssize_t idx;
-
- rdrb = ai.rdrb;
+ ssize_t idx;
- idx = shm_rdrbuff_write_b(rdrb,
+ idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
NULL,
@@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
if (idx < 0)
return -1;
- *sdb = shm_rdrbuff_get(rdrb, idx);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;
}
+void ipcp_sdb_release(struct shm_du_buff * sdb)
+{
+ shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+}
+
void ipcp_flow_fini(int fd)
{
struct shm_rbuff * rx_rb;
+ assert(fd >= 0);
+
fccntl(fd, FLOWSFLAGS, FLOWFWRONLY);
pthread_rwlock_rdlock(&ai.lock);
@@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd)
int ipcp_flow_get_qoscube(int fd,
qoscube_t * cube)
{
- if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL)
- return -EINVAL;
+ assert(fd >= 0);
+ assert(cube);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(ai.flows[fd].port_id >= 0);
*cube = ai.flows[fd].cube;
@@ -1670,28 +1395,20 @@ int local_flow_write(int fd,
{
int ret;
- if (fd < 0)
- return -EINVAL;
+ assert(fd >= 0);
pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
return -ENOTALLOC;
}
- ret = finalize_write(fd, idx);
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return ret;
- }
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret == 0)
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.lock);
- return 0;
-}
-
-void ipcp_sdb_release(struct shm_du_buff * sdb)
-{
- shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+ return ret;
}