diff options
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/ipcp.c | 36 | ||||
-rw-r--r-- | src/irmd/ipcp.h | 14 | ||||
-rw-r--r-- | src/irmd/irm_flow.c | 4 | ||||
-rw-r--r-- | src/irmd/irm_flow.h | 2 | ||||
-rw-r--r-- | src/irmd/main.c | 99 |
5 files changed, 119 insertions, 36 deletions
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 85698ec1..78408185 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -441,7 +441,9 @@ static int __ipcp_flow_alloc(pid_t pid, const uint8_t * dst, size_t len, qosspec_t qs, - bool join) + bool join, + const void * data, + size_t dlen) { ipcp_msg_t msg = IPCP_MSG__INIT; qosspec_msg_t qs_msg; @@ -450,10 +452,8 @@ static int __ipcp_flow_alloc(pid_t pid, assert(dst); - if (join) - msg.code = IPCP_MSG_CODE__IPCP_FLOW_JOIN; - else - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + msg.code = join ? IPCP_MSG_CODE__IPCP_FLOW_JOIN + : IPCP_MSG_CODE__IPCP_FLOW_ALLOC; msg.has_flow_id = true; msg.flow_id = flow_id; msg.has_pid = true; @@ -463,6 +463,9 @@ static int __ipcp_flow_alloc(pid_t pid, msg.hash.data = (uint8_t *) dst; qs_msg = spec_to_msg(&qs); msg.qosspec = &qs_msg; + msg.has_pk = true; + msg.pk.data = (uint8_t *) data; + msg.pk.len = (uint32_t) dlen; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -484,9 +487,12 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qosspec_t qs) + qosspec_t qs, + const void * data, + size_t dlen) { - return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false); + return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, false, + data, dlen); } int ipcp_flow_join(pid_t pid, @@ -496,13 +502,16 @@ int ipcp_flow_join(pid_t pid, size_t len, qosspec_t qs) { - return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true); + return __ipcp_flow_alloc(pid, flow_id, n_pid, dst, len, qs, true, + NULL, 0); } -int ipcp_flow_alloc_resp(pid_t pid, - int flow_id, - pid_t n_pid, - int response) +int ipcp_flow_alloc_resp(pid_t pid, + int flow_id, + pid_t n_pid, + int response, + const void * data, + size_t len) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; @@ -515,6 +524,9 @@ int ipcp_flow_alloc_resp(pid_t pid, msg.pid = n_pid; msg.has_response = true; msg.response = response; + msg.has_pk = true; + msg.pk.data = (uint8_t *) data; + msg.pk.len = (uint32_t) len; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 398255e8..ae00792b 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -67,7 +67,9 @@ int ipcp_flow_alloc(pid_t pid, pid_t n_pid, const uint8_t * dst, size_t len, - qosspec_t qs); + qosspec_t qs, + const void * data, + size_t dlen); int ipcp_flow_join(pid_t pid, int flow_id, @@ -76,10 +78,12 @@ int ipcp_flow_join(pid_t pid, size_t len, qosspec_t qs); -int ipcp_flow_alloc_resp(pid_t pid, - int flow_id, - pid_t n_pid, - int response); +int ipcp_flow_alloc_resp(pid_t pid, + int flow_id, + pid_t n_pid, + int response, + const void * data, + size_t len); int ipcp_flow_dealloc(pid_t pid, int flow_id); diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 70d2a789..10395a35 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -62,6 +62,8 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_1_pid = n_1_pid; f->flow_id = flow_id; f->qs = qs; + f->data = NULL; + f->len = 0; f->n_rb = shm_rbuff_create(n_pid, flow_id); if (f->n_rb == NULL) { @@ -119,6 +121,8 @@ void irm_flow_destroy(struct irm_flow * f) pthread_mutex_lock(&f->state_lock); + assert(f->len == 0); + if (f->state == FLOW_DESTROY) { pthread_mutex_unlock(&f->state_lock); return; diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 28369e03..051a60a6 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -48,6 +48,8 @@ struct irm_flow { pid_t n_1_pid; qosspec_t qs; + void * data; + size_t len; struct shm_rbuff * n_rb; struct shm_rbuff * n_1_rb; diff --git a/src/irmd/main.c b/src/irmd/main.c index 6b672756..65354382 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -166,6 +166,11 @@ static void clear_irm_flow(struct irm_flow * f) { assert(f); + if (f->len != 0) { + free(f->data); + f->len = 0; + } + while ((idx = shm_rbuff_read(f->n_rb)) >= 0) shm_rdrbuff_remove(irmd.rdrb, idx); @@ -1161,7 +1166,9 @@ static int proc_announce(pid_t pid, static int flow_accept(pid_t pid, struct timespec * timeo, - struct irm_flow ** fl) + struct irm_flow ** fl, + const void * data, + size_t len) { struct irm_flow * f = NULL; struct proc_entry * e = NULL; @@ -1228,7 +1235,7 @@ static int flow_accept(pid_t pid, list_del(&f->next); bmp_release(irmd.flow_ids, f->flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1); + ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1, NULL, 0); clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); @@ -1248,7 +1255,7 @@ static int flow_accept(pid_t pid, list_del(&f->next); bmp_release(irmd.flow_ids, f->flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1); + ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1, NULL, 0); clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); @@ -1260,7 +1267,7 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); - if (ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, 0)) { + if (ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, 0, data, len)) { pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1285,7 +1292,9 @@ static int flow_alloc(pid_t pid, qosspec_t qs, struct timespec * timeo, struct irm_flow ** e, - bool join) + bool join, + const void * data, + size_t len) { struct irm_flow * f; struct ipcp_entry * ipcp; @@ -1293,10 +1302,8 @@ static int flow_alloc(pid_t pid, int state; uint8_t * hash; - if (join) - ipcp = get_ipcp_entry_by_layer(dst); - else - ipcp = get_ipcp_by_dst_name(dst, pid); + ipcp = join ? get_ipcp_entry_by_layer(dst) + : get_ipcp_by_dst_name(dst, pid); if (ipcp == NULL) { log_info("Destination %s unreachable.", dst); return -1; @@ -1341,7 +1348,7 @@ static int flow_alloc(pid_t pid, } } else { if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, - IPCP_HASH_LEN(ipcp), qs)) { + IPCP_HASH_LEN(ipcp), qs, data, len)) { /* sanitizer cleans this */ log_info("Flow_allocation failed."); free(hash); @@ -1450,7 +1457,9 @@ static pid_t auto_execute(char ** argv) static struct irm_flow * flow_req_arr(pid_t pid, const uint8_t * hash, - qosspec_t qs) + qosspec_t qs, + const void * data, + size_t len) { struct reg_entry * re = NULL; struct prog_entry * a = NULL; @@ -1547,6 +1556,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); + flow_id = bmp_allocate(irmd.flow_ids); if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { pthread_rwlock_unlock(&irmd.flows_lock); @@ -1561,6 +1571,21 @@ static struct irm_flow * flow_req_arr(pid_t pid, return NULL; } + if (len != 0) { + assert(data); + f->data = malloc(len); + if (f->data == NULL) { + bmp_release(irmd.flow_ids, flow_id); + pthread_rwlock_unlock(&irmd.flows_lock); + log_err("Could not piggyback data."); + return NULL; + } + + f->len = len; + + memcpy(f->data, data, len); + } + list_add(&f->next, &irmd.irm_flows); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1577,6 +1602,8 @@ static struct irm_flow * flow_req_arr(pid_t pid, list_del(&f->next); pthread_rwlock_unlock(&irmd.flows_lock); log_err("Could not get process table entry for %d.", h_pid); + free(f->data); + f->len = 0; irm_flow_destroy(f); return NULL; } @@ -1590,8 +1617,10 @@ static struct irm_flow * flow_req_arr(pid_t pid, return f; } -static int flow_alloc_reply(int flow_id, - int response) +static int flow_alloc_reply(int flow_id, + int response, + const void * data, + size_t len) { struct irm_flow * f; @@ -1608,6 +1637,14 @@ static int flow_alloc_reply(int flow_id, else irm_flow_set_state(f, FLOW_NULL); + f->data = malloc(len); + if (f->data == NULL) { + pthread_rwlock_unlock(&irmd.flows_lock); + return -1; + } + memcpy(f->data, data, len); + f->len = len; + pthread_rwlock_unlock(&irmd.flows_lock); return 0; @@ -1921,7 +1958,7 @@ static void * mainloop(void * o) if (msg == NULL) { close(sfd); - irm_msg__free_unpacked(ret_msg, NULL); + irm_msg__free_unpacked(msg, NULL); continue; } @@ -1991,7 +2028,10 @@ static void * mainloop(void * o) result = name_unreg(msg->pid, msg->name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - result = flow_accept(msg->pid, timeo, &e); + assert(msg->pk.len > 0 ? msg->pk.data != NULL + : msg->pk.data == NULL); + result = flow_accept(msg->pid, timeo, &e, + msg->pk.data, msg->pk.len); if (result == 0) { qosspec_msg_t qs_msg; ret_msg->has_flow_id = true; @@ -2000,23 +2040,35 @@ static void * mainloop(void * o) ret_msg->pid = e->n_1_pid; qs_msg = spec_to_msg(&e->qs); ret_msg->qosspec = &qs_msg; + ret_msg->has_pk = true; + ret_msg->pk.data = e->data; + ret_msg->pk.len = e->len; + e->len = 0; /* Data is free'd with ret_msg */ } break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: + assert(msg->pk.len > 0 ? msg->pk.data != NULL + : msg->pk.data == NULL); result = flow_alloc(msg->pid, msg->dst, msg_to_spec(msg->qosspec), - timeo, &e, false); + timeo, &e, false, msg->pk.data, + msg->pk.len); if (result == 0) { ret_msg->has_flow_id = true; ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; + ret_msg->has_pk = true; + ret_msg->pk.data = e->data; + ret_msg->pk.len = e->len; + e->len = 0; /* Data is free'd with ret_msg */ } break; case IRM_MSG_CODE__IRM_FLOW_JOIN: + assert(msg->pk.len == 0 && msg->pk.data == NULL); result = flow_alloc(msg->pid, msg->dst, msg_to_spec(msg->qosspec), - timeo, &e, true); + timeo, &e, true, NULL, 0); if (result == 0) { ret_msg->has_flow_id = true; ret_msg->flow_id = e->flow_id; @@ -2028,9 +2080,13 @@ static void * mainloop(void * o) result = flow_dealloc(msg->pid, msg->flow_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: + assert(msg->pk.len > 0 ? msg->pk.data != NULL + : msg->pk.data == NULL); e = flow_req_arr(msg->pid, msg->hash.data, - msg_to_spec(msg->qosspec)); + msg_to_spec(msg->qosspec), + msg->pk.data, + msg->pk.len); result = (e == NULL ? -1 : 0); if (result == 0) { ret_msg->has_flow_id = true; @@ -2040,7 +2096,12 @@ static void * mainloop(void * o) } break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: - result = flow_alloc_reply(msg->flow_id, msg->response); + assert(msg->pk.len > 0 ? msg->pk.data != NULL + : msg->pk.data == NULL); + result = flow_alloc_reply(msg->flow_id, + msg->response, + msg->pk.data, + msg->pk.len); break; default: log_err("Don't know that message code."); |