From ebf4e472b0065677cb3caf2b67b4bf31fd1f7343 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 11 Oct 2018 01:44:28 +0200 Subject: ipcpd: Decouple flow allocator from dt thread The flow allocator passed a blocking callback to the forwarding component, which blocks packet processing. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/normal/fa.c | 262 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 176 insertions(+), 86 deletions(-) (limited to 'src') diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 027223b7..56864e1f 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -68,13 +68,23 @@ struct fa_msg { uint32_t max_gap; } __attribute__((packed)); +struct cmd { + struct list_head next; + struct shm_du_buff * sdb; +}; + struct { - pthread_rwlock_t flows_lock; - int r_eid[PROG_MAX_FLOWS]; - uint64_t r_addr[PROG_MAX_FLOWS]; - int fd; + pthread_rwlock_t flows_lock; + int r_eid[PROG_MAX_FLOWS]; + uint64_t r_addr[PROG_MAX_FLOWS]; + int fd; - struct psched * psched; + struct list_head cmds; + pthread_cond_t cond; + pthread_mutex_t mtx; + pthread_t worker; + + struct psched * psched; } fa; static void packet_handler(int fd, @@ -100,109 +110,161 @@ static void destroy_conn(int fd) } static void fa_post_packet(void * comp, - struct shm_du_buff * sdb) + struct shm_du_buff * sdb) { - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; - int fd; - uint8_t * buf; - struct fa_msg * msg; - qosspec_t qs; - - (void) comp; + struct cmd * cmd; assert(comp == &fa); - assert(sdb); - buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); - if (buf == NULL) + (void) comp; + + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); + ipcp_sdb_release(sdb); return; + } + + cmd->sdb = sdb; + + pthread_mutex_lock(&fa.mtx); + + list_add(&cmd->next, &fa.cmds); - msg = (struct fa_msg *) buf; + pthread_cond_signal(&fa.cond); - /* Depending on the message call the function in ipcp-dev.h */ + pthread_mutex_unlock(&fa.mtx); +} - memcpy(msg, shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb)); +static void * fa_handle_packet(void * o) +{ + struct timespec ts = {0, TIMEOUT * 1000}; - ipcp_sdb_release(sdb); + (void) o; - switch (msg->code) { - case FLOW_REQ: - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + while (true) { + struct timespec abstime; + int fd; + uint8_t * buf; + struct fa_msg * msg; + qosspec_t qs; + struct cmd * cmd; - pthread_mutex_lock(&ipcpi.alloc_lock); + pthread_mutex_lock(&fa.mtx); - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + &fa.mtx); - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - free(msg); - return; - } + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); - assert(ipcpi.alloc_id == -1); - - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - - fd = ipcp_flow_req_arr(getpid(), - (uint8_t *) (msg + 1), - ipcp_dir_hash_len(), - qs); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_err("Failed to get fd for flow."); - free(msg); - return; + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_cleanup_pop(true); + + buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); + if (buf == NULL) { + log_err("Failed to allocate memory."); + free(cmd); + ipcp_sdb_release(cmd->sdb); + continue; } - pthread_rwlock_wrlock(&fa.flows_lock); + msg = (struct fa_msg *) buf; - fa.r_eid[fd] = ntoh32(msg->s_eid); - fa.r_addr[fd] = ntoh64(msg->s_addr); + /* Depending on the message call the function in ipcp-dev.h */ - pthread_rwlock_unlock(&fa.flows_lock); + assert(sizeof(*msg) + ipcp_dir_hash_len() >= + (unsigned long int) (shm_du_buff_tail(cmd->sdb) - + shm_du_buff_head(cmd->sdb))); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); + memcpy(msg, shm_du_buff_head(cmd->sdb), + shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb)); - pthread_mutex_unlock(&ipcpi.alloc_lock); + ipcp_sdb_release(cmd->sdb); - break; - case FLOW_REPLY: - pthread_rwlock_wrlock(&fa.flows_lock); + free(cmd); - fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + switch (msg->code) { + case FLOW_REQ: + clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); + pthread_mutex_lock(&ipcpi.alloc_lock); - if (msg->response < 0) - destroy_conn(ntoh32(msg->r_eid)); - else - psched_add(fa.psched, ntoh32(msg->r_eid)); + while (ipcpi.alloc_id != -1 && + ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } - pthread_rwlock_unlock(&fa.flows_lock); + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_dbg("Won't allocate over non-operational" + "IPCP."); + free(msg); + continue; + } - break; - default: - log_err("Got an unknown flow allocation message."); - break; - } + assert(ipcpi.alloc_id == -1); + + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); - free(msg); + fd = ipcp_flow_req_arr(getpid(), + (uint8_t *) (msg + 1), + ipcp_dir_hash_len(), + qs); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_err("Failed to get fd for flow."); + free(msg); + continue; + } + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_eid[fd] = ntoh32(msg->s_eid); + fa.r_addr[fd] = ntoh64(msg->s_addr); + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + break; + case FLOW_REPLY: + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + + ipcp_flow_alloc_reply(ntoh32(msg->r_eid), + msg->response); + + if (msg->response < 0) + destroy_conn(ntoh32(msg->r_eid)); + else + psched_add(fa.psched, ntoh32(msg->r_eid)); + + pthread_rwlock_unlock(&fa.flows_lock); + + break; + default: + log_err("Got an unknown flow allocation message."); + break; + } + + free(msg); + } } int fa_init(void) @@ -213,31 +275,59 @@ int fa_init(void) destroy_conn(i); if (pthread_rwlock_init(&fa.flows_lock, NULL)) - return -1; + goto fail_rwlock; + + if (pthread_mutex_init(&fa.mtx, NULL)) + goto fail_mtx; + + if (pthread_cond_init(&fa.cond, NULL)) + goto fail_cond; + + list_head_init(&fa.cmds); fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; + + fail_cond: + pthread_mutex_destroy(&fa.mtx); + fail_mtx: + pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: + log_err("Failed to initialize flow allocator."); + return -1; } void fa_fini(void) { + pthread_cond_destroy(&fa.cond);; + pthread_mutex_destroy(&fa.mtx); pthread_rwlock_destroy(&fa.flows_lock); } int fa_start(void) { fa.psched = psched_create(packet_handler); - if (fa.psched == NULL) { - log_err("Failed to create packet scheduler."); - return -1; - } + if (fa.psched == NULL) + goto fail_psched; + + if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) + goto fail_thread; return 0; + + fail_thread: + psched_destroy(fa.psched); + fail_psched: + log_err("Failed to start flow allocator."); + return -1; } void fa_stop(void) { + pthread_cancel(fa.worker); + pthread_join(fa.worker, NULL); + psched_destroy(fa.psched); } -- cgit v1.2.3