summaryrefslogtreecommitdiff
path: root/src/tools/operf/operf_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/operf/operf_client.c')
-rw-r--r--src/tools/operf/operf_client.c68
1 files changed, 14 insertions, 54 deletions
diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c
index 902a7b41..44f25893 100644
--- a/src/tools/operf/operf_client.c
+++ b/src/tools/operf/operf_client.c
@@ -22,6 +22,7 @@
*/
#include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
#include <ouroboros/time_utils.h>
#ifdef __FreeBSD__
@@ -45,6 +46,7 @@ static void busy_wait_until(const struct timespec * deadline)
while (now.tv_sec == deadline->tv_sec
&& now.tv_nsec < deadline->tv_nsec)
clock_gettime(CLOCK_REALTIME, &now);
+ pthread_testcancel();
}
void shutdown_client(int signo, siginfo_t * info, void * c)
@@ -68,23 +70,20 @@ void * reader(void * o)
struct timespec timeout = {2, 0};
char buf[OPERF_BUF_SIZE];
- int fd = 0;
+ int fd = *((int *) o);
int msg_len = 0;
- (void) o;
+ flow_set_timeout(fd, &timeout);
- /* FIXME: use flow timeout option once we have it */
- while (flow_event_wait(client.flows, client.fq, &timeout) != -ETIMEDOUT)
- while ((fd = fqueue_next(client.fq)) >= 0) {
- msg_len = flow_read(fd, buf, OPERF_BUF_SIZE);
- if (msg_len != client.size) {
- printf("Invalid message on fd %d.\n", fd);
- continue;
- }
-
- ++client.rcvd;
+ while ((msg_len = flow_read(fd, buf, OPERF_BUF_SIZE)) != -ETIMEDOUT) {
+ if (msg_len != client.size) {
+ printf("Invalid message on fd %d.\n", fd);
+ continue;
}
+ ++client.rcvd;
+ }
+
return (void *) 0;
}
@@ -160,33 +159,6 @@ void * writer(void * o)
return (void *) 0;
}
-static int client_init(void)
-{
- client.flows = flow_set_create();
- if (client.flows == NULL)
- return -ENOMEM;
-
- client.fq = fqueue_create();
- if (client.fq == NULL) {
- flow_set_destroy(client.flows);
- return -ENOMEM;
- }
-
- client.sent = 0;
- client.rcvd = 0;
-
- return 0;
-}
-
-void client_fini(void)
-{
- if (client.flows != NULL)
- flow_set_destroy(client.flows);
-
- if (client.fq != NULL)
- fqueue_destroy(client.fq);
-}
-
int client_main(void)
{
struct sigaction sig_act;
@@ -208,32 +180,24 @@ int client_main(void)
return -1;
}
- if (client_init()) {
- printf("Failed to initialize client.\n");
- return -1;
- }
+ client.sent = 0;
+ client.rcvd = 0;
fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
printf("Failed to allocate flow.\n");
return -1;
}
- flow_set_add(client.flows, fd);
-
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
- flow_set_del(client.flows, fd);
flow_dealloc(fd);
- client_fini();
return -1;
}
clock_gettime(CLOCK_REALTIME, &tic);
- pthread_create(&client.reader_pt, NULL, reader, NULL);
+ pthread_create(&client.reader_pt, NULL, reader, &fd);
pthread_create(&client.writer_pt, NULL, writer, &fd);
pthread_join(client.writer_pt, NULL);
@@ -253,11 +217,7 @@ int client_main(void)
(client.rcvd * client.size * 8)
/ (double) ts_diff_us(&tic, &toc));
- flow_set_del(client.flows, fd);
-
flow_dealloc(fd);
- client_fini();
-
return 0;
}