diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 262 | ||||
| -rw-r--r-- | src/ipcpd/normal/pol/link_state.c | 126 | ||||
| -rw-r--r-- | src/ipcpd/normal/psched.c | 2 | 
4 files changed, 244 insertions, 150 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 4064bf5c..aa1909e9 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -2729,6 +2729,10 @@ static void handle_event(void *       self,                  pthread_t          thr;                  struct join_info * inf;                  struct conn *      c     = (struct conn *) o; +                struct timespec    slack = {0, 10 * MILLION}; + +                /* Give the pff some time to update for the new link. */ +                nanosleep(&slack, NULL);                  switch(dht_get_state(dht)) {                  case DHT_INIT: diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 027223b7..56864e1f 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -68,13 +68,23 @@ struct fa_msg {          uint32_t max_gap;  } __attribute__((packed)); +struct cmd { +        struct list_head     next; +        struct shm_du_buff * sdb; +}; +  struct { -        pthread_rwlock_t   flows_lock; -        int                r_eid[PROG_MAX_FLOWS]; -        uint64_t           r_addr[PROG_MAX_FLOWS]; -        int                fd; +        pthread_rwlock_t flows_lock; +        int              r_eid[PROG_MAX_FLOWS]; +        uint64_t         r_addr[PROG_MAX_FLOWS]; +        int              fd; -        struct psched *    psched; +        struct list_head cmds; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; +        pthread_t        worker; + +        struct psched *  psched;  } fa;  static void packet_handler(int                  fd, @@ -100,109 +110,161 @@ static void destroy_conn(int fd)  }  static void fa_post_packet(void *               comp, -                        struct shm_du_buff * sdb) +                           struct shm_du_buff * sdb)  { -        struct timespec ts  = {0, TIMEOUT * 1000}; -        struct timespec abstime; -        int             fd; -        uint8_t *       buf; -        struct fa_msg * msg; -        qosspec_t       qs; - -        (void) comp; +        struct cmd * cmd;          assert(comp == &fa); -        assert(sdb); -        buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); -        if (buf == NULL) +        (void) comp; + +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory."); +                ipcp_sdb_release(sdb);                  return; +        } + +        cmd->sdb = sdb; + +        pthread_mutex_lock(&fa.mtx); + +        list_add(&cmd->next, &fa.cmds); -        msg = (struct fa_msg *) buf; +        pthread_cond_signal(&fa.cond); -        /* Depending on the message call the function in ipcp-dev.h */ +        pthread_mutex_unlock(&fa.mtx); +} -        memcpy(msg, shm_du_buff_head(sdb), -               shm_du_buff_tail(sdb) - shm_du_buff_head(sdb)); +static void * fa_handle_packet(void * o) +{ +        struct timespec ts  = {0, TIMEOUT * 1000}; -        ipcp_sdb_release(sdb); +        (void) o; -        switch (msg->code) { -        case FLOW_REQ: -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        while (true) { +                struct timespec abstime; +                int             fd; +                uint8_t *       buf; +                struct fa_msg * msg; +                qosspec_t       qs; +                struct cmd *    cmd; -                pthread_mutex_lock(&ipcpi.alloc_lock); +                pthread_mutex_lock(&fa.mtx); -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) { -                        ts_add(&abstime, &ts, &abstime); -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &abstime); -                } +                pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                                     &fa.mtx); -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        free(msg); -                        return; -                } +                while (list_is_empty(&fa.cmds)) +                        pthread_cond_wait(&fa.cond, &fa.mtx); -                assert(ipcpi.alloc_id == -1); - -                qs.delay        = ntoh32(msg->delay); -                qs.bandwidth    = ntoh64(msg->bandwidth); -                qs.availability = msg->availability; -                qs.loss         = ntoh32(msg->loss); -                qs.ber          = ntoh32(msg->ber); -                qs.in_order     = msg->in_order; -                qs.max_gap      = ntoh32(msg->max_gap); - -                fd = ipcp_flow_req_arr(getpid(), -                                       (uint8_t *) (msg + 1), -                                       ipcp_dir_hash_len(), -                                       qs); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        log_err("Failed to get fd for flow."); -                        free(msg); -                        return; +                cmd = list_last_entry(&fa.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_cleanup_pop(true); + +                buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); +                if (buf == NULL) { +                        log_err("Failed to allocate memory."); +                        free(cmd); +                        ipcp_sdb_release(cmd->sdb); +                        continue;                  } -                pthread_rwlock_wrlock(&fa.flows_lock); +                msg = (struct fa_msg *) buf; -                fa.r_eid[fd]  = ntoh32(msg->s_eid); -                fa.r_addr[fd] = ntoh64(msg->s_addr); +                /* Depending on the message call the function in ipcp-dev.h */ -                pthread_rwlock_unlock(&fa.flows_lock); +                assert(sizeof(*msg) + ipcp_dir_hash_len() >= +                       (unsigned long int) (shm_du_buff_tail(cmd->sdb) - +                                            shm_du_buff_head(cmd->sdb))); -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); +                memcpy(msg, shm_du_buff_head(cmd->sdb), +                       shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb)); -                pthread_mutex_unlock(&ipcpi.alloc_lock); +                ipcp_sdb_release(cmd->sdb); -                break; -        case FLOW_REPLY: -                pthread_rwlock_wrlock(&fa.flows_lock); +                free(cmd); -                fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); +                switch (msg->code) { +                case FLOW_REQ: +                        clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); +                        pthread_mutex_lock(&ipcpi.alloc_lock); -                if (msg->response < 0) -                        destroy_conn(ntoh32(msg->r_eid)); -                else -                        psched_add(fa.psched, ntoh32(msg->r_eid)); +                        while (ipcpi.alloc_id != -1 && +                               ipcp_get_state() == IPCP_OPERATIONAL) { +                                ts_add(&abstime, &ts, &abstime); +                                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                                       &ipcpi.alloc_lock, +                                                       &abstime); +                        } -                pthread_rwlock_unlock(&fa.flows_lock); +                        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_dbg("Won't allocate over non-operational" +                                        "IPCP."); +                                free(msg); +                                continue; +                        } -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                break; -        } +                        assert(ipcpi.alloc_id == -1); + +                        qs.delay        = ntoh32(msg->delay); +                        qs.bandwidth    = ntoh64(msg->bandwidth); +                        qs.availability = msg->availability; +                        qs.loss         = ntoh32(msg->loss); +                        qs.ber          = ntoh32(msg->ber); +                        qs.in_order     = msg->in_order; +                        qs.max_gap      = ntoh32(msg->max_gap); -        free(msg); +                        fd = ipcp_flow_req_arr(getpid(), +                                               (uint8_t *) (msg + 1), +                                               ipcp_dir_hash_len(), +                                               qs); +                        if (fd < 0) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_err("Failed to get fd for flow."); +                                free(msg); +                                continue; +                        } + +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[fd]  = ntoh32(msg->s_eid); +                        fa.r_addr[fd] = ntoh64(msg->s_addr); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        ipcpi.alloc_id = fd; +                        pthread_cond_broadcast(&ipcpi.alloc_cond); + +                        pthread_mutex_unlock(&ipcpi.alloc_lock); + +                        break; +                case FLOW_REPLY: +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + +                        ipcp_flow_alloc_reply(ntoh32(msg->r_eid), +                                              msg->response); + +                        if (msg->response < 0) +                                destroy_conn(ntoh32(msg->r_eid)); +                        else +                                psched_add(fa.psched, ntoh32(msg->r_eid)); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        break; +                default: +                        log_err("Got an unknown flow allocation message."); +                        break; +                } + +                free(msg); +        }  }  int fa_init(void) @@ -213,31 +275,59 @@ int fa_init(void)                  destroy_conn(i);          if (pthread_rwlock_init(&fa.flows_lock, NULL)) -                return -1; +                goto fail_rwlock; + +        if (pthread_mutex_init(&fa.mtx, NULL)) +                goto fail_mtx; + +        if (pthread_cond_init(&fa.cond, NULL)) +                goto fail_cond; + +        list_head_init(&fa.cmds);          fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);          return 0; + + fail_cond: +        pthread_mutex_destroy(&fa.mtx); + fail_mtx: +        pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: +        log_err("Failed to initialize flow allocator."); +        return -1;  }  void fa_fini(void)  { +        pthread_cond_destroy(&fa.cond);; +        pthread_mutex_destroy(&fa.mtx);          pthread_rwlock_destroy(&fa.flows_lock);  }  int fa_start(void)  {          fa.psched = psched_create(packet_handler); -        if (fa.psched == NULL) { -                log_err("Failed to create packet scheduler."); -                return -1; -        } +        if (fa.psched == NULL) +                goto fail_psched; + +        if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) +                goto fail_thread;          return 0; + + fail_thread: +        psched_destroy(fa.psched); + fail_psched: +        log_err("Failed to start flow allocator."); +        return -1;  }  void fa_stop(void)  { +        pthread_cancel(fa.worker); +        pthread_join(fa.worker, NULL); +          psched_destroy(fa.psched);  } diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index e2e9eab5..8db1a9c5 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -374,7 +374,66 @@ static int lsdb_del_nb(uint64_t     addr,          return -EPERM;  } -static void set_pff_modified(void) +static int nbr_to_fd(uint64_t addr) +{ +        struct list_head * p; + +        pthread_rwlock_rdlock(&ls.db_lock); + +        list_for_each(p, &ls.nbs) { +                struct nb * nb = list_entry(p, struct nb, next); +                if (nb->addr == addr && nb->type == NB_DT) { +                        pthread_rwlock_unlock(&ls.db_lock); +                        return nb->fd; +                } +        } + +        pthread_rwlock_unlock(&ls.db_lock); + +        return -1; +} + +static void calculate_pff(struct routing_i * instance) +{ +        int                fd; +        struct list_head   table; +        struct list_head * p; +        struct list_head * q; +        int                fds[PROG_MAX_FLOWS]; + +        if (graph_routing_table(ls.graph, ls.routing_algo, +                                ipcpi.dt_addr, &table)) +                return; + +        pff_lock(instance->pff); + +        pff_flush(instance->pff); + +        /* Calulcate forwarding table from routing table. */ +        list_for_each(p, &table) { +                int                    i = 0; +                struct routing_table * t = +                        list_entry(p, struct routing_table, next); + +                list_for_each(q, &t->nhops) { +                        struct nhop * n = list_entry(q, struct nhop, next); + +                        fd = nbr_to_fd(n->nhop); +                        if (fd == -1) +                                continue; + +                        fds[i++] = fd; +                } + +                pff_add(instance->pff, t->dst, fds, i); +        } + +        pff_unlock(instance->pff); + +        graph_free_routing_table(ls.graph, &table); +} + +static void set_pff_modified(bool calc)  {          struct list_head * p; @@ -385,6 +444,8 @@ static void set_pff_modified(void)                  pthread_mutex_lock(&inst->lock);                  inst->modified = true;                  pthread_mutex_unlock(&inst->lock); +                if (calc) +                        calculate_pff(inst);          }          pthread_mutex_unlock(&ls.routing_i_lock);  } @@ -439,7 +500,7 @@ static int lsdb_add_link(uint64_t    src,          pthread_rwlock_unlock(&ls.db_lock); -        set_pff_modified(); +        set_pff_modified(true);          return 0;  } @@ -462,7 +523,7 @@ static int lsdb_del_link(uint64_t src,                          ls.db_len--;                          pthread_rwlock_unlock(&ls.db_lock); -                        set_pff_modified(); +                        set_pff_modified(false);                          free(a);                          return 0;                  } @@ -473,65 +534,6 @@ static int lsdb_del_link(uint64_t src,          return -EPERM;  } -static int nbr_to_fd(uint64_t addr) -{ -        struct list_head * p; - -        pthread_rwlock_rdlock(&ls.db_lock); - -        list_for_each(p, &ls.nbs) { -                struct nb * nb = list_entry(p, struct nb, next); -                if (nb->addr == addr && nb->type == NB_DT) { -                        pthread_rwlock_unlock(&ls.db_lock); -                        return nb->fd; -                } -        } - -        pthread_rwlock_unlock(&ls.db_lock); - -        return -1; -} - -static void calculate_pff(struct routing_i * instance) -{ -        int                fd; -        struct list_head   table; -        struct list_head * p; -        struct list_head * q; -        int                fds[PROG_MAX_FLOWS]; - -        if (graph_routing_table(ls.graph, ls.routing_algo, -                                ipcpi.dt_addr, &table)) -                return; - -        pff_lock(instance->pff); - -        pff_flush(instance->pff); - -        /* Calulcate forwarding table from routing table. */ -        list_for_each(p, &table) { -                int                    i = 0; -                struct routing_table * t = -                        list_entry(p, struct routing_table, next); - -                list_for_each(q, &t->nhops) { -                        struct nhop * n = list_entry(q, struct nhop, next); - -                        fd = nbr_to_fd(n->nhop); -                        if (fd == -1) -                                continue; - -                        fds[i++] = fd; -                } - -                pff_add(instance->pff, t->dst, fds, i); -        } - -        pff_unlock(instance->pff); - -        graph_free_routing_table(ls.graph, &table); -} -  static void * periodic_recalc_pff(void * o)  {          bool               modified; diff --git a/src/ipcpd/normal/psched.c b/src/ipcpd/normal/psched.c index 27e5f1de..c38c072d 100644 --- a/src/ipcpd/normal/psched.c +++ b/src/ipcpd/normal/psched.c @@ -42,11 +42,9 @@  #include <string.h>  static int qos_prio [] = { -        QOS_PRIO_RAW,          QOS_PRIO_BE,          QOS_PRIO_VIDEO,          QOS_PRIO_VOICE, -        QOS_PRIO_DATA  };  struct psched { | 
