diff options
| author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-12-12 13:24:17 +0100 | 
|---|---|---|
| committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-12-12 15:10:30 +0100 | 
| commit | f8c14e0246a6c9cb5e8ff47869b5968abb63f010 (patch) | |
| tree | d91c005451a74822516669f3f7cc3ade34971abb | |
| parent | b731adbf7b6fa16490f7abf94e2662d82d76cce0 (diff) | |
| download | ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.tar.gz ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.zip | |
src, tools: Set/get timeout and get qos for flows
Receiver timeouts can now be set on a flow using the flow_set_timeout
function. Specifying NULL disables the timeout.  The flow_get_timeout
function gets the value for the timeout.
This commit also deprecates fcntl in favor of flow_get_flags and
flow_set_flags functions.
struct qos_spec is typedef'd as a qosspec_t.
The tools and cdap.c are updated to use the new API.
Fixes a bug in operf client where the client's writer thread wouldn't
cancel on SIGINT.
| -rw-r--r-- | include/ouroboros/dev.h | 10 | ||||
| -rw-r--r-- | include/ouroboros/fcntl.h | 19 | ||||
| -rw-r--r-- | include/ouroboros/ipcp-dev.h | 7 | ||||
| -rw-r--r-- | include/ouroboros/qos.h | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 6 | ||||
| -rw-r--r-- | src/irmd/main.c | 10 | ||||
| -rw-r--r-- | src/lib/cdap.c | 2 | ||||
| -rw-r--r-- | src/lib/dev.c | 190 | ||||
| -rw-r--r-- | src/tools/cbr/cbr_server.c | 4 | ||||
| -rw-r--r-- | src/tools/echo/echo_server.c | 2 | ||||
| -rw-r--r-- | src/tools/operf/operf.c | 3 | ||||
| -rw-r--r-- | src/tools/operf/operf_client.c | 68 | ||||
| -rw-r--r-- | src/tools/operf/operf_server.c | 2 | ||||
| -rw-r--r-- | src/tools/oping/oping.c | 3 | ||||
| -rw-r--r-- | src/tools/oping/oping_client.c | 115 | ||||
| -rw-r--r-- | src/tools/oping/oping_server.c | 4 | 
16 files changed, 251 insertions, 198 deletions
| diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index f2e42d03..47665ca4 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -35,8 +35,8 @@ int     ap_init(char * ap_name);  void    ap_fini(void);  /* Returns flow descriptor (> 0), client AE name and qos spec. */ -int     flow_accept(char **           ae_name, -                    struct qos_spec * qos); +int     flow_accept(char **     ae_name, +                    qosspec_t * qos);  int     flow_alloc_resp(int fd,                          int response); @@ -45,9 +45,9 @@ int     flow_alloc_resp(int fd,   * Returns flow descriptor (> 0).   * On returning, qos will contain the actual supplied QoS.   */ -int     flow_alloc(char *            dst_name, -                   char *            src_ae_name, -                   struct qos_spec * qos); +int     flow_alloc(char *      dst_name, +                   char *      src_ae_name, +                   qosspec_t * qos);  int     flow_alloc_res(int fd); diff --git a/include/ouroboros/fcntl.h b/include/ouroboros/fcntl.h index ccb45996..3ad3ccac 100644 --- a/include/ouroboros/fcntl.h +++ b/include/ouroboros/fcntl.h @@ -23,6 +23,8 @@  #ifndef OUROBOROS_FCNTL_H  #define OUROBOROS_FCNTL_H +#include <sys/time.h> +  /* same values as fcntl.h */  #define FLOW_O_RDONLY   00000000  #define FLOW_O_WRONLY   00000001 @@ -34,11 +36,18 @@  #define FLOW_O_INVALID  (FLOW_O_WRONLY | FLOW_O_RDWR) -#define FLOW_F_GETFL    00000001 -#define FLOW_F_SETFL    00000002 +int               flow_set_flags(int fd, +                                 int flags); + +int               flow_get_flags(int fd); + +int               flow_set_timeout(int               fd, +                                   struct timespec * to); + +int               flow_get_timeout(int               fd, +                                   struct timespec * to); -int     flow_cntl(int fd, -                  int cmd, -                  int oflags); +int               flow_get_qosspec(int         fd, +                                   qosspec_t * spec);  #endif /* OUROBOROS_FCNTL_H */ diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index ee7c83ad..19a66762 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -21,10 +21,6 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#include <unistd.h> -#include <time.h> - -#include <ouroboros/qos.h>  #include <ouroboros/shm_rdrbuff.h>  #ifndef OUROBOROS_IPCP_DEV_H @@ -49,4 +45,7 @@ void ipcp_flow_fini(int fd);  void ipcp_flow_del(struct shm_du_buff * sdb); +int  ipcp_flow_get_qoscube(int             fd, +                           enum qos_cube * cube); +  #endif /* OUROBOROS_IPCP_DEV_H */ diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h index 8f573b7d..047e6288 100644 --- a/include/ouroboros/qos.h +++ b/include/ouroboros/qos.h @@ -26,9 +26,9 @@  #include <stdint.h>  /* FIXME: may need revision */ -struct qos_spec { +typedef struct qos_spec {          uint32_t delay;          uint32_t jitter; -}; +} qosspec_t;  #endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 41785ae4..8e416aa4 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -123,9 +123,9 @@ static int add_np1_fd(int           fd,  static void * fmgr_nm1_acceptor(void * o)  { -        int    fd; -        char * ae_name; -        struct qos_spec qs; +        int       fd; +        char *    ae_name; +        qosspec_t qs;          (void) o; diff --git a/src/irmd/main.c b/src/irmd/main.c index dc0c26f2..6a6aedf8 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1125,16 +1125,16 @@ static int flow_alloc_resp(pid_t n_api,          return ret;  } -static struct irm_flow * flow_alloc(pid_t  api, -                                    char * dst_name, -                                    char * src_ae_name, -                                    struct qos_spec * qos) +static struct irm_flow * flow_alloc(pid_t       api, +                                    char *      dst_name, +                                    char *      src_ae_name, +                                    qosspec_t * qos)  {          struct irm_flow * f;          pid_t ipcp;          int port_id; -        /* FIXME: Map qos_spec to qos_cube */ +        /* FIXME: Map qosspec to qos_cube */          (void) qos;          pthread_rwlock_rdlock(&irmd->state_lock); diff --git a/src/lib/cdap.c b/src/lib/cdap.c index d06a7d39..df79be54 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -203,7 +203,7 @@ struct cdap * cdap_create(struct cdap_ops * ops,              ops->cdap_request == NULL)                  return NULL; -        flags = flow_cntl(fd, FLOW_F_GETFL, 0); +        flags = flow_get_flags(fd);          if (flags & FLOW_O_NONBLOCK)                  return NULL; diff --git a/src/lib/dev.c b/src/lib/dev.c index 1c0d73a1..bad56129 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -138,7 +138,8 @@ struct flow {          pid_t                 api; -        struct timespec *     timeout; +        bool                  timesout; +        struct timespec       rcv_timeo;  };  struct { @@ -213,11 +214,7 @@ static void reset_flow(int fd)          ai.flows[fd].oflags = 0;          ai.flows[fd].api = -1; - -        if (ai.flows[fd].timeout != NULL) { -                free(ai.flows[fd].timeout); -                ai.flows[fd].timeout = NULL; -        } +        ai.flows[fd].timesout = false;  }  int ap_init(char * ap_name) @@ -265,13 +262,13 @@ int ap_init(char * ap_name)          }          for (i = 0; i < AP_MAX_FLOWS; ++i) { -                ai.flows[i].rx_rb   = NULL; -                ai.flows[i].tx_rb   = NULL; -                ai.flows[i].set     = NULL; -                ai.flows[i].port_id = -1; -                ai.flows[i].oflags  = 0; -                ai.flows[i].api     = -1; -                ai.flows[i].timeout = NULL; +                ai.flows[i].rx_rb    = NULL; +                ai.flows[i].tx_rb    = NULL; +                ai.flows[i].set      = NULL; +                ai.flows[i].port_id  = -1; +                ai.flows[i].oflags   = 0; +                ai.flows[i].api      = -1; +                ai.flows[i].timesout = false;          }          ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); @@ -341,7 +338,7 @@ void ap_fini()          pthread_rwlock_destroy(&ai.data_lock);  } -int flow_accept(char ** ae_name, struct qos_spec * qos) +int flow_accept(char ** ae_name, qosspec_t * qos)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; @@ -490,7 +487,7 @@ int flow_alloc_resp(int fd, int response)          return ret;  } -int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) +int flow_alloc(char * dst_name, char * src_ae_name, qosspec_t * qos)  {          irm_msg_t msg = IRM_MSG__INIT;          irm_msg_t * recv_msg = NULL; @@ -680,7 +677,7 @@ int flow_dealloc(int fd)          return 0;  } -int flow_cntl(int fd, int cmd, int oflags) +int flow_set_flags(int fd, int flags)  {          int old; @@ -698,25 +695,115 @@ int flow_cntl(int fd, int cmd, int oflags)          old = ai.flows[fd].oflags; -        switch (cmd) { -        case FLOW_F_GETFL: /* GET FLOW FLAGS */ +        ai.flows[fd].oflags = flags; +        if (flags & FLOW_O_WRONLY) +                shm_rbuff_block(ai.flows[fd].rx_rb); +        if (flags & FLOW_O_RDWR) +                shm_rbuff_unblock(ai.flows[fd].rx_rb); +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return old; +} + +int flow_get_flags(int fd) +{ +        int old; + +        if (fd < 0 || fd >= AP_MAX_FLOWS) +                return -EBADF; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -ENOTALLOC; +        } + +        old = ai.flows[fd].oflags; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return old; +} + +int flow_get_timeout(int fd, struct timespec * timeo) +{ +        int ret = 0; + +        if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                return old; -        case FLOW_F_SETFL: /* SET FLOW FLAGS */ -                ai.flows[fd].oflags = oflags; -                if (oflags & FLOW_O_WRONLY) -                        shm_rbuff_block(ai.flows[fd].rx_rb); -                if (oflags & FLOW_O_RDWR) -                        shm_rbuff_unblock(ai.flows[fd].rx_rb); +                return -ENOTALLOC; +        } + +        if (ai.flows[fd].timesout) +                *timeo = ai.flows[fd].rcv_timeo; +        else +                ret = -EPERM; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return ret; +} + +int flow_set_timeout(int fd, struct timespec * timeo) +{ +        if (fd < 0 || fd >= AP_MAX_FLOWS) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                return old; -        default: +                return -ENOTALLOC; +        } + +        if (timeo == NULL) { +                ai.flows[fd].timesout = false; +        } else { +                ai.flows[fd].timesout = true; +                ai.flows[fd].rcv_timeo = *timeo; +        } + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return 0; +} + +int flow_get_qosspec(int fd, qosspec_t * spec) +{ +        if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) {                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                return FLOW_O_INVALID; /* unknown command */ +                return -ENOTALLOC;          } + +        /* FIXME: map cube to spec */ + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return 0;  }  ssize_t flow_write(int fd, void * buf, size_t count) @@ -764,7 +851,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)                  }          } else { /* blocking */                  struct shm_rdrbuff * rdrb = ai.rdrb; -                struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; +                struct shm_rbuff * tx_rb  = ai.flows[fd].tx_rb;                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); @@ -816,19 +903,28 @@ ssize_t flow_read(int fd, void * buf, size_t count)                  idx = shm_rbuff_read(ai.flows[fd].rx_rb);                  pthread_rwlock_unlock(&ai.flows_lock);          } else { -                struct shm_rbuff * rb     = ai.flows[fd].rx_rb; -                struct timespec * timeout = ai.flows[fd].timeout; +                struct shm_rbuff * rb   = ai.flows[fd].rx_rb; +                bool timeo = ai.flows[fd].timesout; +                struct timespec timeout = ai.flows[fd].rcv_timeo; +                  pthread_rwlock_unlock(&ai.flows_lock);                  pthread_rwlock_unlock(&ai.data_lock); -                idx = shm_rbuff_read_b(rb, timeout); + +                if (timeo) +                        idx = shm_rbuff_read_b(rb, &timeout); +                else +                        idx = shm_rbuff_read_b(rb, NULL); +                  pthread_rwlock_rdlock(&ai.data_lock);          } -        if (idx < 0) { +        if (idx == -ETIMEDOUT) {                  pthread_rwlock_unlock(&ai.data_lock); -                return -EAGAIN; +                return -ETIMEDOUT;          } +        assert(idx >= 0); +          n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);          if (n < 0) {                  pthread_rwlock_unlock(&ai.data_lock); @@ -844,7 +940,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)          return n;  } -/* select functions */ +/* fqueue functions */  struct flow_set * flow_set_create()  { @@ -1328,7 +1424,7 @@ int ipcp_flow_fini(int fd)  {          struct shm_rbuff * rb; -        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); +        flow_set_flags(fd, FLOW_O_WRONLY);          pthread_rwlock_rdlock(&ai.data_lock);          pthread_rwlock_rdlock(&ai.flows_lock); @@ -1343,6 +1439,28 @@ int ipcp_flow_fini(int fd)          return 0;  } +int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube) +{ +        if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL) +                return -EINVAL; + +        pthread_rwlock_rdlock(&ai.data_lock); +        pthread_rwlock_wrlock(&ai.flows_lock); + +        if (ai.flows[fd].port_id < 0) { +                pthread_rwlock_unlock(&ai.flows_lock); +                pthread_rwlock_unlock(&ai.data_lock); +                return -ENOTALLOC; +        } + +        *cube = ai.flows[fd].qos; + +        pthread_rwlock_unlock(&ai.flows_lock); +        pthread_rwlock_unlock(&ai.data_lock); + +        return 0; +} +  ssize_t local_flow_read(int fd)  {          ssize_t ret; diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 104c5e9e..64055cfb 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -85,7 +85,7 @@ static void handle_flow(int fd)          alive = iv_start;          ts_add(&iv_start, &intv, &iv_end); -        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK); +        flow_set_flags(fd, FLOW_O_NONBLOCK);          while (!stop) {                  clock_gettime(CLOCK_REALTIME, &now); @@ -157,7 +157,7 @@ static void * listener(void * o)  {          int client_fd = 0;          int response = 0; -        struct qos_spec qs; +        qosspec_t qs;          (void) o; diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index 09575364..c369d3e6 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -40,7 +40,7 @@ int server_main(void)          int    client_fd = 0;          char   buf[BUF_SIZE];          ssize_t count = 0; -        struct qos_spec qs; +        qosspec_t qs;          printf("Starting the server.\n"); diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c index bc7ade3a..1716a598 100644 --- a/src/tools/operf/operf.c +++ b/src/tools/operf/operf.c @@ -47,9 +47,6 @@ struct c {          unsigned long sent;          unsigned long rcvd; -        flow_set_t * flows; -        fqueue_t *   fq; -          pthread_t reader_pt;          pthread_t writer_pt;  } client; diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c index 902a7b41..44f25893 100644 --- a/src/tools/operf/operf_client.c +++ b/src/tools/operf/operf_client.c @@ -22,6 +22,7 @@   */  #include <ouroboros/dev.h> +#include <ouroboros/fcntl.h>  #include <ouroboros/time_utils.h>  #ifdef __FreeBSD__ @@ -45,6 +46,7 @@ static void busy_wait_until(const struct timespec * deadline)          while (now.tv_sec == deadline->tv_sec                 && now.tv_nsec < deadline->tv_nsec)                  clock_gettime(CLOCK_REALTIME, &now); +        pthread_testcancel();  }  void shutdown_client(int signo, siginfo_t * info, void * c) @@ -68,23 +70,20 @@ void * reader(void * o)          struct timespec timeout = {2, 0};          char buf[OPERF_BUF_SIZE]; -        int fd = 0; +        int fd = *((int *) o);          int msg_len = 0; -        (void) o; +        flow_set_timeout(fd, &timeout); -        /* FIXME: use flow timeout option once we have it */ -        while (flow_event_wait(client.flows, client.fq, &timeout) != -ETIMEDOUT) -                while ((fd = fqueue_next(client.fq)) >= 0) { -                        msg_len = flow_read(fd, buf, OPERF_BUF_SIZE); -                        if (msg_len != client.size) { -                                printf("Invalid message on fd %d.\n", fd); -                                continue; -                        } - -                        ++client.rcvd; +        while ((msg_len = flow_read(fd, buf, OPERF_BUF_SIZE)) != -ETIMEDOUT) { +                if (msg_len != client.size) { +                        printf("Invalid message on fd %d.\n", fd); +                        continue;                  } +                ++client.rcvd; +        } +          return (void *) 0;  } @@ -160,33 +159,6 @@ void * writer(void * o)          return (void *) 0;  } -static int client_init(void) -{ -        client.flows = flow_set_create(); -        if (client.flows == NULL) -                return -ENOMEM; - -        client.fq = fqueue_create(); -        if (client.fq == NULL) { -                flow_set_destroy(client.flows); -                return -ENOMEM; -        } - -        client.sent = 0; -        client.rcvd = 0; - -        return 0; -} - -void client_fini(void) -{ -        if (client.flows != NULL) -                flow_set_destroy(client.flows); - -        if (client.fq != NULL) -                fqueue_destroy(client.fq); -} -  int client_main(void)  {          struct sigaction sig_act; @@ -208,32 +180,24 @@ int client_main(void)                  return -1;          } -        if (client_init()) { -                printf("Failed to initialize client.\n"); -                return -1; -        } +        client.sent = 0; +        client.rcvd = 0;          fd = flow_alloc(client.s_apn, NULL, NULL);          if (fd < 0) { -                flow_set_destroy(client.flows); -                fqueue_destroy(client.fq);                  printf("Failed to allocate flow.\n");                  return -1;          } -        flow_set_add(client.flows, fd); -          if (flow_alloc_res(fd)) {                  printf("Flow allocation refused.\n"); -                flow_set_del(client.flows, fd);                  flow_dealloc(fd); -                client_fini();                  return -1;          }          clock_gettime(CLOCK_REALTIME, &tic); -        pthread_create(&client.reader_pt, NULL, reader, NULL); +        pthread_create(&client.reader_pt, NULL, reader, &fd);          pthread_create(&client.writer_pt, NULL, writer, &fd);          pthread_join(client.writer_pt, NULL); @@ -253,11 +217,7 @@ int client_main(void)                 (client.rcvd * client.size * 8)                 / (double) ts_diff_us(&tic, &toc)); -        flow_set_del(client.flows, fd); -          flow_dealloc(fd); -        client_fini(); -          return 0;  } diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c index 4eb93879..340103d2 100644 --- a/src/tools/operf/operf_server.c +++ b/src/tools/operf/operf_server.c @@ -102,7 +102,7 @@ void * accept_thread(void * o)  {          int fd = 0;          struct timespec now = {0, 0}; -        struct qos_spec qs; +        qosspec_t qs;          (void) o; diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 98d12a7b..224b182b 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -54,9 +54,6 @@ struct c {          double rtt_avg;          double rtt_m2; -        flow_set_t * flows; -        fqueue_t *   fq; -          /* needs locking */          struct timespec * times;          pthread_mutex_t lock; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index b30ba5f4..c439cf46 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -60,56 +60,54 @@ void * reader(void * o)          char buf[OPING_BUF_SIZE];          struct oping_msg * msg = (struct oping_msg *) buf; -        int fd = 0; +        int fd = *((int *) o);          int msg_len = 0;          double ms = 0;          double d = 0; -        (void) o; - -        /* FIXME: use flow timeout option once we have it */ -        while (client.rcvd != client.count -               && (flow_event_wait(client.flows, client.fq, &timeout) -                   != -ETIMEDOUT)) { -                while ((fd = fqueue_next(client.fq)) >= 0) { -                        msg_len = flow_read(fd, buf, OPING_BUF_SIZE); -                        if (msg_len < 0) -                                continue; - -                        if (ntohl(msg->type) != ECHO_REPLY) { -                                printf("Invalid message on fd %d.\n", fd); -                                continue; -                        } - -                        if (ntohl(msg->id) >= client.count) { -                                printf("Invalid id.\n"); -                                continue; -                        } - -                        ++client.rcvd; - -                        clock_gettime(CLOCK_REALTIME, &now); - -                        pthread_mutex_lock(&client.lock); -                        ms = ts_diff_us(&client.times[ntohl(msg->id)], &now) -                                / 1000.0; -                        pthread_mutex_unlock(&client.lock); - -                        printf("%d bytes from %s: seq=%d time=%.3f ms\n", -                               msg_len, -                               client.s_apn, -                               ntohl(msg->id), -                               ms); - -                        if (ms < client.rtt_min) -                                client.rtt_min = ms; -                        if (ms > client.rtt_max) -                                client.rtt_max = ms; - -                        d = (ms - client.rtt_avg); -                        client.rtt_avg += d  / (float) client.rcvd; -                        client.rtt_m2 += d * (ms - client.rtt_avg); +        flow_set_timeout(fd, &timeout); + +        while (client.rcvd != client.count) { +                msg_len = flow_read(fd, buf, OPING_BUF_SIZE); +                if (msg_len == -ETIMEDOUT) +                        break; + +                if (msg_len < 0) +                        continue; + +                if (ntohl(msg->type) != ECHO_REPLY) { +                        printf("Invalid message on fd %d.\n", fd); +                        continue;                  } + +                if (ntohl(msg->id) >= client.count) { +                        printf("Invalid id.\n"); +                        continue; +                } + +                ++client.rcvd; + +                clock_gettime(CLOCK_REALTIME, &now); + +                pthread_mutex_lock(&client.lock); +                ms = ts_diff_us(&client.times[ntohl(msg->id)], &now) +                        / 1000.0; +                pthread_mutex_unlock(&client.lock); + +                printf("%d bytes from %s: seq=%d time=%.3f ms\n", +                       msg_len, +                       client.s_apn, +                       ntohl(msg->id), +                       ms); + +                if (ms < client.rtt_min) +                        client.rtt_min = ms; +                if (ms > client.rtt_max) +                        client.rtt_max = ms; + +                d = (ms - client.rtt_avg); +                client.rtt_avg += d  / (float) client.rcvd; +                client.rtt_m2 += d * (ms - client.rtt_avg);          }          return (void *) 0; @@ -164,20 +162,8 @@ void * writer(void * o)  static int client_init(void)  { -        client.flows = flow_set_create(); -        if (client.flows == NULL) -                return -ENOMEM; - -        client.fq = fqueue_create(); -        if (client.fq == NULL) { -                flow_set_destroy(client.flows); -                return -ENOMEM; -        } -          client.times = malloc(sizeof(struct timespec) * client.count);          if (client.times == NULL) { -                flow_set_destroy(client.flows); -                fqueue_destroy(client.fq);                  pthread_mutex_unlock(&client.lock);                  return -ENOMEM;          } @@ -197,12 +183,6 @@ static int client_init(void)  void client_fini(void)  { -        if (client.flows != NULL) -                flow_set_destroy(client.flows); - -        if (client.fq != NULL) -                fqueue_destroy(client.fq); -          if (client.times != NULL)                  free(client.times);  } @@ -235,17 +215,12 @@ int client_main(void)          fd = flow_alloc(client.s_apn, NULL, NULL);          if (fd < 0) { -                flow_set_destroy(client.flows); -                fqueue_destroy(client.fq);                  printf("Failed to allocate flow.\n");                  return -1;          } -        flow_set_add(client.flows, fd); -          if (flow_alloc_res(fd)) {                  printf("Flow allocation refused.\n"); -                flow_set_del(client.flows, fd);                  flow_dealloc(fd);                  client_fini();                  return -1; @@ -255,7 +230,7 @@ int client_main(void)          clock_gettime(CLOCK_REALTIME, &tic); -        pthread_create(&client.reader_pt, NULL, reader, NULL); +        pthread_create(&client.reader_pt, NULL, reader, &fd);          pthread_create(&client.writer_pt, NULL, writer, &fd);          pthread_join(client.writer_pt, NULL); @@ -283,8 +258,6 @@ int client_main(void)                          printf("NaN ms\n");          } -        flow_set_del(client.flows, fd); -          flow_dealloc(fd);          client_fini(); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 8d7ab1db..63fca567 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -115,7 +115,7 @@ void * accept_thread(void * o)  {          int fd = 0;          struct timespec now = {0, 0}; -        struct qos_spec qs; +        qosspec_t qs;          (void) o; @@ -143,7 +143,7 @@ void * accept_thread(void * o)                  server.times[fd] = now;                  pthread_mutex_unlock(&server.lock); -                flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR); +                flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR);          }          return (void *) 0; | 
