From c75f20d2ef73b0193e75fa59c4679be713a342c7 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 10 May 2017 16:39:58 +0200 Subject: ipcpd: Remove FRCT from normal IPCP In order to ensure 100% reliable transfer, the protocol state machine that takes care of retransmission and SDU ordering has to be in the application. Flow allocation in the normal now uses fds. The PDU_type field was deprecated and AE's within the DIF can use reserved fds. --- src/ipcpd/normal/fa.c | 134 ++++++++++++++------------------------------------ 1 file changed, 37 insertions(+), 97 deletions(-) (limited to 'src/ipcpd/normal/fa.c') diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 3aa2634a..d6c36a17 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -48,8 +48,8 @@ typedef FlowAllocMsg flow_alloc_msg_t; struct { pthread_rwlock_t flows_lock; - cep_id_t fd_to_cep_id[AP_MAX_FLOWS]; - int cep_id_to_fd[IPCPD_MAX_CONNS]; + int r_fd[AP_MAX_FLOWS]; + uint64_t r_addr[AP_MAX_FLOWS]; struct sdu_sched * sdu_sched; } fa; @@ -58,14 +58,12 @@ static int sdu_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) { - (void) qc; - pthread_rwlock_rdlock(&fa.flows_lock); - if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) { + if (dt_write_sdu(fa.r_addr[fd], qc, fa.r_fd[fd], sdb)) { pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); - log_warn("Failed to hand SDU to FRCT."); + log_warn("Failed to forward SDU."); return -1; } @@ -74,15 +72,18 @@ static int sdu_handler(int fd, return 0; } +static void destroy_conn(int fd) +{ + fa.r_fd[fd] = -1; + fa.r_addr[fd] = INVALID_ADDR; +} + int fa_init(void) { int i; for (i = 0; i < AP_MAX_FLOWS; ++i) - fa.fd_to_cep_id[i] = INVALID_CEP_ID; - - for (i = 0; i < IPCPD_MAX_CONNS; ++i) - fa.cep_id_to_fd[i] = -1; + destroy_conn(i); if (pthread_rwlock_init(&fa.flows_lock, NULL)) return -1; @@ -128,20 +129,10 @@ static struct shm_du_buff * create_fa_sdb(flow_alloc_msg_t * msg) return sdb; } -/* FIXME: Revise when Delta-t is fully implemented */ -static void destroy_conn(int fd, - cep_id_t cep_id) -{ - fa.fd_to_cep_id[fd] = INVALID_CEP_ID; - fa.cep_id_to_fd[cep_id] = -1; - frct_i_destroy(cep_id); -} - int fa_alloc(int fd, const uint8_t * dst, qoscube_t qc) { - cep_id_t cep_id; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; char path[RIB_MAX_PATH_LEN + 1]; uint64_t addr; @@ -185,38 +176,30 @@ int fa_alloc(int fd, if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) return -1; - cep_id = frct_i_create(addr, qc); - if (cep_id == INVALID_CEP_ID) - return -1; - msg.code = FLOW_ALLOC_CODE__FLOW_REQ; msg.has_hash = true; msg.hash.len = ipcp_dir_hash_len(); msg.hash.data = (uint8_t *) dst; msg.has_qc = true; msg.qc = qc; - msg.has_s_cep_id = true; - msg.s_cep_id = cep_id; + msg.has_s_fd = true; + msg.s_fd = fd; msg.has_s_addr = true; msg.s_addr = ipcpi.dt_addr; sdb = create_fa_sdb(&msg); - if (sdb == NULL) { - frct_i_destroy(cep_id); + if (sdb == NULL) return -1; - } - pthread_rwlock_wrlock(&fa.flows_lock); - - if (dt_write_sdu(addr, qc, PDU_TYPE_FA, sdb)) { - frct_i_destroy(cep_id); - pthread_rwlock_unlock(&fa.flows_lock); + if (dt_write_sdu(addr, qc, FD_FA, sdb)) { ipcp_sdb_release(sdb); return -1; } - fa.fd_to_cep_id[fd] = cep_id; - fa.cep_id_to_fd[cep_id] = fd; + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_fd[fd] = fd; + fa.r_addr[fd] = addr; pthread_rwlock_unlock(&fa.flows_lock); @@ -251,22 +234,22 @@ int fa_alloc_resp(int fd, pthread_rwlock_wrlock(&fa.flows_lock); msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; - msg.has_cep_id = true; - msg.cep_id = frct_i_get_id(fa.fd_to_cep_id[fd]); - msg.s_cep_id = fa.fd_to_cep_id[fd]; - msg.has_s_cep_id = true; + msg.has_r_fd = true; + msg.r_fd = fa.r_fd[fd]; + msg.has_s_fd = true; + msg.s_fd = fd; msg.response = response; msg.has_response = true; sdb = create_fa_sdb(&msg); if (sdb == NULL) { - destroy_conn(fd, fa.fd_to_cep_id[fd]); + destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); return -1; } if (response < 0) { - destroy_conn(fd, fa.fd_to_cep_id[fd]); + destroy_conn(fd); ipcp_sdb_release(sdb); } else { sdu_sched_add(fa.sdu_sched, fd); @@ -276,11 +259,8 @@ int fa_alloc_resp(int fd, assert(qc >= 0 && qc < QOS_CUBE_MAX); - if (dt_write_sdu(frct_i_get_addr(fa.fd_to_cep_id[fd]), - qc, - PDU_TYPE_FA, - sdb)) { - destroy_conn(fd, fa.fd_to_cep_id[fd]); + if (dt_write_sdu(fa.r_addr[fd], qc, FD_FA, sdb)) { + destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); return -1; @@ -299,7 +279,7 @@ int fa_dealloc(int fd) sdu_sched_del(fa.sdu_sched, fd); - destroy_conn(fd, fa.fd_to_cep_id[fd]); + destroy_conn(fd); pthread_rwlock_unlock(&fa.flows_lock); @@ -313,7 +293,6 @@ int fa_post_sdu(struct shm_du_buff * sdb) struct timespec ts = {0, TIMEOUT * 1000}; int fd; flow_alloc_msg_t * msg; - cep_id_t cep_id; assert(sdb); @@ -332,7 +311,7 @@ int fa_post_sdu(struct shm_du_buff * sdb) case FLOW_ALLOC_CODE__FLOW_REQ: pthread_mutex_lock(&ipcpi.alloc_lock); - if (!msg->has_hash) { + if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { log_err("Bad flow request."); pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); @@ -354,27 +333,12 @@ int fa_post_sdu(struct shm_du_buff * sdb) assert(ipcpi.alloc_id == -1); - cep_id = frct_i_create(msg->s_addr, msg->qc); - if (cep_id == INVALID_CEP_ID) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - if (frct_i_set_id(cep_id, msg->s_cep_id)) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - frct_i_destroy(cep_id); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - fd = ipcp_flow_req_arr(getpid(), msg->hash.data, ipcp_dir_hash_len(), msg->qc); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); - frct_i_destroy(cep_id); flow_alloc_msg__free_unpacked(msg, NULL); log_err("Failed to get fd for flow."); return -1; @@ -382,8 +346,8 @@ int fa_post_sdu(struct shm_du_buff * sdb) pthread_rwlock_wrlock(&fa.flows_lock); - fa.fd_to_cep_id[fd] = cep_id; - fa.cep_id_to_fd[cep_id] = fd; + fa.r_fd[fd] = msg->s_fd; + fa.r_addr[fd] = msg->s_addr; pthread_rwlock_unlock(&fa.flows_lock); @@ -396,15 +360,12 @@ int fa_post_sdu(struct shm_du_buff * sdb) case FLOW_ALLOC_CODE__FLOW_REPLY: pthread_rwlock_wrlock(&fa.flows_lock); - fd = fa.cep_id_to_fd[msg->cep_id]; - ipcp_flow_alloc_reply(fd, msg->response); - if (msg->response < 0) { - destroy_conn(fd, msg->cep_id); - } else { - frct_i_set_id(msg->cep_id, msg->s_cep_id); - sdu_sched_add(fa.sdu_sched, - fa.cep_id_to_fd[msg->cep_id]); - } + ipcp_flow_alloc_reply(msg->r_fd, msg->response); + + if (msg->response < 0) + destroy_conn(msg->r_fd); + else + sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); pthread_rwlock_unlock(&fa.flows_lock); @@ -420,24 +381,3 @@ int fa_post_sdu(struct shm_du_buff * sdb) return 0; } - -int fa_post_sdu_user(cep_id_t cep_id, - struct shm_du_buff * sdb) -{ - int fd; - - assert(sdb); - - pthread_rwlock_rdlock(&fa.flows_lock); - - fd = fa.cep_id_to_fd[cep_id]; - if (ipcp_flow_write(fd, sdb)) { - pthread_rwlock_unlock(&fa.flows_lock); - log_err("Failed to hand SDU to N flow."); - return -1; - } - - pthread_rwlock_unlock(&fa.flows_lock); - - return 0; -} -- cgit v1.2.3