summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@ugent.be>2018-10-11 14:10:03 +0200
committerSander Vrijders <sander.vrijders@ugent.be>2018-10-11 14:10:03 +0200
commit6856f3e24fa00b99747bab790b478866212b2e2d (patch)
treeca2a4f818bb1844408f015aa2ebe32cdb8a9e3a1 /src/ipcpd/normal
parent91299fe1de31465b0b389ba6fee76db23db830fb (diff)
parent9b8d2830250ecffb298f6c72196cffb94991f4d1 (diff)
downloadouroboros-6856f3e24fa00b99747bab790b478866212b2e2d.tar.gz
ouroboros-6856f3e24fa00b99747bab790b478866212b2e2d.zip
Merge branch 'testing' into be
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/dht.c4
-rw-r--r--src/ipcpd/normal/fa.c262
-rw-r--r--src/ipcpd/normal/pol/link_state.c126
-rw-r--r--src/ipcpd/normal/psched.c2
4 files changed, 244 insertions, 150 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index 4064bf5c..aa1909e9 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -2729,6 +2729,10 @@ static void handle_event(void * self,
pthread_t thr;
struct join_info * inf;
struct conn * c = (struct conn *) o;
+ struct timespec slack = {0, 10 * MILLION};
+
+ /* Give the pff some time to update for the new link. */
+ nanosleep(&slack, NULL);
switch(dht_get_state(dht)) {
case DHT_INIT:
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 027223b7..56864e1f 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -68,13 +68,23 @@ struct fa_msg {
uint32_t max_gap;
} __attribute__((packed));
+struct cmd {
+ struct list_head next;
+ struct shm_du_buff * sdb;
+};
+
struct {
- pthread_rwlock_t flows_lock;
- int r_eid[PROG_MAX_FLOWS];
- uint64_t r_addr[PROG_MAX_FLOWS];
- int fd;
+ pthread_rwlock_t flows_lock;
+ int r_eid[PROG_MAX_FLOWS];
+ uint64_t r_addr[PROG_MAX_FLOWS];
+ int fd;
- struct psched * psched;
+ struct list_head cmds;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ pthread_t worker;
+
+ struct psched * psched;
} fa;
static void packet_handler(int fd,
@@ -100,109 +110,161 @@ static void destroy_conn(int fd)
}
static void fa_post_packet(void * comp,
- struct shm_du_buff * sdb)
+ struct shm_du_buff * sdb)
{
- struct timespec ts = {0, TIMEOUT * 1000};
- struct timespec abstime;
- int fd;
- uint8_t * buf;
- struct fa_msg * msg;
- qosspec_t qs;
-
- (void) comp;
+ struct cmd * cmd;
assert(comp == &fa);
- assert(sdb);
- buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
- if (buf == NULL)
+ (void) comp;
+
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command failed. Out of memory.");
+ ipcp_sdb_release(sdb);
return;
+ }
+
+ cmd->sdb = sdb;
+
+ pthread_mutex_lock(&fa.mtx);
+
+ list_add(&cmd->next, &fa.cmds);
- msg = (struct fa_msg *) buf;
+ pthread_cond_signal(&fa.cond);
- /* Depending on the message call the function in ipcp-dev.h */
+ pthread_mutex_unlock(&fa.mtx);
+}
- memcpy(msg, shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb));
+static void * fa_handle_packet(void * o)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
- ipcp_sdb_release(sdb);
+ (void) o;
- switch (msg->code) {
- case FLOW_REQ:
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ while (true) {
+ struct timespec abstime;
+ int fd;
+ uint8_t * buf;
+ struct fa_msg * msg;
+ qosspec_t qs;
+ struct cmd * cmd;
- pthread_mutex_lock(&ipcpi.alloc_lock);
+ pthread_mutex_lock(&fa.mtx);
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ &fa.mtx);
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- free(msg);
- return;
- }
+ while (list_is_empty(&fa.cmds))
+ pthread_cond_wait(&fa.cond, &fa.mtx);
- assert(ipcpi.alloc_id == -1);
-
- qs.delay = ntoh32(msg->delay);
- qs.bandwidth = ntoh64(msg->bandwidth);
- qs.availability = msg->availability;
- qs.loss = ntoh32(msg->loss);
- qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
- qs.max_gap = ntoh32(msg->max_gap);
-
- fd = ipcp_flow_req_arr(getpid(),
- (uint8_t *) (msg + 1),
- ipcp_dir_hash_len(),
- qs);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_err("Failed to get fd for flow.");
- free(msg);
- return;
+ cmd = list_last_entry(&fa.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_cleanup_pop(true);
+
+ buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
+ if (buf == NULL) {
+ log_err("Failed to allocate memory.");
+ free(cmd);
+ ipcp_sdb_release(cmd->sdb);
+ continue;
}
- pthread_rwlock_wrlock(&fa.flows_lock);
+ msg = (struct fa_msg *) buf;
- fa.r_eid[fd] = ntoh32(msg->s_eid);
- fa.r_addr[fd] = ntoh64(msg->s_addr);
+ /* Depending on the message call the function in ipcp-dev.h */
- pthread_rwlock_unlock(&fa.flows_lock);
+ assert(sizeof(*msg) + ipcp_dir_hash_len() >=
+ (unsigned long int) (shm_du_buff_tail(cmd->sdb) -
+ shm_du_buff_head(cmd->sdb)));
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
+ memcpy(msg, shm_du_buff_head(cmd->sdb),
+ shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb));
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ ipcp_sdb_release(cmd->sdb);
- break;
- case FLOW_REPLY:
- pthread_rwlock_wrlock(&fa.flows_lock);
+ free(cmd);
- fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+ switch (msg->code) {
+ case FLOW_REQ:
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response);
+ pthread_mutex_lock(&ipcpi.alloc_lock);
- if (msg->response < 0)
- destroy_conn(ntoh32(msg->r_eid));
- else
- psched_add(fa.psched, ntoh32(msg->r_eid));
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
- pthread_rwlock_unlock(&fa.flows_lock);
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_dbg("Won't allocate over non-operational"
+ "IPCP.");
+ free(msg);
+ continue;
+ }
- break;
- default:
- log_err("Got an unknown flow allocation message.");
- break;
- }
+ assert(ipcpi.alloc_id == -1);
+
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
- free(msg);
+ fd = ipcp_flow_req_arr(getpid(),
+ (uint8_t *) (msg + 1),
+ ipcp_dir_hash_len(),
+ qs);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_err("Failed to get fd for flow.");
+ free(msg);
+ continue;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_eid[fd] = ntoh32(msg->s_eid);
+ fa.r_addr[fd] = ntoh64(msg->s_addr);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+
+ ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
+ msg->response);
+
+ if (msg->response < 0)
+ destroy_conn(ntoh32(msg->r_eid));
+ else
+ psched_add(fa.psched, ntoh32(msg->r_eid));
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ break;
+ }
+
+ free(msg);
+ }
}
int fa_init(void)
@@ -213,31 +275,59 @@ int fa_init(void)
destroy_conn(i);
if (pthread_rwlock_init(&fa.flows_lock, NULL))
- return -1;
+ goto fail_rwlock;
+
+ if (pthread_mutex_init(&fa.mtx, NULL))
+ goto fail_mtx;
+
+ if (pthread_cond_init(&fa.cond, NULL))
+ goto fail_cond;
+
+ list_head_init(&fa.cmds);
fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
return 0;
+
+ fail_cond:
+ pthread_mutex_destroy(&fa.mtx);
+ fail_mtx:
+ pthread_rwlock_destroy(&fa.flows_lock);
+ fail_rwlock:
+ log_err("Failed to initialize flow allocator.");
+ return -1;
}
void fa_fini(void)
{
+ pthread_cond_destroy(&fa.cond);;
+ pthread_mutex_destroy(&fa.mtx);
pthread_rwlock_destroy(&fa.flows_lock);
}
int fa_start(void)
{
fa.psched = psched_create(packet_handler);
- if (fa.psched == NULL) {
- log_err("Failed to create packet scheduler.");
- return -1;
- }
+ if (fa.psched == NULL)
+ goto fail_psched;
+
+ if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL))
+ goto fail_thread;
return 0;
+
+ fail_thread:
+ psched_destroy(fa.psched);
+ fail_psched:
+ log_err("Failed to start flow allocator.");
+ return -1;
}
void fa_stop(void)
{
+ pthread_cancel(fa.worker);
+ pthread_join(fa.worker, NULL);
+
psched_destroy(fa.psched);
}
diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c
index e2e9eab5..8db1a9c5 100644
--- a/src/ipcpd/normal/pol/link_state.c
+++ b/src/ipcpd/normal/pol/link_state.c
@@ -374,7 +374,66 @@ static int lsdb_del_nb(uint64_t addr,
return -EPERM;
}
-static void set_pff_modified(void)
+static int nbr_to_fd(uint64_t addr)
+{
+ struct list_head * p;
+
+ pthread_rwlock_rdlock(&ls.db_lock);
+
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->addr == addr && nb->type == NB_DT) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return nb->fd;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return -1;
+}
+
+static void calculate_pff(struct routing_i * instance)
+{
+ int fd;
+ struct list_head table;
+ struct list_head * p;
+ struct list_head * q;
+ int fds[PROG_MAX_FLOWS];
+
+ if (graph_routing_table(ls.graph, ls.routing_algo,
+ ipcpi.dt_addr, &table))
+ return;
+
+ pff_lock(instance->pff);
+
+ pff_flush(instance->pff);
+
+ /* Calulcate forwarding table from routing table. */
+ list_for_each(p, &table) {
+ int i = 0;
+ struct routing_table * t =
+ list_entry(p, struct routing_table, next);
+
+ list_for_each(q, &t->nhops) {
+ struct nhop * n = list_entry(q, struct nhop, next);
+
+ fd = nbr_to_fd(n->nhop);
+ if (fd == -1)
+ continue;
+
+ fds[i++] = fd;
+ }
+
+ pff_add(instance->pff, t->dst, fds, i);
+ }
+
+ pff_unlock(instance->pff);
+
+ graph_free_routing_table(ls.graph, &table);
+}
+
+static void set_pff_modified(bool calc)
{
struct list_head * p;
@@ -385,6 +444,8 @@ static void set_pff_modified(void)
pthread_mutex_lock(&inst->lock);
inst->modified = true;
pthread_mutex_unlock(&inst->lock);
+ if (calc)
+ calculate_pff(inst);
}
pthread_mutex_unlock(&ls.routing_i_lock);
}
@@ -439,7 +500,7 @@ static int lsdb_add_link(uint64_t src,
pthread_rwlock_unlock(&ls.db_lock);
- set_pff_modified();
+ set_pff_modified(true);
return 0;
}
@@ -462,7 +523,7 @@ static int lsdb_del_link(uint64_t src,
ls.db_len--;
pthread_rwlock_unlock(&ls.db_lock);
- set_pff_modified();
+ set_pff_modified(false);
free(a);
return 0;
}
@@ -473,65 +534,6 @@ static int lsdb_del_link(uint64_t src,
return -EPERM;
}
-static int nbr_to_fd(uint64_t addr)
-{
- struct list_head * p;
-
- pthread_rwlock_rdlock(&ls.db_lock);
-
- list_for_each(p, &ls.nbs) {
- struct nb * nb = list_entry(p, struct nb, next);
- if (nb->addr == addr && nb->type == NB_DT) {
- pthread_rwlock_unlock(&ls.db_lock);
- return nb->fd;
- }
- }
-
- pthread_rwlock_unlock(&ls.db_lock);
-
- return -1;
-}
-
-static void calculate_pff(struct routing_i * instance)
-{
- int fd;
- struct list_head table;
- struct list_head * p;
- struct list_head * q;
- int fds[PROG_MAX_FLOWS];
-
- if (graph_routing_table(ls.graph, ls.routing_algo,
- ipcpi.dt_addr, &table))
- return;
-
- pff_lock(instance->pff);
-
- pff_flush(instance->pff);
-
- /* Calulcate forwarding table from routing table. */
- list_for_each(p, &table) {
- int i = 0;
- struct routing_table * t =
- list_entry(p, struct routing_table, next);
-
- list_for_each(q, &t->nhops) {
- struct nhop * n = list_entry(q, struct nhop, next);
-
- fd = nbr_to_fd(n->nhop);
- if (fd == -1)
- continue;
-
- fds[i++] = fd;
- }
-
- pff_add(instance->pff, t->dst, fds, i);
- }
-
- pff_unlock(instance->pff);
-
- graph_free_routing_table(ls.graph, &table);
-}
-
static void * periodic_recalc_pff(void * o)
{
bool modified;
diff --git a/src/ipcpd/normal/psched.c b/src/ipcpd/normal/psched.c
index 27e5f1de..c38c072d 100644
--- a/src/ipcpd/normal/psched.c
+++ b/src/ipcpd/normal/psched.c
@@ -42,11 +42,9 @@
#include <string.h>
static int qos_prio [] = {
- QOS_PRIO_RAW,
QOS_PRIO_BE,
QOS_PRIO_VIDEO,
QOS_PRIO_VOICE,
- QOS_PRIO_DATA
};
struct psched {