diff options
| -rw-r--r-- | src/ipcpd/unicast/fa.c | 283 | 
1 files changed, 170 insertions, 113 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index ef6adae6..7143a346 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -435,167 +435,224 @@ static void fa_post_packet(void *               comp,          pthread_mutex_unlock(&fa.mtx);  } -static void * fa_handle_packet(void * o) +static size_t fa_wait_for_fa_msg(struct fa_msg * msg)  { -        struct timespec ts  = {0, TIMEOUT * 1000}; +        struct cmd * cmd; +        size_t       len; -        (void) o; +        pthread_mutex_lock(&fa.mtx); -        while (true) { -                struct timespec  abstime; -                int              fd; -                uint8_t          buf[MSGBUFSZ]; -                struct fa_msg *  msg; -                qosspec_t        qs; -                struct cmd *     cmd; -                size_t           len; -                size_t           msg_len; -                struct fa_flow * flow; +        pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); -                pthread_mutex_lock(&fa.mtx); +        while (list_is_empty(&fa.cmds)) +                pthread_cond_wait(&fa.cond, &fa.mtx); -                pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx); +        cmd = list_last_entry(&fa.cmds, struct cmd, next); +        list_del(&cmd->next); -                while (list_is_empty(&fa.cmds)) -                        pthread_cond_wait(&fa.cond, &fa.mtx); +        pthread_cleanup_pop(true); -                cmd = list_last_entry(&fa.cmds, struct cmd, next); -                list_del(&cmd->next); +        len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); +        if (len > MSGBUFSZ || len < sizeof(*msg)) { +                log_warn("Invalid flow allocation message (len: %zd)\n", len); +                free(cmd); +                return 0; /* No valid message */ +        } -                pthread_cleanup_pop(true); +        memcpy(msg, shm_du_buff_head(cmd->sdb), len); -                len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); +        ipcp_sdb_release(cmd->sdb); -                if (len > MSGBUFSZ) { -                        log_err("Message over buffer size."); -                        free(cmd); -                        continue; -                } +        free(cmd); -                msg = (struct fa_msg *) buf; +        return len; +} -                /* Depending on the message call the function in ipcp-dev.h */ +static int fa_wait_irmd_alloc(uint8_t *    dst, +                              qosspec_t    qs, +                              const void * data, +                              size_t       len) +{ +        struct timespec ts  = {0, TIMEOUT * 1000}; +        struct timespec abstime; +        int             fd; -                memcpy(msg, shm_du_buff_head(cmd->sdb), len); +        clock_gettime(PTHREAD_COND_CLOCK, &abstime); -                ipcp_sdb_release(cmd->sdb); +        pthread_mutex_lock(&ipcpi.alloc_lock); -                free(cmd); +        while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) { +                ts_add(&abstime, &ts, &abstime); +                pthread_cond_timedwait(&ipcpi.alloc_cond, +                                       &ipcpi.alloc_lock, +                                       &abstime); +        } -                switch (msg->code) { -                case FLOW_REQ: -                        msg_len = sizeof(*msg) + ipcp_dir_hash_len(); +        if (ipcp_get_state() != IPCP_OPERATIONAL) { +                pthread_mutex_unlock(&ipcpi.alloc_lock); +                log_dbg("Won't allocate over non-operational IPCP."); +                return -EIPCPSTATE; +        } -                        assert(len >= msg_len); +        assert(ipcpi.alloc_id == -1); -                        clock_gettime(PTHREAD_COND_CLOCK, &abstime); +        fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len); +        if (fd < 0) { +                pthread_mutex_unlock(&ipcpi.alloc_lock); +                log_dbg("Failed to get fd for flow."); +                return -ENOTALLOC; +        } + +        ipcpi.alloc_id = fd; +        pthread_cond_broadcast(&ipcpi.alloc_cond); -                        pthread_mutex_lock(&ipcpi.alloc_lock); +        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        while (ipcpi.alloc_id != -1 && -                               ipcp_get_state() == IPCP_OPERATIONAL) { -                                ts_add(&abstime, &ts, &abstime); -                                pthread_cond_timedwait(&ipcpi.alloc_cond, -                                                       &ipcpi.alloc_lock, -                                                       &abstime); -                        } +        return fd; +} -                        if (ipcp_get_state() != IPCP_OPERATIONAL) { -                                pthread_mutex_unlock(&ipcpi.alloc_lock); -                                log_dbg("Won't allocate over non-operational" -                                        "IPCP."); -                                continue; -                        } +static int fa_handle_flow_req(struct fa_msg * msg, +                              size_t          len) +{ +        size_t           msg_len; +        int              fd; +        qosspec_t        qs; +        struct fa_flow * flow; +        uint8_t *        data;  /* Piggbacked data on flow alloc request. */ +        size_t           dlen;  /* Length of piggybacked data.            */ -                        assert(ipcpi.alloc_id == -1); +        msg_len = sizeof(*msg) + ipcp_dir_hash_len(); +        if (len < msg_len) { +                log_err("Invalid flow allocation request"); +                return -EPERM; +        } -                        qs.delay        = ntoh32(msg->delay); -                        qs.bandwidth    = ntoh64(msg->bandwidth); -                        qs.availability = msg->availability; -                        qs.loss         = ntoh32(msg->loss); -                        qs.ber          = ntoh32(msg->ber); -                        qs.in_order     = msg->in_order; -                        qs.max_gap      = ntoh32(msg->max_gap); -                        qs.cypher_s     = ntoh16(msg->cypher_s); +        data = (uint8_t *) msg + msg_len; +        dlen = len - msg_len; -                        fd = ipcp_flow_req_arr((uint8_t *) (msg + 1), -                                               ipcp_dir_hash_len(), -                                               qs, -                                               buf + msg_len, -                                               len - msg_len); -                        if (fd < 0) { -                                pthread_mutex_unlock(&ipcpi.alloc_lock); -                                log_err("Failed to get fd for flow."); -                                continue; -                        } +        qs.delay        = ntoh32(msg->delay); +        qs.bandwidth    = ntoh64(msg->bandwidth); +        qs.availability = msg->availability; +        qs.loss         = ntoh32(msg->loss); +        qs.ber          = ntoh32(msg->ber); +        qs.in_order     = msg->in_order; +        qs.max_gap      = ntoh32(msg->max_gap); +        qs.cypher_s     = ntoh16(msg->cypher_s); -                        flow = &fa.flows[fd]; +        fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen); +        if (fd < 0) +                return fd; -                        pthread_rwlock_wrlock(&fa.flows_lock); +        flow = &fa.flows[fd]; -                        fa_flow_init(flow); +        pthread_rwlock_wrlock(&fa.flows_lock); -                        flow->s_eid  = gen_eid(fd); -                        flow->r_eid  = ntoh64(msg->s_eid); -                        flow->r_addr = ntoh64(msg->s_addr); +        fa_flow_init(flow); -                        pthread_rwlock_unlock(&fa.flows_lock); +        flow->s_eid  = gen_eid(fd); +        flow->r_eid  = ntoh64(msg->s_eid); +        flow->r_addr = ntoh64(msg->s_addr); -                        ipcpi.alloc_id = fd; -                        pthread_cond_broadcast(&ipcpi.alloc_cond); +        pthread_rwlock_unlock(&fa.flows_lock); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); +        return fd; +} -                        break; -                case FLOW_REPLY: -                        assert(len >= sizeof(*msg)); +static int fa_handle_flow_reply(struct fa_msg * msg, +                                size_t          len) +{ +        int              fd; +        struct fa_flow * flow; +        uint8_t *        data;  /* Piggbacked data on flow alloc request. */ +        size_t           dlen;  /* Length of piggybacked data.            */ -                        pthread_rwlock_wrlock(&fa.flows_lock); +        assert(len >= sizeof(*msg)); -                        fd = eid_to_fd(ntoh64(msg->r_eid)); -                        if (fd < 0) { -                                pthread_rwlock_unlock(&fa.flows_lock); -                                break; -                        } +        data = (uint8_t *) msg + sizeof(*msg); +        dlen = len - sizeof(*msg); -                        flow = &fa.flows[fd]; +        pthread_rwlock_wrlock(&fa.flows_lock); -                        flow->r_eid = ntoh64(msg->s_eid); +        fd = eid_to_fd(ntoh64(msg->r_eid)); +        if (fd < 0) { +                pthread_rwlock_unlock(&fa.flows_lock); +                return -ENOTALLOC; +        } -                        if (msg->response < 0) -                                fa_flow_fini(flow); -                        else -                                psched_add(fa.psched, fd); +        flow = &fa.flows[fd]; -                        pthread_rwlock_unlock(&fa.flows_lock); +        flow->r_eid = ntoh64(msg->s_eid); -                        ipcp_flow_alloc_reply(fd, -                                              msg->response, -                                              buf + sizeof(*msg), -                                              len - sizeof(*msg)); -                        break; -                case FLOW_UPDATE: -                        assert(len >= sizeof(*msg)); +        if (msg->response < 0) +                fa_flow_fini(flow); +        else +                psched_add(fa.psched, fd); + +        pthread_rwlock_unlock(&fa.flows_lock); + +        if (ipcp_flow_alloc_reply(fd, msg->response, data, dlen)) +                return -EIRMD; -                        pthread_rwlock_wrlock(&fa.flows_lock); +        return 0; +} -                        fd = eid_to_fd(ntoh64(msg->r_eid)); -                        if (fd < 0) { -                                pthread_rwlock_unlock(&fa.flows_lock); -                                break; -                        } +static int fa_handle_flow_update(struct fa_msg * msg, +                                 size_t          len) +{ +        struct fa_flow * flow; +        int              fd; + +        assert(len >= sizeof(*msg)); + +        pthread_rwlock_wrlock(&fa.flows_lock); + +        fd = eid_to_fd(ntoh64(msg->r_eid)); +        if (fd < 0) { +                pthread_rwlock_unlock(&fa.flows_lock); +                return -EPERM; +        } -                        flow = &fa.flows[fd]; +        flow = &fa.flows[fd];  #ifdef IPCP_FLOW_STATS -                        flow->u_rcv++; +        flow->u_rcv++;  #endif -                        ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); +        ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); -                        pthread_rwlock_unlock(&fa.flows_lock); +        pthread_rwlock_unlock(&fa.flows_lock); + +        return 0; +} + +static void * fa_handle_packet(void * o) +{ +        (void) o; + +        while (true) { +                uint8_t          buf[MSGBUFSZ]; +                struct fa_msg *  msg; +                size_t           len; + +                msg = (struct fa_msg *) buf; + +                len = fa_wait_for_fa_msg(msg); +                if (len == 0) +                        continue; +                switch (msg->code) { +                case FLOW_REQ: +                        if (fa_handle_flow_req(msg, len) < 0) +                                log_err("Error handling flow alloc request."); +                        break; +                case FLOW_REPLY: +                        if (fa_handle_flow_reply(msg, len) < 0) +                                log_err("Error handling flow reply."); +                        break; +                case FLOW_UPDATE: +                        if (fa_handle_flow_update(msg, len) < 0) +                                log_err("Error handling flow update.");                          break;                  default: -                        log_err("Got an unknown flow allocation message."); +                        log_warn("Recieved unknown flow allocation message.");                          break;                  }          }  | 
