diff options
| -rw-r--r-- | doc/man/flow_read.3 | 6 | ||||
| -rw-r--r-- | src/lib/dev.c | 59 | 
2 files changed, 44 insertions, 21 deletions
diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3 index 96f76766..0110d0a2 100644 --- a/doc/man/flow_read.3 +++ b/doc/man/flow_read.3 @@ -30,7 +30,11 @@ from the supplied buffer \fIbuf\fR to the flow specified by \fIfd\fR.  .SH RETURN VALUE  On success, \fBflow_read\fR() returns the number of bytes read. On -failure, a negative value indicating the error will be returned. +failure, a negative value indicating the error will be returned. If +the number of bytes read equals count, a subsequent call to +\fBflow_read\fR() should be performed to check if there were more +bytes to read. This call to \fBflow_read\fR will return 0 if there +was no more data and mark the end of the datagram.  On success, \fBflow_write\fR() returns 0. On failure, a negative value  indicating the error will be returned. Passing a NULL pointer for diff --git a/src/lib/dev.c b/src/lib/dev.c index 3564c293..115cd565 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -52,12 +52,14 @@  #include <stdbool.h>  #include <sys/types.h> -#define BUF_SIZE       1500 -  #ifndef CLOCK_REALTIME_COARSE  #define CLOCK_REALTIME_COARSE CLOCK_REALTIME  #endif +/* Partial read information. */ +#define NO_PART      -1 +#define DONE_PART    -2 +  struct flow_set {          size_t   idx;  }; @@ -92,6 +94,7 @@ struct flow {          int                   oflags;          qoscube_t             cube;          qosspec_t             spec; +        ssize_t               part_idx;          pid_t                 pid; @@ -285,11 +288,12 @@ static int flow_init(int       port_id,          if (ai.flows[fd].set == NULL)                  goto fail; -        ai.flows[fd].port_id = port_id; -        ai.flows[fd].oflags  = FLOWFDEFAULT; -        ai.flows[fd].pid     = pid; -        ai.flows[fd].cube    = qc; -        ai.flows[fd].spec    = qos_cube_to_spec(qc); +        ai.flows[fd].port_id  = port_id; +        ai.flows[fd].oflags   = FLOWFDEFAULT; +        ai.flows[fd].pid      = pid; +        ai.flows[fd].cube     = qc; +        ai.flows[fd].spec     = qos_cube_to_spec(qc); +        ai.flows[fd].part_idx = NO_PART;          ai.ports[port_id].fd = fd; @@ -899,6 +903,11 @@ ssize_t flow_read(int    fd,          flow = &ai.flows[fd]; +        if (flow->part_idx == DONE_PART) { +                flow->part_idx = NO_PART; +                return 0; +        } +          clock_gettime(PTHREAD_COND_CLOCK, &abs);          pthread_rwlock_rdlock(&ai.lock); @@ -918,26 +927,36 @@ ssize_t flow_read(int    fd,          pthread_rwlock_unlock(&ai.lock); -        idx = frcti_queued_pdu(flow->frcti); +        idx = flow->part_idx;          if (idx < 0) { -                do { -                        idx = noblock ? shm_rbuff_read(rb) : -                                shm_rbuff_read_b(rb, abstime); -                        if (idx < 0) -                                return idx; -                        sdb = shm_rdrbuff_get(ai.rdrb, idx); -                } while (frcti_rcv(flow->frcti, sdb) != 0); +                idx = frcti_queued_pdu(flow->frcti); +                if (idx < 0) { +                        do { +                                idx = noblock ? shm_rbuff_read(rb) : +                                        shm_rbuff_read_b(rb, abstime); +                                if (idx < 0) +                                        return idx; +                                sdb = shm_rdrbuff_get(ai.rdrb, idx); +                        } while (frcti_rcv(flow->frcti, sdb) != 0); +                }          }          n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);          assert(n >= 0); -        memcpy(buf, sdu, MIN((size_t) n, count)); - -        shm_rdrbuff_remove(ai.rdrb, idx); - -        return n; +        if (n <= (ssize_t) count) { +                memcpy(buf, sdu, n); +                shm_rdrbuff_remove(ai.rdrb, idx); +                flow->part_idx = (n == (ssize_t) count) ? DONE_PART : NO_PART; +                return n; +        } else { +                memcpy(buf, sdu, count); +                sdb = shm_rdrbuff_get(ai.rdrb, idx); +                shm_du_buff_head_release(sdb, n); +                flow->part_idx = idx; +                return count; +        }  }  /* fqueue functions. */  | 
