diff options
| author | Dimitri Staessens <dimitri.staessens@ugent.be> | 2018-03-12 17:27:10 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2018-03-12 18:39:15 +0100 | 
| commit | e03dedf0a4c40ceeb063f95777bc99628a980ec9 (patch) | |
| tree | a886fea671133bd678524b4430a0ada94f63785e /src/lib | |
| parent | 9bf0d277416c342a8a9e0b2017b2b10f1d093245 (diff) | |
| download | ouroboros-e03dedf0a4c40ceeb063f95777bc99628a980ec9.tar.gz ouroboros-e03dedf0a4c40ceeb063f95777bc99628a980ec9.zip | |
lib: Allow partial read
This implements partial read of packets if the buffer supplied to
flow_read() is smaller than the packet in the buffer. If the number of
bytes returned by flow_read equals the size of the buffer, the next
read() will deliver the next bytes of the packet (or 0 if the packet
was exactly the size of the buffer on the previous read).
Implements #7.
Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 59 | 
1 files changed, 39 insertions, 20 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index 3564c293..115cd565 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -52,12 +52,14 @@  #include <stdbool.h>  #include <sys/types.h> -#define BUF_SIZE       1500 -  #ifndef CLOCK_REALTIME_COARSE  #define CLOCK_REALTIME_COARSE CLOCK_REALTIME  #endif +/* Partial read information. */ +#define NO_PART      -1 +#define DONE_PART    -2 +  struct flow_set {          size_t   idx;  }; @@ -92,6 +94,7 @@ struct flow {          int                   oflags;          qoscube_t             cube;          qosspec_t             spec; +        ssize_t               part_idx;          pid_t                 pid; @@ -285,11 +288,12 @@ static int flow_init(int       port_id,          if (ai.flows[fd].set == NULL)                  goto fail; -        ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags  = FLOWFDEFAULT; -        ai.flows[fd].pid     = pid; -        ai.flows[fd].cube    = qc; -        ai.flows[fd].spec    = qos_cube_to_spec(qc); +        ai.flows[fd].port_id  = port_id; +        ai.flows[fd].oflags   = FLOWFDEFAULT; +        ai.flows[fd].pid      = pid; +        ai.flows[fd].cube     = qc; +        ai.flows[fd].spec     = qos_cube_to_spec(qc); +        ai.flows[fd].part_idx = NO_PART;          ai.ports[port_id].fd = fd; @@ -899,6 +903,11 @@ ssize_t flow_read(int    fd,          flow = &ai.flows[fd]; +        if (flow->part_idx == DONE_PART) { +                flow->part_idx = NO_PART; +                return 0; +        } +          clock_gettime(PTHREAD_COND_CLOCK, &abs);          pthread_rwlock_rdlock(&ai.lock); @@ -918,26 +927,36 @@ ssize_t flow_read(int    fd,          pthread_rwlock_unlock(&ai.lock); -        idx = frcti_queued_pdu(flow->frcti); +        idx = flow->part_idx;          if (idx < 0) { -                do { -                        idx = noblock ? shm_rbuff_read(rb) : -                                shm_rbuff_read_b(rb, abstime); -                        if (idx < 0) -                                return idx; -                        sdb = shm_rdrbuff_get(ai.rdrb, idx); -                } while (frcti_rcv(flow->frcti, sdb) != 0); +                idx = frcti_queued_pdu(flow->frcti); +                if (idx < 0) { +                        do { +                                idx = noblock ? shm_rbuff_read(rb) : +                                        shm_rbuff_read_b(rb, abstime); +                                if (idx < 0) +                                        return idx; +                                sdb = shm_rdrbuff_get(ai.rdrb, idx); +                        } while (frcti_rcv(flow->frcti, sdb) != 0); +                }          }          n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);          assert(n >= 0); -        memcpy(buf, sdu, MIN((size_t) n, count)); - -        shm_rdrbuff_remove(ai.rdrb, idx); - -        return n; +        if (n <= (ssize_t) count) { +                memcpy(buf, sdu, n); +                shm_rdrbuff_remove(ai.rdrb, idx); +                flow->part_idx = (n == (ssize_t) count) ? DONE_PART : NO_PART; +                return n; +        } else { +                memcpy(buf, sdu, count); +                sdb = shm_rdrbuff_get(ai.rdrb, idx); +                shm_du_buff_head_release(sdb, n); +                flow->part_idx = idx; +                return count; +        }  }  /* fqueue functions. */ | 
