summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/ipcp.c36
-rw-r--r--src/irmd/ipcp.h14
-rw-r--r--src/irmd/irm_flow.c4
-rw-r--r--src/irmd/irm_flow.h2
-rw-r--r--src/irmd/main.c99
5 files changed, 119 insertions, 36 deletions
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 85698ec1..78408185 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -441,7 +441,9 @@ static int __ipcp_flow_alloc(pid_t pid,
const uint8_t * dst,
size_t len,
qosspec_t qs,
- bool join)
+ bool join,
+ const void * data,
+ size_t dlen)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
qosspec_msg_t qs_msg;
@@ -450,10 +452,8 @@ static int __ipcp_flow_alloc(pid_t pid,
assert(dst);
- if (join)
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_JOIN;
- else
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
+ msg.code = join ? IPCP_MSG_CODE__IPCP_FLOW_JOIN
+ : IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
msg.has_flow_id = true;
msg.flow_id = flow_id;
msg.has_pid = true;
@@ -463,6 +463,9 @@ static int __ipcp_flow_alloc(pid_t pid,
msg.hash.data = (uint8_t *) dst;
qs_msg = spec_to_msg(&qs);
msg.qosspec = &qs_msg;
+ msg.has_pk = true;
+ msg.pk.data = (uint8_t *) data;
+ msg.pk.len = (uint32_t) dlen;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
@@ -484,9 +487,12 @@ int ipcp_flow_alloc(pid_t pid,
pid_t n_pid,
const uint8_t * dst,
size_t len,
- qosspec_t qs)
+ qosspec_t qs,
+ const void * data,
+ size_t dlen)
{
- return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false);
+ return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false,
+ data, dlen);
}
int ipcp_flow_join(pid_t pid,
@@ -496,13 +502,16 @@ int ipcp_flow_join(pid_t pid,
size_t len,
qosspec_t qs)
{
- return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true);
+ return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true,
+ NULL, 0);
}
-int ipcp_flow_alloc_resp(pid_t pid,
- int flow_id,
- pid_t n_pid,
- int response)
+int ipcp_flow_alloc_resp(pid_t pid,
+ int flow_id,
+ pid_t n_pid,
+ int response,
+ const void * data,
+ size_t len)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
@@ -515,6 +524,9 @@ int ipcp_flow_alloc_resp(pid_t pid,
msg.pid = n_pid;
msg.has_response = true;
msg.response = response;
+ msg.has_pk = true;
+ msg.pk.data = (uint8_t *) data;
+ msg.pk.len = (uint32_t) len;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
index 398255e8..ae00792b 100644
--- a/src/irmd/ipcp.h
+++ b/src/irmd/ipcp.h
@@ -67,7 +67,9 @@ int ipcp_flow_alloc(pid_t pid,
pid_t n_pid,
const uint8_t * dst,
size_t len,
- qosspec_t qs);
+ qosspec_t qs,
+ const void * data,
+ size_t dlen);
int ipcp_flow_join(pid_t pid,
int flow_id,
@@ -76,10 +78,12 @@ int ipcp_flow_join(pid_t pid,
size_t len,
qosspec_t qs);
-int ipcp_flow_alloc_resp(pid_t pid,
- int flow_id,
- pid_t n_pid,
- int response);
+int ipcp_flow_alloc_resp(pid_t pid,
+ int flow_id,
+ pid_t n_pid,
+ int response,
+ const void * data,
+ size_t len);
int ipcp_flow_dealloc(pid_t pid,
int flow_id);
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index 70d2a789..10395a35 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -62,6 +62,8 @@ struct irm_flow * irm_flow_create(pid_t n_pid,
f->n_1_pid = n_1_pid;
f->flow_id = flow_id;
f->qs = qs;
+ f->data = NULL;
+ f->len = 0;
f->n_rb = shm_rbuff_create(n_pid, flow_id);
if (f->n_rb == NULL) {
@@ -119,6 +121,8 @@ void irm_flow_destroy(struct irm_flow * f)
pthread_mutex_lock(&f->state_lock);
+ assert(f->len == 0);
+
if (f->state == FLOW_DESTROY) {
pthread_mutex_unlock(&f->state_lock);
return;
diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h
index 28369e03..051a60a6 100644
--- a/src/irmd/irm_flow.h
+++ b/src/irmd/irm_flow.h
@@ -48,6 +48,8 @@ struct irm_flow {
pid_t n_1_pid;
qosspec_t qs;
+ void * data;
+ size_t len;
struct shm_rbuff * n_rb;
struct shm_rbuff * n_1_rb;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 6b672756..65354382 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -166,6 +166,11 @@ static void clear_irm_flow(struct irm_flow * f) {
assert(f);
+ if (f->len != 0) {
+ free(f->data);
+ f->len = 0;
+ }
+
while ((idx = shm_rbuff_read(f->n_rb)) >= 0)
shm_rdrbuff_remove(irmd.rdrb, idx);
@@ -1161,7 +1166,9 @@ static int proc_announce(pid_t pid,
static int flow_accept(pid_t pid,
struct timespec * timeo,
- struct irm_flow ** fl)
+ struct irm_flow ** fl,
+ const void * data,
+ size_t len)
{
struct irm_flow * f = NULL;
struct proc_entry * e = NULL;
@@ -1228,7 +1235,7 @@ static int flow_accept(pid_t pid,
list_del(&f->next);
bmp_release(irmd.flow_ids, f->flow_id);
pthread_rwlock_unlock(&irmd.flows_lock);
- ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1);
+ ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1, NULL, 0);
clear_irm_flow(f);
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
@@ -1248,7 +1255,7 @@ static int flow_accept(pid_t pid,
list_del(&f->next);
bmp_release(irmd.flow_ids, f->flow_id);
pthread_rwlock_unlock(&irmd.flows_lock);
- ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1);
+ ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1, NULL, 0);
clear_irm_flow(f);
irm_flow_set_state(f, FLOW_NULL);
irm_flow_destroy(f);
@@ -1260,7 +1267,7 @@ static int flow_accept(pid_t pid,
pthread_rwlock_unlock(&irmd.reg_lock);
- if (ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, 0)) {
+ if (ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, 0, data, len)) {
pthread_rwlock_wrlock(&irmd.flows_lock);
list_del(&f->next);
pthread_rwlock_unlock(&irmd.flows_lock);
@@ -1285,7 +1292,9 @@ static int flow_alloc(pid_t pid,
qosspec_t qs,
struct timespec * timeo,
struct irm_flow ** e,
- bool join)
+ bool join,
+ const void * data,
+ size_t len)
{
struct irm_flow * f;
struct ipcp_entry * ipcp;
@@ -1293,10 +1302,8 @@ static int flow_alloc(pid_t pid,
int state;
uint8_t * hash;
- if (join)
- ipcp = get_ipcp_entry_by_layer(dst);
- else
- ipcp = get_ipcp_by_dst_name(dst, pid);
+ ipcp = join ? get_ipcp_entry_by_layer(dst)
+ : get_ipcp_by_dst_name(dst, pid);
if (ipcp == NULL) {
log_info("Destination %s unreachable.", dst);
return -1;
@@ -1341,7 +1348,7 @@ static int flow_alloc(pid_t pid,
}
} else {
if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
- IPCP_HASH_LEN(ipcp), qs)) {
+ IPCP_HASH_LEN(ipcp), qs, data, len)) {
/* sanitizer cleans this */
log_info("Flow_allocation failed.");
free(hash);
@@ -1450,7 +1457,9 @@ static pid_t auto_execute(char ** argv)
static struct irm_flow * flow_req_arr(pid_t pid,
const uint8_t * hash,
- qosspec_t qs)
+ qosspec_t qs,
+ const void * data,
+ size_t len)
{
struct reg_entry * re = NULL;
struct prog_entry * a = NULL;
@@ -1547,6 +1556,7 @@ static struct irm_flow * flow_req_arr(pid_t pid,
pthread_rwlock_unlock(&irmd.reg_lock);
pthread_rwlock_wrlock(&irmd.flows_lock);
+
flow_id = bmp_allocate(irmd.flow_ids);
if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) {
pthread_rwlock_unlock(&irmd.flows_lock);
@@ -1561,6 +1571,21 @@ static struct irm_flow * flow_req_arr(pid_t pid,
return NULL;
}
+ if (len != 0) {
+ assert(data);
+ f->data = malloc(len);
+ if (f->data == NULL) {
+ bmp_release(irmd.flow_ids, flow_id);
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ log_err("Could not piggyback data.");
+ return NULL;
+ }
+
+ f->len = len;
+
+ memcpy(f->data, data, len);
+ }
+
list_add(&f->next, &irmd.irm_flows);
pthread_rwlock_unlock(&irmd.flows_lock);
@@ -1577,6 +1602,8 @@ static struct irm_flow * flow_req_arr(pid_t pid,
list_del(&f->next);
pthread_rwlock_unlock(&irmd.flows_lock);
log_err("Could not get process table entry for %d.", h_pid);
+ free(f->data);
+ f->len = 0;
irm_flow_destroy(f);
return NULL;
}
@@ -1590,8 +1617,10 @@ static struct irm_flow * flow_req_arr(pid_t pid,
return f;
}
-static int flow_alloc_reply(int flow_id,
- int response)
+static int flow_alloc_reply(int flow_id,
+ int response,
+ const void * data,
+ size_t len)
{
struct irm_flow * f;
@@ -1608,6 +1637,14 @@ static int flow_alloc_reply(int flow_id,
else
irm_flow_set_state(f, FLOW_NULL);
+ f->data = malloc(len);
+ if (f->data == NULL) {
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ return -1;
+ }
+ memcpy(f->data, data, len);
+ f->len = len;
+
pthread_rwlock_unlock(&irmd.flows_lock);
return 0;
@@ -1921,7 +1958,7 @@ static void * mainloop(void * o)
if (msg == NULL) {
close(sfd);
- irm_msg__free_unpacked(ret_msg, NULL);
+ irm_msg__free_unpacked(msg, NULL);
continue;
}
@@ -1991,7 +2028,10 @@ static void * mainloop(void * o)
result = name_unreg(msg->pid, msg->name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- result = flow_accept(msg->pid, timeo, &e);
+ assert(msg->pk.len > 0 ? msg->pk.data != NULL
+ : msg->pk.data == NULL);
+ result = flow_accept(msg->pid, timeo, &e,
+ msg->pk.data, msg->pk.len);
if (result == 0) {
qosspec_msg_t qs_msg;
ret_msg->has_flow_id = true;
@@ -2000,23 +2040,35 @@ static void * mainloop(void * o)
ret_msg->pid = e->n_1_pid;
qs_msg = spec_to_msg(&e->qs);
ret_msg->qosspec = &qs_msg;
+ ret_msg->has_pk = true;
+ ret_msg->pk.data = e->data;
+ ret_msg->pk.len = e->len;
+ e->len = 0; /* Data is free'd with ret_msg */
}
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
+ assert(msg->pk.len > 0 ? msg->pk.data != NULL
+ : msg->pk.data == NULL);
result = flow_alloc(msg->pid, msg->dst,
msg_to_spec(msg->qosspec),
- timeo, &e, false);
+ timeo, &e, false, msg->pk.data,
+ msg->pk.len);
if (result == 0) {
ret_msg->has_flow_id = true;
ret_msg->flow_id = e->flow_id;
ret_msg->has_pid = true;
ret_msg->pid = e->n_1_pid;
+ ret_msg->has_pk = true;
+ ret_msg->pk.data = e->data;
+ ret_msg->pk.len = e->len;
+ e->len = 0; /* Data is free'd with ret_msg */
}
break;
case IRM_MSG_CODE__IRM_FLOW_JOIN:
+ assert(msg->pk.len == 0 && msg->pk.data == NULL);
result = flow_alloc(msg->pid, msg->dst,
msg_to_spec(msg->qosspec),
- timeo, &e, true);
+ timeo, &e, true, NULL, 0);
if (result == 0) {
ret_msg->has_flow_id = true;
ret_msg->flow_id = e->flow_id;
@@ -2028,9 +2080,13 @@ static void * mainloop(void * o)
result = flow_dealloc(msg->pid, msg->flow_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
+ assert(msg->pk.len > 0 ? msg->pk.data != NULL
+ : msg->pk.data == NULL);
e = flow_req_arr(msg->pid,
msg->hash.data,
- msg_to_spec(msg->qosspec));
+ msg_to_spec(msg->qosspec),
+ msg->pk.data,
+ msg->pk.len);
result = (e == NULL ? -1 : 0);
if (result == 0) {
ret_msg->has_flow_id = true;
@@ -2040,7 +2096,12 @@ static void * mainloop(void * o)
}
break;
case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:
- result = flow_alloc_reply(msg->flow_id, msg->response);
+ assert(msg->pk.len > 0 ? msg->pk.data != NULL
+ : msg->pk.data == NULL);
+ result = flow_alloc_reply(msg->flow_id,
+ msg->response,
+ msg->pk.data,
+ msg->pk.len);
break;
default:
log_err("Don't know that message code.");