summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c719
1 files changed, 218 insertions, 501 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 28a99bc4..ff22cca6 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -38,22 +38,19 @@
#include <ouroboros/fqueue.h>
#include <ouroboros/qoscube.h>
#include <ouroboros/timerwheel.h>
-#include <ouroboros/frct_pci.h>
-#include <ouroboros/rq.h>
+
+#include "frct_pci.h"
+#include "rq.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
+#include <stdbool.h>
+#include <sys/types.h>
#define BUF_SIZE 1500
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
-
-#define MPL 2000 /* ms */
-#define RQ_SIZE 20
-
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
@@ -76,26 +73,6 @@ enum port_state {
PORT_DESTROY
};
-struct frcti {
- bool used;
-
- struct timespec last_snd;
- bool snd_drf;
- uint64_t snd_lwe;
- uint64_t snd_rwe;
-
- struct timespec last_rcv;
- bool rcv_drf;
- uint64_t rcv_lwe;
- uint64_t rcv_rwe;
-
- uint16_t conf_flags;
-
- struct rq * rq;
-
- pthread_rwlock_t lock;
-};
-
struct port {
int fd;
@@ -119,6 +96,8 @@ struct flow {
bool rcv_timesout;
struct timespec snd_timeo;
struct timespec rcv_timeo;
+
+ struct frcti * frcti;
};
struct {
@@ -132,13 +111,15 @@ struct {
struct bmp * fds;
struct bmp * fqueues;
+
struct flow * flows;
struct port * ports;
- struct frcti * frcti;
pthread_rwlock_t lock;
} ai;
+#include "frct.c"
+
static void port_destroy(struct port * p)
{
pthread_mutex_lock(&p->state_lock);
@@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id)
enum port_state state;
struct port * p;
- pthread_rwlock_rdlock(&ai.lock);
-
p = &ai.ports[port_id];
- pthread_rwlock_unlock(&ai.lock);
-
pthread_mutex_lock(&p->state_lock);
if (p->state == PORT_ID_ASSIGNED) {
@@ -245,275 +222,8 @@ static int api_announce(char * ap_name)
return ret;
}
-/* Call under flows lock. */
-static int finalize_write(int fd,
- size_t idx)
-{
- int ret;
-
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- if (ret < 0)
- return ret;
-
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
-
- return ret;
-}
-
-static int frcti_init(int fd)
-{
- struct frcti * frcti;
-
- frcti = &(ai.frcti[fd]);
-
- frcti->used = true;
-
- frcti->snd_drf = true;
- frcti->snd_lwe = 0;
- frcti->snd_rwe = 0;
-
- frcti->rcv_drf = true;
- frcti->rcv_lwe = 0;
- frcti->rcv_rwe = 0;
-
- frcti->conf_flags = 0;
-
- frcti->rq = rq_create(RQ_SIZE);
- if (frcti->rq == NULL)
- return -1;
-
- return 0;
-}
-
-static void frcti_clear(int fd)
-{
- ai.frcti[fd].used = false;
-}
-
-static void frcti_fini(int fd)
-{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything is acked.
- */
-
- frcti_clear(fd);
-
- rq_destroy(ai.frcti[fd].rq);
-}
-
-static int frcti_send(int fd,
- struct frct_pci * pci,
- struct shm_du_buff * sdb)
-{
- struct timespec now = {0, 0};
- struct frcti * frcti;
- int ret;
-
- frcti = &(ai.frcti[fd]);
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- /* Check if sender inactivity is true. */
- if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL)
- frcti->snd_drf = true;
-
- /* Set the DRF in the first packet of a new run of SDUs. */
- if (frcti->snd_drf) {
- pci->flags |= FLAG_DATA_RUN;
- frcti->snd_drf = false;
- }
-
- frcti->last_snd = now;
-
- pci->seqno = frcti->snd_lwe++;
-
- if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) {
- pthread_rwlock_unlock(&frcti->lock);
- return -1;
- }
-
- ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
- if (ret < 0) {
- pthread_rwlock_unlock(&frcti->lock);
- return ret;
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- return 0;
-}
-
-
-static int frcti_configure(int fd,
- uint16_t flags)
-{
- struct frcti * frcti;
- struct frct_pci pci;
- struct shm_du_buff * sdb;
-
- frcti = &(ai.frcti[fd]);
-
- memset(&pci, 0, sizeof(pci));
-
- if (ipcp_sdb_reserve(&sdb, 0))
- return -1;
-
- pci.conf_flags = flags;
-
- /* Always set the DRF on a configure message. */
- pci.flags |= FLAG_DATA_RUN;
- pci.type |= PDU_TYPE_CONFIG;
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- frcti->conf_flags = pci.conf_flags;
-
- pthread_rwlock_unlock(&frcti->lock);
-
- if (frcti_send(fd, &pci, sdb)) {
- shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
- return -1;
- }
-
- return 0;
-}
-
-static int frcti_write(int fd,
- struct shm_du_buff * sdb)
-{
- struct frct_pci pci;
-
- memset(&pci, 0, sizeof(pci));
-
- pci.type |= PDU_TYPE_DATA;
-
- return frcti_send(fd, &pci, sdb);
-}
-
-static ssize_t frcti_read(int fd)
-{
- ssize_t idx;
- struct frcti * frcti;
- struct frct_pci pci;
- struct shm_du_buff * sdb;
- uint64_t seqno;
- bool nxt_pdu = true;
-
- frcti = &(ai.frcti[fd]);
-
- /* See if we already have the next PDU */
- pthread_rwlock_wrlock(&frcti->lock);
-
- if (!rq_is_empty(frcti->rq)) {
- seqno = rq_peek(frcti->rq);
- if (seqno == frcti->rcv_lwe) {
- frcti->rcv_lwe++;
- idx = rq_pop(frcti->rq);
- pthread_rwlock_unlock(&frcti->lock);
- return idx;
- }
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- do {
- struct timespec now;
- struct timespec abs;
- struct timespec * abstime = NULL;
- struct shm_rbuff * rb;
- bool noblock;
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_rdlock(&ai.lock);
-
- noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK;
- rb = ai.flows[fd].rx_rb;
-
- if (ai.flows[fd].rcv_timesout) {
- ts_add(&now, &ai.flows[fd].rcv_timeo, &abs);
- abstime = &abs;
- }
-
- pthread_rwlock_unlock(&ai.lock);
-
- if (noblock) {
- idx = shm_rbuff_read(rb);
- } else {
- idx = shm_rbuff_read_b(rb, abstime);
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
- }
-
- if (idx < 0)
- return idx;
-
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- /* SDU may be corrupted. */
- if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) {
- pthread_rwlock_unlock(&frcti->lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
-
- /* Check if receiver inactivity is true. */
- if (!frcti->rcv_drf &&
- ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
- frcti->rcv_drf = true;
-
- /* When there is receiver inactivity queue the packet. */
- if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
- if (rq_push(frcti->rq, pci.seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&frcti->lock);
- return -EAGAIN;
- }
-
- /* If the DRF is set, reset the state of the connection. */
- if (pci.flags & FLAG_DATA_RUN)
- frcti->rcv_lwe = pci.seqno;
-
- if (pci.type & PDU_TYPE_CONFIG)
- frcti->conf_flags = pci.conf_flags;
-
- if (frcti->rcv_drf)
- frcti->rcv_drf = false;
-
- frcti->last_rcv = now;
-
- nxt_pdu = true;
-
- if (!(pci.type & PDU_TYPE_DATA)) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- nxt_pdu = false;
- }
-
- if (frcti->conf_flags & FRCTFORDERING) {
- if (pci.seqno != frcti->rcv_lwe) {
- if (rq_push(frcti->rq, pci.seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- nxt_pdu = false;
- } else {
- frcti->rcv_lwe++;
- }
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- } while (!nxt_pdu);
-
- return idx;
-}
-
static void flow_clear(int fd)
{
- assert(!(fd < 0));
-
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
ai.flows[fd].port_id = -1;
@@ -525,8 +235,10 @@ static void flow_fini(int fd)
{
assert(!(fd < 0));
- if (ai.flows[fd].port_id != -1)
+ if (ai.flows[fd].port_id != -1) {
port_destroy(&ai.ports[ai.flows[fd].port_id]);
+ bmp_release(ai.fds, fd);
+ }
if (ai.flows[fd].rx_rb != NULL)
shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -537,8 +249,8 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL)
shm_flow_set_close(ai.flows[fd].set);
- if (ai.frcti[fd].used)
- frcti_fini(fd);
+ if (ai.flows[fd].frcti != NULL)
+ frcti_destroy(ai.flows[fd].frcti);
flow_clear(fd);
}
@@ -548,37 +260,27 @@ static int flow_init(int port_id,
qoscube_t qc)
{
int fd;
+ int err = -ENOMEM;
pthread_rwlock_wrlock(&ai.lock);
fd = bmp_allocate(ai.fds);
if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.lock);
- return -EBADF;
+ err = -EBADF;
+ goto fail_fds;
}
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].rx_rb == NULL)
+ goto fail;
ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].tx_rb == NULL)
+ goto fail;
ai.flows[fd].set = shm_flow_set_open(api);
- if (ai.flows[fd].set == NULL) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].set == NULL)
+ goto fail;
ai.flows[fd].port_id = port_id;
ai.flows[fd].oflags = FLOWFDEFAULT;
@@ -593,6 +295,12 @@ static int flow_init(int port_id,
pthread_rwlock_unlock(&ai.lock);
return fd;
+
+ fail:
+ flow_fini(fd);
+ fail_fds:
+ pthread_rwlock_unlock(&ai.lock);
+ return err;
}
static bool check_python(char * str)
@@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int argc,
{
const char * ap_name = argv[0];
int i;
- int j;
(void) argc;
(void) envp;
@@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int argc,
if (ai.flows == NULL)
goto fail_flows;
- ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS);
- if (ai.frcti == NULL)
- goto fail_frcti;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
flow_clear(i);
- frcti_clear(i);
-
- if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) {
- for (j = i - 1; j >= 0 ; j--)
- pthread_rwlock_destroy(&ai.frcti[j].lock);
- goto fail_frct_lock;
- }
- }
ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
if (ai.ports == NULL)
@@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int argc,
if (pthread_rwlock_init(&ai.lock, NULL))
goto fail_lock;
- ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
- if (ai.tw == NULL)
- goto fail_timerwheel;
+ if (frct_init())
+ goto fail_frct;
return;
- fail_timerwheel:
+ fail_frct:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int argc,
fail_ap_name:
free(ai.ports);
fail_ports:
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- pthread_rwlock_destroy(&ai.frcti[i].lock);
- fail_frct_lock:
- free(ai.frcti);
- fail_frcti:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
@@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void)
if (ai.fds == NULL)
return;
- bmp_destroy(ai.fds);
- bmp_destroy(ai.fqueues);
+ frct_fini();
shm_flow_set_destroy(ai.fqset);
if (ai.ap_name != NULL)
free(ai.ap_name);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (ai.flows[i].port_id != -1) {
@@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void)
shm_rdrbuff_remove(ai.rdrb, idx);
flow_fini(i);
}
-
- pthread_rwlock_destroy(&ai.frcti[i].lock);
}
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
@@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void)
free(ai.flows);
free(ai.ports);
- free(ai.frcti);
+
+ bmp_destroy(ai.fds);
+ bmp_destroy(ai.fqueues);
pthread_rwlock_unlock(&ai.lock);
@@ -825,7 +513,16 @@ int flow_accept(qosspec_t * qs,
pthread_rwlock_wrlock(&ai.lock);
- frcti_init(fd);
+ /* FIXME: check if FRCT is needed based on qc? */
+
+ assert(ai.flows[fd].frcti == NULL);
+
+ ai.flows[fd].frcti = frcti_create(fd);
+ if (ai.flows[fd].frcti == NULL) {
+ flow_fini(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOMEM;
+ }
if (qs != NULL)
*qs = ai.flows[fd].spec;
@@ -891,7 +588,15 @@ int flow_alloc(const char * dst_name,
pthread_rwlock_wrlock(&ai.lock);
- frcti_init(fd);
+ /* FIXME: check if FRCT is needed based on qc? */
+ assert(ai.flows[fd].frcti == NULL);
+
+ ai.flows[fd].frcti = frcti_create(fd);
+ if (ai.flows[fd].frcti == NULL) {
+ flow_fini(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOMEM;
+ }
pthread_rwlock_unlock(&ai.lock);
@@ -913,7 +618,7 @@ int flow_dealloc(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(!(ai.flows[fd].port_id < 0));
+ assert(ai.flows[fd].port_id >= 0);
msg.port_id = ai.flows[fd].port_id;
@@ -933,7 +638,6 @@ int flow_dealloc(int fd)
pthread_rwlock_wrlock(&ai.lock);
flow_fini(fd);
- bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.lock);
@@ -944,6 +648,7 @@ int fccntl(int fd,
int cmd,
...)
{
+ uint16_t sflags;
uint32_t * fflags;
uint16_t * cflags;
va_list l;
@@ -951,15 +656,18 @@ int fccntl(int fd,
qosspec_t * qs;
uint32_t rx_acl;
uint32_t tx_acl;
+ struct flow * flow;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
va_start(l, cmd);
pthread_rwlock_wrlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -969,57 +677,57 @@ int fccntl(int fd,
case FLOWSSNDTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL) {
- ai.flows[fd].snd_timesout = false;
+ flow->snd_timesout = false;
} else {
- ai.flows[fd].snd_timesout = true;
- ai.flows[fd].snd_timeo = *timeo;
+ flow->snd_timesout = true;
+ flow->snd_timeo = *timeo;
}
break;
case FLOWGSNDTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL)
goto einval;
- if (!ai.flows[fd].snd_timesout)
+ if (!flow->snd_timesout)
goto eperm;
- *timeo = ai.flows[fd].snd_timeo;
+ *timeo = flow->snd_timeo;
break;
case FLOWSRCVTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL) {
- ai.flows[fd].rcv_timesout = false;
+ flow->rcv_timesout = false;
} else {
- ai.flows[fd].rcv_timesout = true;
- ai.flows[fd].rcv_timeo = *timeo;
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = *timeo;
}
break;
case FLOWGRCVTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL)
goto einval;
- if (!ai.flows[fd].rcv_timesout)
+ if (!flow->rcv_timesout)
goto eperm;
- *timeo = ai.flows[fd].snd_timeo;
+ *timeo = flow->snd_timeo;
break;
case FLOWGQOSSPEC:
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = ai.flows[fd].spec;
+ *qs = flow->spec;
break;
case FLOWSFLAGS:
- ai.flows[fd].oflags = va_arg(l, uint32_t);
- rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
- tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ flow->oflags = va_arg(l, uint32_t);
+ rx_acl = shm_rbuff_get_acl(flow->rx_rb);
+ tx_acl = shm_rbuff_get_acl(flow->rx_rb);
/*
* Making our own flow write only means making the
* the other side of the flow read only.
*/
- if (ai.flows[fd].oflags & FLOWFWRONLY)
+ if (flow->oflags & FLOWFWRONLY)
rx_acl |= ACL_RDONLY;
- if (ai.flows[fd].oflags & FLOWFRDWR)
+ if (flow->oflags & FLOWFRDWR)
rx_acl |= ACL_RDWR;
- if (ai.flows[fd].oflags & FLOWFDOWN) {
+ if (flow->oflags & FLOWFDOWN) {
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
} else {
@@ -1027,26 +735,28 @@ int fccntl(int fd,
tx_acl &= ~ACL_FLOWDOWN;
}
- shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
- shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);
+ shm_rbuff_set_acl(flow->rx_rb, rx_acl);
+ shm_rbuff_set_acl(flow->tx_rb, tx_acl);
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
if (fflags == NULL)
goto einval;
- *fflags = ai.flows[fd].oflags;
+ *fflags = flow->oflags;
break;
case FRCTSFLAGS:
- ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int);
+ sflags = (uint16_t) va_arg(l, int);
+ if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags))
+ goto eperm;
break;
case FRCTGFLAGS:
cflags = (uint16_t *) va_arg(l, int *);
if (cflags == NULL)
goto einval;
- *cflags = ai.frcti[fd].conf_flags;
- if (frcti_configure(fd, ai.frcti[fd].conf_flags))
+ if (flow->frcti == NULL)
goto eperm;
+ *cflags = frcti_getconf(flow->frcti);
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1075,8 +785,10 @@ ssize_t flow_write(int fd,
const void * buf,
size_t count)
{
- ssize_t idx;
- int ret;
+ struct flow * flow;
+ ssize_t idx;
+ int ret;
+ int flags;
if (buf == NULL)
return 0;
@@ -1084,104 +796,110 @@ ssize_t flow_write(int fd,
if (fd < 0 || fd > AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
- pthread_rwlock_unlock(&ai.lock);
- return -EPERM;
- }
+ flags = flow->oflags;
- if (ai.flows[fd].oflags & FLOWFWNOBLOCK) {
- idx = shm_rdrbuff_write(ai.rdrb,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- buf,
- count);
- if (idx < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return idx;
- }
+ pthread_rwlock_unlock(&ai.lock);
- } else { /* Blocking. */
- pthread_rwlock_unlock(&ai.lock);
+ if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
+ return -EPERM;
+ if (flags & FLOWFWNOBLOCK)
+ idx = shm_rdrbuff_write(ai.rdrb,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ buf,
+ count);
+ else /* Blocking. */
idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
buf,
count);
- if (idx < 0)
- return idx;
+ if (idx < 0)
+ return idx;
- pthread_rwlock_rdlock(&ai.lock);
+ if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOMEM;
}
- if (!ai.frcti[fd].used) {
- ret = finalize_write(fd, idx);
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return ret;
- }
+ pthread_rwlock_rdlock(&ai.lock);
- pthread_rwlock_unlock(&ai.lock);
- } else {
- pthread_rwlock_unlock(&ai.lock);
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret < 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
- if (ret < 0) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- return ret;
- }
- }
+ pthread_rwlock_unlock(&ai.lock);
- return 0;
+ assert(ret <= 0);
+
+ return ret;
}
ssize_t flow_read(int fd,
void * buf,
size_t count)
{
- ssize_t idx;
- ssize_t n;
- uint8_t * sdu;
- bool used;
- struct shm_rbuff * rb;
+ ssize_t idx;
+ ssize_t n;
+ uint8_t * sdu;
+ struct shm_rbuff * rb;
+ struct shm_du_buff * sdb;
+ struct timespec now;
+ struct timespec abs;
+ struct timespec * abstime = NULL;
+ struct flow * flow;
+ bool noblock;
if (fd < 0 || fd > AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- used = ai.frcti[fd].used;
- rb = ai.flows[fd].rx_rb;
+ rb = flow->rx_rb;
+ noblock = flow->oflags & FLOWFRNOBLOCK;
- pthread_rwlock_unlock(&ai.lock);
+ if (ai.flows[fd].rcv_timesout) {
+ ts_add(&now, &flow->rcv_timeo, &abs);
+ abstime = &abs;
+ }
- if (!used)
- idx = shm_rbuff_read(rb);
- else
- idx = frcti_read(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ idx = frcti_queued_pdu(flow->frcti);
if (idx < 0) {
- assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
- idx == -EFLOWDOWN);
- return idx;
+ do {
+ idx = noblock ? shm_rbuff_read(rb) :
+ shm_rbuff_read_b(rb, abstime);
+ if (idx < 0)
+ return idx;
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, sdb) != 0);
}
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
- if (n < 0)
- return -1;
+
+ assert(n >= 0);
memcpy(buf, sdu, MIN((size_t) n, count));
@@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api,
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd,
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd,
int ipcp_flow_read(int fd,
struct shm_du_buff ** sdb)
{
- ssize_t idx = -1;
- int port_id = -1;
+ struct flow * flow;
+ struct shm_rbuff * rb;
+ ssize_t idx;
assert(fd >= 0);
assert(sdb);
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if ((port_id = ai.flows[fd].port_id) < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(flow->port_id >= 0);
- pthread_rwlock_unlock(&ai.lock);
+ rb = flow->rx_rb;
- if (!ai.frcti[fd].used)
- idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- else
- idx = frcti_read(fd);
+ pthread_rwlock_unlock(&ai.lock);
- if (idx < 0)
- return idx;
+ if (flow->frcti != NULL) {
+ idx = frcti_queued_pdu(flow->frcti);
+ if (idx >= 0) {
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ return 0;
+ }
+ }
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ do {
+ idx = shm_rbuff_read(rb);
+ if (idx < 0)
+ return idx;
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, *sdb) != 0);
return 0;
}
@@ -1555,53 +1280,49 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
- int ret;
+ struct flow * flow;
+ int ret;
+ ssize_t idx;
- if (sdb == NULL)
- return -EINVAL;
+ assert(sdb);
+
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(flow->port_id >= 0);
- if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
+ if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
pthread_rwlock_unlock(&ai.lock);
return -EPERM;
}
- assert(ai.flows[fd].tx_rb);
+ assert(flow->tx_rb);
- if (!ai.frcti[fd].used) {
- ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return ret;
- }
+ idx = shm_du_buff_get_idx(sdb);
+ if (frcti_snd(flow->frcti, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
- } else {
- pthread_rwlock_unlock(&ai.lock);
-
- ret = frcti_write(fd, sdb);
- if (ret < 0)
- return ret;
+ return -ENOMEM;
}
- return 0;
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret == 0)
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ assert(ret <= 0);
+
+ return ret;
}
int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
size_t len)
{
- struct shm_rdrbuff * rdrb;
- ssize_t idx;
-
- rdrb = ai.rdrb;
+ ssize_t idx;
- idx = shm_rdrbuff_write_b(rdrb,
+ idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
NULL,
@@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
if (idx < 0)
return -1;
- *sdb = shm_rdrbuff_get(rdrb, idx);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;
}
+void ipcp_sdb_release(struct shm_du_buff * sdb)
+{
+ shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+}
+
void ipcp_flow_fini(int fd)
{
struct shm_rbuff * rx_rb;
+ assert(fd >= 0);
+
fccntl(fd, FLOWSFLAGS, FLOWFWRONLY);
pthread_rwlock_rdlock(&ai.lock);
@@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd)
int ipcp_flow_get_qoscube(int fd,
qoscube_t * cube)
{
- if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL)
- return -EINVAL;
+ assert(fd >= 0);
+ assert(cube);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(ai.flows[fd].port_id >= 0);
*cube = ai.flows[fd].cube;
@@ -1670,28 +1395,20 @@ int local_flow_write(int fd,
{
int ret;
- if (fd < 0)
- return -EINVAL;
+ assert(fd >= 0);
pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
return -ENOTALLOC;
}
- ret = finalize_write(fd, idx);
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return ret;
- }
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret == 0)
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.lock);
- return 0;
-}
-
-void ipcp_sdb_release(struct shm_du_buff * sdb)
-{
- shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+ return ret;
}