summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/dev.h5
-rw-r--r--src/ipcpd/broadcast/main.c30
-rw-r--r--src/ipcpd/eth/eth.c1
-rw-r--r--src/ipcpd/ipcp.c33
-rw-r--r--src/ipcpd/ipcp.h4
-rw-r--r--src/ipcpd/local/main.c1
-rw-r--r--src/ipcpd/normal/main.c1
-rw-r--r--src/ipcpd/raptor/main.c1
-rw-r--r--src/ipcpd/udp/main.c1
-rw-r--r--src/irmd/ipcp.c38
-rw-r--r--src/irmd/ipcp.h7
-rw-r--r--src/irmd/main.c56
-rw-r--r--src/lib/dev.c26
-rw-r--r--src/lib/ipcpd_messages.proto11
-rw-r--r--src/lib/irmd_messages.proto9
-rw-r--r--src/tools/obc/obc.c8
16 files changed, 178 insertions, 54 deletions
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index 7c5ab460..cd87068e 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -40,6 +40,11 @@ int flow_alloc(const char * dst_name,
int flow_accept(qosspec_t * qs,
const struct timespec * timeo);
+/* Returns flow descriptor, qs updates to supplied QoS. */
+int flow_join(const char * bc,
+ qosspec_t * qs,
+ const struct timespec * timeo);
+
int flow_dealloc(int fd);
ssize_t flow_write(int fd,
diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c
index 8c6bfa71..af39dd34 100644
--- a/src/ipcpd/broadcast/main.c
+++ b/src/ipcpd/broadcast/main.c
@@ -198,31 +198,18 @@ static int broadcast_ipcp_bootstrap(const struct ipcp_config * conf)
return -1;
}
-static int broadcast_ipcp_query(const uint8_t * dst)
+static int name_check(const uint8_t * dst)
{
uint8_t * buf;
size_t len;
int ret;
- char * multicast_name;
- char * suffix = ".mc";
len = hash_len(ipcpi.dir_hash_algo);
buf = malloc(len);
if (buf == NULL)
return -ENOMEM;
- multicast_name = malloc(strlen(ipcpi.layer_name) + strlen(suffix) + 1);
- if (multicast_name == NULL) {
- free(buf);
- return -ENOMEM;
- }
-
- strcpy(multicast_name, ipcpi.layer_name);
- strcat(multicast_name, suffix);
-
- str_hash(ipcpi.dir_hash_algo, buf, multicast_name);
-
- free(multicast_name);
+ str_hash(ipcpi.dir_hash_algo, buf, ipcpi.layer_name);
ret = memcmp(buf, dst, len);
@@ -231,9 +218,9 @@ static int broadcast_ipcp_query(const uint8_t * dst)
return ret;
}
-static int broadcast_ipcp_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs)
+static int broadcast_ipcp_join(int fd,
+ const uint8_t * dst,
+ qosspec_t qs)
{
struct conn conn;
@@ -243,7 +230,7 @@ static int broadcast_ipcp_alloc(int fd,
conn.flow_info.fd = fd;
- if (broadcast_ipcp_query(dst) != 0)
+ if (name_check(dst) != 0)
return -1;
notifier_event(NOTIFY_DT_CONN_ADD, &conn);
@@ -276,8 +263,9 @@ static struct ipcp_ops broadcast_ops = {
.ipcp_disconnect = connmgr_ipcp_disconnect,
.ipcp_reg = NULL,
.ipcp_unreg = NULL,
- .ipcp_query = broadcast_ipcp_query,
- .ipcp_flow_alloc = broadcast_ipcp_alloc,
+ .ipcp_query = NULL,
+ .ipcp_flow_alloc = NULL,
+ .ipcp_flow_join = broadcast_ipcp_join,
.ipcp_flow_alloc_resp = NULL,
.ipcp_flow_dealloc = broadcast_ipcp_dealloc
};
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index f9691626..68f39c5d 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -1777,6 +1777,7 @@ static struct ipcp_ops eth_ops = {
.ipcp_unreg = eth_ipcp_unreg,
.ipcp_query = eth_ipcp_query,
.ipcp_flow_alloc = eth_ipcp_flow_alloc,
+ .ipcp_flow_join = NULL,
.ipcp_flow_alloc_resp = eth_ipcp_flow_alloc_resp,
.ipcp_flow_dealloc = eth_ipcp_flow_dealloc
};
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 6376bedb..dced6f64 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -428,6 +428,39 @@ static void * mainloop(void * o)
msg->hash.data,
qs);
break;
+ case IPCP_MSG_CODE__IPCP_FLOW_JOIN:
+ ret_msg.has_result = true;
+
+ if (ipcpi.ops->ipcp_flow_join == NULL) {
+ log_err("Broadcast unsupported.");
+ ret_msg.result = -ENOTSUP;
+ break;
+ }
+
+ assert(msg->hash.len == ipcp_dir_hash_len());
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_err("IPCP in wrong state.");
+ ret_msg.result = -EIPCPSTATE;
+ break;
+ }
+
+ qs = msg_to_spec(msg->qosspec);
+ fd = np1_flow_alloc(msg->pid,
+ msg->flow_id,
+ qs);
+ if (fd < 0) {
+ log_err("Failed allocating fd on flow_id %d.",
+ msg->flow_id);
+ ret_msg.result = -1;
+ break;
+ }
+
+ ret_msg.result =
+ ipcpi.ops->ipcp_flow_join(fd,
+ msg->hash.data,
+ qs);
+ break;
case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP:
ret_msg.has_result = true;
if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) {
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index fabd35fe..b6e79413 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -62,6 +62,10 @@ struct ipcp_ops {
const uint8_t * dst,
qosspec_t qs);
+ int (* ipcp_flow_join)(int fd,
+ const uint8_t * dst,
+ qosspec_t qs);
+
int (* ipcp_flow_alloc_resp)(int fd,
int response);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index ab43f1f8..88cf2352 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -325,6 +325,7 @@ static struct ipcp_ops local_ops = {
.ipcp_unreg = ipcp_local_unreg,
.ipcp_query = ipcp_local_query,
.ipcp_flow_alloc = ipcp_local_flow_alloc,
+ .ipcp_flow_join = NULL,
.ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp,
.ipcp_flow_dealloc = ipcp_local_flow_dealloc
};
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 3f05f421..5e013eb8 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -295,6 +295,7 @@ static struct ipcp_ops normal_ops = {
.ipcp_unreg = dir_unreg,
.ipcp_query = normal_ipcp_query,
.ipcp_flow_alloc = fa_alloc,
+ .ipcp_flow_join = NULL,
.ipcp_flow_alloc_resp = fa_alloc_resp,
.ipcp_flow_dealloc = fa_dealloc
};
diff --git a/src/ipcpd/raptor/main.c b/src/ipcpd/raptor/main.c
index 8f578611..d3c9040e 100644
--- a/src/ipcpd/raptor/main.c
+++ b/src/ipcpd/raptor/main.c
@@ -1055,6 +1055,7 @@ static struct ipcp_ops raptor_ops = {
.ipcp_unreg = raptor_unreg,
.ipcp_query = raptor_query,
.ipcp_flow_alloc = raptor_flow_alloc,
+ .ipcp_flow_join = NULL,
.ipcp_flow_alloc_resp = raptor_flow_alloc_resp,
.ipcp_flow_dealloc = raptor_flow_dealloc
};
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index a1af1e85..31e6166b 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -1183,6 +1183,7 @@ static struct ipcp_ops udp_ops = {
.ipcp_unreg = ipcp_udp_unreg,
.ipcp_query = ipcp_udp_query,
.ipcp_flow_alloc = ipcp_udp_flow_alloc,
+ .ipcp_flow_join = NULL,
.ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp,
.ipcp_flow_dealloc = ipcp_udp_flow_dealloc
};
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 7f3f4807..08547d01 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -435,12 +435,13 @@ int ipcp_query(pid_t pid,
return ret;
}
-int ipcp_flow_alloc(pid_t pid,
- int flow_id,
- pid_t n_pid,
- const uint8_t * dst,
- size_t len,
- qosspec_t qs)
+static int __ipcp_flow_alloc(pid_t pid,
+ int flow_id,
+ pid_t n_pid,
+ const uint8_t * dst,
+ size_t len,
+ qosspec_t qs,
+ bool join)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
qosspec_msg_t qs_msg;
@@ -449,7 +450,10 @@ int ipcp_flow_alloc(pid_t pid,
assert(dst);
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
+ if (join)
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_JOIN;
+ else
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
msg.has_flow_id = true;
msg.flow_id = flow_id;
msg.has_pid = true;
@@ -475,6 +479,26 @@ int ipcp_flow_alloc(pid_t pid,
return ret;
}
+int ipcp_flow_alloc(pid_t pid,
+ int flow_id,
+ pid_t n_pid,
+ const uint8_t * dst,
+ size_t len,
+ qosspec_t qs)
+{
+ return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false);
+}
+
+int ipcp_flow_join(pid_t pid,
+ int flow_id,
+ pid_t n_pid,
+ const uint8_t * dst,
+ size_t len,
+ qosspec_t qs)
+{
+ return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true);
+}
+
int ipcp_flow_alloc_resp(pid_t pid,
int flow_id,
pid_t n_pid,
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
index 07b9c44a..611bada2 100644
--- a/src/irmd/ipcp.h
+++ b/src/irmd/ipcp.h
@@ -69,6 +69,13 @@ int ipcp_flow_alloc(pid_t pid,
size_t len,
qosspec_t qs);
+int ipcp_flow_join(pid_t pid,
+ int flow_id,
+ pid_t n_pid,
+ const uint8_t * dst,
+ size_t len,
+ qosspec_t qs);
+
int ipcp_flow_alloc_resp(pid_t pid,
int flow_id,
pid_t n_pid,
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 67e16de0..802b01f0 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -330,6 +330,19 @@ static struct ipcp_entry * get_ipcp_entry_by_name(const char * name)
return NULL;
}
+static struct ipcp_entry * get_ipcp_entry_by_layer(const char * layer)
+{
+ struct list_head * p;
+
+ list_for_each(p, &irmd.ipcps) {
+ struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next);
+ if (strcmp(layer, e->layer) == 0)
+ return e;
+ }
+
+ return NULL;
+}
+
static struct ipcp_entry * get_ipcp_by_dst_name(const char * name,
pid_t src)
{
@@ -1267,7 +1280,8 @@ static int flow_alloc(pid_t pid,
const char * dst,
qosspec_t qs,
struct timespec * timeo,
- struct irm_flow ** e)
+ struct irm_flow ** e,
+ bool join)
{
struct irm_flow * f;
struct ipcp_entry * ipcp;
@@ -1275,7 +1289,10 @@ static int flow_alloc(pid_t pid,
int state;
uint8_t * hash;
- ipcp = get_ipcp_by_dst_name(dst, pid);
+ if (join)
+ ipcp = get_ipcp_entry_by_layer(dst);
+ else
+ ipcp = get_ipcp_by_dst_name(dst, pid);
if (ipcp == NULL) {
log_info("Destination %s unreachable.", dst);
return -1;
@@ -1310,12 +1327,22 @@ static int flow_alloc(pid_t pid,
str_hash(ipcp->dir_hash_algo, hash, dst);
- if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
- IPCP_HASH_LEN(ipcp), qs)) {
- /* sanitizer cleans this */
- log_info("Flow_allocation failed.");
- free(hash);
- return -EAGAIN;
+ if (join) {
+ if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash,
+ IPCP_HASH_LEN(ipcp), qs)) {
+ /* sanitizer cleans this */
+ log_info("Flow_join failed.");
+ free(hash);
+ return -EAGAIN;
+ }
+ } else {
+ if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
+ IPCP_HASH_LEN(ipcp), qs)) {
+ /* sanitizer cleans this */
+ log_info("Flow_allocation failed.");
+ free(hash);
+ return -EAGAIN;
+ }
}
free(hash);
@@ -1978,7 +2005,18 @@ static void * mainloop(void * o)
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
result = flow_alloc(msg->pid, msg->dst,
msg_to_spec(msg->qosspec),
- timeo, &e);
+ timeo, &e, false);
+ if (result == 0) {
+ ret_msg->has_flow_id = true;
+ ret_msg->flow_id = e->flow_id;
+ ret_msg->has_pid = true;
+ ret_msg->pid = e->n_1_pid;
+ }
+ break;
+ case IRM_MSG_CODE__IRM_FLOW_JOIN:
+ result = flow_alloc(msg->pid, msg->dst,
+ msg_to_spec(msg->qosspec),
+ timeo, &e, true);
if (result == 0) {
ret_msg->has_flow_id = true;
ret_msg->flow_id = e->flow_id;
diff --git a/src/lib/dev.c b/src/lib/dev.c
index a2ec836f..57dfc3f2 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -561,9 +561,10 @@ int flow_accept(qosspec_t * qs,
return fd;
}
-int flow_alloc(const char * dst,
- qosspec_t * qs,
- const struct timespec * timeo)
+static int __flow_alloc(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo,
+ bool join)
{
irm_msg_t msg = IRM_MSG__INIT;
qosspec_msg_t qs_msg = QOSSPEC_MSG__INIT;
@@ -574,7 +575,10 @@ int flow_alloc(const char * dst,
if (qs != NULL)
qs->ber = 1;
#endif
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
+ if (join)
+ msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN;
+ else
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst = (char *) dst;
msg.has_pid = true;
msg.pid = ai.pid;
@@ -634,6 +638,20 @@ int flow_alloc(const char * dst,
return fd;
}
+int flow_alloc(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
+{
+ return __flow_alloc(dst, qs, timeo, false);
+}
+
+int flow_join(const char * dst,
+ qosspec_t * qs,
+ const struct timespec * timeo)
+{
+ return __flow_alloc(dst, qs, timeo, true);
+}
+
int flow_dealloc(int fd)
{
irm_msg_t msg = IRM_MSG__INIT;
diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto
index ae1014ac..1793aee7 100644
--- a/src/lib/ipcpd_messages.proto
+++ b/src/lib/ipcpd_messages.proto
@@ -32,11 +32,12 @@ enum ipcp_msg_code {
IPCP_UNREG = 4;
IPCP_QUERY = 5;
IPCP_FLOW_ALLOC = 6;
- IPCP_FLOW_ALLOC_RESP = 7;
- IPCP_FLOW_DEALLOC = 8;
- IPCP_CONNECT = 9;
- IPCP_DISCONNECT = 10;
- IPCP_REPLY = 11;
+ IPCP_FLOW_JOIN = 7;
+ IPCP_FLOW_ALLOC_RESP = 8;
+ IPCP_FLOW_DEALLOC = 9;
+ IPCP_CONNECT = 10;
+ IPCP_DISCONNECT = 11;
+ IPCP_REPLY = 12;
};
message ipcp_msg {
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index 351b4a8e..9b935f57 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -43,10 +43,11 @@ enum irm_msg_code {
IRM_UNREG = 15;
IRM_FLOW_ALLOC = 16;
IRM_FLOW_ACCEPT = 17;
- IRM_FLOW_DEALLOC = 18;
- IPCP_FLOW_REQ_ARR = 19;
- IPCP_FLOW_ALLOC_REPLY = 20;
- IRM_REPLY = 21;
+ IRM_FLOW_JOIN = 18;
+ IRM_FLOW_DEALLOC = 19;
+ IPCP_FLOW_REQ_ARR = 20;
+ IPCP_FLOW_ALLOC_REPLY = 21;
+ IRM_REPLY = 22;
};
message ipcp_info_msg {
diff --git a/src/tools/obc/obc.c b/src/tools/obc/obc.c
index 747d01d3..e3fba557 100644
--- a/src/tools/obc/obc.c
+++ b/src/tools/obc/obc.c
@@ -63,9 +63,9 @@ static int reader_main(const char * dst)
printf("Starting a reader.\n");
- fd = flow_alloc(dst, NULL, NULL);
+ fd = flow_join(dst, NULL, NULL);
if (fd < 0) {
- printf("Failed to allocate multicast flow.\n");
+ printf("Failed to join broadcast.\n");
return -1;
}
@@ -90,9 +90,9 @@ static int writer_main(const char * dst,
{
int fd = 0;
- fd = flow_alloc(dst, NULL, NULL);
+ fd = flow_join(dst, NULL, NULL);
if (fd < 0) {
- printf("Failed to allocate multicast flow.\n");
+ printf("Failed to join broadcast.\n");
return -1;
}