summaryrefslogtreecommitdiff
path: root/src/tools/operf/operf_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/operf/operf_server.c')
-rw-r--r--src/tools/operf/operf_server.c69
1 files changed, 54 insertions, 15 deletions
diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c
index 11eb92fc..de32f320 100644
--- a/src/tools/operf/operf_server.c
+++ b/src/tools/operf/operf_server.c
@@ -36,6 +36,8 @@
* OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+bool stop;
+
void shutdown_server(int signo, siginfo_t * info, void * c)
{
(void) info;
@@ -46,6 +48,7 @@ void shutdown_server(int signo, siginfo_t * info, void * c)
case SIGTERM:
case SIGHUP:
pthread_cancel(server.accept_pt);
+ stop = true;
default:
return;
}
@@ -58,7 +61,7 @@ void * cleaner_thread(void * o)
(void) o;
- while (true) {
+ while (!stop) {
clock_gettime(CLOCK_REALTIME, &now);
pthread_mutex_lock(&server.lock);
for (i = 0; i < OPERF_MAX_FLOWS; ++i)
@@ -73,18 +76,21 @@ void * cleaner_thread(void * o)
pthread_mutex_unlock(&server.lock);
sleep(1);
}
+
+ return (void *) 0;
}
-void * server_thread(void *o)
+void * server_thread(void * o)
{
- int msg_len = 0;
+ int msg_len = 0;
struct timespec timeout = {0, 100 * MILLION};
- struct timespec now = {0, 0};
- int fd;
+ struct timespec now = {0, 0};
+ int fd;
+ struct msg * msg;
(void) o;
- while (fevent(server.flows, server.fq, &timeout))
+ while (!stop && fevent(server.flows, server.fq, &timeout))
while ((fd = fqueue_next(server.fq)) >= 0) {
msg_len = flow_read(fd, server.buffer, OPERF_BUF_SIZE);
if (msg_len < 0)
@@ -96,9 +102,21 @@ void * server_thread(void *o)
server.times[fd] = now;
pthread_mutex_unlock(&server.lock);
- if (flow_write(fd, server.buffer, msg_len) < 0) {
- printf("Error writing to flow (fd %d).\n", fd);
- flow_dealloc(fd);
+ msg = (struct msg *) server.buffer;
+
+ if (server.conf[fd].test_type == TEST_TYPE_UNI)
+ printf("Seqno %d from fd %d: %zd.%06zu\n",
+ msg->id, fd,
+ (ssize_t) now.tv_sec,
+ (size_t) now.tv_nsec / 1000);
+
+ if (server.conf[fd].test_type == TEST_TYPE_BI) {
+ if (flow_write(fd, server.buffer,
+ msg_len) < 0) {
+ printf("Error writing to flow "
+ "(fd %d).\n", fd);
+ flow_dealloc(fd);
+ }
}
}
@@ -110,12 +128,13 @@ void * accept_thread(void * o)
int fd = 0;
struct timespec now = {0, 0};
qosspec_t qs;
+ int len = 0;
(void) o;
printf("Ouroboros perf server started.\n");
- while (true) {
+ while (!stop) {
fd = flow_accept(&qs, NULL);
if (fd < 0) {
printf("Failed to accept flow.\n");
@@ -124,6 +143,27 @@ void * accept_thread(void * o)
printf("New flow %d.\n", fd);
+ /* Read test type. */
+ len = flow_read(fd, &(server.conf[fd]),
+ sizeof(server.conf[fd]));
+ if (len == -ETIMEDOUT) {
+ printf("Failed to read config message.\n");
+ flow_dealloc(fd);
+ break;
+ }
+
+ /* Check if length was correct. */
+ if (flow_read(fd, NULL, 0) != 0) {
+ printf("Invalid config message.\n");
+ flow_dealloc(fd);
+ break;
+ }
+
+ if (server.conf[fd].test_type == TEST_TYPE_BI)
+ printf("Doing a bidirectional test.\n");
+ else
+ printf("Doing a unidirectional test.\n");
+
clock_gettime(CLOCK_REALTIME, &now);
pthread_mutex_lock(&server.lock);
@@ -161,20 +201,19 @@ int server_main(void)
return -1;
}
+ stop = false;
+
pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);
pthread_create(&server.accept_pt, NULL, accept_thread, NULL);
pthread_create(&server.server_pt, NULL, server_thread, NULL);
pthread_join(server.accept_pt, NULL);
- pthread_cancel(server.server_pt);
- pthread_cancel(server.cleaner_pt);
+ pthread_join(server.server_pt, NULL);
+ pthread_join(server.cleaner_pt, NULL);
fset_destroy(server.flows);
fqueue_destroy(server.fq);
- pthread_join(server.server_pt, NULL);
- pthread_join(server.cleaner_pt, NULL);
-
return 0;
}