diff options
| author | Dimitri Staessens <dimitri.staessens@ugent.be> | 2018-10-11 01:44:28 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2018-10-11 13:42:38 +0200 | 
| commit | ebf4e472b0065677cb3caf2b67b4bf31fd1f7343 (patch) | |
| tree | f11eb412c7e57258f25daa31913cc8da0b3f645a /src/ipcpd/normal | |
| parent | 4402b69c2535f9975d835438a52cebc89a147ba3 (diff) | |
| download | ouroboros-ebf4e472b0065677cb3caf2b67b4bf31fd1f7343.tar.gz ouroboros-ebf4e472b0065677cb3caf2b67b4bf31fd1f7343.zip | |
ipcpd: Decouple flow allocator from dt thread
The flow allocator passed a blocking callback to the forwarding
component, which blocks packet processing.
Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/fa.c | 262 | 
1 files changed, 176 insertions, 86 deletions
| diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 027223b7..56864e1f 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -68,13 +68,23 @@ struct fa_msg {          uint32_t max_gap;  } __attribute__((packed)); +struct cmd { +        struct list_head     next; +        struct shm_du_buff * sdb; +}; +  struct { -        pthread_rwlock_t   flows_lock; -        int                r_eid[PROG_MAX_FLOWS]; -        uint64_t           r_addr[PROG_MAX_FLOWS]; -        int                fd; +        pthread_rwlock_t flows_lock; +        int              r_eid[PROG_MAX_FLOWS]; +        uint64_t         r_addr[PROG_MAX_FLOWS]; +        int              fd; -        struct psched *    psched; +        struct list_head cmds; +        pthread_cond_t   cond; +        pthread_mutex_t  mtx; +        pthread_t        worker; + +        struct psched *  psched;  } fa;  static void packet_handler(int                  fd, @@ -100,109 +110,161 @@ static void destroy_conn(int fd)  }  static void fa_post_packet(void *               comp, -                        struct shm_du_buff * sdb) +                           struct shm_du_buff * sdb)  { -        struct timespec ts  = {0, TIMEOUT * 1000}; -        struct timespec abstime; -        int             fd; -        uint8_t *       buf; -        struct fa_msg * msg; -        qosspec_t       qs; - -        (void) comp; +        struct cmd * cmd;          assert(comp == &fa); -        assert(sdb); -        buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); -        if (buf == NULL) +        (void) comp; + +        cmd = malloc(sizeof(*cmd)); +        if (cmd == NULL) { +                log_err("Command failed. Out of memory."); +                ipcp_sdb_release(sdb);                  return; +        } + +        cmd->sdb = sdb; + +        pthread_mutex_lock(&fa.mtx); + +        list_add(&cmd->next, &fa.cmds); -        msg = (struct fa_msg *) buf; +        pthread_cond_signal(&fa.cond); -        /* Depending on the message call the function in ipcp-dev.h */ +        pthread_mutex_unlock(&fa.mtx); +} -        memcpy(msg, shm_du_buff_head(sdb), -               shm_du_buff_tail(sdb) - shm_du_buff_head(sdb)); +static void * fa_handle_packet(void * o) +{ +        struct timespec ts  = {0, TIMEOUT * 1000}; -        ipcp_sdb_release(sdb); +        (void) o; -        switch (msg->code) { -        case FLOW_REQ: -                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        while (true) { +                struct timespec abstime; +                int             fd; +                uint8_t *       buf; +                struct fa_msg * msg; +                qosspec_t       qs; +                struct cmd *    cmd; -                pthread_mutex_lock(&ipcpi.alloc_lock); +                pthread_mutex_lock(&fa.mtx); -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) { -                        ts_add(&abstime, &ts, &abstime); -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &abstime); -                } +                pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, +                                     &fa.mtx); -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        free(msg); -                        return; -                } +                while (list_is_empty(&fa.cmds)) +                        pthread_cond_wait(&fa.cond, &fa.mtx); -                assert(ipcpi.alloc_id == -1); - -                qs.delay        = ntoh32(msg->delay); -                qs.bandwidth    = ntoh64(msg->bandwidth); -                qs.availability = msg->availability; -                qs.loss         = ntoh32(msg->loss); -                qs.ber          = ntoh32(msg->ber); -                qs.in_order     = msg->in_order; -                qs.max_gap      = ntoh32(msg->max_gap); - -                fd = ipcp_flow_req_arr(getpid(), -                                       (uint8_t *) (msg + 1), -                                       ipcp_dir_hash_len(), -                                       qs); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        log_err("Failed to get fd for flow."); -                        free(msg); -                        return; +                cmd = list_last_entry(&fa.cmds, struct cmd, next); +                list_del(&cmd->next); + +                pthread_cleanup_pop(true); + +                buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); +                if (buf == NULL) { +                        log_err("Failed to allocate memory."); +                        free(cmd); +                        ipcp_sdb_release(cmd->sdb); +                        continue;                  } -                pthread_rwlock_wrlock(&fa.flows_lock); +                msg = (struct fa_msg *) buf; -                fa.r_eid[fd]  = ntoh32(msg->s_eid); -                fa.r_addr[fd] = ntoh64(msg->s_addr); +                /* Depending on the message call the function in ipcp-dev.h */ -                pthread_rwlock_unlock(&fa.flows_lock); +                assert(sizeof(*msg) + ipcp_dir_hash_len() >= +                       (unsigned long int) (shm_du_buff_tail(cmd->sdb) - +                                            shm_du_buff_head(cmd->sdb))); -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); +                memcpy(msg, shm_du_buff_head(cmd->sdb), +                       shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb)); -                pthread_mutex_unlock(&ipcpi.alloc_lock); +                ipcp_sdb_release(cmd->sdb); -                break; -        case FLOW_REPLY: -                pthread_rwlock_wrlock(&fa.flows_lock); +                free(cmd); -                fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); +                switch (msg->code) { +                case FLOW_REQ: +                        clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); +                        pthread_mutex_lock(&ipcpi.alloc_lock); -                if (msg->response < 0) -                        destroy_conn(ntoh32(msg->r_eid)); -                else -                        psched_add(fa.psched, ntoh32(msg->r_eid)); +                        while (ipcpi.alloc_id != -1 && +                               ipcp_get_state() == IPCP_OPERATIONAL) { +                                ts_add(&abstime, &ts, &abstime); +                                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                                       &ipcpi.alloc_lock, +                                                       &abstime); +                        } -                pthread_rwlock_unlock(&fa.flows_lock); +                        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_dbg("Won't allocate over non-operational" +                                        "IPCP."); +                                free(msg); +                                continue; +                        } -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                break; -        } +                        assert(ipcpi.alloc_id == -1); + +                        qs.delay        = ntoh32(msg->delay); +                        qs.bandwidth    = ntoh64(msg->bandwidth); +                        qs.availability = msg->availability; +                        qs.loss         = ntoh32(msg->loss); +                        qs.ber          = ntoh32(msg->ber); +                        qs.in_order     = msg->in_order; +                        qs.max_gap      = ntoh32(msg->max_gap); -        free(msg); +                        fd = ipcp_flow_req_arr(getpid(), +                                               (uint8_t *) (msg + 1), +                                               ipcp_dir_hash_len(), +                                               qs); +                        if (fd < 0) { +                                pthread_mutex_unlock(&ipcpi.alloc_lock); +                                log_err("Failed to get fd for flow."); +                                free(msg); +                                continue; +                        } + +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[fd]  = ntoh32(msg->s_eid); +                        fa.r_addr[fd] = ntoh64(msg->s_addr); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        ipcpi.alloc_id = fd; +                        pthread_cond_broadcast(&ipcpi.alloc_cond); + +                        pthread_mutex_unlock(&ipcpi.alloc_lock); + +                        break; +                case FLOW_REPLY: +                        pthread_rwlock_wrlock(&fa.flows_lock); + +                        fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + +                        ipcp_flow_alloc_reply(ntoh32(msg->r_eid), +                                              msg->response); + +                        if (msg->response < 0) +                                destroy_conn(ntoh32(msg->r_eid)); +                        else +                                psched_add(fa.psched, ntoh32(msg->r_eid)); + +                        pthread_rwlock_unlock(&fa.flows_lock); + +                        break; +                default: +                        log_err("Got an unknown flow allocation message."); +                        break; +                } + +                free(msg); +        }  }  int fa_init(void) @@ -213,31 +275,59 @@ int fa_init(void)                  destroy_conn(i);          if (pthread_rwlock_init(&fa.flows_lock, NULL)) -                return -1; +                goto fail_rwlock; + +        if (pthread_mutex_init(&fa.mtx, NULL)) +                goto fail_mtx; + +        if (pthread_cond_init(&fa.cond, NULL)) +                goto fail_cond; + +        list_head_init(&fa.cmds);          fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);          return 0; + + fail_cond: +        pthread_mutex_destroy(&fa.mtx); + fail_mtx: +        pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: +        log_err("Failed to initialize flow allocator."); +        return -1;  }  void fa_fini(void)  { +        pthread_cond_destroy(&fa.cond);; +        pthread_mutex_destroy(&fa.mtx);          pthread_rwlock_destroy(&fa.flows_lock);  }  int fa_start(void)  {          fa.psched = psched_create(packet_handler); -        if (fa.psched == NULL) { -                log_err("Failed to create packet scheduler."); -                return -1; -        } +        if (fa.psched == NULL) +                goto fail_psched; + +        if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) +                goto fail_thread;          return 0; + + fail_thread: +        psched_destroy(fa.psched); + fail_psched: +        log_err("Failed to start flow allocator."); +        return -1;  }  void fa_stop(void)  { +        pthread_cancel(fa.worker); +        pthread_join(fa.worker, NULL); +          psched_destroy(fa.psched);  } | 
