summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/fa.c
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-02-19 16:20:22 +0100
committerSander Vrijders <sander@ouroboros.rocks>2022-02-21 09:05:26 +0100
commitc7025b1060ba250caa81f589c19e38d46949a3e7 (patch)
treeb0e2a05f8b3f59707b611ab050ddb1ac3ac3c92c /src/ipcpd/unicast/fa.c
parent5b61e1f163afcb292185ede20c4682ef5ea92081 (diff)
downloadouroboros-c7025b1060ba250caa81f589c19e38d46949a3e7.tar.gz
ouroboros-c7025b1060ba250caa81f589c19e38d46949a3e7.zip
ipcpd: Refactor flow allocator message handling
This refactors the single long function that handles incoming packets destined for the flow allocator. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/ipcpd/unicast/fa.c')
-rw-r--r--src/ipcpd/unicast/fa.c283
1 files changed, 170 insertions, 113 deletions
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index ef6adae6..7143a346 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -435,167 +435,224 @@ static void fa_post_packet(void * comp,
pthread_mutex_unlock(&fa.mtx);
}
-static void * fa_handle_packet(void * o)
+static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
{
- struct timespec ts = {0, TIMEOUT * 1000};
+ struct cmd * cmd;
+ size_t len;
- (void) o;
+ pthread_mutex_lock(&fa.mtx);
- while (true) {
- struct timespec abstime;
- int fd;
- uint8_t buf[MSGBUFSZ];
- struct fa_msg * msg;
- qosspec_t qs;
- struct cmd * cmd;
- size_t len;
- size_t msg_len;
- struct fa_flow * flow;
+ pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx);
- pthread_mutex_lock(&fa.mtx);
+ while (list_is_empty(&fa.cmds))
+ pthread_cond_wait(&fa.cond, &fa.mtx);
- pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx);
+ cmd = list_last_entry(&fa.cmds, struct cmd, next);
+ list_del(&cmd->next);
- while (list_is_empty(&fa.cmds))
- pthread_cond_wait(&fa.cond, &fa.mtx);
+ pthread_cleanup_pop(true);
- cmd = list_last_entry(&fa.cmds, struct cmd, next);
- list_del(&cmd->next);
+ len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+ if (len > MSGBUFSZ || len < sizeof(*msg)) {
+ log_warn("Invalid flow allocation message (len: %zd)\n", len);
+ free(cmd);
+ return 0; /* No valid message */
+ }
- pthread_cleanup_pop(true);
+ memcpy(msg, shm_du_buff_head(cmd->sdb), len);
- len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+ ipcp_sdb_release(cmd->sdb);
- if (len > MSGBUFSZ) {
- log_err("Message over buffer size.");
- free(cmd);
- continue;
- }
+ free(cmd);
- msg = (struct fa_msg *) buf;
+ return len;
+}
- /* Depending on the message call the function in ipcp-dev.h */
+static int fa_wait_irmd_alloc(uint8_t * dst,
+ qosspec_t qs,
+ const void * data,
+ size_t len)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ struct timespec abstime;
+ int fd;
- memcpy(msg, shm_du_buff_head(cmd->sdb), len);
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ipcp_sdb_release(cmd->sdb);
+ pthread_mutex_lock(&ipcpi.alloc_lock);
- free(cmd);
+ while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
- switch (msg->code) {
- case FLOW_REQ:
- msg_len = sizeof(*msg) + ipcp_dir_hash_len();
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_dbg("Won't allocate over non-operational IPCP.");
+ return -EIPCPSTATE;
+ }
- assert(len >= msg_len);
+ assert(ipcpi.alloc_id == -1);
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_dbg("Failed to get fd for flow.");
+ return -ENOTALLOC;
+ }
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
- pthread_mutex_lock(&ipcpi.alloc_lock);
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
- while (ipcpi.alloc_id != -1 &&
- ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
+ return fd;
+}
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Won't allocate over non-operational"
- "IPCP.");
- continue;
- }
+static int fa_handle_flow_req(struct fa_msg * msg,
+ size_t len)
+{
+ size_t msg_len;
+ int fd;
+ qosspec_t qs;
+ struct fa_flow * flow;
+ uint8_t * data; /* Piggbacked data on flow alloc request. */
+ size_t dlen; /* Length of piggybacked data. */
- assert(ipcpi.alloc_id == -1);
+ msg_len = sizeof(*msg) + ipcp_dir_hash_len();
+ if (len < msg_len) {
+ log_err("Invalid flow allocation request");
+ return -EPERM;
+ }
- qs.delay = ntoh32(msg->delay);
- qs.bandwidth = ntoh64(msg->bandwidth);
- qs.availability = msg->availability;
- qs.loss = ntoh32(msg->loss);
- qs.ber = ntoh32(msg->ber);
- qs.in_order = msg->in_order;
- qs.max_gap = ntoh32(msg->max_gap);
- qs.cypher_s = ntoh16(msg->cypher_s);
+ data = (uint8_t *) msg + msg_len;
+ dlen = len - msg_len;
- fd = ipcp_flow_req_arr((uint8_t *) (msg + 1),
- ipcp_dir_hash_len(),
- qs,
- buf + msg_len,
- len - msg_len);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_err("Failed to get fd for flow.");
- continue;
- }
+ qs.delay = ntoh32(msg->delay);
+ qs.bandwidth = ntoh64(msg->bandwidth);
+ qs.availability = msg->availability;
+ qs.loss = ntoh32(msg->loss);
+ qs.ber = ntoh32(msg->ber);
+ qs.in_order = msg->in_order;
+ qs.max_gap = ntoh32(msg->max_gap);
+ qs.cypher_s = ntoh16(msg->cypher_s);
- flow = &fa.flows[fd];
+ fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
+ if (fd < 0)
+ return fd;
- pthread_rwlock_wrlock(&fa.flows_lock);
+ flow = &fa.flows[fd];
- fa_flow_init(flow);
+ pthread_rwlock_wrlock(&fa.flows_lock);
- flow->s_eid = gen_eid(fd);
- flow->r_eid = ntoh64(msg->s_eid);
- flow->r_addr = ntoh64(msg->s_addr);
+ fa_flow_init(flow);
- pthread_rwlock_unlock(&fa.flows_lock);
+ flow->s_eid = gen_eid(fd);
+ flow->r_eid = ntoh64(msg->s_eid);
+ flow->r_addr = ntoh64(msg->s_addr);
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
+ pthread_rwlock_unlock(&fa.flows_lock);
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return fd;
+}
- break;
- case FLOW_REPLY:
- assert(len >= sizeof(*msg));
+static int fa_handle_flow_reply(struct fa_msg * msg,
+ size_t len)
+{
+ int fd;
+ struct fa_flow * flow;
+ uint8_t * data; /* Piggbacked data on flow alloc request. */
+ size_t dlen; /* Length of piggybacked data. */
- pthread_rwlock_wrlock(&fa.flows_lock);
+ assert(len >= sizeof(*msg));
- fd = eid_to_fd(ntoh64(msg->r_eid));
- if (fd < 0) {
- pthread_rwlock_unlock(&fa.flows_lock);
- break;
- }
+ data = (uint8_t *) msg + sizeof(*msg);
+ dlen = len - sizeof(*msg);
- flow = &fa.flows[fd];
+ pthread_rwlock_wrlock(&fa.flows_lock);
- flow->r_eid = ntoh64(msg->s_eid);
+ fd = eid_to_fd(ntoh64(msg->r_eid));
+ if (fd < 0) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ return -ENOTALLOC;
+ }
- if (msg->response < 0)
- fa_flow_fini(flow);
- else
- psched_add(fa.psched, fd);
+ flow = &fa.flows[fd];
- pthread_rwlock_unlock(&fa.flows_lock);
+ flow->r_eid = ntoh64(msg->s_eid);
- ipcp_flow_alloc_reply(fd,
- msg->response,
- buf + sizeof(*msg),
- len - sizeof(*msg));
- break;
- case FLOW_UPDATE:
- assert(len >= sizeof(*msg));
+ if (msg->response < 0)
+ fa_flow_fini(flow);
+ else
+ psched_add(fa.psched, fd);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ if (ipcp_flow_alloc_reply(fd, msg->response, data, dlen))
+ return -EIRMD;
- pthread_rwlock_wrlock(&fa.flows_lock);
+ return 0;
+}
- fd = eid_to_fd(ntoh64(msg->r_eid));
- if (fd < 0) {
- pthread_rwlock_unlock(&fa.flows_lock);
- break;
- }
+static int fa_handle_flow_update(struct fa_msg * msg,
+ size_t len)
+{
+ struct fa_flow * flow;
+ int fd;
+
+ assert(len >= sizeof(*msg));
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fd = eid_to_fd(ntoh64(msg->r_eid));
+ if (fd < 0) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ return -EPERM;
+ }
- flow = &fa.flows[fd];
+ flow = &fa.flows[fd];
#ifdef IPCP_FLOW_STATS
- flow->u_rcv++;
+ flow->u_rcv++;
#endif
- ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
+ ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
- pthread_rwlock_unlock(&fa.flows_lock);
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+static void * fa_handle_packet(void * o)
+{
+ (void) o;
+
+ while (true) {
+ uint8_t buf[MSGBUFSZ];
+ struct fa_msg * msg;
+ size_t len;
+
+ msg = (struct fa_msg *) buf;
+
+ len = fa_wait_for_fa_msg(msg);
+ if (len == 0)
+ continue;
+ switch (msg->code) {
+ case FLOW_REQ:
+ if (fa_handle_flow_req(msg, len) < 0)
+ log_err("Error handling flow alloc request.");
+ break;
+ case FLOW_REPLY:
+ if (fa_handle_flow_reply(msg, len) < 0)
+ log_err("Error handling flow reply.");
+ break;
+ case FLOW_UPDATE:
+ if (fa_handle_flow_update(msg, len) < 0)
+ log_err("Error handling flow update.");
break;
default:
- log_err("Got an unknown flow allocation message.");
+ log_warn("Recieved unknown flow allocation message.");
break;
}
}