diff options
| -rw-r--r-- | include/ouroboros/bitmap.h | 5 | ||||
| -rw-r--r-- | include/ouroboros/config.h.in | 11 | ||||
| -rw-r--r-- | src/irmd/main.c | 227 | ||||
| -rw-r--r-- | src/lib/bitmap.c | 61 | ||||
| -rw-r--r-- | src/lib/tests/bitmap_test.c | 35 | 
5 files changed, 273 insertions, 66 deletions
| diff --git a/include/ouroboros/bitmap.h b/include/ouroboros/bitmap.h index cb62312a..d6bb250b 100644 --- a/include/ouroboros/bitmap.h +++ b/include/ouroboros/bitmap.h @@ -33,7 +33,7 @@ struct bmp;  struct bmp * bmp_create(size_t  bits,                          ssize_t offset); -int          bmp_destroy(struct bmp * b); +void         bmp_destroy(struct bmp * b);  ssize_t      bmp_allocate(struct bmp * instance); @@ -43,4 +43,7 @@ int          bmp_release(struct bmp * instance,  bool         bmp_is_id_valid(struct bmp * b,                               ssize_t      id); +bool         bmp_is_id_used(struct bmp * b, +                            ssize_t      id); +  #endif /* OUROBOROS_BITMAP_H */ diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 7616961c..067a2f85 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -35,7 +35,7 @@  #define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@"  #define IPCP_NORMAL_EXEC       "@IPCP_NORMAL_TARGET@"  #define IPCP_LOCAL_EXEC        "@IPCP_LOCAL_TARGET@" -#define AP_MAX_FLOWS           256 +#define AP_MAX_FLOWS           2048  #define AP_MAX_FQUEUES         64  #define SHM_RDRB_BLOCK_SIZE    sysconf(_SC_PAGESIZE)  #define SHM_RDRB_MULTI_BLOCK @@ -47,14 +47,19 @@  #define SHM_RBUFF_PREFIX       "/ouroboros.rbuff."  #define SHM_FLOW_SET_PREFIX    "/ouroboros.sets."  #define IRMD_MAX_FLOWS         4096 -#define IRMD_THREADPOOL_SIZE   16 +/* IRMD dynamic threadpooling */ +#define IRMD_MIN_AV_THREADS    16 +#define IRMD_MAX_AV_THREADS    64 +#define IRMD_MAX_THREADS       256 +  #define IPCPD_THREADPOOL_SIZE  16  #define IPCPD_MAX_CONNS        IRMD_MAX_FLOWS  #define PTHREAD_COND_CLOCK     CLOCK_MONOTONIC  #define PFT_SIZE               1 << 12  /* Timeout values */ +#define IRMD_TPM_TIMEOUT       1000  #define IRMD_ACCEPT_TIMEOUT    100 -#define IRMD_REQ_ARR_TIMEOUT   200 +#define IRMD_REQ_ARR_TIMEOUT   500  #define IRMD_FLOW_TIMEOUT      5000  #define IPCP_ACCEPT_TIMEOUT    100  #define SOCKET_TIMEOUT         4000 diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state {  };  struct irm { -        struct list_head     registry; +        struct list_head     registry;     /* registered names known     */ -        struct list_head     ipcps; +        struct list_head     ipcps;        /* list of ipcps in system    */ -        struct list_head     api_table; -        struct list_head     apn_table; -        struct list_head     spawned_apis; -        pthread_rwlock_t     reg_lock; +        struct list_head     api_table;    /* ap instances               */ +        struct list_head     apn_table;    /* ap names known             */ +        struct list_head     spawned_apis; /* child ap instances         */ +        pthread_rwlock_t     reg_lock;     /* lock for registration info */ -        /* keep track of all flows in this processing system */ -        struct bmp *         port_ids; -        /* maps port_ids to api pair */ -        struct list_head     irm_flows; -        pthread_rwlock_t     flows_lock; +        struct bmp *         port_ids;     /* port_ids for flows         */ +        struct list_head     irm_flows;    /* flow information           */ +        pthread_rwlock_t     flows_lock;   /* lock for flows             */ -        struct lockfile *    lf; -        struct shm_rdrbuff * rdrb; -        pthread_t *          threadpool; -        int                  sockfd; +        struct lockfile *    lf;           /* single irmd per system     */ +        struct shm_rdrbuff * rdrb;         /* rdrbuff for SDUs           */ +        int                  sockfd;       /* UNIX socket                */ -        enum irm_state       state; -        pthread_rwlock_t     state_lock; +        pthread_t *          threadpool;   /* pool of mainloop threads   */ -        pthread_t            irm_sanitize; -        pthread_t            shm_sanitize; +        struct bmp *         thread_ids;   /* ids for mainloop threads   */ +        size_t               max_threads;  /* max threads set by tpm     */ +        size_t               threads;      /* available mainloop threads */ +        pthread_cond_t       threads_cond; /* signal thread entry/exit   */ +        pthread_mutex_t      threads_lock; /* mutex for threads/condvar  */ + +        enum irm_state       state;        /* state of the irmd          */ +        pthread_rwlock_t     state_lock;   /* lock for the entire irmd   */ + +        pthread_t            tpm;          /* threadpool manager         */ +        pthread_t            irm_sanitize; /* clean up irmd resources    */ +        pthread_t            shm_sanitize; /* keep track of rdrbuff use  */  } * irmd;  static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void)          if (irmd->state != IRMD_NULL)                  log_warn("Unsafe destroy."); +        pthread_mutex_lock(&irmd->threads_lock); + +        if (irmd->thread_ids != NULL) +                bmp_destroy(irmd->thread_ids); + +        pthread_mutex_unlock(&irmd->threads_lock); +          if (irmd->threadpool != NULL)                  free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o)          }  } +static void thread_inc(void) +{ +        pthread_mutex_lock(&irmd->threads_lock); + +        ++irmd->threads; +        pthread_cond_signal(&irmd->threads_cond); + +        pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ +        pthread_mutex_lock(&irmd->threads_lock); + +        --irmd->threads; +        pthread_cond_signal(&irmd->threads_cond); + +        pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ +        int ret; + +        pthread_mutex_lock(&irmd->threads_lock); + +        ret = irmd->threads > irmd->max_threads; + +        pthread_mutex_unlock(&irmd->threads_lock); + +        return ret; +} + +static void thread_exit(ssize_t id) +{ +        pthread_mutex_lock(&irmd->threads_lock); +        bmp_release(irmd->thread_ids, id); + +        --irmd->threads; +        pthread_cond_signal(&irmd->threads_cond); + +        pthread_mutex_unlock(&irmd->threads_lock); +} +  void * mainloop(void * o)  {          uint8_t buf[IRM_MSG_BUF_SIZE]; -        (void) o; +        ssize_t id = (ssize_t) o;          while (true) {  #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o)                                       (SOCKET_TIMEOUT % 1000) * 1000};                  pthread_rwlock_rdlock(&irmd->state_lock); -                if (irmd->state != IRMD_RUNNING) { + +                if (irmd->state != IRMD_RUNNING || thread_check()) { +                        thread_exit(id);                          pthread_rwlock_unlock(&irmd->state_lock);                          break;                  } +                  pthread_rwlock_unlock(&irmd->state_lock);                  ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o)                  if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0)                          continue;  #endif +                  cli_sockfd = accept(irmd->sockfd, 0, 0);                  if (cli_sockfd < 0)                          continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o)                          continue;                  } +                thread_dec(); +                  switch (msg->code) {                  case IRM_MSG_CODE__IRM_CREATE_IPCP:                          ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(cli_sockfd); +                        thread_inc();                          continue;                  } @@ -1917,6 +1981,7 @@ void * mainloop(void * o)                          if (apis != NULL)                                  free(apis);                          close(cli_sockfd); +                        thread_inc();                          continue;                  } @@ -1930,6 +1995,82 @@ void * mainloop(void * o)                  free(buffer.data);                  close(cli_sockfd); + +                thread_inc(); +        } + +        return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ +        bool ret; +        pthread_mutex_lock(&irmd->threads_lock); + +        ret = bmp_is_id_used(irmd->thread_ids, id); + +        pthread_mutex_unlock(&irmd->threads_lock); + +        return ret; +} + +void * threadpoolmgr(void * o) +{ +        struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), +                              (IRMD_TPM_TIMEOUT % 1000) * MILLION}; +        struct timespec dl; +        size_t t; + +        (void) o; + +        while (true) { +                clock_gettime(PTHREAD_COND_CLOCK, &dl); +                ts_add(&dl, &to, &dl); + +                pthread_rwlock_rdlock(&irmd->state_lock); +                if (irmd->state != IRMD_RUNNING) { +                        pthread_rwlock_unlock(&irmd->state_lock); +                        log_dbg("Threadpool manager exiting."); +                        for (t = 0; t < IRMD_MAX_THREADS; ++t) +                                if (is_thread_alive(t)) { +                                        log_dbg("Waiting for thread %zd.", t); +                                        pthread_join(irmd->threadpool[t], NULL); +                                } + +                        log_dbg("Threadpool manager done."); +                        break; +                } + +                pthread_rwlock_unlock(&irmd->state_lock); + +                pthread_mutex_lock(&irmd->threads_lock); + +                if (irmd->threads < IRMD_MIN_AV_THREADS) { +                        log_dbg("Increasing threadpool."); +                        irmd->max_threads = IRMD_MAX_AV_THREADS; + +                        while (irmd->threads < irmd->max_threads) { +                                ssize_t id = bmp_allocate(irmd->thread_ids); +                                if (!bmp_is_id_valid(irmd->thread_ids, id)) { +                                        log_warn("IRMd threadpool exhausted."); +                                        break; +                                } + +                                if (pthread_create(&irmd->threadpool[id], +                                                   NULL, mainloop, (void *) id)) +                                        log_warn("Failed to start new thread."); +                                else +                                        ++irmd->threads; +                        } +                } + +                if (pthread_cond_timedwait(&irmd->threads_cond, +                                           &irmd->threads_lock, +                                           &dl) == ETIMEDOUT) +                        if (irmd->threads > IRMD_MIN_AV_THREADS) +                                --irmd->max_threads; + +                pthread_mutex_unlock(&irmd->threads_lock);          }          return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o)  static int irm_create(void)  {          struct stat st; +        pthread_condattr_t cattr;          struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),                                    (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void)                  return -1;          } +        if (pthread_mutex_init(&irmd->threads_lock, NULL)) { +                log_err("Failed to initialize mutex."); +                free(irmd); +                return -1; +        } + +        if (pthread_condattr_init(&cattr)) { +                log_err("Failed to initialize condattr."); +                free(irmd); +                return -1; +        } + +#ifndef __APPLE__ +        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif +        if (pthread_cond_init(&irmd->threads_cond, &cattr)) { +                log_err("Failed to initialize cond."); +                free(irmd); +                return -1; +        } +          list_head_init(&irmd->ipcps);          list_head_init(&irmd->api_table);          list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void)                  return -ENOMEM;          } -        irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); +        irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); +        if (irmd->thread_ids == NULL) { +                irm_destroy(); +                return -ENOMEM; +        } + +        irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);          if (irmd->threadpool == NULL) {                  irm_destroy();                  return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void)                  return -1;          } -        irmd->state = IRMD_RUNNING; +        irmd->threads     = 0; +        irmd->max_threads = IRMD_MIN_AV_THREADS; +        irmd->state       = IRMD_RUNNING;          log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int     argc,  {          struct sigaction sig_act; -        int t = 0; -          bool use_stdout = false;          if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int     argc,                  exit(EXIT_FAILURE);          } -        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) -                pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); +        pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); +        pthread_join(irmd->tpm, NULL);          pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);          pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); -        /* Wait for (all of them) to return. */ -        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) -                pthread_join(irmd->threadpool[t], NULL); -          pthread_join(irmd->irm_sanitize, NULL);          pthread_cancel(irmd->shm_sanitize); diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 93ffda77..bf9bb99d 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -38,7 +38,8 @@  #define BITS_TO_LONGS(nr) \          DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(size_t)) -static size_t find_next_zero_bit(const size_t * addr, size_t nbits) +static size_t find_next_zero_bit(const size_t * addr, +                                 size_t         nbits)  {          size_t tmp;          size_t start = 0; @@ -65,13 +66,15 @@ static size_t find_next_zero_bit(const size_t * addr, size_t nbits)          return (start * BITS_PER_LONG) + pos;  } -static void bitmap_zero(size_t * dst, size_t nbits) +static void bitmap_zero(size_t * dst, +                        size_t   nbits)  {          size_t len = BITS_TO_LONGS(nbits) * sizeof(size_t);          memset(dst, 0, len);  } -static void bitmap_clear(size_t * map, size_t start) +static void bitmap_clear(size_t * map, +                         size_t   start)  {          size_t * p = map + BIT_WORD(start);          size_t mask = ~(1UL << (start % (BITS_PER_LONG))); @@ -79,7 +82,8 @@ static void bitmap_clear(size_t * map, size_t start)          *p &= mask;  } -static void bitmap_set(size_t * map, size_t start) +static void bitmap_set(size_t * map, +                       size_t   start)  {          size_t * p = map + BIT_WORD(start);          size_t mask = 1UL << (start % (BITS_PER_LONG)); @@ -94,7 +98,8 @@ struct bmp {          size_t * bitmap;  }; -struct bmp * bmp_create(size_t bits, ssize_t offset) +struct bmp * bmp_create(size_t  bits, +                        ssize_t offset)  {          struct bmp * tmp; @@ -118,20 +123,15 @@ struct bmp * bmp_create(size_t bits, ssize_t offset)          return tmp;  } -int bmp_destroy(struct bmp * b) +void bmp_destroy(struct bmp * b)  {          if (b == NULL) -                return -1; +                return; -        if (b->bitmap == NULL) { -                free(b); -                return -1; -        } +        if (b->bitmap != NULL) +                free(b->bitmap); -        free(b->bitmap);          free(b); - -        return 0;  }  static ssize_t bad_id(struct bmp * b) @@ -158,7 +158,8 @@ ssize_t bmp_allocate(struct bmp * b)          return id + b->offset;  } -static bool is_id_valid(struct bmp * b, ssize_t id) +static bool is_id_valid(struct bmp * b, +                        ssize_t      id)  {          assert(b); @@ -168,7 +169,17 @@ static bool is_id_valid(struct bmp * b, ssize_t id)          return true;  } -bool bmp_is_id_valid(struct bmp * b, ssize_t id) +static bool is_id_used(size_t * map, +                       size_t   start) +{ +        size_t * p = map + BIT_WORD(start); +        size_t mask = 1UL << (start % (BITS_PER_LONG)); + +        return (*p & mask) != 0; +} + +bool bmp_is_id_valid(struct bmp * b, +                     ssize_t      id)  {          if (b == NULL)                  return false; @@ -176,19 +187,25 @@ bool bmp_is_id_valid(struct bmp * b, ssize_t id)          return is_id_valid(b, id);  } -int bmp_release(struct bmp * b, ssize_t id) +int bmp_release(struct bmp * b, +                ssize_t      id)  { -        size_t rid; -          if (b == NULL)                  return -1;          if (!is_id_valid(b, id))                  return -1; -        rid = id - b->offset; - -        bitmap_clear(b->bitmap, rid); +        bitmap_clear(b->bitmap, id - b->offset);          return 0;  } + +bool bmp_is_id_used(struct bmp * b, +                    ssize_t      id) +{ +        if (b == NULL) +                return false; + +        return is_id_used(b->bitmap, id - b->offset); +} diff --git a/src/lib/tests/bitmap_test.c b/src/lib/tests/bitmap_test.c index 7480600e..e438f217 100644 --- a/src/lib/tests/bitmap_test.c +++ b/src/lib/tests/bitmap_test.c @@ -23,6 +23,7 @@  #include "bitmap.c"  #include <time.h>  #include <stdlib.h> +#include <stdio.h>  #define BITMAP_SIZE 200 @@ -41,40 +42,56 @@ int bitmap_test(int argc, char ** argv)          srand(time(NULL));          bmp = bmp_create(bits, offset); -        if (bmp == NULL) +        if (bmp == NULL) { +                printf("Failed to create bmp.\n");                  return -1; +        } -        if (bmp_destroy(bmp)) -                return -1; +        bmp_destroy(bmp);          bmp = bmp_create(bits, offset); -        if (bmp == NULL) +        if (bmp == NULL) { +                printf("Failed to re-create bmp.\n");                  return -1; +        }          for (i = offset; i < BITMAP_SIZE + 5 + offset; i++) {                  id = bmp_allocate(bmp);                  if (!bmp_is_id_valid(bmp, id))                          continue; -                if (id != i) +                if (!bmp_is_id_used(bmp, id)) { +                        printf("ID not marked in use.\n"); +                        bmp_destroy(bmp);                          return -1; +                } + +                if (id != i) { +                        printf("Wrong ID returned.\n"); +                        bmp_destroy(bmp); +                        return -1; +                }          }          for (i = 0; i < BITMAP_SIZE + 5; i++) {                  r = (ssize_t) (rand() % BITMAP_SIZE) + offset; -                if (bmp_release(bmp, r)) +                if (bmp_release(bmp, r)) { +                        printf("Failed to release ID.\n");                          return -1; +                }                  id = bmp_allocate(bmp);                  if (!bmp_is_id_valid(bmp, id))                          continue; -                if (id != r) +                if (id != r) { +                        printf("Wrong prev ID returned.\n"); +                        bmp_destroy(bmp);                          return -1; +                }          } -        if (bmp_destroy(bmp)) -                return -1; +        bmp_destroy(bmp);          return 0;  } | 
