diff options
-rw-r--r-- | include/ouroboros/dev.h | 5 | ||||
-rw-r--r-- | src/ipcpd/broadcast/main.c | 30 | ||||
-rw-r--r-- | src/ipcpd/eth/eth.c | 1 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 33 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 4 | ||||
-rw-r--r-- | src/ipcpd/local/main.c | 1 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 1 | ||||
-rw-r--r-- | src/ipcpd/raptor/main.c | 1 | ||||
-rw-r--r-- | src/ipcpd/udp/main.c | 1 | ||||
-rw-r--r-- | src/irmd/ipcp.c | 38 | ||||
-rw-r--r-- | src/irmd/ipcp.h | 7 | ||||
-rw-r--r-- | src/irmd/main.c | 56 | ||||
-rw-r--r-- | src/lib/dev.c | 26 | ||||
-rw-r--r-- | src/lib/ipcpd_messages.proto | 11 | ||||
-rw-r--r-- | src/lib/irmd_messages.proto | 9 | ||||
-rw-r--r-- | src/tools/obc/obc.c | 8 |
16 files changed, 178 insertions, 54 deletions
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index 7c5ab460..cd87068e 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -40,6 +40,11 @@ int flow_alloc(const char * dst_name, int flow_accept(qosspec_t * qs, const struct timespec * timeo); +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_join(const char * bc, + qosspec_t * qs, + const struct timespec * timeo); + int flow_dealloc(int fd); ssize_t flow_write(int fd, diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c index 8c6bfa71..af39dd34 100644 --- a/src/ipcpd/broadcast/main.c +++ b/src/ipcpd/broadcast/main.c @@ -198,31 +198,18 @@ static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf) return -1; } -static int broadcast_ipcp_query(const uint8_t * dst) +static int name_check(const uint8_t * dst) { uint8_t * buf; size_t len; int ret; - char * multicast_name; - char * suffix = ".mc"; len = hash_len(ipcpi.dir_hash_algo); buf = malloc(len); if (buf == NULL) return -ENOMEM; - multicast_name = malloc(strlen(ipcpi.layer_name) + strlen(suffix) + 1); - if (multicast_name == NULL) { - free(buf); - return -ENOMEM; - } - - strcpy(multicast_name, ipcpi.layer_name); - strcat(multicast_name, suffix); - - str_hash(ipcpi.dir_hash_algo, buf, multicast_name); - - free(multicast_name); + str_hash(ipcpi.dir_hash_algo, buf, ipcpi.layer_name); ret = memcmp(buf, dst, len); @@ -231,9 +218,9 @@ static int broadcast_ipcp_query(const uint8_t * dst) return ret; } -static int broadcast_ipcp_alloc(int fd, - const uint8_t * dst, - qosspec_t qs) +static int broadcast_ipcp_join(int fd, + const uint8_t * dst, + qosspec_t qs) { struct conn conn; @@ -243,7 +230,7 @@ static int broadcast_ipcp_alloc(int fd, conn.flow_info.fd = fd; - if (broadcast_ipcp_query(dst) != 0) + if (name_check(dst) != 0) return -1; notifier_event(NOTIFY_DT_CONN_ADD, &conn); @@ -276,8 +263,9 @@ static struct ipcp_ops broadcast_ops = { .ipcp_disconnect = connmgr_ipcp_disconnect, .ipcp_reg = NULL, .ipcp_unreg = NULL, - .ipcp_query = broadcast_ipcp_query, - .ipcp_flow_alloc = broadcast_ipcp_alloc, + .ipcp_query = NULL, + .ipcp_flow_alloc = NULL, + .ipcp_flow_join = broadcast_ipcp_join, .ipcp_flow_alloc_resp = NULL, .ipcp_flow_dealloc = broadcast_ipcp_dealloc }; diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index f9691626..68f39c5d 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -1777,6 +1777,7 @@ static struct ipcp_ops eth_ops = { .ipcp_unreg = eth_ipcp_unreg, .ipcp_query = eth_ipcp_query, .ipcp_flow_alloc = eth_ipcp_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = eth_ipcp_flow_alloc_resp, .ipcp_flow_dealloc = eth_ipcp_flow_dealloc }; diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 6376bedb..dced6f64 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -428,6 +428,39 @@ static void * mainloop(void * o) msg->hash.data, qs); break; + case IPCP_MSG_CODE__IPCP_FLOW_JOIN: + ret_msg.has_result = true; + + if (ipcpi.ops->ipcp_flow_join == NULL) { + log_err("Broadcast unsupported."); + ret_msg.result = -ENOTSUP; + break; + } + + assert(msg->hash.len == ipcp_dir_hash_len()); + + if (ipcp_get_state() != IPCP_OPERATIONAL) { + log_err("IPCP in wrong state."); + ret_msg.result = -EIPCPSTATE; + break; + } + + qs = msg_to_spec(msg->qosspec); + fd = np1_flow_alloc(msg->pid, + msg->flow_id, + qs); + if (fd < 0) { + log_err("Failed allocating fd on flow_id %d.", + msg->flow_id); + ret_msg.result = -1; + break; + } + + ret_msg.result = + ipcpi.ops->ipcp_flow_join(fd, + msg->hash.data, + qs); + break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: ret_msg.has_result = true; if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index fabd35fe..b6e79413 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -62,6 +62,10 @@ struct ipcp_ops { const uint8_t * dst, qosspec_t qs); + int (* ipcp_flow_join)(int fd, + const uint8_t * dst, + qosspec_t qs); + int (* ipcp_flow_alloc_resp)(int fd, int response); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index ab43f1f8..88cf2352 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -325,6 +325,7 @@ static struct ipcp_ops local_ops = { .ipcp_unreg = ipcp_local_unreg, .ipcp_query = ipcp_local_query, .ipcp_flow_alloc = ipcp_local_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, .ipcp_flow_dealloc = ipcp_local_flow_dealloc }; diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 3f05f421..5e013eb8 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -295,6 +295,7 @@ static struct ipcp_ops normal_ops = { .ipcp_unreg = dir_unreg, .ipcp_query = normal_ipcp_query, .ipcp_flow_alloc = fa_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = fa_alloc_resp, .ipcp_flow_dealloc = fa_dealloc }; diff --git a/src/ipcpd/raptor/main.c b/src/ipcpd/raptor/main.c index 8f578611..d3c9040e 100644 --- a/src/ipcpd/raptor/main.c +++ b/src/ipcpd/raptor/main.c @@ -1055,6 +1055,7 @@ static struct ipcp_ops raptor_ops = { .ipcp_unreg = raptor_unreg, .ipcp_query = raptor_query, .ipcp_flow_alloc = raptor_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = raptor_flow_alloc_resp, .ipcp_flow_dealloc = raptor_flow_dealloc }; diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c index a1af1e85..31e6166b 100644 --- a/src/ipcpd/udp/main.c +++ b/src/ipcpd/udp/main.c @@ -1183,6 +1183,7 @@ static struct ipcp_ops udp_ops = { .ipcp_unreg = ipcp_udp_unreg, .ipcp_query = ipcp_udp_query, .ipcp_flow_alloc = ipcp_udp_flow_alloc, + .ipcp_flow_join = NULL, .ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp, .ipcp_flow_dealloc = ipcp_udp_flow_dealloc }; diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 7f3f4807..08547d01 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -435,12 +435,13 @@ int ipcp_query(pid_t pid, return ret; } -int ipcp_flow_alloc(pid_t pid, - int flow_id, - pid_t n_pid, - const uint8_t * dst, - size_t len, - qosspec_t qs) +static int __ipcp_flow_alloc(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs, + bool join) { ipcp_msg_t msg = IPCP_MSG__INIT; qosspec_msg_t qs_msg; @@ -449,7 +450,10 @@ int ipcp_flow_alloc(pid_t pid, assert(dst); - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + if (join) + msg.code = IPCP_MSG_CODE__IPCP_FLOW_JOIN; + else + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; msg.has_flow_id = true; msg.flow_id = flow_id; msg.has_pid = true; @@ -475,6 +479,26 @@ int ipcp_flow_alloc(pid_t pid, return ret; } +int ipcp_flow_alloc(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs) +{ + return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false); +} + +int ipcp_flow_join(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs) +{ + return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true); +} + int ipcp_flow_alloc_resp(pid_t pid, int flow_id, pid_t n_pid, diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 07b9c44a..611bada2 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -69,6 +69,13 @@ int ipcp_flow_alloc(pid_t pid, size_t len, qosspec_t qs); +int ipcp_flow_join(pid_t pid, + int flow_id, + pid_t n_pid, + const uint8_t * dst, + size_t len, + qosspec_t qs); + int ipcp_flow_alloc_resp(pid_t pid, int flow_id, pid_t n_pid, diff --git a/src/irmd/main.c b/src/irmd/main.c index 67e16de0..802b01f0 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -330,6 +330,19 @@ static struct ipcp_entry * get_ipcp_entry_by_name(const char * name) return NULL; } +static struct ipcp_entry * get_ipcp_entry_by_layer(const char * layer) +{ + struct list_head * p; + + list_for_each(p, &irmd.ipcps) { + struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next); + if (strcmp(layer, e->layer) == 0) + return e; + } + + return NULL; +} + static struct ipcp_entry * get_ipcp_by_dst_name(const char * name, pid_t src) { @@ -1267,7 +1280,8 @@ static int flow_alloc(pid_t pid, const char * dst, qosspec_t qs, struct timespec * timeo, - struct irm_flow ** e) + struct irm_flow ** e, + bool join) { struct irm_flow * f; struct ipcp_entry * ipcp; @@ -1275,7 +1289,10 @@ static int flow_alloc(pid_t pid, int state; uint8_t * hash; - ipcp = get_ipcp_by_dst_name(dst, pid); + if (join) + ipcp = get_ipcp_entry_by_layer(dst); + else + ipcp = get_ipcp_by_dst_name(dst, pid); if (ipcp == NULL) { log_info("Destination %s unreachable.", dst); return -1; @@ -1310,12 +1327,22 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); - if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, - IPCP_HASH_LEN(ipcp), qs)) { - /* sanitizer cleans this */ - log_info("Flow_allocation failed."); - free(hash); - return -EAGAIN; + if (join) { + if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash, + IPCP_HASH_LEN(ipcp), qs)) { + /* sanitizer cleans this */ + log_info("Flow_join failed."); + free(hash); + return -EAGAIN; + } + } else { + if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, + IPCP_HASH_LEN(ipcp), qs)) { + /* sanitizer cleans this */ + log_info("Flow_allocation failed."); + free(hash); + return -EAGAIN; + } } free(hash); @@ -1978,7 +2005,18 @@ static void * mainloop(void * o) case IRM_MSG_CODE__IRM_FLOW_ALLOC: result = flow_alloc(msg->pid, msg->dst, msg_to_spec(msg->qosspec), - timeo, &e); + timeo, &e, false); + if (result == 0) { + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; + ret_msg->has_pid = true; + ret_msg->pid = e->n_1_pid; + } + break; + case IRM_MSG_CODE__IRM_FLOW_JOIN: + result = flow_alloc(msg->pid, msg->dst, + msg_to_spec(msg->qosspec), + timeo, &e, true); if (result == 0) { ret_msg->has_flow_id = true; ret_msg->flow_id = e->flow_id; diff --git a/src/lib/dev.c b/src/lib/dev.c index a2ec836f..57dfc3f2 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -561,9 +561,10 @@ int flow_accept(qosspec_t * qs, return fd; } -int flow_alloc(const char * dst, - qosspec_t * qs, - const struct timespec * timeo) +static int __flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo, + bool join) { irm_msg_t msg = IRM_MSG__INIT; qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT; @@ -574,7 +575,10 @@ int flow_alloc(const char * dst, if (qs != NULL) qs->ber = 1; #endif - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; + if (join) + msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN; + else + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst = (char *) dst; msg.has_pid = true; msg.pid = ai.pid; @@ -634,6 +638,20 @@ int flow_alloc(const char * dst, return fd; } +int flow_alloc(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) +{ + return __flow_alloc(dst, qs, timeo, false); +} + +int flow_join(const char * dst, + qosspec_t * qs, + const struct timespec * timeo) +{ + return __flow_alloc(dst, qs, timeo, true); +} + int flow_dealloc(int fd) { irm_msg_t msg = IRM_MSG__INIT; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index ae1014ac..1793aee7 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -32,11 +32,12 @@ enum ipcp_msg_code { IPCP_UNREG = 4; IPCP_QUERY = 5; IPCP_FLOW_ALLOC = 6; - IPCP_FLOW_ALLOC_RESP = 7; - IPCP_FLOW_DEALLOC = 8; - IPCP_CONNECT = 9; - IPCP_DISCONNECT = 10; - IPCP_REPLY = 11; + IPCP_FLOW_JOIN = 7; + IPCP_FLOW_ALLOC_RESP = 8; + IPCP_FLOW_DEALLOC = 9; + IPCP_CONNECT = 10; + IPCP_DISCONNECT = 11; + IPCP_REPLY = 12; }; message ipcp_msg { diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 351b4a8e..9b935f57 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -43,10 +43,11 @@ enum irm_msg_code { IRM_UNREG = 15; IRM_FLOW_ALLOC = 16; IRM_FLOW_ACCEPT = 17; - IRM_FLOW_DEALLOC = 18; - IPCP_FLOW_REQ_ARR = 19; - IPCP_FLOW_ALLOC_REPLY = 20; - IRM_REPLY = 21; + IRM_FLOW_JOIN = 18; + IRM_FLOW_DEALLOC = 19; + IPCP_FLOW_REQ_ARR = 20; + IPCP_FLOW_ALLOC_REPLY = 21; + IRM_REPLY = 22; }; message ipcp_info_msg { diff --git a/src/tools/obc/obc.c b/src/tools/obc/obc.c index 747d01d3..e3fba557 100644 --- a/src/tools/obc/obc.c +++ b/src/tools/obc/obc.c @@ -63,9 +63,9 @@ static int reader_main(const char * dst) printf("Starting a reader.\n"); - fd = flow_alloc(dst, NULL, NULL); + fd = flow_join(dst, NULL, NULL); if (fd < 0) { - printf("Failed to allocate multicast flow.\n"); + printf("Failed to join broadcast.\n"); return -1; } @@ -90,9 +90,9 @@ static int writer_main(const char * dst, { int fd = 0; - fd = flow_alloc(dst, NULL, NULL); + fd = flow_join(dst, NULL, NULL); if (fd < 0) { - printf("Failed to allocate multicast flow.\n"); + printf("Failed to join broadcast.\n"); return -1; } |