summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/flow_read.36
-rw-r--r--src/lib/dev.c59
2 files changed, 44 insertions, 21 deletions
diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3
index 96f76766..0110d0a2 100644
--- a/doc/man/flow_read.3
+++ b/doc/man/flow_read.3
@@ -30,7 +30,11 @@ from the supplied buffer \fIbuf\fR to the flow specified by \fIfd\fR.
.SH RETURN VALUE
On success, \fBflow_read\fR() returns the number of bytes read. On
-failure, a negative value indicating the error will be returned.
+failure, a negative value indicating the error will be returned. If
+the number of bytes read equals count, a subsequent call to
+\fBflow_read\fR() should be performed to check if there were more
+bytes to read. This call to \fBflow_read\fR will return 0 if there
+was no more data and mark the end of the datagram.
On success, \fBflow_write\fR() returns 0. On failure, a negative value
indicating the error will be returned. Passing a NULL pointer for
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 3564c293..115cd565 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -52,12 +52,14 @@
#include <stdbool.h>
#include <sys/types.h>
-#define BUF_SIZE 1500
-
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
+/* Partial read information. */
+#define NO_PART -1
+#define DONE_PART -2
+
struct flow_set {
size_t idx;
};
@@ -92,6 +94,7 @@ struct flow {
int oflags;
qoscube_t cube;
qosspec_t spec;
+ ssize_t part_idx;
pid_t pid;
@@ -285,11 +288,12 @@ static int flow_init(int port_id,
if (ai.flows[fd].set == NULL)
goto fail;
- ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOWFDEFAULT;
- ai.flows[fd].pid = pid;
- ai.flows[fd].cube = qc;
- ai.flows[fd].spec = qos_cube_to_spec(qc);
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].oflags = FLOWFDEFAULT;
+ ai.flows[fd].pid = pid;
+ ai.flows[fd].cube = qc;
+ ai.flows[fd].spec = qos_cube_to_spec(qc);
+ ai.flows[fd].part_idx = NO_PART;
ai.ports[port_id].fd = fd;
@@ -899,6 +903,11 @@ ssize_t flow_read(int fd,
flow = &ai.flows[fd];
+ if (flow->part_idx == DONE_PART) {
+ flow->part_idx = NO_PART;
+ return 0;
+ }
+
clock_gettime(PTHREAD_COND_CLOCK, &abs);
pthread_rwlock_rdlock(&ai.lock);
@@ -918,26 +927,36 @@ ssize_t flow_read(int fd,
pthread_rwlock_unlock(&ai.lock);
- idx = frcti_queued_pdu(flow->frcti);
+ idx = flow->part_idx;
if (idx < 0) {
- do {
- idx = noblock ? shm_rbuff_read(rb) :
- shm_rbuff_read_b(rb, abstime);
- if (idx < 0)
- return idx;
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
- } while (frcti_rcv(flow->frcti, sdb) != 0);
+ idx = frcti_queued_pdu(flow->frcti);
+ if (idx < 0) {
+ do {
+ idx = noblock ? shm_rbuff_read(rb) :
+ shm_rbuff_read_b(rb, abstime);
+ if (idx < 0)
+ return idx;
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, sdb) != 0);
+ }
}
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
assert(n >= 0);
- memcpy(buf, sdu, MIN((size_t) n, count));
-
- shm_rdrbuff_remove(ai.rdrb, idx);
-
- return n;
+ if (n <= (ssize_t) count) {
+ memcpy(buf, sdu, n);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ flow->part_idx = (n == (ssize_t) count) ? DONE_PART : NO_PART;
+ return n;
+ } else {
+ memcpy(buf, sdu, count);
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ shm_du_buff_head_release(sdb, n);
+ flow->part_idx = idx;
+ return count;
+ }
}
/* fqueue functions. */