diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-25 18:55:37 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-26 14:41:33 +0200 | 
| commit | 98f5a4cd845035a7fbeaa8ea95e58023c52e2b9c (patch) | |
| tree | d8d843e83759caeb805dd415c83bd3fee7e52b51 /src/tools/cbr | |
| parent | 8ade85d2084433198f050978b2f0ec8347efd37d (diff) | |
| download | ouroboros-98f5a4cd845035a7fbeaa8ea95e58023c52e2b9c.tar.gz ouroboros-98f5a4cd845035a7fbeaa8ea95e58023c52e2b9c.zip | |
tools: Add threadpool to cbr
This adds a threadpool to cbr, so that it is not overflooded with
client requests.
Diffstat (limited to 'src/tools/cbr')
| -rw-r--r-- | src/tools/cbr/cbr_client.c | 1 | ||||
| -rw-r--r-- | src/tools/cbr/cbr_server.c | 160 | 
2 files changed, 125 insertions, 36 deletions
| diff --git a/src/tools/cbr/cbr_client.c b/src/tools/cbr/cbr_client.c index 78b95255..ff7d4057 100644 --- a/src/tools/cbr/cbr_client.c +++ b/src/tools/cbr/cbr_client.c @@ -67,6 +67,7 @@ int client_main(int duration, int size, long rate)                  if (flow_write(fd, buf, size) == -1) {                          printf("Failed to write SDU.\n"); +                        stop = true;                          continue;                  } diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 553f954d..eef1acc9 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -31,26 +31,45 @@  #include <ouroboros/time_utils.h>  #define DIF_NAME "*" +#define THREADS_SIZE 10 +pthread_t       listen_thread; +pthread_t       threads[THREADS_SIZE]; +int             fds[THREADS_SIZE]; +int             fds_count = 0; +int             fds_index = 0; +pthread_mutex_t fds_lock; +pthread_cond_t  fds_signal; -void shutdown_server(int signo) +void shutdown_server(int signo, siginfo_t * info, void * c)  {          char * dif = DIF_NAME; +        int i; + +        switch(signo) { +        case SIGINT: +        case SIGTERM: +        case SIGHUP: +                if (ap_unreg(&dif, 1)) { +                        printf("Failed to unregister application.\n"); +                        ap_fini(); +                        exit(EXIT_FAILURE); +                } -        if (ap_unreg(&dif, 1)) { -                printf("Failed to unregister application.\n"); -                ap_fini(); -                exit(EXIT_FAILURE); -        } +                pthread_cancel(listen_thread); -        ap_fini(); -        exit(EXIT_SUCCESS); +                for (i = 0; i < THREADS_SIZE; i++) { +                        pthread_cancel(threads[i]); +                } + +        default: +                return; +        }  } -void * handleflow(void * o) +void handle_flow(int fd)  {          ssize_t count = 0; -        int fd = *((int *) o);          char buf[BUF_SIZE];          struct timespec now; @@ -108,45 +127,57 @@ void * handleflow(void * o)                          ts_add(&iv_start, &intv, &iv_end);                  }          } - -        return 0;  } -int server_main() +void * worker(void * o)  { -        int    server_fd = 0; -        int    client_fd = 0; +        int cli_fd; -        char * dif         = DIF_NAME; -        char * client_name = NULL; +        while (true) { +                pthread_mutex_lock(&fds_lock); +                pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, +                                     (void *) &fds_lock); +                while (fds[fds_index] == -1) { +                        pthread_cond_wait(&fds_signal, &fds_lock); +                } -        int i = 0; +                cli_fd = fds[fds_index]; +                fds[fds_index] = -1; +                pthread_cleanup_pop(1); -        pthread_t * threads = malloc(sizeof(*threads) * 10); -        if (threads == NULL) -                EXIT_FAILURE; +                handle_flow(cli_fd); -        printf("Server started, interval is %ld s, timeout is %ld s.\n", -               server_settings.interval, server_settings.timeout); - -        /* Manual cleanup is required for now */ -        if (signal(SIGINT, shutdown_server) == SIG_ERR) { -                printf("Can't install signal handler.\n"); -                return -1; +                pthread_mutex_lock(&fds_lock); +                fds_count--; +                pthread_mutex_unlock(&fds_lock);          } +        return 0; +} + +void * listener(void * o) +{ +        char * dif = DIF_NAME; +        int server_fd; +        char * client_name = NULL; +        int client_fd = 0; +        int response = 0; +          if (ap_init(SERVER_AP_NAME)) {                  printf("Failed to init AP.\n"); -                return -1; +                exit(EXIT_FAILURE);          }          server_fd = ap_reg(&dif, 1);          if (server_fd < 0) {                  printf("Failed to register application.\n");                  ap_fini(); -                return -1; +                exit(EXIT_FAILURE);          } +        printf("Server started, interval is %ld s, timeout is %ld s.\n", +               server_settings.interval, server_settings.timeout); +          while (true) {                  client_fd = flow_accept(server_fd,                                          &client_name, NULL); @@ -157,18 +188,75 @@ int server_main()                  printf("New flow from %s.\n", client_name); -                if (flow_alloc_resp(client_fd, 0)) { +                pthread_mutex_lock(&fds_lock); + +                response = (fds_count < THREADS_SIZE) ? 0 : -1; + +                if (flow_alloc_resp(client_fd, response)) {                          printf("Failed to give an allocate response.\n");                          flow_dealloc(client_fd); +                        pthread_mutex_unlock(&fds_lock);                          continue;                  } -                if (i < 10) { -                        pthread_create(&threads[i++], -                                       NULL, -                                       handleflow, -                                       &client_fd); +                if (response) { +                        printf("Can't accept any more flows, denying.\n"); +                        continue;                  } + +                fds_count++; +                fds_index = (fds_index + 1) % THREADS_SIZE; +                fds[fds_index] = client_fd; + +                pthread_mutex_unlock(&fds_lock); +                pthread_cond_signal(&fds_signal); +        } + +        return 0; +} + +int server_main() +{ +        struct sigaction sig_act; +        int i; + +        memset(&sig_act, 0, sizeof sig_act); +        sig_act.sa_sigaction = &shutdown_server; +        sig_act.sa_flags = 0; + +        for (i = 0; i < THREADS_SIZE; i++) +                fds[i] = -1; + +        if (sigaction(SIGINT,  &sig_act, NULL) || +            sigaction(SIGTERM, &sig_act, NULL) || +            sigaction(SIGHUP,  &sig_act, NULL) || +            sigaction(SIGPIPE, &sig_act, NULL)) { +                printf("Failed to install sighandler.\n"); +                exit(EXIT_FAILURE); +        } + +        if (pthread_mutex_init(&fds_lock, NULL)) { +                printf("Failed to init mutex.\n"); +                exit(EXIT_FAILURE); +        } + +        if (pthread_cond_init(&fds_signal, NULL)) { +                printf("Failed to init cond.\n"); +                exit(EXIT_FAILURE); +        } + +        for (i = 0; i < THREADS_SIZE; i++) { +                pthread_create(&threads[i], NULL, +                               worker, NULL); +        } + +        pthread_create(&listen_thread, NULL, +                       listener, NULL); + +        pthread_join(listen_thread, NULL); + +        for (i = 0; i < THREADS_SIZE; i++) { +                pthread_join(threads[i], NULL);          }          ap_fini(); | 
