summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-26 15:11:56 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-26 15:11:56 +0200
commit4ede581f562e1f4b2f924559ce582cec5389e056 (patch)
tree2fa38dcbe15ba78b3327b10589a12df5c04a5403
parent22fe93c00ce5a2b3a64b64f38dc749c83ed42206 (diff)
parentc1dd73319e5bc80a158cb1c4cffae3c9a7b13a87 (diff)
downloadouroboros-4ede581f562e1f4b2f924559ce582cec5389e056.tar.gz
ouroboros-4ede581f562e1f4b2f924559ce582cec5389e056.zip
Merged in sandervrijders/ouroboros/be-cbr (pull request #112)
Be cbr
-rw-r--r--src/ipcpd/ipcp.c5
-rw-r--r--src/tools/cbr/cbr_client.c1
-rw-r--r--src/tools/cbr/cbr_server.c160
3 files changed, 130 insertions, 36 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index dd370005..e0dd1b60 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -93,6 +93,9 @@ void * ipcp_main_loop(void * o)
return (void *) 1;
}
+ pthread_cleanup_push((void(*)(void *)) close,
+ (void *) &sockfd);
+
free(sock_path);
while (true) {
@@ -259,5 +262,7 @@ void * ipcp_main_loop(void * o)
close(lsockfd);
}
+ pthread_cleanup_pop(0);
+
return NULL;
}
diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c
index 78b95255..ff7d4057 100644
--- a/src/tools/cbr/cbr_client.c
+++ b/src/tools/cbr/cbr_client.c
@@ -67,6 +67,7 @@ int client_main(int duration, int size, long rate)
if (flow_write(fd, buf, size) == -1) {
printf("Failed to write SDU.\n");
+ stop = true;
continue;
}
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 553f954d..eef1acc9 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -31,26 +31,45 @@
#include <ouroboros/time_utils.h>
#define DIF_NAME "*"
+#define THREADS_SIZE 10
+pthread_t listen_thread;
+pthread_t threads[THREADS_SIZE];
+int fds[THREADS_SIZE];
+int fds_count = 0;
+int fds_index = 0;
+pthread_mutex_t fds_lock;
+pthread_cond_t fds_signal;
-void shutdown_server(int signo)
+void shutdown_server(int signo, siginfo_t * info, void * c)
{
char * dif = DIF_NAME;
+ int i;
+
+ switch(signo) {
+ case SIGINT:
+ case SIGTERM:
+ case SIGHUP:
+ if (ap_unreg(&dif, 1)) {
+ printf("Failed to unregister application.\n");
+ ap_fini();
+ exit(EXIT_FAILURE);
+ }
- if (ap_unreg(&dif, 1)) {
- printf("Failed to unregister application.\n");
- ap_fini();
- exit(EXIT_FAILURE);
- }
+ pthread_cancel(listen_thread);
- ap_fini();
- exit(EXIT_SUCCESS);
+ for (i = 0; i < THREADS_SIZE; i++) {
+ pthread_cancel(threads[i]);
+ }
+
+ default:
+ return;
+ }
}
-void * handleflow(void * o)
+void handle_flow(int fd)
{
ssize_t count = 0;
- int fd = *((int *) o);
char buf[BUF_SIZE];
struct timespec now;
@@ -108,45 +127,57 @@ void * handleflow(void * o)
ts_add(&iv_start, &intv, &iv_end);
}
}
-
- return 0;
}
-int server_main()
+void * worker(void * o)
{
- int server_fd = 0;
- int client_fd = 0;
+ int cli_fd;
- char * dif = DIF_NAME;
- char * client_name = NULL;
+ while (true) {
+ pthread_mutex_lock(&fds_lock);
+ pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
+ (void *) &fds_lock);
+ while (fds[fds_index] == -1) {
+ pthread_cond_wait(&fds_signal, &fds_lock);
+ }
- int i = 0;
+ cli_fd = fds[fds_index];
+ fds[fds_index] = -1;
+ pthread_cleanup_pop(1);
- pthread_t * threads = malloc(sizeof(*threads) * 10);
- if (threads == NULL)
- EXIT_FAILURE;
+ handle_flow(cli_fd);
- printf("Server started, interval is %ld s, timeout is %ld s.\n",
- server_settings.interval, server_settings.timeout);
-
- /* Manual cleanup is required for now */
- if (signal(SIGINT, shutdown_server) == SIG_ERR) {
- printf("Can't install signal handler.\n");
- return -1;
+ pthread_mutex_lock(&fds_lock);
+ fds_count--;
+ pthread_mutex_unlock(&fds_lock);
}
+ return 0;
+}
+
+void * listener(void * o)
+{
+ char * dif = DIF_NAME;
+ int server_fd;
+ char * client_name = NULL;
+ int client_fd = 0;
+ int response = 0;
+
if (ap_init(SERVER_AP_NAME)) {
printf("Failed to init AP.\n");
- return -1;
+ exit(EXIT_FAILURE);
}
server_fd = ap_reg(&dif, 1);
if (server_fd < 0) {
printf("Failed to register application.\n");
ap_fini();
- return -1;
+ exit(EXIT_FAILURE);
}
+ printf("Server started, interval is %ld s, timeout is %ld s.\n",
+ server_settings.interval, server_settings.timeout);
+
while (true) {
client_fd = flow_accept(server_fd,
&client_name, NULL);
@@ -157,18 +188,75 @@ int server_main()
printf("New flow from %s.\n", client_name);
- if (flow_alloc_resp(client_fd, 0)) {
+ pthread_mutex_lock(&fds_lock);
+
+ response = (fds_count < THREADS_SIZE) ? 0 : -1;
+
+ if (flow_alloc_resp(client_fd, response)) {
printf("Failed to give an allocate response.\n");
flow_dealloc(client_fd);
+ pthread_mutex_unlock(&fds_lock);
continue;
}
- if (i < 10) {
- pthread_create(&threads[i++],
- NULL,
- handleflow,
- &client_fd);
+ if (response) {
+ printf("Can't accept any more flows, denying.\n");
+ continue;
}
+
+ fds_count++;
+ fds_index = (fds_index + 1) % THREADS_SIZE;
+ fds[fds_index] = client_fd;
+
+ pthread_mutex_unlock(&fds_lock);
+ pthread_cond_signal(&fds_signal);
+ }
+
+ return 0;
+}
+
+int server_main()
+{
+ struct sigaction sig_act;
+ int i;
+
+ memset(&sig_act, 0, sizeof sig_act);
+ sig_act.sa_sigaction = &shutdown_server;
+ sig_act.sa_flags = 0;
+
+ for (i = 0; i < THREADS_SIZE; i++)
+ fds[i] = -1;
+
+ if (sigaction(SIGINT, &sig_act, NULL) ||
+ sigaction(SIGTERM, &sig_act, NULL) ||
+ sigaction(SIGHUP, &sig_act, NULL) ||
+ sigaction(SIGPIPE, &sig_act, NULL)) {
+ printf("Failed to install sighandler.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (pthread_mutex_init(&fds_lock, NULL)) {
+ printf("Failed to init mutex.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (pthread_cond_init(&fds_signal, NULL)) {
+ printf("Failed to init cond.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ for (i = 0; i < THREADS_SIZE; i++) {
+ pthread_create(&threads[i], NULL,
+ worker, NULL);
+ }
+
+ pthread_create(&listen_thread, NULL,
+ listener, NULL);
+
+ pthread_join(listen_thread, NULL);
+
+ for (i = 0; i < THREADS_SIZE; i++) {
+ pthread_join(threads[i], NULL);
}
ap_fini();