From 9dab3985812e75071271ce69000561156d0d9374 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 27 Dec 2018 15:42:00 +0100 Subject: include: Add a flow_join operation for broadcast This adds a new flow_join operaiton for broadcast, which is a much safer solution than overloading destination name semantics. The internal API now also has a different IPCP_FLOW_JOIN operation. The IRMd doesn't need to query broadcasts IPCPs for the name, it can just check if an IPCP with the layer name exists. The broadcast IPCP doesn't need to implement the query proxy call anymore. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- include/ouroboros/dev.h | 5 ++++ src/ipcpd/broadcast/main.c | 30 +++++++----------------- src/ipcpd/eth/eth.c | 1 + src/ipcpd/ipcp.c | 33 ++++++++++++++++++++++++++ src/ipcpd/ipcp.h | 4 ++++ src/ipcpd/local/main.c | 1 + src/ipcpd/normal/main.c | 1 + src/ipcpd/raptor/main.c | 1 + src/ipcpd/udp/main.c | 1 + src/irmd/ipcp.c | 38 ++++++++++++++++++++++++------ src/irmd/ipcp.h | 7 ++++++ src/irmd/main.c | 56 +++++++++++++++++++++++++++++++++++++------- src/lib/dev.c | 26 ++++++++++++++++---- src/lib/ipcpd_messages.proto | 11 +++++---- src/lib/irmd_messages.proto | 9 +++---- src/tools/obc/obc.c | 8 +++---- 16 files changed, 178 insertions(+), 54 deletions(-) diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index 7c5ab460..cd87068e 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -40,6 +40,11 @@ int flow_alloc(const char * dst_name, int flow_accept(qosspec_t * qs, const struct timespec * timeo); +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_join(const char * bc, + qosspec_t * qs, + const struct timespec * timeo); + int flow_dealloc(int fd); ssize_t flow_write(int fd, diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index 8c6bfa71..af39dd34 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -198,31 +198,18 @@ static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf) return -1; } -static int broadcast_ipcp_query(const uint8_t * dst) +static int name_check(const uint8_t * dst) { uint8_t * buf; size_t len; int ret; - char * multicast_name; - char * suffix = ".mc"; len = hash_len(ipcpi.dir_hash_algo); buf = malloc(len); if (buf == NULL) return -ENOMEM; - multicast_name = malloc(strlen(ipcpi.layer_name) + strlen(suffix) + 1); - if (multicast_name == NULL) { - free(buf); - return -ENOMEM; - } - - strcpy(multicast_name, ipcpi.layer_name); - strcat(multicast_name, suffix); - - str_hash(ipcpi.dir_hash_algo, buf, multicast_name); - - free(multicast_name); + str_hash(ipcpi.dir_hash_algo, buf, ipcpi.layer_name); ret = memcmp(buf, dst, len); @@ -231,9 +218,9 @@ static int broadcast_ipcp_query(const uint8_t * dst) return ret; } -static int broadcast_ipcp_alloc(int fd, - const uint8_t * dst, - qosspec_t qs) +static int broadcast_ipcp_join(int fd, + const uint8_t * dst, + qosspec_t qs) { struct conn conn; @@ -243,7 +230,7 @@ static int broadcast_ipcp_alloc(int fd, conn.flow_info.fd = fd; - if (broadcast_ipcp_query(dst) != 0) + if (name_check(dst) != 0) return -1; notifier_event(NOTIFY_DT_CONN_ADD, &conn); @@ -276,8 +263,9 @@ static struct ipcp_ops broadcast_ops = { .ipcp_disconnect = connmgr_ipcp_disconnect, .ipcp_reg = NULL, .ipcp_unreg = NULL, - .ipcp_query = broadcast_ipcp_query, - .ipcp_flow_alloc = broadcast_ipcp_alloc, + .ipcp_query = NULL, + .ipcp_flow_alloc = NULL, + .ipcp_flow_join = broadcast_ipcp_join, .ipcp_flow_alloc_resp = NULL, .ipcp_flow_dealloc = broadcast_ipcp_dealloc }; diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index f9691626..68f39c5d 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1777,6 +1777,7 @@ static struct ipcp_ops eth_ops = { .ipcp_unreg = eth_ipcp_unreg, .ipcp_query = eth_ipcp_query, .ipcp_flow_alloc = eth_ipcp_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = eth_ipcp_flow_alloc_resp, .ipcp_flow_dealloc = eth_ipcp_flow_dealloc }; diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 6376bedb..dced6f64 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -428,6 +428,39 @@ static void * mainloop(void * o) msg->hash.data, qs); break; + case IPCP_MSG_CODE__IPCP_FLOW_JOIN: + ret_msg.has_result = true; + + if (ipcpi.ops->ipcp_flow_join == NULL) { + log_err("Broadcast unsupported."); + ret_msg.result = -ENOTSUP; + break; + } + + assert(msg->hash.len == ipcp_dir_hash_len()); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg.result = -EIPCPSTATE; + break; + } + + qs = msg_to_spec(msg->qosspec); + fd = np1_flow_alloc(msg->pid, + msg->flow_id, + qs); + if (fd < 0) { + log_err("Failed allocating fd on flow_id %d.", + msg->flow_id); + ret_msg.result = -1; + break; + } + + ret_msg.result = + ipcpi.ops->ipcp_flow_join(fd, + msg->hash.data, + qs); + break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: ret_msg.has_result = true; if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index fabd35fe..b6e79413 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -62,6 +62,10 @@ struct ipcp_ops { const uint8_t * dst, qosspec_t qs); + int (* ipcp_flow_join)(int fd, + const uint8_t * dst, + qosspec_t qs); + int (* ipcp_flow_alloc_resp)(int fd, int response); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index ab43f1f8..88cf2352 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -325,6 +325,7 @@ static struct ipcp_ops local_ops = { .ipcp_unreg = ipcp_local_unreg, .ipcp_query = ipcp_local_query, .ipcp_flow_alloc = ipcp_local_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, .ipcp_flow_dealloc = ipcp_local_flow_dealloc }; diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 3f05f421..5e013eb8 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -295,6 +295,7 @@ static struct ipcp_ops normal_ops = { .ipcp_unreg = dir_unreg, .ipcp_query = normal_ipcp_query, .ipcp_flow_alloc = fa_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = fa_alloc_resp, .ipcp_flow_dealloc = fa_dealloc }; diff --git a/src/ipcpd/raptor/main.c b/src/ipcpd/raptor/main.c index 8f578611..d3c9040e 100644 --- a/src/ipcpd/raptor/main.c +++ b/src/ipcpd/raptor/main.c @@ -1055,6 +1055,7 @@ static struct ipcp_ops raptor_ops = { .ipcp_unreg = raptor_unreg, .ipcp_query = raptor_query, .ipcp_flow_alloc = raptor_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = raptor_flow_alloc_resp, .ipcp_flow_dealloc = raptor_flow_dealloc }; diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index a1af1e85..31e6166b 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -1183,6 +1183,7 @@ static struct ipcp_ops udp_ops = { .ipcp_unreg = ipcp_udp_unreg, .ipcp_query = ipcp_udp_query, .ipcp_flow_alloc = ipcp_udp_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp, .ipcp_flow_dealloc = ipcp_udp_flow_dealloc }; diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 7f3f4807..08547d01 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -435,12 +435,13 @@ int ipcp_query(pid_t pid, return ret; } -int ipcp_flow_alloc(pid_t pid, - int flow_id, - pid_t n_pid, - const uint8_t * dst, - size_t len, - qosspec_t qs) +static int __ipcp_flow_alloc(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs, + bool join) { ipcp_msg_t msg = IPCP_MSG__INIT; qosspec_msg_t qs_msg; @@ -449,7 +450,10 @@ int ipcp_flow_alloc(pid_t pid, assert(dst); - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + if (join) + msg.code = IPCP_MSG_CODE__IPCP_FLOW_JOIN; + else + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; msg.has_flow_id = true; msg.flow_id = flow_id; msg.has_pid = true; @@ -475,6 +479,26 @@ int ipcp_flow_alloc(pid_t pid, return ret; } +int ipcp_flow_alloc(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs) +{ + return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false); +} + +int ipcp_flow_join(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs) +{ + return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true); +} + int ipcp_flow_alloc_resp(pid_t pid, int flow_id, pid_t n_pid, diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 07b9c44a..611bada2 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -69,6 +69,13 @@ int ipcp_flow_alloc(pid_t pid, size_t len, qosspec_t qs); +int ipcp_flow_join(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs); + int ipcp_flow_alloc_resp(pid_t pid, int flow_id, pid_t n_pid, diff --git a/src/irmd/main.c b/src/irmd/main.c index 67e16de0..802b01f0 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -330,6 +330,19 @@ static struct ipcp_entry * get_ipcp_entry_by_name(const char * name) return NULL; } +static struct ipcp_entry * get_ipcp_entry_by_layer(const char * layer) +{ + struct list_head * p; + + list_for_each(p, &irmd.ipcps) { + struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next); + if (strcmp(layer, e->layer) == 0) + return e; + } + + return NULL; +} + static struct ipcp_entry * get_ipcp_by_dst_name(const char * name, pid_t src) { @@ -1267,7 +1280,8 @@ static int flow_alloc(pid_t pid, const char * dst, qosspec_t qs, struct timespec * timeo, - struct irm_flow ** e) + struct irm_flow ** e, + bool join) { struct irm_flow * f; struct ipcp_entry * ipcp; @@ -1275,7 +1289,10 @@ static int flow_alloc(pid_t pid, int state; uint8_t * hash; - ipcp = get_ipcp_by_dst_name(dst, pid); + if (join) + ipcp = get_ipcp_entry_by_layer(dst); + else + ipcp = get_ipcp_by_dst_name(dst, pid); if (ipcp == NULL) { log_info("Destination %s unreachable.", dst); return -1; @@ -1310,12 +1327,22 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); - if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, - IPCP_HASH_LEN(ipcp), qs)) { - /* sanitizer cleans this */ - log_info("Flow_allocation failed."); - free(hash); - return -EAGAIN; + if (join) { + if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, + IPCP_HASH_LEN(ipcp), qs)) { + /* sanitizer cleans this */ + log_info("Flow_join failed."); + free(hash); + return -EAGAIN; + } + } else { + if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, + IPCP_HASH_LEN(ipcp), qs)) { + /* sanitizer cleans this */ + log_info("Flow_allocation failed."); + free(hash); + return -EAGAIN; + } } free(hash); @@ -1978,7 +2005,18 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IRM_FLOW_ALLOC: result = flow_alloc(msg->pid, msg->dst, msg_to_spec(msg->qosspec), - timeo, &e); + timeo, &e, false); + if (result == 0) { + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; + ret_msg->has_pid = true; + ret_msg->pid = e->n_1_pid; + } + break; + case IRM_MSG_CODE__IRM_FLOW_JOIN: + result = flow_alloc(msg->pid, msg->dst, + msg_to_spec(msg->qosspec), + timeo, &e, true); if (result == 0) { ret_msg->has_flow_id = true; ret_msg->flow_id = e->flow_id; diff --git a/src/lib/dev.c b/src/lib/dev.c index a2ec836f..57dfc3f2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -561,9 +561,10 @@ int flow_accept(qosspec_t * qs, return fd; } -int flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo) +static int __flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo, + bool join) { irm_msg_t msg = IRM_MSG__INIT; qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; @@ -574,7 +575,10 @@ int flow_alloc(const char * dst, if (qs != NULL) qs->ber = 1; #endif - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; + if (join) + msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN; + else + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst = (char *) dst; msg.has_pid = true; msg.pid = ai.pid; @@ -634,6 +638,20 @@ int flow_alloc(const char * dst, return fd; } +int flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) +{ + return __flow_alloc(dst, qs, timeo, false); +} + +int flow_join(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) +{ + return __flow_alloc(dst, qs, timeo, true); +} + int flow_dealloc(int fd) { irm_msg_t msg = IRM_MSG__INIT; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index ae1014ac..1793aee7 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -32,11 +32,12 @@ enum ipcp_msg_code { IPCP_UNREG = 4; IPCP_QUERY = 5; IPCP_FLOW_ALLOC = 6; - IPCP_FLOW_ALLOC_RESP = 7; - IPCP_FLOW_DEALLOC = 8; - IPCP_CONNECT = 9; - IPCP_DISCONNECT = 10; - IPCP_REPLY = 11; + IPCP_FLOW_JOIN = 7; + IPCP_FLOW_ALLOC_RESP = 8; + IPCP_FLOW_DEALLOC = 9; + IPCP_CONNECT = 10; + IPCP_DISCONNECT = 11; + IPCP_REPLY = 12; }; message ipcp_msg { diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 351b4a8e..9b935f57 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -43,10 +43,11 @@ enum irm_msg_code { IRM_UNREG = 15; IRM_FLOW_ALLOC = 16; IRM_FLOW_ACCEPT = 17; - IRM_FLOW_DEALLOC = 18; - IPCP_FLOW_REQ_ARR = 19; - IPCP_FLOW_ALLOC_REPLY = 20; - IRM_REPLY = 21; + IRM_FLOW_JOIN = 18; + IRM_FLOW_DEALLOC = 19; + IPCP_FLOW_REQ_ARR = 20; + IPCP_FLOW_ALLOC_REPLY = 21; + IRM_REPLY = 22; }; message ipcp_info_msg { diff --git a/src/tools/obc/obc.c b/src/tools/obc/obc.c index 747d01d3..e3fba557 100644 --- a/src/tools/obc/obc.c +++ b/src/tools/obc/obc.c @@ -63,9 +63,9 @@ static int reader_main(const char * dst) printf("Starting a reader.\n"); - fd = flow_alloc(dst, NULL, NULL); + fd = flow_join(dst, NULL, NULL); if (fd < 0) { - printf("Failed to allocate multicast flow.\n"); + printf("Failed to join broadcast.\n"); return -1; } @@ -90,9 +90,9 @@ static int writer_main(const char * dst, { int fd = 0; - fd = flow_alloc(dst, NULL, NULL); + fd = flow_join(dst, NULL, NULL); if (fd < 0) { - printf("Failed to allocate multicast flow.\n"); + printf("Failed to join broadcast.\n"); return -1; } -- cgit v1.2.3