summaryrefslogtreecommitdiff
path: root/src/tools/cbr/cbr_server.c
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-05-25 18:55:37 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-05-26 14:41:33 +0200
commit98f5a4cd845035a7fbeaa8ea95e58023c52e2b9c (patch)
treed8d843e83759caeb805dd415c83bd3fee7e52b51 /src/tools/cbr/cbr_server.c
parent8ade85d2084433198f050978b2f0ec8347efd37d (diff)
downloadouroboros-98f5a4cd845035a7fbeaa8ea95e58023c52e2b9c.tar.gz
ouroboros-98f5a4cd845035a7fbeaa8ea95e58023c52e2b9c.zip
tools: Add threadpool to cbr
This adds a threadpool to cbr, so that it is not overflooded with client requests.
Diffstat (limited to 'src/tools/cbr/cbr_server.c')
-rw-r--r--src/tools/cbr/cbr_server.c160
1 files changed, 124 insertions, 36 deletions
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 553f954d..eef1acc9 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -31,26 +31,45 @@
#include <ouroboros/time_utils.h>
#define DIF_NAME "*"
+#define THREADS_SIZE 10
+pthread_t listen_thread;
+pthread_t threads[THREADS_SIZE];
+int fds[THREADS_SIZE];
+int fds_count = 0;
+int fds_index = 0;
+pthread_mutex_t fds_lock;
+pthread_cond_t fds_signal;
-void shutdown_server(int signo)
+void shutdown_server(int signo, siginfo_t * info, void * c)
{
char * dif = DIF_NAME;
+ int i;
+
+ switch(signo) {
+ case SIGINT:
+ case SIGTERM:
+ case SIGHUP:
+ if (ap_unreg(&dif, 1)) {
+ printf("Failed to unregister application.\n");
+ ap_fini();
+ exit(EXIT_FAILURE);
+ }
- if (ap_unreg(&dif, 1)) {
- printf("Failed to unregister application.\n");
- ap_fini();
- exit(EXIT_FAILURE);
- }
+ pthread_cancel(listen_thread);
- ap_fini();
- exit(EXIT_SUCCESS);
+ for (i = 0; i < THREADS_SIZE; i++) {
+ pthread_cancel(threads[i]);
+ }
+
+ default:
+ return;
+ }
}
-void * handleflow(void * o)
+void handle_flow(int fd)
{
ssize_t count = 0;
- int fd = *((int *) o);
char buf[BUF_SIZE];
struct timespec now;
@@ -108,45 +127,57 @@ void * handleflow(void * o)
ts_add(&iv_start, &intv, &iv_end);
}
}
-
- return 0;
}
-int server_main()
+void * worker(void * o)
{
- int server_fd = 0;
- int client_fd = 0;
+ int cli_fd;
- char * dif = DIF_NAME;
- char * client_name = NULL;
+ while (true) {
+ pthread_mutex_lock(&fds_lock);
+ pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
+ (void *) &fds_lock);
+ while (fds[fds_index] == -1) {
+ pthread_cond_wait(&fds_signal, &fds_lock);
+ }
- int i = 0;
+ cli_fd = fds[fds_index];
+ fds[fds_index] = -1;
+ pthread_cleanup_pop(1);
- pthread_t * threads = malloc(sizeof(*threads) * 10);
- if (threads == NULL)
- EXIT_FAILURE;
+ handle_flow(cli_fd);
- printf("Server started, interval is %ld s, timeout is %ld s.\n",
- server_settings.interval, server_settings.timeout);
-
- /* Manual cleanup is required for now */
- if (signal(SIGINT, shutdown_server) == SIG_ERR) {
- printf("Can't install signal handler.\n");
- return -1;
+ pthread_mutex_lock(&fds_lock);
+ fds_count--;
+ pthread_mutex_unlock(&fds_lock);
}
+ return 0;
+}
+
+void * listener(void * o)
+{
+ char * dif = DIF_NAME;
+ int server_fd;
+ char * client_name = NULL;
+ int client_fd = 0;
+ int response = 0;
+
if (ap_init(SERVER_AP_NAME)) {
printf("Failed to init AP.\n");
- return -1;
+ exit(EXIT_FAILURE);
}
server_fd = ap_reg(&dif, 1);
if (server_fd < 0) {
printf("Failed to register application.\n");
ap_fini();
- return -1;
+ exit(EXIT_FAILURE);
}
+ printf("Server started, interval is %ld s, timeout is %ld s.\n",
+ server_settings.interval, server_settings.timeout);
+
while (true) {
client_fd = flow_accept(server_fd,
&client_name, NULL);
@@ -157,18 +188,75 @@ int server_main()
printf("New flow from %s.\n", client_name);
- if (flow_alloc_resp(client_fd, 0)) {
+ pthread_mutex_lock(&fds_lock);
+
+ response = (fds_count < THREADS_SIZE) ? 0 : -1;
+
+ if (flow_alloc_resp(client_fd, response)) {
printf("Failed to give an allocate response.\n");
flow_dealloc(client_fd);
+ pthread_mutex_unlock(&fds_lock);
continue;
}
- if (i < 10) {
- pthread_create(&threads[i++],
- NULL,
- handleflow,
- &client_fd);
+ if (response) {
+ printf("Can't accept any more flows, denying.\n");
+ continue;
}
+
+ fds_count++;
+ fds_index = (fds_index + 1) % THREADS_SIZE;
+ fds[fds_index] = client_fd;
+
+ pthread_mutex_unlock(&fds_lock);
+ pthread_cond_signal(&fds_signal);
+ }
+
+ return 0;
+}
+
+int server_main()
+{
+ struct sigaction sig_act;
+ int i;
+
+ memset(&sig_act, 0, sizeof sig_act);
+ sig_act.sa_sigaction = &shutdown_server;
+ sig_act.sa_flags = 0;
+
+ for (i = 0; i < THREADS_SIZE; i++)
+ fds[i] = -1;
+
+ if (sigaction(SIGINT, &sig_act, NULL) ||
+ sigaction(SIGTERM, &sig_act, NULL) ||
+ sigaction(SIGHUP, &sig_act, NULL) ||
+ sigaction(SIGPIPE, &sig_act, NULL)) {
+ printf("Failed to install sighandler.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (pthread_mutex_init(&fds_lock, NULL)) {
+ printf("Failed to init mutex.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (pthread_cond_init(&fds_signal, NULL)) {
+ printf("Failed to init cond.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ for (i = 0; i < THREADS_SIZE; i++) {
+ pthread_create(&threads[i], NULL,
+ worker, NULL);
+ }
+
+ pthread_create(&listen_thread, NULL,
+ listener, NULL);
+
+ pthread_join(listen_thread, NULL);
+
+ for (i = 0; i < THREADS_SIZE; i++) {
+ pthread_join(threads[i], NULL);
}
ap_fini();