summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri.staessens@ugent.be>2018-10-11 01:44:28 +0200
committerSander Vrijders <sander.vrijders@ugent.be>2018-10-11 13:42:38 +0200
commitebf4e472b0065677cb3caf2b67b4bf31fd1f7343 (patch)
treef11eb412c7e57258f25daa31913cc8da0b3f645a
parent4402b69c2535f9975d835438a52cebc89a147ba3 (diff)
downloadouroboros-ebf4e472b0065677cb3caf2b67b4bf31fd1f7343.tar.gz
ouroboros-ebf4e472b0065677cb3caf2b67b4bf31fd1f7343.zip
ipcpd: Decouple flow allocator from dt thread
The flow allocator passed a blocking callback to the forwarding component, which blocks packet processing. Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be> Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
-rw-r--r--src/ipcpd/normal/fa.c262
1 files changed, 176 insertions, 86 deletions
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 027223b7..56864e1f 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -68,13 +68,23 @@ struct fa_msg {
uint32_t max_gap;
} __attribute__((packed));
+struct cmd {
+ struct list_head next;
+ struct shm_du_buff * sdb;
+};
+
struct {
- pthread_rwlock_t flows_lock;
- int r_eid[PROG_MAX_FLOWS];
- uint64_t r_addr[PROG_MAX_FLOWS];
- int fd;
+ pthread_rwlock_t flows_lock;
+ int r_eid[PROG_MAX_FLOWS];
+ uint64_t r_addr[PROG_MAX_FLOWS];
+ int fd;
- struct psched * psched;
+ struct list_head cmds;
+ pthread_cond_t cond;
+ pthread_mutex_t mtx;
+ pthread_t worker;
+
+ struct psched * psched;
} fa;
static void packet_handler(int fd,
@@ -100,109 +110,161 @@ static void destroy_conn(int fd)
}
static void fa_post_packet(void * comp,
- struct shm_du_buff * sdb)
+ struct shm_du_buff * sdb)
{
- struct timespec ts = {0, TIMEOUT * 1000};
- struct timespec abstime;
- int fd;
- uint8_t * buf;
- struct fa_msg * msg;
- qosspec_t qs;
-
- (void) comp;
+ struct cmd * cmd;
assert(comp == &fa);
- assert(sdb);
- buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
- if (buf == NULL)
+ (void) comp;
+
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command failed. Out of memory.");
+ ipcp_sdb_release(sdb);
return;
+ }
+
+ cmd->sdb = sdb;
+
+ pthread_mutex_lock(&fa.mtx);
+
+ list_add(&cmd->next, &fa.cmds);
- msg = (struct fa_msg *) buf;
+ pthread_cond_signal(&fa.cond);
- /* Depending on the message call the function in ipcp-dev.h */
+ pthread_mutex_unlock(&fa.mtx);
+}
- memcpy(msg, shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb));
+static void * fa_handle_packet(void * o)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
- ipcp_sdb_release(sdb);
+ (void) o;
- switch (msg->code) {
- case FLOW_REQ:
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ while (true) {
+ struct timespec abstime;
+ int fd;
+ uint8_t * buf;
+ struct fa_msg * msg;
+ qosspec_t qs;
+ struct cmd * cmd;
- pthread_mutex_lock(&ipcpi.alloc_lock);
+ pthread_mutex_lock(&fa.mtx);
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
+ pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
+ &fa.mtx);
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- free(msg);
- return;
- }
+ while (list_is_empty(&fa.cmds))
+ pthread_cond_wait(&fa.cond, &fa.mtx);
- assert(ipcpi.alloc_id == -1);
-
- qs.delay = ntoh32(msg->delay);
- qs.bandwidth = ntoh64(msg->bandwidth);
- qs.availability = msg->availability;
- qs.loss = ntoh32(msg->loss);
- qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
- qs.max_gap = ntoh32(msg->max_gap);
-
- fd = ipcp_flow_req_arr(getpid(),
- (uint8_t *) (msg + 1),
- ipcp_dir_hash_len(),
- qs);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_err("Failed to get fd for flow.");
- free(msg);
- return;
+ cmd = list_last_entry(&fa.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_cleanup_pop(true);
+
+ buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
+ if (buf == NULL) {
+ log_err("Failed to allocate memory.");
+ free(cmd);
+ ipcp_sdb_release(cmd->sdb);
+ continue;
}
- pthread_rwlock_wrlock(&fa.flows_lock);
+ msg = (struct fa_msg *) buf;
- fa.r_eid[fd] = ntoh32(msg->s_eid);
- fa.r_addr[fd] = ntoh64(msg->s_addr);
+ /* Depending on the message call the function in ipcp-dev.h */
- pthread_rwlock_unlock(&fa.flows_lock);
+ assert(sizeof(*msg) + ipcp_dir_hash_len() >=
+ (unsigned long int) (shm_du_buff_tail(cmd->sdb) -
+ shm_du_buff_head(cmd->sdb)));
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
+ memcpy(msg, shm_du_buff_head(cmd->sdb),
+ shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb));
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ ipcp_sdb_release(cmd->sdb);
- break;
- case FLOW_REPLY:
- pthread_rwlock_wrlock(&fa.flows_lock);
+ free(cmd);
- fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+ switch (msg->code) {
+ case FLOW_REQ:
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response);
+ pthread_mutex_lock(&ipcpi.alloc_lock);
- if (msg->response < 0)
- destroy_conn(ntoh32(msg->r_eid));
- else
- psched_add(fa.psched, ntoh32(msg->r_eid));
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
- pthread_rwlock_unlock(&fa.flows_lock);
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_dbg("Won't allocate over non-operational"
+ "IPCP.");
+ free(msg);
+ continue;
+ }
- break;
- default:
- log_err("Got an unknown flow allocation message.");
- break;
- }
+ assert(ipcpi.alloc_id == -1);
+
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
- free(msg);
+ fd = ipcp_flow_req_arr(getpid(),
+ (uint8_t *) (msg + 1),
+ ipcp_dir_hash_len(),
+ qs);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_err("Failed to get fd for flow.");
+ free(msg);
+ continue;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_eid[fd] = ntoh32(msg->s_eid);
+ fa.r_addr[fd] = ntoh64(msg->s_addr);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+
+ ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
+ msg->response);
+
+ if (msg->response < 0)
+ destroy_conn(ntoh32(msg->r_eid));
+ else
+ psched_add(fa.psched, ntoh32(msg->r_eid));
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ break;
+ }
+
+ free(msg);
+ }
}
int fa_init(void)
@@ -213,31 +275,59 @@ int fa_init(void)
destroy_conn(i);
if (pthread_rwlock_init(&fa.flows_lock, NULL))
- return -1;
+ goto fail_rwlock;
+
+ if (pthread_mutex_init(&fa.mtx, NULL))
+ goto fail_mtx;
+
+ if (pthread_cond_init(&fa.cond, NULL))
+ goto fail_cond;
+
+ list_head_init(&fa.cmds);
fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
return 0;
+
+ fail_cond:
+ pthread_mutex_destroy(&fa.mtx);
+ fail_mtx:
+ pthread_rwlock_destroy(&fa.flows_lock);
+ fail_rwlock:
+ log_err("Failed to initialize flow allocator.");
+ return -1;
}
void fa_fini(void)
{
+ pthread_cond_destroy(&fa.cond);;
+ pthread_mutex_destroy(&fa.mtx);
pthread_rwlock_destroy(&fa.flows_lock);
}
int fa_start(void)
{
fa.psched = psched_create(packet_handler);
- if (fa.psched == NULL) {
- log_err("Failed to create packet scheduler.");
- return -1;
- }
+ if (fa.psched == NULL)
+ goto fail_psched;
+
+ if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL))
+ goto fail_thread;
return 0;
+
+ fail_thread:
+ psched_destroy(fa.psched);
+ fail_psched:
+ log_err("Failed to start flow allocator.");
+ return -1;
}
void fa_stop(void)
{
+ pthread_cancel(fa.worker);
+ pthread_join(fa.worker, NULL);
+
psched_destroy(fa.psched);
}