From a46f96fbfc017963a5578498c93bc85650461020 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Wed, 10 Oct 2018 11:11:09 +0200 Subject: ipcpd: Remove stale QoS cubes There were QoS cubes defined for raw and data flows, which are now run on the best effort cube. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/normal/psched.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/psched.c b/src/ipcpd/normal/psched.c index 27e5f1de..c38c072d 100644 --- a/src/ipcpd/normal/psched.c +++ b/src/ipcpd/normal/psched.c @@ -42,11 +42,9 @@ #include static int qos_prio [] = { - QOS_PRIO_RAW, QOS_PRIO_BE, QOS_PRIO_VIDEO, QOS_PRIO_VOICE, - QOS_PRIO_DATA }; struct psched { -- cgit v1.2.3 From ebf4e472b0065677cb3caf2b67b4bf31fd1f7343 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 11 Oct 2018 01:44:28 +0200 Subject: ipcpd: Decouple flow allocator from dt thread The flow allocator passed a blocking callback to the forwarding component, which blocks packet processing. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/normal/fa.c | 262 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 176 insertions(+), 86 deletions(-) (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 027223b7..56864e1f 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -68,13 +68,23 @@ struct fa_msg { uint32_t max_gap; } __attribute__((packed)); +struct cmd { + struct list_head next; + struct shm_du_buff * sdb; +}; + struct { - pthread_rwlock_t flows_lock; - int r_eid[PROG_MAX_FLOWS]; - uint64_t r_addr[PROG_MAX_FLOWS]; - int fd; + pthread_rwlock_t flows_lock; + int r_eid[PROG_MAX_FLOWS]; + uint64_t r_addr[PROG_MAX_FLOWS]; + int fd; - struct psched * psched; + struct list_head cmds; + pthread_cond_t cond; + pthread_mutex_t mtx; + pthread_t worker; + + struct psched * psched; } fa; static void packet_handler(int fd, @@ -100,109 +110,161 @@ static void destroy_conn(int fd) } static void fa_post_packet(void * comp, - struct shm_du_buff * sdb) + struct shm_du_buff * sdb) { - struct timespec ts = {0, TIMEOUT * 1000}; - struct timespec abstime; - int fd; - uint8_t * buf; - struct fa_msg * msg; - qosspec_t qs; - - (void) comp; + struct cmd * cmd; assert(comp == &fa); - assert(sdb); - buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); - if (buf == NULL) + (void) comp; + + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); + ipcp_sdb_release(sdb); return; + } + + cmd->sdb = sdb; + + pthread_mutex_lock(&fa.mtx); + + list_add(&cmd->next, &fa.cmds); - msg = (struct fa_msg *) buf; + pthread_cond_signal(&fa.cond); - /* Depending on the message call the function in ipcp-dev.h */ + pthread_mutex_unlock(&fa.mtx); +} - memcpy(msg, shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb)); +static void * fa_handle_packet(void * o) +{ + struct timespec ts = {0, TIMEOUT * 1000}; - ipcp_sdb_release(sdb); + (void) o; - switch (msg->code) { - case FLOW_REQ: - clock_gettime(PTHREAD_COND_CLOCK, &abstime); + while (true) { + struct timespec abstime; + int fd; + uint8_t * buf; + struct fa_msg * msg; + qosspec_t qs; + struct cmd * cmd; - pthread_mutex_lock(&ipcpi.alloc_lock); + pthread_mutex_lock(&fa.mtx); - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) { - ts_add(&abstime, &ts, &abstime); - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &abstime); - } + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + &fa.mtx); - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - free(msg); - return; - } + while (list_is_empty(&fa.cmds)) + pthread_cond_wait(&fa.cond, &fa.mtx); - assert(ipcpi.alloc_id == -1); - - qs.delay = ntoh32(msg->delay); - qs.bandwidth = ntoh64(msg->bandwidth); - qs.availability = msg->availability; - qs.loss = ntoh32(msg->loss); - qs.ber = ntoh32(msg->ber); - qs.in_order = msg->in_order; - qs.max_gap = ntoh32(msg->max_gap); - - fd = ipcp_flow_req_arr(getpid(), - (uint8_t *) (msg + 1), - ipcp_dir_hash_len(), - qs); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - log_err("Failed to get fd for flow."); - free(msg); - return; + cmd = list_last_entry(&fa.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_cleanup_pop(true); + + buf = malloc(sizeof(*msg) + ipcp_dir_hash_len()); + if (buf == NULL) { + log_err("Failed to allocate memory."); + free(cmd); + ipcp_sdb_release(cmd->sdb); + continue; } - pthread_rwlock_wrlock(&fa.flows_lock); + msg = (struct fa_msg *) buf; - fa.r_eid[fd] = ntoh32(msg->s_eid); - fa.r_addr[fd] = ntoh64(msg->s_addr); + /* Depending on the message call the function in ipcp-dev.h */ - pthread_rwlock_unlock(&fa.flows_lock); + assert(sizeof(*msg) + ipcp_dir_hash_len() >= + (unsigned long int) (shm_du_buff_tail(cmd->sdb) - + shm_du_buff_head(cmd->sdb))); - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); + memcpy(msg, shm_du_buff_head(cmd->sdb), + shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb)); - pthread_mutex_unlock(&ipcpi.alloc_lock); + ipcp_sdb_release(cmd->sdb); - break; - case FLOW_REPLY: - pthread_rwlock_wrlock(&fa.flows_lock); + free(cmd); - fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + switch (msg->code) { + case FLOW_REQ: + clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response); + pthread_mutex_lock(&ipcpi.alloc_lock); - if (msg->response < 0) - destroy_conn(ntoh32(msg->r_eid)); - else - psched_add(fa.psched, ntoh32(msg->r_eid)); + while (ipcpi.alloc_id != -1 && + ipcp_get_state() == IPCP_OPERATIONAL) { + ts_add(&abstime, &ts, &abstime); + pthread_cond_timedwait(&ipcpi.alloc_cond, + &ipcpi.alloc_lock, + &abstime); + } - pthread_rwlock_unlock(&fa.flows_lock); + if (ipcp_get_state() != IPCP_OPERATIONAL) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_dbg("Won't allocate over non-operational" + "IPCP."); + free(msg); + continue; + } - break; - default: - log_err("Got an unknown flow allocation message."); - break; - } + assert(ipcpi.alloc_id == -1); + + qs.delay = ntoh32(msg->delay); + qs.bandwidth = ntoh64(msg->bandwidth); + qs.availability = msg->availability; + qs.loss = ntoh32(msg->loss); + qs.ber = ntoh32(msg->ber); + qs.in_order = msg->in_order; + qs.max_gap = ntoh32(msg->max_gap); - free(msg); + fd = ipcp_flow_req_arr(getpid(), + (uint8_t *) (msg + 1), + ipcp_dir_hash_len(), + qs); + if (fd < 0) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + log_err("Failed to get fd for flow."); + free(msg); + continue; + } + + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_eid[fd] = ntoh32(msg->s_eid); + fa.r_addr[fd] = ntoh64(msg->s_addr); + + pthread_rwlock_unlock(&fa.flows_lock); + + ipcpi.alloc_id = fd; + pthread_cond_broadcast(&ipcpi.alloc_cond); + + pthread_mutex_unlock(&ipcpi.alloc_lock); + + break; + case FLOW_REPLY: + pthread_rwlock_wrlock(&fa.flows_lock); + + fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + + ipcp_flow_alloc_reply(ntoh32(msg->r_eid), + msg->response); + + if (msg->response < 0) + destroy_conn(ntoh32(msg->r_eid)); + else + psched_add(fa.psched, ntoh32(msg->r_eid)); + + pthread_rwlock_unlock(&fa.flows_lock); + + break; + default: + log_err("Got an unknown flow allocation message."); + break; + } + + free(msg); + } } int fa_init(void) @@ -213,31 +275,59 @@ int fa_init(void) destroy_conn(i); if (pthread_rwlock_init(&fa.flows_lock, NULL)) - return -1; + goto fail_rwlock; + + if (pthread_mutex_init(&fa.mtx, NULL)) + goto fail_mtx; + + if (pthread_cond_init(&fa.cond, NULL)) + goto fail_cond; + + list_head_init(&fa.cmds); fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); return 0; + + fail_cond: + pthread_mutex_destroy(&fa.mtx); + fail_mtx: + pthread_rwlock_destroy(&fa.flows_lock); + fail_rwlock: + log_err("Failed to initialize flow allocator."); + return -1; } void fa_fini(void) { + pthread_cond_destroy(&fa.cond);; + pthread_mutex_destroy(&fa.mtx); pthread_rwlock_destroy(&fa.flows_lock); } int fa_start(void) { fa.psched = psched_create(packet_handler); - if (fa.psched == NULL) { - log_err("Failed to create packet scheduler."); - return -1; - } + if (fa.psched == NULL) + goto fail_psched; + + if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL)) + goto fail_thread; return 0; + + fail_thread: + psched_destroy(fa.psched); + fail_psched: + log_err("Failed to start flow allocator."); + return -1; } void fa_stop(void) { + pthread_cancel(fa.worker); + pthread_join(fa.worker, NULL); + psched_destroy(fa.psched); } -- cgit v1.2.3 From 9b8d2830250ecffb298f6c72196cffb94991f4d1 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 11 Oct 2018 01:44:29 +0200 Subject: ipcpd: Speed up enrolment of DHT The link-state algorithm will now quickly recalculate for link additions (but not for removals, for stability). Upon notification of a new link, the DHT will wait for a brief moment to enroll. This reduces enrolment for large networks by some orders of magnitude. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/normal/dht.c | 4 ++ src/ipcpd/normal/pol/link_state.c | 126 +++++++++++++++++++------------------- 2 files changed, 68 insertions(+), 62 deletions(-) (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 4064bf5c..aa1909e9 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -2729,6 +2729,10 @@ static void handle_event(void * self, pthread_t thr; struct join_info * inf; struct conn * c = (struct conn *) o; + struct timespec slack = {0, 10 * MILLION}; + + /* Give the pff some time to update for the new link. */ + nanosleep(&slack, NULL); switch(dht_get_state(dht)) { case DHT_INIT: diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index e2e9eab5..8db1a9c5 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -374,7 +374,66 @@ static int lsdb_del_nb(uint64_t addr, return -EPERM; } -static void set_pff_modified(void) +static int nbr_to_fd(uint64_t addr) +{ + struct list_head * p; + + pthread_rwlock_rdlock(&ls.db_lock); + + list_for_each(p, &ls.nbs) { + struct nb * nb = list_entry(p, struct nb, next); + if (nb->addr == addr && nb->type == NB_DT) { + pthread_rwlock_unlock(&ls.db_lock); + return nb->fd; + } + } + + pthread_rwlock_unlock(&ls.db_lock); + + return -1; +} + +static void calculate_pff(struct routing_i * instance) +{ + int fd; + struct list_head table; + struct list_head * p; + struct list_head * q; + int fds[PROG_MAX_FLOWS]; + + if (graph_routing_table(ls.graph, ls.routing_algo, + ipcpi.dt_addr, &table)) + return; + + pff_lock(instance->pff); + + pff_flush(instance->pff); + + /* Calulcate forwarding table from routing table. */ + list_for_each(p, &table) { + int i = 0; + struct routing_table * t = + list_entry(p, struct routing_table, next); + + list_for_each(q, &t->nhops) { + struct nhop * n = list_entry(q, struct nhop, next); + + fd = nbr_to_fd(n->nhop); + if (fd == -1) + continue; + + fds[i++] = fd; + } + + pff_add(instance->pff, t->dst, fds, i); + } + + pff_unlock(instance->pff); + + graph_free_routing_table(ls.graph, &table); +} + +static void set_pff_modified(bool calc) { struct list_head * p; @@ -385,6 +444,8 @@ static void set_pff_modified(void) pthread_mutex_lock(&inst->lock); inst->modified = true; pthread_mutex_unlock(&inst->lock); + if (calc) + calculate_pff(inst); } pthread_mutex_unlock(&ls.routing_i_lock); } @@ -439,7 +500,7 @@ static int lsdb_add_link(uint64_t src, pthread_rwlock_unlock(&ls.db_lock); - set_pff_modified(); + set_pff_modified(true); return 0; } @@ -462,7 +523,7 @@ static int lsdb_del_link(uint64_t src, ls.db_len--; pthread_rwlock_unlock(&ls.db_lock); - set_pff_modified(); + set_pff_modified(false); free(a); return 0; } @@ -473,65 +534,6 @@ static int lsdb_del_link(uint64_t src, return -EPERM; } -static int nbr_to_fd(uint64_t addr) -{ - struct list_head * p; - - pthread_rwlock_rdlock(&ls.db_lock); - - list_for_each(p, &ls.nbs) { - struct nb * nb = list_entry(p, struct nb, next); - if (nb->addr == addr && nb->type == NB_DT) { - pthread_rwlock_unlock(&ls.db_lock); - return nb->fd; - } - } - - pthread_rwlock_unlock(&ls.db_lock); - - return -1; -} - -static void calculate_pff(struct routing_i * instance) -{ - int fd; - struct list_head table; - struct list_head * p; - struct list_head * q; - int fds[PROG_MAX_FLOWS]; - - if (graph_routing_table(ls.graph, ls.routing_algo, - ipcpi.dt_addr, &table)) - return; - - pff_lock(instance->pff); - - pff_flush(instance->pff); - - /* Calulcate forwarding table from routing table. */ - list_for_each(p, &table) { - int i = 0; - struct routing_table * t = - list_entry(p, struct routing_table, next); - - list_for_each(q, &t->nhops) { - struct nhop * n = list_entry(q, struct nhop, next); - - fd = nbr_to_fd(n->nhop); - if (fd == -1) - continue; - - fds[i++] = fd; - } - - pff_add(instance->pff, t->dst, fds, i); - } - - pff_unlock(instance->pff); - - graph_free_routing_table(ls.graph, &table); -} - static void * periodic_recalc_pff(void * o) { bool modified; -- cgit v1.2.3