diff options
Diffstat (limited to 'src/tools/operf/operf_server.c')
-rw-r--r-- | src/tools/operf/operf_server.c | 69 |
1 files changed, 54 insertions, 15 deletions
diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c index 11eb92fc..de32f320 100644 --- a/src/tools/operf/operf_server.c +++ b/src/tools/operf/operf_server.c @@ -36,6 +36,8 @@ * OF THE POSSIBILITY OF SUCH DAMAGE. */ +bool stop; + void shutdown_server(int signo, siginfo_t * info, void * c) { (void) info; @@ -46,6 +48,7 @@ void shutdown_server(int signo, siginfo_t * info, void * c) case SIGTERM: case SIGHUP: pthread_cancel(server.accept_pt); + stop = true; default: return; } @@ -58,7 +61,7 @@ void * cleaner_thread(void * o) (void) o; - while (true) { + while (!stop) { clock_gettime(CLOCK_REALTIME, &now); pthread_mutex_lock(&server.lock); for (i = 0; i < OPERF_MAX_FLOWS; ++i) @@ -73,18 +76,21 @@ void * cleaner_thread(void * o) pthread_mutex_unlock(&server.lock); sleep(1); } + + return (void *) 0; } -void * server_thread(void *o) +void * server_thread(void * o) { - int msg_len = 0; + int msg_len = 0; struct timespec timeout = {0, 100 * MILLION}; - struct timespec now = {0, 0}; - int fd; + struct timespec now = {0, 0}; + int fd; + struct msg * msg; (void) o; - while (fevent(server.flows, server.fq, &timeout)) + while (!stop && fevent(server.flows, server.fq, &timeout)) while ((fd = fqueue_next(server.fq)) >= 0) { msg_len = flow_read(fd, server.buffer, OPERF_BUF_SIZE); if (msg_len < 0) @@ -96,9 +102,21 @@ void * server_thread(void *o) server.times[fd] = now; pthread_mutex_unlock(&server.lock); - if (flow_write(fd, server.buffer, msg_len) < 0) { - printf("Error writing to flow (fd %d).\n", fd); - flow_dealloc(fd); + msg = (struct msg *) server.buffer; + + if (server.conf[fd].test_type == TEST_TYPE_UNI) + printf("Seqno %d from fd %d: %zd.%06zu\n", + msg->id, fd, + (ssize_t) now.tv_sec, + (size_t) now.tv_nsec / 1000); + + if (server.conf[fd].test_type == TEST_TYPE_BI) { + if (flow_write(fd, server.buffer, + msg_len) < 0) { + printf("Error writing to flow " + "(fd %d).\n", fd); + flow_dealloc(fd); + } } } @@ -110,12 +128,13 @@ void * accept_thread(void * o) int fd = 0; struct timespec now = {0, 0}; qosspec_t qs; + int len = 0; (void) o; printf("Ouroboros perf server started.\n"); - while (true) { + while (!stop) { fd = flow_accept(&qs, NULL); if (fd < 0) { printf("Failed to accept flow.\n"); @@ -124,6 +143,27 @@ void * accept_thread(void * o) printf("New flow %d.\n", fd); + /* Read test type. */ + len = flow_read(fd, &(server.conf[fd]), + sizeof(server.conf[fd])); + if (len == -ETIMEDOUT) { + printf("Failed to read config message.\n"); + flow_dealloc(fd); + break; + } + + /* Check if length was correct. */ + if (flow_read(fd, NULL, 0) != 0) { + printf("Invalid config message.\n"); + flow_dealloc(fd); + break; + } + + if (server.conf[fd].test_type == TEST_TYPE_BI) + printf("Doing a bidirectional test.\n"); + else + printf("Doing a unidirectional test.\n"); + clock_gettime(CLOCK_REALTIME, &now); pthread_mutex_lock(&server.lock); @@ -161,20 +201,19 @@ int server_main(void) return -1; } + stop = false; + pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); pthread_create(&server.accept_pt, NULL, accept_thread, NULL); pthread_create(&server.server_pt, NULL, server_thread, NULL); pthread_join(server.accept_pt, NULL); - pthread_cancel(server.server_pt); - pthread_cancel(server.cleaner_pt); + pthread_join(server.server_pt, NULL); + pthread_join(server.cleaner_pt, NULL); fset_destroy(server.flows); fqueue_destroy(server.fq); - pthread_join(server.server_pt, NULL); - pthread_join(server.cleaner_pt, NULL); - return 0; } |