summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri.staessens@ugent.be>2018-03-12 17:27:10 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2018-03-12 18:39:15 +0100
commite03dedf0a4c40ceeb063f95777bc99628a980ec9 (patch)
treea886fea671133bd678524b4430a0ada94f63785e /src/lib
parent9bf0d277416c342a8a9e0b2017b2b10f1d093245 (diff)
downloadouroboros-e03dedf0a4c40ceeb063f95777bc99628a980ec9.tar.gz
ouroboros-e03dedf0a4c40ceeb063f95777bc99628a980ec9.zip
lib: Allow partial read
This implements partial read of packets if the buffer supplied to flow_read() is smaller than the packet in the buffer. If the number of bytes returned by flow_read equals the size of the buffer, the next read() will deliver the next bytes of the packet (or 0 if the packet was exactly the size of the buffer on the previous read). Implements #7. Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be> Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/dev.c59
1 files changed, 39 insertions, 20 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 3564c293..115cd565 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -52,12 +52,14 @@
#include <stdbool.h>
#include <sys/types.h>
-#define BUF_SIZE 1500
-
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
+/* Partial read information. */
+#define NO_PART -1
+#define DONE_PART -2
+
struct flow_set {
size_t idx;
};
@@ -92,6 +94,7 @@ struct flow {
int oflags;
qoscube_t cube;
qosspec_t spec;
+ ssize_t part_idx;
pid_t pid;
@@ -285,11 +288,12 @@ static int flow_init(int port_id,
if (ai.flows[fd].set == NULL)
goto fail;
- ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOWFDEFAULT;
- ai.flows[fd].pid = pid;
- ai.flows[fd].cube = qc;
- ai.flows[fd].spec = qos_cube_to_spec(qc);
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].oflags = FLOWFDEFAULT;
+ ai.flows[fd].pid = pid;
+ ai.flows[fd].cube = qc;
+ ai.flows[fd].spec = qos_cube_to_spec(qc);
+ ai.flows[fd].part_idx = NO_PART;
ai.ports[port_id].fd = fd;
@@ -899,6 +903,11 @@ ssize_t flow_read(int fd,
flow = &ai.flows[fd];
+ if (flow->part_idx == DONE_PART) {
+ flow->part_idx = NO_PART;
+ return 0;
+ }
+
clock_gettime(PTHREAD_COND_CLOCK, &abs);
pthread_rwlock_rdlock(&ai.lock);
@@ -918,26 +927,36 @@ ssize_t flow_read(int fd,
pthread_rwlock_unlock(&ai.lock);
- idx = frcti_queued_pdu(flow->frcti);
+ idx = flow->part_idx;
if (idx < 0) {
- do {
- idx = noblock ? shm_rbuff_read(rb) :
- shm_rbuff_read_b(rb, abstime);
- if (idx < 0)
- return idx;
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
- } while (frcti_rcv(flow->frcti, sdb) != 0);
+ idx = frcti_queued_pdu(flow->frcti);
+ if (idx < 0) {
+ do {
+ idx = noblock ? shm_rbuff_read(rb) :
+ shm_rbuff_read_b(rb, abstime);
+ if (idx < 0)
+ return idx;
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, sdb) != 0);
+ }
}
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
assert(n >= 0);
- memcpy(buf, sdu, MIN((size_t) n, count));
-
- shm_rdrbuff_remove(ai.rdrb, idx);
-
- return n;
+ if (n <= (ssize_t) count) {
+ memcpy(buf, sdu, n);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ flow->part_idx = (n == (ssize_t) count) ? DONE_PART : NO_PART;
+ return n;
+ } else {
+ memcpy(buf, sdu, count);
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ shm_du_buff_head_release(sdb, n);
+ flow->part_idx = idx;
+ return count;
+ }
}
/* fqueue functions. */