diff options
| -rw-r--r-- | include/ouroboros/qoscube.h | 4 | ||||
| -rw-r--r-- | src/ipcpd/CMakeLists.txt | 14 | ||||
| -rw-r--r-- | src/ipcpd/config.h.in | 2 | ||||
| -rw-r--r-- | src/ipcpd/eth/eth.c | 7 | ||||
| -rw-r--r-- | src/ipcpd/normal/dht.c | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 262 | ||||
| -rw-r--r-- | src/ipcpd/normal/pol/link_state.c | 126 | ||||
| -rw-r--r-- | src/ipcpd/normal/psched.c | 2 | ||||
| -rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/lib/dev.c | 5 | 
10 files changed, 257 insertions, 171 deletions
| diff --git a/include/ouroboros/qoscube.h b/include/ouroboros/qoscube.h index e0e93a82..bce9b361 100644 --- a/include/ouroboros/qoscube.h +++ b/include/ouroboros/qoscube.h @@ -26,9 +26,7 @@  #include <ouroboros/qos.h>  typedef enum qos_cube { -        QOS_CUBE_RAW = 0, -        QOS_CUBE_BE, -        QOS_CUBE_DATA, +        QOS_CUBE_BE = 0,          QOS_CUBE_VIDEO,          QOS_CUBE_VOICE,          QOS_CUBE_MAX diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index b706d432..50d23f8e 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -1,15 +1,11 @@  set(IPCP_ACCEPT_TIMEOUT 100 CACHE STRING    "Timeout for accept in IPCP mainloop threads (ms)") -set(IPCP_QOS_CUBE_RAW_PRIO 50 CACHE STRING -  "Priority for raw flow processing (0-99)") -set(IPCP_QOS_CUBE_BE_PRIO 0 CACHE STRING +set(IPCP_QOS_CUBE_BE_PRIO 50 CACHE STRING    "Priority for best effort QoS cube (0-99)")  set(IPCP_QOS_CUBE_VIDEO_PRIO 90 CACHE STRING    "Priority for video QoS cube (0-99)")  set(IPCP_QOS_CUBE_VOICE_PRIO 99 CACHE STRING    "Priority for voice QoS cube (0-99)") -set(IPCP_QOS_CUBE_DATA_PRIO 0 CACHE STRING -  "Priority for data QoS cube (0-99)")  set(IPCP_MIN_THREADS 4 CACHE STRING    "Minimum number of worker threads in the IPCP")  set(IPCP_ADD_THREADS 4 CACHE STRING @@ -21,18 +17,10 @@ set(DISABLE_CORE_LOCK FALSE CACHE BOOL  set(IPCP_CONN_WAIT_DIR TRUE CACHE BOOL    "Check the running state of the directory when adding a dt connection") -if ((IPCP_QOS_CUBE_RAW_PRIO LESS 0) OR (IPCP_QOS_CUBE_RAW_PRIO GREATER 99)) -  message(FATAL_ERROR "Invalid priority for raw flow") -endif () -  if ((IPCP_QOS_CUBE_BE_PRIO LESS 0) OR (IPCP_QOS_CUBE_BE_PRIO GREATER 99))    message(FATAL_ERROR "Invalid priority for best effort QoS cube")  endif () -if ((IPCP_QOS_CUBE_DATA_PRIO LESS 0) OR (IPCP_QOS_CUBE_DATA_PRIO GREATER 99)) -  message(FATAL_ERROR "Invalid priority for data QoS cube") -endif () -  if ((IPCP_QOS_CUBE_VIDEO_PRIO LESS 0) OR (IPCP_QOS_CUBE_VIDEO_PRIO GREATER 99))    message(FATAL_ERROR "Invalid priority for video QoS cube")  endif () diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index afce5e86..567fe971 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -42,11 +42,9 @@  #cmakedefine HAVE_LIBGCRYPT  /* normal IPCP */ -#define QOS_PRIO_RAW        @IPCP_QOS_CUBE_RAW_PRIO@  #define QOS_PRIO_BE         @IPCP_QOS_CUBE_BE_PRIO@  #define QOS_PRIO_VIDEO      @IPCP_QOS_CUBE_VIDEO_PRIO@  #define QOS_PRIO_VOICE      @IPCP_QOS_CUBE_VOICE_PRIO@ -#define QOS_PRIO_DATA       @IPCP_QOS_CUBE_DATA_PRIO@  #define IPCP_SCHED_THR_MUL  @IPCP_SCHED_THR_MUL@  #define PFT_SIZE            @PFT_SIZE@ diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 1bbfac5b..e7a1580c 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1242,8 +1242,13 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)                  return -1;          } +        if (strlen(conf->dev) >= IFNAMSIZ) { +                log_err("Invalid device name: %s.", conf->dev); +                return -1; +        } +          memset(&ifr, 0, sizeof(ifr)); -        memcpy(ifr.ifr_name, conf->dev, IFNAMSIZ); +        strcpy(ifr.ifr_name, conf->dev);  #ifdef BUILD_ETH_DIX          if (conf->ethertype < 0x0600 || conf->ethertype == 0xFFFF) { diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 4064bf5c..aa1909e9 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -2729,6 +2729,10 @@ static void handle_event(void *       self,                  pthread_t          thr;                  struct join_info * inf;                  struct conn *      c     = (struct conn *) o; +                struct timespec    slack = {0, 10 * MILLION}; + +                /* Give the pff some time to update for the new link. */ +                nanosleep(&slack, NULL);                  switch(dht_get_state(dht)) {                  case DHT_INIT: diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 027223b7..56864e1f 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -68,13 +68,23 @@ struct fa_msg {          uint32_t max_gap;  } __attribute__((packed)); +struct cmd { +        struct list_head     next; +        struct shm_du_buff * sdb; +}; +  struct { -        pthread_rwlock_t   flows_lock; -        int                r_eid[PROG_MAX_FLOWS]; -        uint64_t           r_addr[PROG_MAX_FLOWS]; -        int                fd; +        pthread_rwlock_t flows_lock; +        int              r_eid[PROG_MAX_FLOWS]; +        uint64_t         r_addr[PROG_MAX_FLOWS]; +        int              fd; -        struct psched *    psched; +        struct list_head cmds; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; +        pthread_t        worker; + +        struct psched *  psched;  } fa;  static void packet_handler(int                  fd, @@ -100,109 +110,161 @@ static void destroy_conn(int fd)  }  static void fa_post_packet(void *               comp, -                        struct shm_du_buff * sdb) +                           struct shm_du_buff * sdb)  { -        struct timespec ts  = {0, TIMEOUT * 1000}; -        struct timespec abstime; -        int             fd; -        uint8_t *       buf; -        struct fa_msg * msg; -        qosspec_t       qs; - -        (void) comp; +        struct cmd * cmd;          assert(comp == &fa); -        assert(sdb); -        buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); -        if (buf == NULL) +        (void) comp; + +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory."); +                ipcp_sdb_release(sdb);                  return; +        } + +        cmd->sdb = sdb; + +        pthread_mutex_lock(&fa.mtx); + +        list_add(&cmd->next, &fa.cmds); -        msg = (struct fa_msg *) buf; +        pthread_cond_signal(&fa.cond); -        /* Depending on the message call the function in ipcp-dev.h */ +        pthread_mutex_unlock(&fa.mtx); +} -        memcpy(msg, shm_du_buff_head(sdb), -               shm_du_buff_tail(sdb) - shm_du_buff_head(sdb)); +static void * fa_handle_packet(void * o) +{ +        struct timespec ts  = {0, TIMEOUT * 1000}; -        ipcp_sdb_release(sdb); +        (void) o; -        switch (msg->code) { -        case FLOW_REQ: -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        while (true) { +                struct timespec abstime; +                int             fd; +                uint8_t *       buf; +                struct fa_msg * msg; +                qosspec_t       qs; +                struct cmd *    cmd; -                pthread_mutex_lock(&ipcpi.alloc_lock); +                pthread_mutex_lock(&fa.mtx); -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) { -                        ts_add(&abstime, &ts, &abstime); -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &abstime); -                } +                pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                                     &fa.mtx); -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        free(msg); -                        return; -                } +                while (list_is_empty(&fa.cmds)) +                        pthread_cond_wait(&fa.cond, &fa.mtx); -                assert(ipcpi.alloc_id == -1); - -                qs.delay        = ntoh32(msg->delay); -                qs.bandwidth    = ntoh64(msg->bandwidth); -                qs.availability = msg->availability; -                qs.loss         = ntoh32(msg->loss); -                qs.ber          = ntoh32(msg->ber); -                qs.in_order     = msg->in_order; -                qs.max_gap      = ntoh32(msg->max_gap); - -                fd = ipcp_flow_req_arr(getpid(), -                                       (uint8_t *) (msg + 1), -                                       ipcp_dir_hash_len(), -                                       qs); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        log_err("Failed to get fd for flow."); -                        free(msg); -                        return; +                cmd = list_last_entry(&fa.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_cleanup_pop(true); + +                buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); +                if (buf == NULL) { +                        log_err("Failed to allocate memory."); +                        free(cmd); +                        ipcp_sdb_release(cmd->sdb); +                        continue;                  } -                pthread_rwlock_wrlock(&fa.flows_lock); +                msg = (struct fa_msg *) buf; -                fa.r_eid[fd]  = ntoh32(msg->s_eid); -                fa.r_addr[fd] = ntoh64(msg->s_addr); +                /* Depending on the message call the function in ipcp-dev.h */ -                pthread_rwlock_unlock(&fa.flows_lock); +                assert(sizeof(*msg) + ipcp_dir_hash_len() >= +                       (unsigned long int) (shm_du_buff_tail(cmd->sdb) - +                                            shm_du_buff_head(cmd->sdb))); -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); +                memcpy(msg, shm_du_buff_head(cmd->sdb), +                       shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb)); -                pthread_mutex_unlock(&ipcpi.alloc_lock); +                ipcp_sdb_release(cmd->sdb); -                break; -        case FLOW_REPLY: -                pthread_rwlock_wrlock(&fa.flows_lock); +                free(cmd); -                fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); +                switch (msg->code) { +                case FLOW_REQ: +                        clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); +                        pthread_mutex_lock(&ipcpi.alloc_lock); -                if (msg->response < 0) -                        destroy_conn(ntoh32(msg->r_eid)); -                else -                        psched_add(fa.psched, ntoh32(msg->r_eid)); +                        while (ipcpi.alloc_id != -1 && +                               ipcp_get_state() == IPCP_OPERATIONAL) { +                                ts_add(&abstime, &ts, &abstime); +                                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                                       &ipcpi.alloc_lock, +                                                       &abstime); +                        } -                pthread_rwlock_unlock(&fa.flows_lock); +                        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_dbg("Won't allocate over non-operational" +                                        "IPCP."); +                                free(msg); +                                continue; +                        } -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                break; -        } +                        assert(ipcpi.alloc_id == -1); + +                        qs.delay        = ntoh32(msg->delay); +                        qs.bandwidth    = ntoh64(msg->bandwidth); +                        qs.availability = msg->availability; +                        qs.loss         = ntoh32(msg->loss); +                        qs.ber          = ntoh32(msg->ber); +                        qs.in_order     = msg->in_order; +                        qs.max_gap      = ntoh32(msg->max_gap); -        free(msg); +                        fd = ipcp_flow_req_arr(getpid(), +                                               (uint8_t *) (msg + 1), +                                               ipcp_dir_hash_len(), +                                               qs); +                        if (fd < 0) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_err("Failed to get fd for flow."); +                                free(msg); +                                continue; +                        } + +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[fd]  = ntoh32(msg->s_eid); +                        fa.r_addr[fd] = ntoh64(msg->s_addr); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        ipcpi.alloc_id = fd; +                        pthread_cond_broadcast(&ipcpi.alloc_cond); + +                        pthread_mutex_unlock(&ipcpi.alloc_lock); + +                        break; +                case FLOW_REPLY: +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + +                        ipcp_flow_alloc_reply(ntoh32(msg->r_eid), +                                              msg->response); + +                        if (msg->response < 0) +                                destroy_conn(ntoh32(msg->r_eid)); +                        else +                                psched_add(fa.psched, ntoh32(msg->r_eid)); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        break; +                default: +                        log_err("Got an unknown flow allocation message."); +                        break; +                } + +                free(msg); +        }  }  int fa_init(void) @@ -213,31 +275,59 @@ int fa_init(void)                  destroy_conn(i);          if (pthread_rwlock_init(&fa.flows_lock, NULL)) -                return -1; +                goto fail_rwlock; + +        if (pthread_mutex_init(&fa.mtx, NULL)) +                goto fail_mtx; + +        if (pthread_cond_init(&fa.cond, NULL)) +                goto fail_cond; + +        list_head_init(&fa.cmds);          fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);          return 0; + + fail_cond: +        pthread_mutex_destroy(&fa.mtx); + fail_mtx: +        pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: +        log_err("Failed to initialize flow allocator."); +        return -1;  }  void fa_fini(void)  { +        pthread_cond_destroy(&fa.cond);; +        pthread_mutex_destroy(&fa.mtx);          pthread_rwlock_destroy(&fa.flows_lock);  }  int fa_start(void)  {          fa.psched = psched_create(packet_handler); -        if (fa.psched == NULL) { -                log_err("Failed to create packet scheduler."); -                return -1; -        } +        if (fa.psched == NULL) +                goto fail_psched; + +        if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) +                goto fail_thread;          return 0; + + fail_thread: +        psched_destroy(fa.psched); + fail_psched: +        log_err("Failed to start flow allocator."); +        return -1;  }  void fa_stop(void)  { +        pthread_cancel(fa.worker); +        pthread_join(fa.worker, NULL); +          psched_destroy(fa.psched);  } diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index e2e9eab5..8db1a9c5 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -374,7 +374,66 @@ static int lsdb_del_nb(uint64_t     addr,          return -EPERM;  } -static void set_pff_modified(void) +static int nbr_to_fd(uint64_t addr) +{ +        struct list_head * p; + +        pthread_rwlock_rdlock(&ls.db_lock); + +        list_for_each(p, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                if (nb->addr == addr && nb->type == NB_DT) { +                        pthread_rwlock_unlock(&ls.db_lock); +                        return nb->fd; +                } +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        return -1; +} + +static void calculate_pff(struct routing_i * instance) +{ +        int                fd; +        struct list_head   table; +        struct list_head * p; +        struct list_head * q; +        int                fds[PROG_MAX_FLOWS]; + +        if (graph_routing_table(ls.graph, ls.routing_algo, +                                ipcpi.dt_addr, &table)) +                return; + +        pff_lock(instance->pff); + +        pff_flush(instance->pff); + +        /* Calulcate forwarding table from routing table. */ +        list_for_each(p, &table) { +                int                    i = 0; +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); + +                list_for_each(q, &t->nhops) { +                        struct nhop * n = list_entry(q, struct nhop, next); + +                        fd = nbr_to_fd(n->nhop); +                        if (fd == -1) +                                continue; + +                        fds[i++] = fd; +                } + +                pff_add(instance->pff, t->dst, fds, i); +        } + +        pff_unlock(instance->pff); + +        graph_free_routing_table(ls.graph, &table); +} + +static void set_pff_modified(bool calc)  {          struct list_head * p; @@ -385,6 +444,8 @@ static void set_pff_modified(void)                  pthread_mutex_lock(&inst->lock);                  inst->modified = true;                  pthread_mutex_unlock(&inst->lock); +                if (calc) +                        calculate_pff(inst);          }          pthread_mutex_unlock(&ls.routing_i_lock);  } @@ -439,7 +500,7 @@ static int lsdb_add_link(uint64_t    src,          pthread_rwlock_unlock(&ls.db_lock); -        set_pff_modified(); +        set_pff_modified(true);          return 0;  } @@ -462,7 +523,7 @@ static int lsdb_del_link(uint64_t src,                          ls.db_len--;                          pthread_rwlock_unlock(&ls.db_lock); -                        set_pff_modified(); +                        set_pff_modified(false);                          free(a);                          return 0;                  } @@ -473,65 +534,6 @@ static int lsdb_del_link(uint64_t src,          return -EPERM;  } -static int nbr_to_fd(uint64_t addr) -{ -        struct list_head * p; - -        pthread_rwlock_rdlock(&ls.db_lock); - -        list_for_each(p, &ls.nbs) { -                struct nb * nb = list_entry(p, struct nb, next); -                if (nb->addr == addr && nb->type == NB_DT) { -                        pthread_rwlock_unlock(&ls.db_lock); -                        return nb->fd; -                } -        } - -        pthread_rwlock_unlock(&ls.db_lock); - -        return -1; -} - -static void calculate_pff(struct routing_i * instance) -{ -        int                fd; -        struct list_head   table; -        struct list_head * p; -        struct list_head * q; -        int                fds[PROG_MAX_FLOWS]; - -        if (graph_routing_table(ls.graph, ls.routing_algo, -                                ipcpi.dt_addr, &table)) -                return; - -        pff_lock(instance->pff); - -        pff_flush(instance->pff); - -        /* Calulcate forwarding table from routing table. */ -        list_for_each(p, &table) { -                int                    i = 0; -                struct routing_table * t = -                        list_entry(p, struct routing_table, next); - -                list_for_each(q, &t->nhops) { -                        struct nhop * n = list_entry(q, struct nhop, next); - -                        fd = nbr_to_fd(n->nhop); -                        if (fd == -1) -                                continue; - -                        fds[i++] = fd; -                } - -                pff_add(instance->pff, t->dst, fds, i); -        } - -        pff_unlock(instance->pff); - -        graph_free_routing_table(ls.graph, &table); -} -  static void * periodic_recalc_pff(void * o)  {          bool               modified; diff --git a/src/ipcpd/normal/psched.c b/src/ipcpd/normal/psched.c index 27e5f1de..c38c072d 100644 --- a/src/ipcpd/normal/psched.c +++ b/src/ipcpd/normal/psched.c @@ -42,11 +42,9 @@  #include <string.h>  static int qos_prio [] = { -        QOS_PRIO_RAW,          QOS_PRIO_BE,          QOS_PRIO_VIDEO,          QOS_PRIO_VOICE, -        QOS_PRIO_DATA  };  struct psched { diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index a6dc00c6..42164fac 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -176,7 +176,7 @@ set(SHM_RDRB_MULTI_BLOCK true CACHE BOOL    "Packet buffer multiblock packet support")  set(SHM_RBUFF_LOCKLESS 0 CACHE BOOL    "Enable shared memory lockless rbuff support") -set(QOS_DISABLE_CRC 0 CACHE BOOL +set(QOS_DISABLE_CRC TRUE CACHE BOOL    "Ignores ber setting on all QoS cubes")  set(SOURCE_FILES_DEV diff --git a/src/lib/dev.c b/src/lib/dev.c index 2a5c3f83..6dbb925e 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -345,7 +345,10 @@ static void init(int     argc,                  prog = argv[1];          ai.pid = getpid(); - +#ifdef HAVE_LIBGCRYPT +        if (!gcry_check_version(GCRYPT_VERSION)) +                goto fail_fds; +#endif          ai.fds = bmp_create(PROG_MAX_FLOWS - PROG_RES_FDS, PROG_RES_FDS);          if (ai.fds == NULL)                  goto fail_fds; | 
