summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2016-05-08 16:34:19 +0200
committerSander Vrijders <sander.vrijders@intec.ugent.be>2016-05-08 16:34:19 +0200
commit5812dfb832e513dc455a0d48624bcad62334d457 (patch)
tree93a02e1b20f54bb869eadc856f201412c633315c
parentde8f2015cbd015b1cced366cb12c054be62c23b1 (diff)
parent021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff)
downloadouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz
ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
-rw-r--r--include/ouroboros/CMakeLists.txt1
-rw-r--r--include/ouroboros/dev.h19
-rw-r--r--include/ouroboros/flow.h43
-rw-r--r--include/ouroboros/ipcp.h4
-rw-r--r--include/ouroboros/shm_ap_rbuff.h52
-rw-r--r--include/ouroboros/shm_du_map.h21
-rw-r--r--include/ouroboros/utils.h1
-rw-r--r--src/ipcpd/flow.c38
-rw-r--r--src/ipcpd/flow.h12
-rw-r--r--src/ipcpd/ipcp-data.c104
-rw-r--r--src/ipcpd/ipcp-data.h16
-rw-r--r--src/ipcpd/ipcp-ops.h9
-rw-r--r--src/ipcpd/ipcp.c20
-rw-r--r--src/ipcpd/ipcp.h3
-rw-r--r--src/ipcpd/shim-udp/main.c524
-rw-r--r--src/ipcpd/shim-udp/tests/shim_udp_test.c12
-rw-r--r--src/irmd/main.c764
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/bitmap.c19
-rw-r--r--src/lib/dev.c344
-rw-r--r--src/lib/ipcp.c65
-rw-r--r--src/lib/ipcpd_messages.proto6
-rw-r--r--src/lib/irmd_messages.proto26
-rw-r--r--src/lib/shm_ap_rbuff.c268
-rw-r--r--src/lib/shm_du_map.c143
-rw-r--r--src/lib/tests/shm_du_map_test.c53
-rw-r--r--src/tools/echo/echo_client.c17
-rw-r--r--src/tools/echo/echo_server.c47
28 files changed, 1859 insertions, 773 deletions
diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt
index cfa299ca..6a247e8e 100644
--- a/include/ouroboros/CMakeLists.txt
+++ b/include/ouroboros/CMakeLists.txt
@@ -15,6 +15,7 @@ set(HEADER_FILES
irm.h
list.h
logs.h
+ shm_ap_rbuff.h
shm_du_map.h
sockets.h
utils.h
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index 414273ef..e857e211 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -24,29 +24,34 @@
#define OUROBOROS_DEV_H
#include <ouroboros/common.h>
+#include <ouroboros/flow.h>
#define UNKNOWN_AP "__UNKNOWN_AP__"
#define UNKNOWN_AE "__UNKNOWN_AE__"
+/* These calls should be removed once we write the ouroboros OS. */
+int ap_init(char * ap_name);
+void ap_fini();
+
/* Returns file descriptor */
-int ap_reg(char * ap_name, char ** difs, size_t difs_size);
-int ap_unreg(char * ap_name, char ** difs, size_t difs_size);
+int ap_reg(char ** difs, size_t difs_size);
+int ap_unreg(char ** difs, size_t difs_size);
/* Returns file descriptor (> 0) and client name(s) */
-int flow_accept(int fd, char * ap_name, char * ae_name);
+int flow_accept(int fd, char ** ap_name, char ** ae_name);
int flow_alloc_resp(int fd, int result);
/* Returns file descriptor */
-int flow_alloc(char * dst_ap_name, char * src_ap_name,
- char * src_ae_name, struct qos_spec * qos,
- int oflags);
+int flow_alloc(char * dst_name,
+ char * src_ae_name,
+ struct qos_spec * qos);
/* If flow is accepted returns a value > 0 */
int flow_alloc_res(int fd);
int flow_dealloc(int fd);
/* Wraps around fnctl */
-int flow_cntl(int fd, int oflags);
+int flow_cntl(int fd, int cmd, int oflags);
ssize_t flow_write(int fd, void * buf, size_t count);
ssize_t flow_read(int fd, void * buf, size_t count);
diff --git a/include/ouroboros/flow.h b/include/ouroboros/flow.h
new file mode 100644
index 00000000..ff9085f7
--- /dev/null
+++ b/include/ouroboros/flow.h
@@ -0,0 +1,43 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Flows
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_FLOW_H
+#define OUROBOROS_FLOW_H
+
+/* same values as fcntl.h */
+#define FLOW_O_RDONLY 00000000
+#define FLOW_O_WRONLY 00000001
+#define FLOW_O_RDWR 00000002
+#define FLOW_O_ACCMODE 00000003
+
+#define FLOW_O_NONBLOCK 00004000
+#define FLOW_O_DEFAULT 00000002
+
+#define FLOW_O_INVALID (FLOW_O_WRONLY | FLOW_O_RDWR)
+
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_ALLOCATED,
+ FLOW_PENDING
+};
+
+#endif /* OUROBOROS_FLOW_H */
diff --git a/include/ouroboros/ipcp.h b/include/ouroboros/ipcp.h
index e3c17bda..570eca67 100644
--- a/include/ouroboros/ipcp.h
+++ b/include/ouroboros/ipcp.h
@@ -61,13 +61,15 @@ int ipcp_name_unreg(pid_t pid,
int ipcp_flow_alloc(pid_t pid,
uint32_t port_id,
+ pid_t n_pid,
char * dst_name,
char * src_ap_name,
char * src_ae_name,
struct qos_spec * qos);
int ipcp_flow_alloc_resp(pid_t pid,
uint32_t port_id,
- int result);
+ pid_t n_pid,
+ int response);
/* These operations go from the IPCP to the IRMd */
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
new file mode 100644
index 00000000..0ee3e81e
--- /dev/null
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -0,0 +1,52 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Ring buffer for application processes
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_SHM_AP_RBUFF_H
+#define OUROBOROS_SHM_AP_RBUFF_H
+
+#ifndef SHM_AP_RBUFF
+#define SHM_AP_RBUFF_PREFIX "ouroboros_rb_"
+#endif
+
+#ifndef SHM_RBUFF_SIZE
+#define SHM_RBUFF_SIZE (1 << 12)
+#endif
+
+#include <sys/types.h>
+
+struct shm_ap_rbuff;
+
+struct rb_entry {
+ size_t index;
+ int port_id;
+};
+
+struct shm_ap_rbuff * shm_ap_rbuff_create();
+struct shm_ap_rbuff * shm_ap_rbuff_open();
+void shm_ap_rbuff_close(struct shm_ap_rbuff * rb);
+void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
+int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
+ struct rb_entry * e);
+struct rb_entry * shm_ap_rbuff_read();
+
+#endif /* OUROBOROS_SHM_AP_RBUFF_H */
diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h
index fb51768d..f575aa42 100644
--- a/include/ouroboros/shm_du_map.h
+++ b/include/ouroboros/shm_du_map.h
@@ -44,14 +44,23 @@ struct shm_du_map;
struct shm_du_map * shm_du_map_create();
struct shm_du_map * shm_du_map_open();
void shm_du_map_close(struct shm_du_map * dum);
+void shm_du_map_destroy(struct shm_du_map * dum);
-struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len);
-int shm_release_du_buff(struct shm_du_map * dum);
+/* returns the index of the buffer in the DU map */
+int shm_create_du_buff(struct shm_du_map * dum,
+ size_t size,
+ size_t headspace,
+ uint8_t * data,
+ size_t len);
+/* FIXME: revise these */
+int shm_du_map_read_sdu(uint8_t ** dst,
+ struct shm_du_map * dum,
+ size_t idx);
+int shm_release_du_buff(struct shm_du_map * dum, size_t idx);
+
+
+/* FIXME: use shm_du_map * and index */
uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
size_t size);
uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,
diff --git a/include/ouroboros/utils.h b/include/ouroboros/utils.h
index 2e5a4944..a1d2ac96 100644
--- a/include/ouroboros/utils.h
+++ b/include/ouroboros/utils.h
@@ -21,6 +21,7 @@
*/
#define MAX(a,b) (a > b ? a : b)
+#define MIN(a,b) (a < b ? a : b)
/*
* Returns the number of characters a uint would
diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c
index c436733b..ae8f848c 100644
--- a/src/ipcpd/flow.c
+++ b/src/ipcpd/flow.c
@@ -27,7 +27,7 @@
#include <ouroboros/logs.h>
-flow_t * flow_create(int32_t port_id)
+flow_t * flow_create(uint32_t port_id)
{
flow_t * flow = malloc(sizeof *flow);
if (flow == NULL) {
@@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id)
INIT_LIST_HEAD(&flow->list);
flow->port_id = port_id;
- flow->oflags = FLOW_O_DEFAULT;
- flow->state = FLOW_NULL;
+ flow->state = FLOW_NULL;
pthread_mutex_init(&flow->lock, NULL);
@@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow)
return;
free(flow);
}
-
-int flow_set_opts(flow_t * flow, uint16_t opts)
-{
- if (flow == NULL) {
- LOG_DBGF("Non-existing flow.");
- return -1;
- }
-
- pthread_mutex_lock(&flow->lock);
-
- if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) {
- flow->oflags = FLOW_O_DEFAULT;
- pthread_mutex_unlock(&flow->lock);
- LOG_WARN("Invalid flow options. Setting default.");
- return -1;
- }
-
- flow->oflags = opts;
-
- pthread_mutex_unlock(&flow->lock);
-
- return 0;
-}
-
-uint16_t flow_get_opts(const flow_t * flow)
-{
- if (flow == NULL) {
- LOG_DBGF("Non-existing flow.");
- return FLOW_O_INVALID;
- }
-
- return flow->oflags;
-}
diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h
index 000de5ad..0a3e90d1 100644
--- a/src/ipcpd/flow.h
+++ b/src/ipcpd/flow.h
@@ -25,6 +25,7 @@
#include <ouroboros/common.h>
#include <ouroboros/list.h>
+#include <ouroboros/shm_ap_rbuff.h>
#include <pthread.h>
/* same values as fcntl.h */
@@ -47,17 +48,14 @@ enum flow_state {
typedef struct flow {
struct list_head list;
- int32_t port_id;
- uint16_t oflags;
- enum flow_state state;
+ uint32_t port_id;
+ struct shm_ap_rbuff * rb;
+ enum flow_state state;
pthread_mutex_t lock;
} flow_t;
-flow_t * flow_create(int32_t port_id);
+flow_t * flow_create(uint32_t port_id);
void flow_destroy(flow_t * flow);
-int flow_set_opts(flow_t * flow, uint16_t opts);
-uint16_t flow_get_opts(const flow_t * flow);
-
#endif /* OUROBOROS_FLOW_H */
diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c
index 72407a53..76fc4bcd 100644
--- a/src/ipcpd/ipcp-data.c
+++ b/src/ipcpd/ipcp-data.c
@@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create()
if (data == NULL)
return NULL;
- data->iname = NULL;
data->type = 0;
- data->dum = NULL;
return data;
}
struct ipcp_data * ipcp_data_init(struct ipcp_data * dst,
- const char * ipcp_name,
enum ipcp_type ipcp_type)
{
if (dst == NULL)
return NULL;
- dst->iname = instance_name_create();
- if (dst->iname == NULL)
- return NULL;
-
- if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) {
- instance_name_destroy(dst->iname);
- return NULL;
- }
-
dst->type = ipcp_type;
- dst->dum = shm_du_map_open();
- if (dst->dum == NULL) {
- instance_name_destroy(dst->iname);
- return NULL;
- }
-
/* init the lists */
INIT_LIST_HEAD(&dst->registry);
- INIT_LIST_HEAD(&dst->flows);
INIT_LIST_HEAD(&dst->directory);
/* init the mutexes */
pthread_mutex_init(&dst->reg_lock, NULL);
pthread_mutex_init(&dst->dir_lock, NULL);
- pthread_mutex_init(&dst->flow_lock, NULL);
return dst;
}
@@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data)
dir_entry_destroy(list_entry(h, struct dir_entry, list));
}
-static void clear_flows(struct ipcp_data * data)
-{
- struct list_head * h;
- struct list_head * t;
- list_for_each_safe(h, t, &data->flows)
- flow_destroy(list_entry(h, flow_t, list));
-
-}
-
void ipcp_data_destroy(struct ipcp_data * data)
{
if (data == NULL)
return;
- /* FIXME: finish all pending operations here */
-
- if (data->iname != NULL)
- instance_name_destroy(data->iname);
- data->iname = NULL;
-
- if (data->dum != NULL)
- shm_du_map_close(data->dum);
- data->dum = NULL;
+ /* FIXME: finish all pending operations here and cancel all threads */
pthread_mutex_lock(&data->reg_lock);
pthread_mutex_lock(&data->dir_lock);
- pthread_mutex_lock(&data->flow_lock);
/* clear the lists */
clear_registry(data);
clear_directory(data);
- clear_flows(data);
/*
* no need to unlock, just free the entire thing
- * pthread_mutex_unlock(&data->flow_lock);
* pthread_mutex_unlock(&data->dir_lock);
* pthread_mutex_unlock(&data->reg_lock);
*/
@@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data,
return addr;
}
-
-flow_t * ipcp_data_find_flow(struct ipcp_data * data,
- uint32_t port_id)
-{
- struct list_head * h;
- list_for_each(h, &data->flows) {
- flow_t * f = list_entry(h, flow_t, list);
- if (f->port_id == port_id)
- return f;
- }
-
- return NULL;
-}
-
-bool ipcp_data_has_flow(struct ipcp_data * data,
- uint32_t port_id)
-{
- return ipcp_data_find_flow(data, port_id) != NULL;
-}
-
-int ipcp_data_add_flow(struct ipcp_data * data,
- flow_t * flow)
-{
- if (data == NULL || flow == NULL)
- return -1;
-
- pthread_mutex_lock(&data->flow_lock);
-
- if (ipcp_data_has_flow(data, flow->port_id)) {
- pthread_mutex_unlock(&data->flow_lock);
- return -2;
- }
-
- list_add(&flow->list,&data->flows);
-
- pthread_mutex_unlock(&data->flow_lock);
-
- return 0;
-}
-
-int ipcp_data_del_flow(struct ipcp_data * data,
- uint32_t port_id)
-{
- flow_t * f;
-
- if (data == NULL)
- return -1;
-
- pthread_mutex_lock(&data->flow_lock);
-
- f = ipcp_data_find_flow(data, port_id);
- if (f == NULL)
- return -1;
-
- list_del(&f->list);
-
- free(f);
-
- pthread_mutex_unlock(&data->flow_lock);
-
- return 0;
-}
diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h
index 1dea8c3c..2e86ba11 100644
--- a/src/ipcpd/ipcp-data.h
+++ b/src/ipcpd/ipcp-data.h
@@ -34,17 +34,11 @@
#include "flow.h"
struct ipcp_data {
- instance_name_t * iname;
enum ipcp_type type;
- struct shm_du_map * dum;
-
struct list_head registry;
pthread_mutex_t reg_lock;
- struct list_head flows;
- pthread_mutex_t flow_lock;
-
struct list_head directory;
pthread_mutex_t dir_lock;
@@ -53,7 +47,6 @@ struct ipcp_data {
struct ipcp_data * ipcp_data_create();
struct ipcp_data * ipcp_data_init(struct ipcp_data * dst,
- const char * ipcp_name,
enum ipcp_type ipcp_type);
void ipcp_data_destroy(struct ipcp_data * data);
@@ -73,13 +66,4 @@ bool ipcp_data_is_in_directory(struct ipcp_data * data,
const char * ap_name);
uint64_t ipcp_data_get_addr(struct ipcp_data * data,
const char * ap_name);
-bool ipcp_data_has_flow(struct ipcp_data * data,
- uint32_t port_id);
-flow_t * ipcp_data_find_flow(struct ipcp_data * data,
- uint32_t port_id);
-int ipcp_data_add_flow(struct ipcp_data * data,
- flow_t * flow);
-int ipcp_data_del_flow(struct ipcp_data * data,
- uint32_t port_id);
-
#endif /* IPCPD_IPCP_DATA_H */
diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h
index 2ccb2e59..91b6cac9 100644
--- a/src/ipcpd/ipcp-ops.h
+++ b/src/ipcpd/ipcp-ops.h
@@ -39,20 +39,15 @@ struct ipcp_ops {
int (* ipcp_name_reg)(char * name);
int (* ipcp_name_unreg)(char * name);
int (* ipcp_flow_alloc)(uint32_t port_id,
+ pid_t n_pid,
char * dst_ap_name,
char * src_ap_name,
char * src_ae_name,
struct qos_spec * qos);
int (* ipcp_flow_alloc_resp)(uint32_t port_id,
+ pid_t n_pid,
int response);
int (* ipcp_flow_dealloc)(uint32_t port_id);
-
- /* FIXME: let's see how this will work with the shm_du_map */
- int (* ipcp_du_write)(uint32_t port_id,
- size_t map_index);
-
- int (* ipcp_du_read)(uint32_t port_id,
- size_t map_index);
};
#endif /* IPCPD_IPCP_OPS_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index d6f373cd..13632a80 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[])
return 0;
}
-int ipcp_main_loop(struct ipcp * _ipcp)
+void * ipcp_main_loop(void * o)
{
int lsockfd;
int sockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
+ struct ipcp * _ipcp = (struct ipcp *) o;
ipcp_msg_t * msg;
ssize_t count;
@@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp)
if (_ipcp == NULL) {
LOG_ERR("Invalid ipcp struct.");
- return 1;
+ return (void *) 1;
}
sockfd = server_socket_open(ipcp_sock_path(getpid()));
if (sockfd < 0) {
LOG_ERR("Could not open server socket.");
- return 1;
+ return (void *) 1;
}
while (true) {
@@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)
conf.max_pdu_size = conf_msg->max_pdu_size;
}
if (conf_msg->ipcp_type == IPCP_SHIM_UDP) {
- conf.ip_addr = conf_msg->ip_addr;
+ conf.ip_addr = conf_msg->ip_addr;
conf.dns_addr = conf_msg->dns_addr;
}
@@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp)
}
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len);
+ _ipcp->ops->ipcp_unreg(msg->dif_names,
+ msg->len);
break;
case IPCP_MSG_CODE__IPCP_NAME_REG:
if (_ipcp->ops->ipcp_name_reg == NULL) {
@@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp)
LOG_ERR("Flow_alloc unsupported.");
break;
}
- ret_msg.has_fd = true;
- ret_msg.fd =
+ ret_msg.has_result = true;
+ ret_msg.result =
_ipcp->ops->ipcp_flow_alloc(msg->port_id,
+ msg->pid,
msg->dst_name,
msg->src_ap_name,
msg->src_ae_name,
@@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)
ret_msg.has_result = true;
ret_msg.result =
_ipcp->ops->ipcp_flow_alloc_resp(msg->port_id,
+ msg->pid,
msg->result);
break;
case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC:
@@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp)
close(lsockfd);
}
- return 0;
+ return NULL;
}
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 9decac8b..393af994 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -43,7 +43,8 @@ struct ipcp {
int irmd_fd;
};
-int ipcp_main_loop();
+void * ipcp_main_loop(void * o);
+void * ipcp_sdu_loop(void * o);
int ipcp_arg_check(int argc, char * argv[]);
#endif
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 460fe9e3..1f7bb12f 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -24,12 +24,13 @@
#include "ipcp.h"
#include "flow.h"
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/ipcp.h>
#include <ouroboros/dif_config.h>
#include <ouroboros/sockets.h>
-#include <ouroboros/dev.h>
+#include <ouroboros/bitmap.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */
struct ipcp * _ipcp;
#endif
+/*
+ * copied from ouroboros/dev. The shim needs access to the internals
+ * because it doesn't follow all steps necessary steps to get
+ * the info
+ */
+
+#define UNKNOWN_AP "__UNKNOWN_AP__"
+#define UNKNOWN_AE "__UNKNOWN_AE__"
+
+#define AP_MAX_FLOWS 256
+
+#ifndef DU_BUFF_HEADSPACE
+ #define DU_BUFF_HEADSPACE 128
+#endif
+
+#ifndef DU_BUFF_TAILSPACE
+ #define DU_BUFF_TAILSPACE 0
+#endif
+
+/* the shim needs access to these internals */
+struct shim_ap_data {
+ instance_name_t * api;
+ struct shm_du_map * dum;
+ struct bmp * fds;
+
+ struct shm_ap_rbuff * rb;
+ struct flow flows[AP_MAX_FLOWS];
+
+ pthread_t mainloop;
+ pthread_t sduloop;
+ pthread_t handler;
+ pthread_t sdu_reader[2];
+ int ping_pong;
+} * _ap_instance;
+
+int shim_ap_init(char * ap_name)
+{
+ _ap_instance = malloc(sizeof(struct shim_ap_data));
+ if (_ap_instance == NULL) {
+ return -1;
+ }
+
+ _ap_instance->api = instance_name_create();
+ if (_ap_instance->api == NULL) {
+ free(_ap_instance);
+ return -1;
+ }
+
+ if (instance_name_init_from(_ap_instance->api,
+ ap_name,
+ getpid()) == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (_ap_instance->fds == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->dum = shm_du_map_open();
+ if (_ap_instance->dum == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->rb = shm_ap_rbuff_create();
+ if (_ap_instance->rb == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ return 0;
+}
+
+void shim_ap_fini()
+{
+ int i = 0;
+
+ if (_ap_instance == NULL)
+ return;
+ if (_ap_instance->api != NULL)
+ instance_name_destroy(_ap_instance->api);
+ if (_ap_instance->fds != NULL)
+ bmp_destroy(_ap_instance->fds);
+ if (_ap_instance->dum != NULL)
+ shm_du_map_close(_ap_instance->dum);
+ if (_ap_instance->rb != NULL)
+ shm_ap_rbuff_destroy(_ap_instance->rb);
+ for (i = 0; i < AP_MAX_FLOWS; i ++)
+ if (_ap_instance->flows[i].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+
+ free(_ap_instance);
+}
+
+static int port_id_to_fd(uint32_t port_id)
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].port_id == port_id
+ && _ap_instance->flows[i].state != FLOW_NULL)
+ return i;
+ return -1;
+}
+
+static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
+{
+ /* the AP chooses the amount of headspace and tailspace */
+ size_t index = shm_create_du_buff(_ap_instance->dum,
+ count,
+ 0,
+ buf,
+ count);
+ struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
+
+ if (index == -1)
+ return -1;
+
+ if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ shm_release_du_buff(_ap_instance->dum, index);
+ return -EPIPE;
+ }
+
+ return 0;
+}
+
+/*
+ * end copy from dev.c
+ */
+
struct ipcp_udp_data {
/* keep ipcp_data first for polymorphism */
struct ipcp_data ipcp_data;
@@ -79,39 +218,15 @@ struct ipcp_udp_data {
int s_fd;
fd_set flow_fd_s;
- flow_t * fd_to_flow_ptr[FD_SETSIZE];
- pthread_mutex_t lock;
+ pthread_mutex_t lock;
};
-struct udp_flow {
- /* keep flow first for polymorphism */
- flow_t flow;
- int fd;
-};
-
-void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
-{
- switch(sig) {
- case SIGINT:
- case SIGTERM:
- case SIGHUP:
- LOG_DBG("Terminating by order of %d. Bye.", info->si_pid);
- if (info->si_pid == irmd_pid) {
- /* shm_du_map_close(_ipcp->data->dum); */
- exit(0);
- }
- default:
- return;
- }
-}
-
-struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name)
+struct ipcp_udp_data * ipcp_udp_data_create()
{
struct ipcp_udp_data * udp_data;
struct ipcp_data * data;
enum ipcp_type ipcp_type;
- int n;
udp_data = malloc(sizeof *udp_data);
if (udp_data == NULL) {
@@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name)
ipcp_type = THIS_TYPE;
data = (struct ipcp_data *) udp_data;
- if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) {
+ if (ipcp_data_init(data, ipcp_type) == NULL) {
free(udp_data);
return NULL;
}
FD_ZERO(&udp_data->flow_fd_s);
- for (n = 0; n < FD_SETSIZE; ++n)
- udp_data->fd_to_flow_ptr[n] = NULL;
return udp_data;
}
+void ipcp_udp_data_destroy(struct ipcp_udp_data * data)
+{
+ if (data == NULL)
+ return;
+
+ ipcp_data_destroy((struct ipcp_data *) data);
+}
+
+void ipcp_udp_destroy(struct ipcp * ipcp)
+{
+ ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data);
+ shim_ap_fini();
+ free(ipcp);
+}
+
+void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
+{
+ switch(sig) {
+ case SIGINT:
+ case SIGTERM:
+ case SIGHUP:
+ if (info->si_pid == irmd_pid || info->si_pid == 0) {
+ LOG_DBG("Terminating by order of %d. Bye.",
+ info->si_pid);
+ pthread_cancel(_ap_instance->mainloop);
+ pthread_cancel(_ap_instance->handler);
+ pthread_cancel(_ap_instance->sdu_reader[0]);
+ pthread_cancel(_ap_instance->sdu_reader[1]);
+ pthread_cancel(_ap_instance->sduloop);
+ exit(0);
+ }
+ default:
+ return;
+ }
+}
+
static void * ipcp_udp_listener()
{
char buf[SHIM_UDP_BUF_SIZE];
@@ -141,10 +290,10 @@ static void * ipcp_udp_listener()
struct sockaddr_in f_saddr;
struct sockaddr_in c_saddr;
struct hostent * hostp;
- struct udp_flow * flow;
int sfd = shim_data(_ipcp)->s_fd;
while (true) {
+ int fd;
n = sizeof c_saddr;
n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
(struct sockaddr *) &c_saddr, (unsigned *) &n);
@@ -157,16 +306,7 @@ static void * ipcp_udp_listener()
if (hostp == NULL)
continue;
- /* create a new socket for the server */
- flow = malloc(sizeof *flow);
- if (flow == NULL)
- continue;
-
- flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if (flow->fd == -1) {
- free(flow);
- continue;
- }
+ fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
memset((char *) &f_saddr, 0, sizeof f_saddr);
f_saddr.sin_family = AF_INET;
@@ -185,36 +325,33 @@ static void * ipcp_udp_listener()
* the flow structure
*/
- if (connect(flow->fd,
+ if (connect(fd,
(struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) {
- close(flow->fd);
- free(flow);
+ close(fd);
continue;
}
+ /* echo back the packet */
+ while(send(fd, buf, strlen(buf), 0) < 0)
+ ;
+
/* reply to IRM */
- flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf,
- UNKNOWN_AP, "");
- if (flow->flow.port_id < 0) {
+ _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(),
+ buf,
+ UNKNOWN_AP,
+ UNKNOWN_AE);
+ if (_ap_instance->flows[fd].port_id < 0) {
LOG_ERR("Could not get port id from IRMd");
- close(flow->fd);
- free(flow);
+ close(fd);
continue;
}
- flow->flow.oflags = FLOW_O_DEFAULT;
- flow->flow.state = FLOW_PENDING;
-
- if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) {
- LOG_DBGF("Could not add flow.");
- close(flow->fd);
- free(flow);
- continue;
- }
+ _ap_instance->flows[fd].rb = NULL;
+ _ap_instance->flows[fd].state = FLOW_PENDING;
- FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s);
- shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow;
+ LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
+ _ap_instance->flows[fd].port_id, fd);
}
return 0;
@@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader()
struct sockaddr_in r_saddr;
while (true) {
- flow_t * flow;
-
if (select(FD_SETSIZE,
&shim_data(_ipcp)->flow_fd_s,
NULL, NULL, NULL)
@@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader()
(struct sockaddr *) &r_saddr,
(unsigned *) &n);
- flow = shim_data(_ipcp)->fd_to_flow_ptr[fd];
- if (flow->state == FLOW_PENDING) {
- if (connect(fd,
- (struct sockaddr *) &r_saddr,
- sizeof r_saddr)
- < 0)
- continue;
- flow->state = FLOW_ALLOCATED;
- }
-
/* send the sdu to the correct port_id */
- LOG_MISSING;
+ ipcp_udp_flow_write(fd, buf, n);
}
}
@@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
{
char ipstr[INET_ADDRSTRLEN];
char dnsstr[INET_ADDRSTRLEN];
- pthread_t handler;
- pthread_t sdu_reader;
int enable = 1;
if (conf->type != THIS_TYPE) {
@@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
dnsstr,
INET_ADDRSTRLEN);
else
- strcpy(dnsstr, "not set.\n");
+ strcpy(dnsstr, "not set");
shim_data(_ipcp)->ip_addr = conf->ip_addr;
shim_data(_ipcp)->dns_addr = conf->dns_addr;
@@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
/* UDP listen server */
if ((shim_data(_ipcp)->s_fd =
- socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
+ socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
LOG_DBGF("Can't create socket.");
return -1;
}
@@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
return -1;
}
- pthread_create(&handler, NULL, ipcp_udp_listener, NULL);
- pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL);
+ FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+
+ pthread_create(&_ap_instance->handler,
+ NULL,
+ ipcp_udp_listener,
+ NULL);
+ pthread_create(&_ap_instance->sdu_reader[0],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+
+ pthread_create(&_ap_instance->sdu_reader[1],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+
+ _ap_instance->ping_pong = 0;
_ipcp->state = IPCP_ENROLLED;
- LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.",
- _ipcp->data->iname->name, _ipcp->data->iname->id);
+ LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.",
+ getpid());
LOG_DBG("Bound to IP address %s.", ipstr);
LOG_DBG("DNS server address is %s.", dnsstr);
@@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name)
}
int ipcp_udp_flow_alloc(uint32_t port_id,
+ pid_t n_pid,
char * dst_name,
char * src_ap_name,
char * src_ae_name,
struct qos_spec * qos)
{
- struct udp_flow * flow = NULL;
struct sockaddr_in l_saddr;
struct sockaddr_in r_saddr;
+ struct sockaddr_in rf_saddr;
+ int fd;
+ int n;
+
+ char * recv_buf = NULL;
struct hostent * h;
if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)
return -1;
- LOG_DBG("Received flow allocation request from %s to %s.",
- src_ap_name, dst_name);
-
if (strlen(dst_name) > 255
|| strlen(src_ap_name) > 255
|| strlen(src_ae_name) > 255) {
@@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t port_id,
if (qos != NULL)
LOG_DBGF("QoS requested. UDP/IP can't do that.");
- flow = malloc(sizeof *flow);
- if (flow == NULL)
- return -1;
-
- flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if (flow->fd == -1) {
- free(flow);
- return -1;
- }
+ fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
/* this socket is for the flow */
memset((char *) &l_saddr, 0, sizeof l_saddr);
@@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t port_id,
l_saddr.sin_addr.s_addr = local_ip;
l_saddr.sin_port = 0;
- if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) {
- char ipstr[INET_ADDRSTRLEN];
- inet_ntop(AF_INET,
- &l_saddr.sin_addr.s_addr,
- ipstr,
- INET_ADDRSTRLEN);
- close(flow->fd);
- free(flow);
+ if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) {
+ close(fd);
return -1;
}
h = gethostbyname(dst_name);
if (h == NULL) {
LOG_DBGF("Could not resolve %s.", dst_name);
- close(flow->fd);
- free(flow);
+ close(fd);
return -1;
}
-
memset((char *) &r_saddr, 0, sizeof r_saddr);
r_saddr.sin_family = AF_INET;
- r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]);
+ r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0]));
r_saddr.sin_port = LISTEN_PORT;
+
/* at least try to get the packet on the wire */
- while (sendto(flow->fd, dst_name, strlen(dst_name), 0,
+ while (sendto(fd, dst_name, strlen(dst_name), 0,
(struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) {
}
- flow->flow.port_id = port_id;
- flow->flow.oflags = FLOW_O_DEFAULT;
- flow->flow.state = FLOW_PENDING;
-
- /* add flow to the list */
+ /* wait for the other shim IPCP to respond */
- pthread_mutex_lock(&_ipcp->data->flow_lock);
+ recv_buf = malloc(strlen(dst_name) + 1);
+ n = sizeof(rf_saddr);
+ n = recvfrom(fd,
+ recv_buf,
+ strlen(dst_name) + 1,
+ 0,
+ (struct sockaddr *) &rf_saddr,
+ (unsigned *) &n);
- if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) {
- LOG_DBGF("Could not add flow.");
- pthread_mutex_unlock(&_ipcp->data->flow_lock);
- close(flow->fd);
- free(flow);
+ if (connect(fd,
+ (struct sockaddr *) &rf_saddr,
+ sizeof rf_saddr)
+ < 0) {
+ free(recv_buf);
return -1;
}
- pthread_mutex_unlock(&_ipcp->data->flow_lock);
+ if (!strcmp(recv_buf, dst_name))
+ LOG_WARN("Incorrect echo from server");
+
+ free(recv_buf);
+
+ _ap_instance->flows[fd].port_id = port_id;
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ LOG_ERR("Could not open N + 1 ringbuffer.");
+ close(fd);
+ }
/* tell IRMd that flow allocation "worked" */
- if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) {
+ if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
LOG_ERR("Failed to notify IRMd about flow allocation reply");
- close(flow->fd);
- ipcp_data_del_flow(_ipcp->data, flow->flow.port_id);
+ close(fd);
+ shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
return -1;
}
- FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s);
- shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
- return 0;
+ pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]);
+ pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+ _ap_instance->ping_pong = !_ap_instance->ping_pong;
+
+ LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd);
+
+ return fd;
}
int ipcp_udp_flow_alloc_resp(uint32_t port_id,
+ pid_t n_pid,
int response)
{
- struct udp_flow * flow =
- (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id);
- if (flow == NULL) {
- return -1;
+ int fd = port_id_to_fd(port_id);
+ if (fd < 0) {
+ LOG_DBGF("Could not find flow with port_id %u.", port_id);
+ return 0;
}
- if (response) {
- ipcp_data_del_flow(_ipcp->data, port_id);
+ if (response)
return 0;
- }
/* awaken pending flow */
- if (flow->flow.state != FLOW_PENDING)
+ if (_ap_instance->flows[fd].state != FLOW_PENDING) {
+ LOG_DBGF("Flow was not pending.");
return -1;
+ }
+
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ LOG_ERR("Could not open N + 1 ringbuffer.");
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ _ap_instance->flows[fd].port_id = 0;
+ return 0;
+ }
- flow->flow.state = FLOW_ALLOCATED;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]);
+ pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+ _ap_instance->ping_pong = !_ap_instance->ping_pong;
+
+ LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd);
return 0;
}
int ipcp_udp_flow_dealloc(uint32_t port_id)
{
- return 0;
-}
+ int fd = port_id_to_fd(port_id);
+ if (fd < 0) {
+ LOG_DBGF("Could not find flow with port_id %u.", port_id);
+ return 0;
+ }
-int ipcp_udp_du_write(uint32_t port_id,
- size_t map_index)
-{
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ _ap_instance->flows[fd].port_id = 0;
+ if (_ap_instance->flows[fd].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+
+ FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
return 0;
}
-int ipcp_udp_du_read(uint32_t port_id,
- size_t map_index)
+/* FIXME: may be crap, didn't think this one through */
+int ipcp_udp_flow_dealloc_arr(uint32_t port_id)
{
- return 0;
+ int fd = port_id_to_fd(port_id);
+ if (fd < 0) {
+ LOG_DBGF("Could not find flow with port_id %u.", port_id);
+ return 0;
+ }
+
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ _ap_instance->flows[fd].port_id = 0;
+ if (_ap_instance->flows[fd].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+
+ FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ return ipcp_flow_dealloc(0, port_id);
}
struct ipcp * ipcp_udp_create(char * ap_name)
@@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name)
struct ipcp_udp_data * data;
struct ipcp_ops * ops;
+ if (shim_ap_init(ap_name) < 0)
+ return NULL;
+
i = malloc(sizeof *i);
if (i == NULL)
return NULL;
- data = ipcp_udp_data_create(ap_name);
+ data = ipcp_udp_data_create();
if (data == NULL) {
free(i);
return NULL;
@@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name)
ops->ipcp_flow_alloc = ipcp_udp_flow_alloc;
ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp;
ops->ipcp_flow_dealloc = ipcp_udp_flow_dealloc;
- ops->ipcp_du_read = ipcp_udp_du_read;
- ops->ipcp_du_write = ipcp_udp_du_write;
i->data = (struct ipcp_data *) data;
i->ops = ops;
@@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name)
#ifndef MAKE_CHECK
+/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
+/* FIXME: stop eating the CPU */
+void * ipcp_udp_sdu_loop(void * o)
+{
+ while (true) {
+ struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb);
+ int fd;
+ int len = 0;
+ char * buf;
+ if (e == NULL)
+ continue;
+
+ len = shm_du_map_read_sdu((uint8_t **) &buf,
+ _ap_instance->dum,
+ e->index);
+ if (len == -1)
+ continue;
+
+ fd = port_id_to_fd(e->port_id);
+
+ if (fd == -1)
+ continue;
+
+ if (len == 0)
+ continue;
+
+ send(fd, buf, len, 0);
+
+ shm_release_du_buff(_ap_instance->dum, e->index);
+ }
+
+ return (void *) 1;
+}
+
int main (int argc, char * argv[])
{
/* argument 1: pid of irmd ? */
@@ -680,6 +900,7 @@ int main (int argc, char * argv[])
sigaction(SIGINT, &sig_act, NULL);
sigaction(SIGTERM, &sig_act, NULL);
sigaction(SIGHUP, &sig_act, NULL);
+ sigaction(SIGPIPE, &sig_act, NULL);
_ipcp = ipcp_udp_create(argv[2]);
if (_ipcp == NULL) {
@@ -687,7 +908,18 @@ int main (int argc, char * argv[])
exit(1);
}
- ipcp_main_loop(_ipcp);
+ pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
+ pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL);
+
+ pthread_join(_ap_instance->sduloop, NULL);
+ pthread_join(_ap_instance->mainloop, NULL);
+ pthread_join(_ap_instance->handler, NULL);
+ pthread_join(_ap_instance->sdu_reader[0], NULL);
+ pthread_join(_ap_instance->sdu_reader[1], NULL);
+
+ ipcp_udp_destroy(_ipcp);
+
+ shim_ap_fini();
exit(0);
}
diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c
index 036f5877..e5e8b32d 100644
--- a/src/ipcpd/shim-udp/tests/shim_udp_test.c
+++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c
@@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv)
_ipcp = ipcp_udp_create(ipcp_name);
if (_ipcp == NULL) {
LOG_ERR("Could not instantiate shim IPCP.");
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
@@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv)
if (ipcp_udp_name_reg("bogus name")) {
LOG_ERR("Failed to register application.");
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
if (ipcp_udp_name_unreg("bogus name")) {
LOG_ERR("Failed to unregister application.");
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
@@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv)
sprintf(bogus, "bogus name %4d", i);
if (ipcp_udp_name_reg(bogus)) {
LOG_ERR("Failed to register application %s.", bogus);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
}
@@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv)
sprintf(bogus, "bogus name %4d", i);
if(ipcp_udp_name_unreg(bogus)) {
LOG_ERR("Failed to unregister application %s.", bogus);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
}
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(0);
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 67254feb..a6403612 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -34,6 +34,7 @@
#include <ouroboros/utils.h>
#include <ouroboros/dif_config.h>
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/bitmap.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -42,41 +43,146 @@
#include <errno.h>
#include <string.h>
#include <limits.h>
+#include <pthread.h>
/* FIXME: this smells like part of namespace management */
#define ALL_DIFS "*"
+#ifndef IRMD_MAX_FLOWS
+ #define IRMD_MAX_FLOWS 4096
+#endif
+
+#ifndef IRMD_THREADPOOL_SIZE
+ #define IRMD_THREADPOOL_SIZE 3
+#endif
+
+
+
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_PENDING,
+ FLOW_ALLOCATED
+};
+
struct ipcp_entry {
struct list_head next;
instance_name_t * api;
char * dif_name;
+
+ pthread_mutex_t lock;
};
-/* currently supports only registering whatevercast groups of a single AP */
+/* currently supports only registering whatevercast groups of a single AP-I */
struct reg_name_entry {
struct list_head next;
/* generic whatevercast name */
char * name;
- /* FIXME: resolve name instead */
+ /* FIXME: make a list resolve to AP-I instead */
instance_name_t * api;
- uint32_t reg_ap_id;
+
+ bool accept;
+ char * req_ap_name;
+ char * req_ae_name;
+ bool flow_arrived;
+
+ pthread_mutex_t fa_lock;
+};
+
+/* keeps track of port_id's between N and N - 1 */
+struct port_map_entry {
+ struct list_head next;
+
+ uint32_t port_id;
+
+ pid_t n_pid;
+ pid_t n_1_pid;
+
+ enum flow_state state;
};
struct irm {
- /* FIXME: list of ipcps can be merged with registered names */
+ /* FIXME: list of ipcps could be merged with registered names */
struct list_head ipcps;
struct list_head reg_names;
+ int sockfd;
+
+ /* keep track of all flows in this processing system */
+ struct bmp * port_ids;
+
+ /* maps port_ids to pid pair */
+ struct list_head port_map;
+
struct shm_du_map * dum;
-};
-struct irm * instance = NULL;
+ pthread_t * threadpool;
+
+ pthread_mutex_t lock;
+} * instance = NULL;
-static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)
+static struct port_map_entry * get_port_map_entry(uint32_t port_id)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &instance->port_map) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ if (e->port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct port_map_entry * get_port_map_entry_n(pid_t n_pid)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &instance->port_map) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ if (e->n_pid == n_pid)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct ipcp_entry * ipcp_entry_create()
+{
+ struct ipcp_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->api = NULL;
+ e->dif_name = NULL;
+
+ INIT_LIST_HEAD(&e->next);
+ pthread_mutex_init(&e->lock, NULL);
+
+ return e;
+}
+
+static void ipcp_entry_destroy(struct ipcp_entry * e)
+{
+ if (e == NULL)
+ return;
+
+ if (e->api != NULL)
+ instance_name_destroy(e->api);
+
+ if (e->dif_name != NULL)
+ free(e->dif_name);
+
+ free(e);
+}
+
+static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api)
{
- struct ipcp_entry * tmp = NULL;
struct list_head * pos = NULL;
list_for_each(pos, &instance->ipcps) {
@@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)
return tmp;
}
- return tmp;
+ return NULL;
}
static instance_name_t * get_ipcp_by_name(char * ap_name)
@@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create()
if (e == NULL)
return NULL;
- e->reg_ap_id = rand() % INT_MAX;
- e->name = NULL;
+ e->name = NULL;
+ e->api = NULL;
+ e->accept = false;
+ e->req_ap_name = NULL;
+ e->req_ae_name = NULL;
+ e->flow_arrived = false;
+ pthread_mutex_init(&e->fa_lock, NULL);
INIT_LIST_HEAD(&e->next);
return e;
@@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create()
static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e,
char * name,
- instance_name_t * api)
+ instance_name_t * api)
{
if (e == NULL || name == NULL || api == NULL)
return NULL;
@@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)
free(e->name);
instance_name_destroy(e->api);
+
+ if (e->req_ap_name != NULL)
+ free(e->req_ap_name);
+ if (e->req_ae_name != NULL)
+ free(e->req_ae_name);
+
+ free(e);
+
return 0;
}
-static struct reg_name_entry * find_reg_name_entry_by_name(char * name)
+static struct reg_name_entry * get_reg_name_entry_by_name(char * name)
{
struct list_head * pos = NULL;
@@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name)
return NULL;
}
-static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
+static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid)
{
struct list_head * pos = NULL;
@@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
struct reg_name_entry * e =
list_entry(pos, struct reg_name_entry, next);
- if (reg_ap_id == e->reg_ap_id)
+ if (e->api->id == pid)
return e;
}
@@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
/* FIXME: add only name when we have NSM solved */
static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)
{
- struct reg_name_entry * e = find_reg_name_entry_by_name(name);
+ struct reg_name_entry * e = get_reg_name_entry_by_name(name);
if (e == NULL) {
e = reg_name_entry_create();
- e = reg_name_entry_init(e, name, api);
+ if (e == NULL)
+ return -1;
+
+ if (reg_name_entry_init(e, name, api) == NULL) {
+ reg_name_entry_destroy(e);
+ return -1;
+ }
+
list_add(&e->next, &instance->reg_names);
return 0;
}
@@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)
static int reg_name_entry_del_name(char * name)
{
- struct reg_name_entry * e = find_reg_name_entry_by_name(name);
+ struct reg_name_entry * e = get_reg_name_entry_by_name(name);
if (e == NULL)
return 0;
@@ -240,34 +366,38 @@ static pid_t create_ipcp(char * ap_name,
pid = ipcp_create(ap_name, ipcp_type);
if (pid == -1) {
- LOG_ERR("Failed to create IPCP");
+ LOG_ERR("Failed to create IPCP.");
return -1;
}
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL) {
+ tmp = ipcp_entry_create();
+ if (tmp == NULL)
return -1;
- }
INIT_LIST_HEAD(&tmp->next);
tmp->api = instance_name_create();
if (tmp->api == NULL) {
- free(tmp);
+ ipcp_entry_destroy(tmp);
return -1;
}
if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {
instance_name_destroy(tmp->api);
- free(tmp);
+ ipcp_entry_destroy(tmp);
return -1;
}
tmp->dif_name = NULL;
- LOG_DBG("Created IPC process with pid %d", pid);
+ pthread_mutex_lock(&instance->lock);
list_add(&tmp->next, &instance->ipcps);
+
+ pthread_mutex_unlock(&instance->lock);
+
+ LOG_INFO("Created IPCP %s-%d ", ap_name, pid);
+
return pid;
}
@@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api)
struct list_head * pos = NULL;
struct list_head * n = NULL;
+ if (api == NULL)
+ return 0;
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
LOG_ERR("No such IPCP in the system.");
- return -1;
+ return 0;
}
- LOG_DBG("Destroying ipcp %s-%d", api->name, api->id);
-
if (ipcp_destroy(api->id))
- LOG_ERR("Could not destroy IPCP");
+ LOG_ERR("Could not destroy IPCP.");
list_for_each_safe(pos, n, &(instance->ipcps)) {
struct ipcp_entry * tmp =
@@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api)
if (instance_name_cmp(api, tmp->api) == 0)
list_del(&tmp->next);
+
+ ipcp_entry_destroy(tmp);
}
+ LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id);
+
return 0;
}
@@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t * api,
return -1;
}
- entry = find_ipcp_entry_by_name(api);
+ entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- LOG_ERR("No such IPCP");
+ LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
- LOG_ERR("Failed to strdup");
+ LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
- LOG_ERR("Could not bootstrap IPCP");
+ LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",
+ api->name, api->id, conf->dif_name);
+
return 0;
}
@@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
- entry = find_ipcp_entry_by_name(api);
+ entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- LOG_ERR("No such IPCP");
+ LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
- LOG_ERR("Failed to strdup");
+ LOG_ERR("Failed to strdup.");
return -1;
}
member = da_resolve_daf(dif_name);
if (member == NULL) {
- LOG_ERR("Could not find a member of that DIF");
+ LOG_ERR("Could not find a member of that DIF.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
@@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t * api,
n_1_difs_size = da_resolve_dap(member, n_1_difs);
if (n_1_difs_size < 1) {
- LOG_ERR("Could not find N-1 DIFs");
+ LOG_ERR("Could not find N-1 DIFs.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) {
- LOG_ERR("Could not enroll IPCP");
+ LOG_ERR("Could not enroll IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",
+ api->name, api->id, dif_name);
+
return 0;
}
@@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api,
size_t difs_size)
{
if (ipcp_reg(api->id, difs, difs_size)) {
- LOG_ERR("Could not register IPCP to N-1 DIF(s)");
+ LOG_ERR("Could not register IPCP to N-1 DIF(s).");
return -1;
}
@@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t * api,
{
if (ipcp_unreg(api->id, difs, difs_size)) {
- LOG_ERR("Could not unregister IPCP from N-1 DIF(s)");
+ LOG_ERR("Could not unregister IPCP from N-1 DIF(s).");
return -1;
}
return 0;
}
-static int ap_unreg_id(uint32_t reg_ap_id,
- pid_t pid,
+static int ap_unreg_id(pid_t pid,
char ** difs,
size_t len)
{
int i;
int ret = 0;
- struct reg_name_entry * rne = NULL;
- struct list_head * pos = NULL;
+ struct reg_name_entry * rne = NULL;
+ struct list_head * pos = NULL;
- rne = find_reg_name_entry_by_id(reg_ap_id);
+ rne = get_reg_name_entry_by_id(pid);
if (rne == NULL)
return 0; /* no such id */
@@ -458,7 +598,6 @@ static int ap_reg(char * ap_name,
{
int i;
int ret = 0;
- int reg_ap_id = 0;
struct list_head * pos = NULL;
struct reg_name_entry * rne = NULL;
@@ -466,18 +605,18 @@ static int ap_reg(char * ap_name,
instance_name_t * ipcpi = NULL;
if (instance->ipcps.next == NULL)
- LOG_ERR("No IPCPs in this system.");
+ return -1;
/* check if this ap_name is already registered */
- rne = find_reg_name_entry_by_name(ap_name);
+ rne = get_reg_name_entry_by_name(ap_name);
if (rne != NULL)
return -1; /* can only register one instance for now */
- rne = reg_name_entry_create();
- if (rne == NULL)
+ api = instance_name_create();
+ if (api == NULL) {
return -1;
+ }
- api = instance_name_create();
if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
instance_name_destroy(api);
return -1;
@@ -488,12 +627,6 @@ static int ap_reg(char * ap_name,
* contains a single instance only
*/
- if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) {
- reg_name_entry_destroy(rne);
- instance_name_destroy(api);
- return -1;
- }
-
if (strcmp(difs[0], ALL_DIFS) == 0) {
list_for_each(pos, &instance->ipcps) {
struct ipcp_entry * e =
@@ -528,11 +661,10 @@ static int ap_reg(char * ap_name,
return -1;
}
/* for now, we register single instances */
- reg_name_entry_add_name_instance(strdup(ap_name),
- instance_name_dup(api));
- instance_name_destroy(api);
+ ret = reg_name_entry_add_name_instance(strdup(ap_name),
+ api);
- return reg_ap_id;
+ return ret;
}
static int ap_unreg(char * ap_name,
@@ -542,149 +674,304 @@ static int ap_unreg(char * ap_name,
{
struct reg_name_entry * tmp = NULL;
- instance_name_t * api = instance_name_create();
- if (api == NULL)
- return -1;
-
- if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
- instance_name_destroy(api);
- return -1;
- }
-
/* check if ap_name is registered */
- tmp = find_reg_name_entry_by_name(api->name);
- if (tmp == NULL) {
- instance_name_destroy(api);
+ tmp = get_reg_name_entry_by_id(ap_id);
+ if (tmp == NULL)
return 0;
- } else {
- return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len);
- }
-}
+ if (strcmp(ap_name, tmp->api->name))
+ return 0;
-static int flow_accept(int fd,
- pid_t pid,
- char * ap_name,
- char * ae_name)
-{
- return -1;
+ return ap_unreg_id(ap_id, difs, len);
}
-static int flow_alloc_resp(int fd,
- int result)
+static struct port_map_entry * flow_accept(pid_t pid,
+ char ** ap_name,
+ char ** ae_name)
{
- return -1;
+ bool arrived = false;
+
+ struct timespec ts = {0, 100000};
+
+ struct port_map_entry * pme;
+ struct reg_name_entry * rne = get_reg_name_entry_by_id(pid);
+ if (rne == NULL) {
+ LOG_DBGF("Unregistered AP calling accept().");
+ return NULL;
+ }
+
+ if (rne->accept) {
+ LOG_DBGF("This AP still has a pending accept().");
+ return NULL;
+ }
+
+ rne->accept = true;
+
+ /* FIXME: wait for a thread that runs select() on flow_arrived */
+ while (!arrived) {
+ /* FIXME: this needs locking */
+ rne = get_reg_name_entry_by_id(pid);
+ if (rne == NULL)
+ return NULL;
+ arrived = rne->flow_arrived;
+ nanosleep(&ts, NULL);
+ }
+
+ pme = get_port_map_entry_n(pid);
+ if (pme == NULL) {
+ LOG_ERR("Port_id was not created yet.");
+ return NULL;
+ }
+
+ pthread_mutex_lock(&rne->fa_lock);
+ *ap_name = rne->req_ap_name;
+ if (ae_name != NULL)
+ *ae_name = rne->req_ae_name;
+ pthread_mutex_unlock(&rne->fa_lock);
+
+ return pme;
}
-static int flow_alloc(char * dst_name,
- char * src_ap_name,
- char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+static int flow_alloc_resp(pid_t n_pid,
+ uint32_t port_id,
+ int response)
{
- int port_id = 0;
- pid_t pid = get_ipcp_by_dst_name(dst_name)->id;
+ struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid);
+ struct port_map_entry * pme = get_port_map_entry(port_id);
- LOG_DBG("flow alloc received from %s-%s to %s.",
- src_ap_name, src_ae_name, dst_name);
+ if (rne == NULL || pme == NULL)
+ return -1;
- return ipcp_flow_alloc(pid,
- port_id,
- dst_name,
- src_ap_name,
- src_ae_name,
- qos);
+ /* FIXME: check all instances associated with the name */
+ if (!rne->accept) {
+ LOG_ERR("No process listening for this name.");
+ return -1;
+ }
+
+ /*
+ * consider the flow as handled
+ * once we can handle a list of AP-I's, remove it from the list
+ */
+
+ rne->flow_arrived = false;
+ rne->accept = false;
+
+ if (!response)
+ pme->state = FLOW_ALLOCATED;
+
+ return ipcp_flow_alloc_resp(pme->n_1_pid,
+ port_id,
+ pme->n_pid,
+ response);
}
-static int flow_alloc_res(int fd)
+static struct port_map_entry * flow_alloc(pid_t pid,
+ char * dst_name,
+ char * src_ap_name,
+ char * src_ae_name,
+ struct qos_spec * qos)
{
+ struct port_map_entry * e = malloc(sizeof(*e));
+ if (e == NULL) {
+ LOG_ERR("Failed malloc of port_map_entry.");
+ return NULL;
+ }
- return -1;
+ e->port_id = bmp_allocate(instance->port_ids);
+ e->n_pid = pid;
+ e->state = FLOW_PENDING;
+ e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+
+ list_add(&e->next, &instance->port_map);
+
+ if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id,
+ e->port_id,
+ e->n_pid,
+ dst_name,
+ src_ap_name,
+ src_ae_name,
+ qos) < 0) {
+ list_del(&e->next);
+ bmp_release(instance->port_ids, e->port_id);
+ free(e);
+ return NULL;
+ }
+
+ return e;
}
-static int flow_dealloc(int fd)
+static int flow_alloc_res(uint32_t port_id)
{
- return -1;
+ bool allocated = false;
+ struct port_map_entry * e;
+ struct timespec ts = {0,100000};
+
+ while (!allocated) {
+ /* FIXME: this needs locking */
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ LOG_DBGF("Could not locate port_id %u", port_id);
+ return -1;
+ }
+ if (e->state == FLOW_ALLOCATED)
+ allocated = true;
+ nanosleep(&ts, NULL);
+ }
+
+ return 0;
}
-static int flow_cntl(int fd,
- int oflags)
+static int flow_dealloc(uint32_t port_id)
{
- return -1;
+ pid_t n_1_pid;
+
+ struct port_map_entry * e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return 0;
+
+ n_1_pid = e->n_1_pid;
+
+ list_del(&e->next);
+ free(e);
+
+ return ipcp_flow_dealloc(n_1_pid, port_id);
}
-static int flow_req_arr(char * dst_name,
- char * ap_name,
- char * ae_name)
+static struct port_map_entry * flow_req_arr(pid_t pid,
+ char * dst_name,
+ char * ap_name,
+ char * ae_name)
{
- return -1;
+ struct reg_name_entry * rne;
+ struct port_map_entry * pme;
+
+ rne = get_reg_name_entry_by_name(dst_name);
+ if (rne == NULL) {
+ LOG_DBGF("Destination name %s unknown.", dst_name);
+ return NULL;
+ }
+
+ pme = malloc(sizeof(*pme));
+ if (pme == NULL) {
+ LOG_ERR("Failed malloc of port_map_entry.");
+ return NULL;
+ }
+
+ pme->port_id = bmp_allocate(instance->port_ids);
+ pme->n_pid = rne->api->id;
+ pme->state = FLOW_PENDING;
+ pme->n_1_pid = pid;
+
+ list_add(&pme->next, &instance->port_map);
+
+ pthread_mutex_lock(&rne->fa_lock);
+
+ rne->req_ap_name = strdup(ap_name);
+ rne->req_ae_name = strdup(ae_name);
+
+ rne->flow_arrived = true;
+
+ pthread_mutex_unlock(&rne->fa_lock);
+
+ return pme;
}
static int flow_alloc_reply(uint32_t port_id,
- int result)
+ int response)
{
- return -1;
+ struct port_map_entry * e;
+
+ /* FIXME: do this under lock */
+ if (!response) {
+ e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return -1;
+ e->state = FLOW_ALLOCATED;
+ }
+
+ /* FIXME: does this need to be propagated to the IPCP? */
+
+ return 0;
}
static int flow_dealloc_ipcp(uint32_t port_id)
{
- return -1;
+ struct port_map_entry * e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return 0;
+
+ list_del(&e->next);
+ free(e);
+
+ return 0;
+}
+
+static void irm_destroy(struct irm * irm)
+{
+ struct list_head * h;
+ struct list_head * t;
+
+ if (irm == NULL)
+ return;
+
+ if (irm->threadpool != NULL)
+ free(irm->threadpool);
+
+ if (irm->port_ids != NULL)
+ bmp_destroy(irm->port_ids);
+ /* clear the lists */
+ list_for_each_safe(h, t, &irm->ipcps) {
+ struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);
+ destroy_ipcp(e->api);
+ }
+
+ list_for_each_safe(h, t, &irm->reg_names) {
+ struct reg_name_entry * e = list_entry(h,
+ struct reg_name_entry,
+ next);
+ char * difs [1] = {ALL_DIFS};
+ ap_unreg_id(e->api->id, difs, 1);
+ }
+
+ list_for_each_safe(h, t, &irm->port_map) {
+ struct port_map_entry * e = list_entry(h,
+ struct port_map_entry,
+ next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ if (irm->dum != NULL)
+ shm_du_map_destroy(irm->dum);
+
+ close(irm->sockfd);
+ free(irm);
}
void irmd_sig_handler(int sig, siginfo_t * info, void * c)
{
+ int i;
+
switch(sig) {
case SIGINT:
case SIGTERM:
case SIGHUP:
- shm_du_map_close(instance->dum);
- free(instance);
- exit(0);
+ if (instance->threadpool != NULL) {
+ for (i = 0; i < IRMD_THREADPOOL_SIZE; i++)
+ pthread_cancel(instance->threadpool[i]);
+ }
+
+ case SIGPIPE:
+ LOG_DBG("Ignoring SIGPIPE.");
default:
return;
}
}
-int main()
+void * mainloop()
{
- int sockfd;
uint8_t buf[IRM_MSG_BUF_SIZE];
- struct sigaction sig_act;
-
- /* init sig_act */
- memset(&sig_act, 0, sizeof sig_act);
-
- /* install signal traps */
- sig_act.sa_sigaction = &irmd_sig_handler;
- sig_act.sa_flags = SA_SIGINFO;
-
- sigaction(SIGINT, &sig_act, NULL);
- sigaction(SIGTERM, &sig_act, NULL);
- sigaction(SIGHUP, &sig_act, NULL);
-
- if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1)
- unlink("/dev/shm/" SHM_DU_MAP_FILENAME);
-
- instance = malloc(sizeof(*instance));
- if (instance == NULL)
- return -1;
-
- if ((instance->dum = shm_du_map_create()) == NULL) {
- free(instance);
- return -1;
- }
-
- INIT_LIST_HEAD(&instance->ipcps);
- INIT_LIST_HEAD(&instance->reg_names);
-
- sockfd = server_socket_open(IRM_SOCK_PATH);
- if (sockfd < 0) {
- shm_du_map_close(instance->dum);
- free(instance);
- return -1;
- }
-
while (true) {
int cli_sockfd;
irm_msg_t * msg;
@@ -692,18 +979,19 @@ int main()
instance_name_t api;
buffer_t buffer;
irm_msg_t ret_msg = IRM_MSG__INIT;
+ struct port_map_entry * e = NULL;
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
- cli_sockfd = accept(sockfd, 0, 0);
+ cli_sockfd = accept(instance->sockfd, 0, 0);
if (cli_sockfd < 0) {
- LOG_ERR("Cannot accept new connection");
+ LOG_ERR("Cannot accept new connection.");
continue;
}
count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE);
if (count <= 0) {
- LOG_ERR("Failed to read from socket");
+ LOG_ERR("Failed to read from socket.");
close(cli_sockfd);
continue;
}
@@ -750,11 +1038,11 @@ int main()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_AP_REG:
- ret_msg.has_fd = true;
- ret_msg.fd = ap_reg(msg->ap_name,
- msg->pid,
- msg->dif_name,
- msg->n_dif_name);
+ ret_msg.has_result = true;
+ ret_msg.result = ap_reg(msg->ap_name,
+ msg->pid,
+ msg->dif_name,
+ msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_AP_UNREG:
ret_msg.has_result = true;
@@ -764,43 +1052,57 @@ int main()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- ret_msg.has_fd = true;
- ret_msg.fd = flow_accept(msg->fd,
- msg->pid,
- ret_msg.ap_name,
- ret_msg.ae_name);
+ e = flow_accept(msg->pid,
+ &ret_msg.ap_name,
+ &ret_msg.ae_name);
+ if (e == NULL)
+ break;
+
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_1_pid;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:
ret_msg.has_result = true;
- ret_msg.result = flow_alloc_resp(msg->fd,
- msg->result);
+ ret_msg.result = flow_alloc_resp(msg->pid,
+ msg->port_id,
+ msg->response);
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
- ret_msg.has_fd = true;
- ret_msg.fd = flow_alloc(msg->dst_name,
- msg->ap_name,
- msg->ae_name,
- NULL,
- msg->oflags);
+ e = flow_alloc(msg->pid,
+ msg->dst_name,
+ msg->ap_name,
+ msg->ae_name,
+ NULL);
+ if (e == NULL)
+ break;
+
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_1_pid;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES:
- ret_msg.has_response = true;
- ret_msg.response = flow_alloc_res(msg->fd);
- break;
- case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->fd);
+ ret_msg.result = flow_alloc_res(msg->port_id);
break;
- case IRM_MSG_CODE__IRM_FLOW_CONTROL:
+ case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_cntl(msg->fd,
- msg->oflags);
+ ret_msg.result = flow_dealloc(msg->port_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
+ e = flow_req_arr(msg->pid,
+ msg->dst_name,
+ msg->ap_name,
+ msg->ae_name);
+ if (e == NULL)
+ break;
+
ret_msg.has_port_id = true;
- ret_msg.port_id = flow_req_arr(msg->dst_name,
- msg->ap_name,
- msg->ae_name);
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_pid;
break;
case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:
ret_msg.has_result = true;
@@ -812,7 +1114,7 @@ int main()
ret_msg.result = flow_dealloc_ipcp(msg->port_id);
break;
default:
- LOG_ERR("Don't know that message code");
+ LOG_ERR("Don't know that message code.");
break;
}
@@ -820,7 +1122,7 @@ int main()
buffer.size = irm_msg__get_packed_size(&ret_msg);
if (buffer.size == 0) {
- LOG_ERR("Failed to send reply message");
+ LOG_ERR("Failed to send reply message.");
close(cli_sockfd);
continue;
}
@@ -842,6 +1144,88 @@ int main()
free(buffer.data);
close(cli_sockfd);
}
+}
+
+static struct irm * irm_create()
+{
+ struct irm * i = malloc(sizeof(*i));
+ if (i == NULL)
+ return NULL;
+
+ if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1)
+ unlink("/dev/shm/" SHM_DU_MAP_FILENAME);
+
+ i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ if (i->threadpool == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ if ((i->dum = shm_du_map_create()) == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&i->ipcps);
+ INIT_LIST_HEAD(&i->reg_names);
+ INIT_LIST_HEAD(&i->port_map);
+
+ i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
+ if (i->port_ids == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ i->sockfd = server_socket_open(IRM_SOCK_PATH);
+ if (i->sockfd < 0) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ pthread_mutex_init(&i->lock, NULL);
+
+ return i;
+}
+
+int main()
+{
+ struct sigaction sig_act;
+
+ int t = 0;
+
+ /* init sig_act */
+ memset(&sig_act, 0, sizeof sig_act);
+
+ /* install signal traps */
+ sig_act.sa_sigaction = &irmd_sig_handler;
+ sig_act.sa_flags = SA_SIGINFO;
+
+ if (sigaction(SIGINT, &sig_act, NULL) < 0)
+ exit(1);
+ if (sigaction(SIGTERM, &sig_act, NULL) < 0)
+ exit(1);
+ if (sigaction(SIGHUP, &sig_act, NULL) < 0)
+ exit(1);
+ if (sigaction(SIGPIPE, &sig_act, NULL) < 0)
+ exit(1);
+
+ instance = irm_create();
+ if (instance == NULL)
+ return 1;
+
+ /*
+ * FIXME: we need a main loop that delegates messages to subthreads in a
+ * way that avoids all possible deadlocks for local apps
+ */
+
+ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
+ pthread_create(&instance->threadpool[t], NULL, mainloop, NULL);
+
+ /* wait for (all of them) to return */
+ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
+ pthread_join(instance->threadpool[t], NULL);
+
+ irm_destroy(instance);
return 0;
}
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 4922e07c..53a7b354 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -32,6 +32,7 @@ set(SOURCE_FILES
ipcp.c
irm.c
list.c
+ shm_ap_rbuff.c
shm_du_map.c
sockets.c
utils.c
diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c
index 8aabb4f4..e84145b2 100644
--- a/src/lib/bitmap.c
+++ b/src/lib/bitmap.c
@@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset)
return NULL;
tmp = malloc(sizeof(*tmp));
- if (!tmp)
+ if (tmp == NULL)
return NULL;
- tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap)));
- if (!tmp->bitmap)
+ tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long));
+ if (tmp->bitmap == NULL) {
+ free(tmp);
return NULL;
+ }
tmp->size = bits;
tmp->offset = offset;
@@ -140,7 +142,8 @@ int bmp_destroy(struct bmp * b)
static ssize_t bad_id(struct bmp * b)
{
- assert(b);
+ if (b == NULL)
+ return -1;
return b->offset - 1;
}
@@ -149,8 +152,8 @@ ssize_t bmp_allocate(struct bmp * b)
{
ssize_t id;
- if (!b)
- return bad_id(b);
+ if (b == NULL)
+ return -1;
id = (ssize_t) find_next_zero_bit(b->bitmap,
b->size);
@@ -177,7 +180,7 @@ static bool is_id_valid(struct bmp * b,
bool bmp_is_id_valid(struct bmp * b,
ssize_t id)
{
- if (!b)
+ if (b == NULL)
return false;
return is_id_valid(b, id);
@@ -188,7 +191,7 @@ int bmp_release(struct bmp * b,
{
ssize_t rid;
- if (!b)
+ if (b == NULL)
return -1;
if (!is_id_valid(b, id))
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 6d8411c5..c99e8cdb 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -25,73 +25,190 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/instance_name.h>
+#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/utils.h>
#include <stdlib.h>
+#include <string.h>
-int ap_reg(char * ap_name,
- char ** difs,
- size_t difs_size)
+#define AP_MAX_FLOWS 256
+
+#ifndef DU_BUFF_HEADSPACE
+ #define DU_BUFF_HEADSPACE 128
+#endif
+
+#ifndef DU_BUFF_TAILSPACE
+ #define DU_BUFF_TAILSPACE 0
+#endif
+
+struct flow {
+ struct shm_ap_rbuff * rb;
+ uint32_t port_id;
+ uint32_t oflags;
+
+ /* don't think this needs locking */
+};
+
+struct ap_data {
+ instance_name_t * api;
+ struct shm_du_map * dum;
+ struct bmp * fds;
+
+ struct shm_ap_rbuff * rb;
+ struct flow flows[AP_MAX_FLOWS];
+} * _ap_instance;
+
+
+int ap_init(char * ap_name)
{
- irm_msg_t msg = IRM_MSG__INIT;
+ _ap_instance = malloc(sizeof(struct ap_data));
+ if (_ap_instance == NULL) {
+ return -1;
+ }
+
+ _ap_instance->api = instance_name_create();
+ if (_ap_instance->api == NULL) {
+ free(_ap_instance);
+ return -1;
+ }
+
+ if (instance_name_init_from(_ap_instance->api,
+ ap_name,
+ getpid()) == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (_ap_instance->fds == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->dum = shm_du_map_open();
+ if (_ap_instance->dum == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->rb = shm_ap_rbuff_create();
+ if (_ap_instance->rb == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ return 0;
+}
+
+void ap_fini()
+{
+ int i = 0;
+
+ if (_ap_instance == NULL)
+ return;
+ if (_ap_instance->api != NULL)
+ instance_name_destroy(_ap_instance->api);
+ if (_ap_instance->fds != NULL)
+ bmp_destroy(_ap_instance->fds);
+ if (_ap_instance->dum != NULL)
+ shm_du_map_close(_ap_instance->dum);
+ if (_ap_instance->rb != NULL)
+ shm_ap_rbuff_destroy(_ap_instance->rb);
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+
+ free(_ap_instance);
+}
+
+#if 0
+static int port_id_to_fd(uint32_t port_id)
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].port_id == port_id
+ && _ap_instance->flows[i].state != FLOW_NULL)
+ return i;
+ return -1;
+}
+#endif
+
+int ap_reg(char ** difs,
+ size_t len)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = bmp_allocate(_ap_instance->fds);
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
+ if (_ap_instance == NULL) {
+ LOG_DBG("ap_init was not called");
+ return -1;
+ }
+
msg.code = IRM_MSG_CODE__IRM_AP_REG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ if (recv_msg->result < 0)
+ fd = -1;
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
}
-int ap_unreg(char * ap_name,
- char ** difs,
- size_t difs_size)
+int ap_unreg(char ** difs,
+ size_t len)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
int ret = -1;
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
msg.code = IRM_MSG_CODE__IRM_AP_UNREG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -102,38 +219,62 @@ int ap_unreg(char * ap_name,
return ret;
}
-int flow_accept(int fd,
- char * ap_name,
- char * ae_name)
+int flow_accept(int fd,
+ char ** ap_name,
+ char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int cli_fd = 0;
-
- if (ap_name == NULL) {
- return -EINVAL;
- }
+ int cfd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- cli_fd = recv_msg->fd;
- ap_name = recv_msg->ap_name;
- ae_name = recv_msg->ae_name;
+
+ cfd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[cfd].rb == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ *ap_name = strdup(recv_msg->ap_name);
+ if (*ap_name == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ if (ae_name != NULL) {
+ *ae_name = strdup(recv_msg->ae_name);
+ if (*ae_name == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+ }
+
+ _ap_instance->flows[cfd].port_id = recv_msg->port_id;
+ _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
+
+
irm_msg__free_unpacked(recv_msg, NULL);
- return cli_fd;
+
+ bmp_release(_ap_instance->fds, fd);
+
+ return cfd;
}
int flow_alloc_resp(int fd,
@@ -145,9 +286,9 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
msg.has_response = true;
msg.response = response;
@@ -155,7 +296,7 @@ int flow_alloc_resp(int fd,
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -167,41 +308,49 @@ int flow_alloc_resp(int fd,
}
int flow_alloc(char * dst_name,
- char * src_ap_name,
char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+ struct qos_spec * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = -1;
- if (dst_name == NULL ||
- src_ap_name == NULL) {
+ if (dst_name == NULL)
return -EINVAL;
- }
if (src_ae_name == NULL)
src_ae_name = UNKNOWN_AE;
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = dst_name;
- msg.ap_name = src_ap_name;
+ msg.ap_name = _ap_instance->api->name;
+ msg.has_pid = true;
+ msg.pid = _ap_instance->api->id;
msg.ae_name = src_ae_name;
- msg.has_oflags = true;
- msg.oflags = oflags;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ fd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ bmp_release(_ap_instance->fds, fd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ _ap_instance->flows[fd].port_id = recv_msg->port_id;
+ _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+
irm_msg__free_unpacked(recv_msg, NULL);
+
return fd;
}
@@ -211,17 +360,15 @@ int flow_alloc_res(int fd)
irm_msg_t * recv_msg = NULL;
int result = 0;
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -238,17 +385,15 @@ int flow_dealloc(int fd)
irm_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -259,47 +404,50 @@ int flow_dealloc(int fd)
return ret;
}
-int flow_cntl(int fd, int oflags)
+int flow_cntl(int fd, int cmd, int oflags)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
- msg.oflags = oflags;
+ return -1;
+}
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ssize_t flow_write(int fd, void * buf, size_t count)
+{
+ /* the AP chooses the amount of headspace and tailspace */
+ size_t index = shm_create_du_buff(_ap_instance->dum,
+ count + DU_BUFF_HEADSPACE +
+ DU_BUFF_TAILSPACE,
+ DU_BUFF_HEADSPACE,
+ (uint8_t *) buf,
+ count);
+ struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
+ if (index == -1)
return -1;
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ shm_release_du_buff(_ap_instance->dum, index);
+ return -EPIPE;
}
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
+ return 0;
}
-ssize_t flow_write(int fd,
- void * buf,
- size_t count)
+ssize_t flow_read(int fd, void * buf, size_t count)
{
- LOG_MISSING;
+ struct rb_entry * e = NULL;
+ int n;
+ uint8_t * sdu;
+ /* FIXME: move this to a thread */
+ while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id)
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+
+ n = shm_du_map_read_sdu(&sdu,
+ _ap_instance->dum,
+ e->index);
+ if (n < 0)
+ return -1;
- return -1;
-}
+ memcpy(buf, sdu, MIN(n, count));
-ssize_t flow_read(int fd,
- void * buf,
- size_t count)
-{
- LOG_MISSING;
+ shm_release_du_buff(_ap_instance->dum, e->index);
- return -1;
+ return n;
}
diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c
index 387572b3..75676915 100644
--- a/src/lib/ipcp.c
+++ b/src/lib/ipcp.c
@@ -121,6 +121,8 @@ pid_t ipcp_create(char * ipcp_name,
return pid;
}
+ /* clear fd table */
+
if (ipcp_type == IPCP_NORMAL)
exec_name = IPCP_NORMAL_EXEC;
else if (ipcp_type == IPCP_SHIM_UDP)
@@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid,
return -EINVAL;
msg.code = IPCP_MSG_CODE__IPCP_ENROLL;
- msg.member_name = malloc(sizeof(*(msg.member_name)));
- if (msg.member_name == NULL) {
- LOG_ERR("Failed to malloc.");
- return -1;
- }
- msg.n_1_dif = n_1_dif;
msg.member_name = member_name;
+ msg.n_1_dif = n_1_dif;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL) {
@@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t pid,
if (name == NULL)
return -1;
- msg.code = IPCP_MSG_CODE__IPCP_NAME_REG;
- msg.name = name;
+ msg.code = IPCP_MSG_CODE__IPCP_NAME_REG;
+ msg.name = name;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
@@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t pid,
int ipcp_flow_alloc(pid_t pid,
uint32_t port_id,
+ pid_t n_pid,
char * dst_name,
char * src_ap_name,
char * src_ae_name,
@@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t pid,
return -EINVAL;
msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+ msg.has_pid = true;
+ msg.pid = n_pid;
msg.src_ap_name = src_ap_name;
msg.src_ae_name = src_ae_name;
msg.dst_name = dst_name;
- msg.port_id = port_id;
- msg.has_port_id = true;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
ipcp_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t pid,
int ipcp_flow_alloc_resp(pid_t pid,
uint32_t port_id,
- int result)
+ pid_t n_pid,
+ int response)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
- msg.has_port_id = true;
- msg.port_id = port_id;
- msg.has_result = true;
- msg.result = result;
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
+ msg.has_port_id = true;
+ msg.port_id = port_id;
+ msg.has_pid = true;
+ msg.pid = n_pid;
+ msg.has_response = true;
+ msg.response = response;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
@@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t pid,
return ret;
}
-int ipcp_flow_req_arr(pid_t pid,
- char * dst_name,
- char * src_ap_name,
- char * src_ae_name)
+int ipcp_flow_req_arr(pid_t pid,
+ char * dst_name,
+ char * src_ap_name,
+ char * src_ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = -1;
+ int port_id = -1;
if (src_ap_name == NULL || src_ae_name == NULL)
return -EINVAL;
msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_pid = true;
+ msg.pid = pid;
msg.dst_name = dst_name;
msg.ap_name = src_ap_name;
msg.ae_name = src_ae_name;
- msg.pid = pid;
- msg.has_pid = true;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ port_id = recv_msg->port_id;
irm_msg__free_unpacked(recv_msg, NULL);
- return fd;
+ return port_id;
}
int ipcp_flow_alloc_reply(pid_t pid,
@@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t pid,
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
- return -1;
+ return 0;
if (recv_msg->has_result == false) {
ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ return 0;
}
ret = recv_msg->result;
@@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t pid,
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
- return -1;
+ return 0;
if (recv_msg->has_result == false) {
irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ return 0;
}
ret = recv_msg->result;
diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto
index da4bb469..daca011d 100644
--- a/src/lib/ipcpd_messages.proto
+++ b/src/lib/ipcpd_messages.proto
@@ -25,6 +25,8 @@ message ipcp_msg {
optional string src_ap_name = 9;
optional string src_ae_name = 10;
optional dif_config_msg conf = 11;
- optional int32 result = 12;
- optional int32 fd = 13;
+ optional int32 fd = 12;
+ optional int32 pid = 13;
+ optional int32 response = 14;
+ optional int32 result = 15;
};
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index 89e2c882..c336614e 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -36,13 +36,10 @@ enum irm_msg_code {
IRM_FLOW_ALLOC = 11;
IRM_FLOW_ALLOC_RES = 12;
IRM_FLOW_DEALLOC = 13;
- IRM_FLOW_CONTROL = 14;
- IRM_FLOW_WRITE = 15;
- IRM_FLOW_READ = 16;
- IPCP_FLOW_REQ_ARR = 17;
- IPCP_FLOW_ALLOC_REPLY = 18;
- IPCP_FLOW_DEALLOC = 19;
- IRM_REPLY = 20;
+ IPCP_FLOW_REQ_ARR = 14;
+ IPCP_FLOW_ALLOC_REPLY = 15;
+ IPCP_FLOW_DEALLOC = 16;
+ IRM_REPLY = 17;
};
message irm_msg {
@@ -52,12 +49,11 @@ message irm_msg {
optional uint32 api_id = 3;
optional uint32 ipcp_type = 5;
repeated string dif_name = 6;
- optional int32 fd = 7;
- optional int32 response = 8;
- optional int32 oflags = 9;
- optional string dst_name = 10;
- optional uint32 port_id = 11;
- optional int32 pid = 12;
- optional dif_config_msg conf = 13;
- optional int32 result = 14;
+ optional int32 response = 7;
+ optional string dst_name = 8;
+ optional uint32 port_id = 9;
+ optional int32 pid = 10;
+ optional dif_config_msg conf = 11;
+ optional int32 cfd = 12;
+ optional int32 result = 13;
};
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
new file mode 100644
index 00000000..0a41dfb3
--- /dev/null
+++ b/src/lib/shm_ap_rbuff.c
@@ -0,0 +1,268 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Ring buffer for application processes
+ *
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/shm_ap_rbuff.h>
+#define OUROBOROS_PREFIX "shm_ap_rbuff"
+
+#include <ouroboros/logs.h>
+
+#include <pthread.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <errno.h>
+
+#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \
+ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t))
+
+#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\
+ & (SHM_RBUFF_SIZE - 1))
+#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE)
+
+struct shm_ap_rbuff {
+ struct rb_entry * shm_base; /* start of entry */
+ size_t * ptr_head; /* start of ringbuffer head */
+ size_t * ptr_tail; /* start of ringbuffer tail */
+ pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ pid_t pid; /* pid to which this rb belongs */
+ int fd;
+};
+
+struct shm_ap_rbuff * shm_ap_rbuff_create()
+{
+ struct shm_ap_rbuff * rb;
+ int shm_fd;
+ struct rb_entry * shm_base;
+ pthread_mutexattr_t attr;
+ char fn[25];
+
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBGF("Failed creating ring buffer.");
+ free(rb);
+ return NULL;
+ }
+
+ if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) {
+ LOG_DBGF("Failed to extend ringbuffer.");
+ free(rb);
+ return NULL;
+ }
+
+ if (write(shm_fd, "", 1) != 1) {
+ LOG_DBGF("Failed to finalise extension of ringbuffer.");
+ free(rb);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_RBUFF_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBGF("Failed to map shared memory.");
+ if (close(shm_fd) == -1)
+ LOG_DBGF("Failed to close invalid shm.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBGF("Failed to remove invalid shm.");
+
+ free(rb);
+ return NULL;
+ }
+
+ rb->shm_base = shm_base;
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_tail = (size_t *)
+ ((uint8_t *) rb->ptr_head + sizeof(size_t));
+ rb->shm_mutex = (pthread_mutex_t *)
+ ((uint8_t *) rb->ptr_tail + sizeof(size_t));
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_mutex_init(rb->shm_mutex, &attr);
+
+ *rb->ptr_head = 0;
+ *rb->ptr_tail = 0;
+
+ rb->fd = shm_fd;
+ rb->pid = getpid();
+
+ return rb;
+}
+
+struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid)
+{
+ struct shm_ap_rbuff * rb;
+ int shm_fd;
+ struct rb_entry * shm_base;
+ char fn[25];
+
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", pid);
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
+
+ shm_fd = shm_open(fn, O_RDWR, 0666);
+ if (shm_fd == -1) {
+ LOG_DBGF("Failed opening shared memory %s.", fn);
+ return NULL;
+ }
+
+ shm_base = mmap(NULL,
+ SHM_RBUFF_FILE_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_SHARED,
+ shm_fd,
+ 0);
+
+ if (shm_base == MAP_FAILED) {
+ LOG_DBGF("Failed to map shared memory.");
+ if (close(shm_fd) == -1)
+ LOG_DBGF("Failed to close invalid shm.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBGF("Failed to remove invalid shm.");
+
+ free(rb);
+ return NULL;
+ }
+
+ rb->shm_base = shm_base;
+ rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+ rb->ptr_tail = (size_t *)
+ ((uint8_t *) rb->ptr_head + sizeof(size_t));
+ rb->shm_mutex = (pthread_mutex_t *)
+ ((uint8_t *) rb->ptr_tail + sizeof(size_t));
+
+ rb->fd = shm_fd;
+ rb->pid = pid;
+
+ return rb;
+}
+void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
+{
+ char fn[25];
+
+ if (rb == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid);
+
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
+ free(rb);
+}
+
+void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
+{
+ char fn[25];
+
+
+ if (rb == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ if (rb->pid != getpid()) {
+ LOG_ERR("Tried to destroy other AP's rbuff.");
+ return;
+ }
+
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid);
+
+ if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
+ if (shm_unlink(fn) == -1)
+ LOG_DBGF("Failed to unlink shm.");
+
+ free(rb);
+}
+
+int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
+{
+ struct rb_entry * pos;
+
+ if (rb == NULL || e == NULL)
+ return -1;
+
+ pthread_mutex_lock(rb->shm_mutex);
+
+ if (!shm_rbuff_free(rb)) {
+ pthread_mutex_unlock(rb->shm_mutex);
+ return -1;
+ }
+
+ pos = rb->shm_base + *rb->ptr_head;
+ *pos = *e;
+ *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
+
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return 0;
+}
+struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
+{
+ struct rb_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ if (rb == NULL)
+ return NULL;
+
+ pthread_mutex_lock(rb->shm_mutex);
+
+ if (shm_rbuff_used(rb) == 0) {
+ pthread_mutex_unlock(rb->shm_mutex);
+ return NULL;
+ }
+
+ *e = *(rb->shm_base + *rb->ptr_tail);
+
+ *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
+
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return e;
+}
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index dfccca6a..56062c9d 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -45,6 +45,9 @@
((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \
SHM_DU_BUFF_BLOCK_SIZE)))
+#define idx_to_du_buff_ptr(dum, idx) \
+ ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE)))
+
#define block_ptr_to_idx(dum, sdb) \
(((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE)
@@ -52,27 +55,31 @@
& (SHM_BLOCKS_IN_MAP - 1))
#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \
+ idx_to_du_buff_ptr(dum, idx)->du_head)
+
#define MIN(a,b)(a < b ? a : b)
struct shm_du_buff {
- size_t size;
- size_t du_head;
- size_t du_tail;
+ size_t size;
+ size_t du_head;
+ size_t du_tail;
+ size_t garbage;
};
struct shm_du_map {
- uint8_t * shm_base; /* start of blocks */
- size_t * ptr_head; /* start of ringbuffer head */
- size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * shm_mutex; /* lock all free space in shm */
- int fd;
+ uint8_t * shm_base; /* start of blocks */
+ size_t * ptr_head; /* start of ringbuffer head */
+ size_t * ptr_tail; /* start of ringbuffer tail */
+ pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ int fd;
};
struct shm_du_map * shm_du_map_create()
{
struct shm_du_map * dum;
int shm_fd;
- uint8_t * shm_base;
+ uint8_t * shm_base;
pthread_mutexattr_t attr;
dum = malloc(sizeof *dum);
@@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open()
{
struct shm_du_map * dum;
int shm_fd;
- uint8_t * shm_base;
+ uint8_t * shm_base;
+
+ dum = malloc(sizeof *dum);
+ if (dum == NULL) {
+ LOG_DBGF("Could not allocate struct.");
+ return NULL;
+ }
shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666);
if (shm_fd == -1) {
@@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open()
return NULL;
}
- dum = malloc(sizeof *dum);
- if (dum == NULL) {
- LOG_DBGF("Could not allocate struct.");
- return NULL;
- }
-
dum->shm_base = shm_base;
dum->ptr_head = (size_t *)
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
@@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum)
if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
LOG_DBGF("Couldn't unmap shared memory.");
+ free(dum);
+}
+
+void shm_du_map_destroy(struct shm_du_map * dum)
+{
+ if (dum == NULL) {
+ LOG_DBGF("Bogus input. Bugging out.");
+ return;
+ }
+
+ if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1)
+ LOG_DBGF("Couldn't unmap shared memory.");
+
if (shm_unlink(SHM_DU_MAP_FILENAME) == -1)
LOG_DBGF("Failed to unlink shm.");
free(dum);
}
-struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
- size_t size,
- size_t headspace,
- uint8_t * data,
- size_t len)
+int shm_create_du_buff(struct shm_du_map * dum,
+ size_t size,
+ size_t headspace,
+ uint8_t * data,
+ size_t len)
{
struct shm_du_buff * sdb;
long blocks = 0;
int sz = size + sizeof *sdb;
int sz2 = headspace + len + sizeof *sdb;
- uint8_t * write_pos;
+ uint8_t * write_pos;
size_t copy_len;
+ size_t index;
if (dum == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
- return NULL;
+ return -1;
}
if (headspace >= size) {
LOG_DBGF("Index out of bounds.");
- return NULL;
+ return -1;
}
if (headspace + len > size) {
LOG_DBGF("Buffer too small for data.");
- return NULL;
+ return -1;
}
pthread_mutex_lock(dum->shm_mutex);
@@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
if (sz2 < 0 && sz > 0) {
pthread_mutex_unlock(dum->shm_mutex);
LOG_DBG("Can't handle this packet now");
- return NULL;
+ return -1;
}
++blocks;
}
if (!shm_map_free(dum, blocks)) {
pthread_mutex_unlock(dum->shm_mutex);
- LOG_DBGF("Allocation failed, Out of Memory.");
- return NULL;
+ return -1;
}
sdb = get_head_ptr(dum);
sdb->size = size;
+ sdb->garbage = 0;
sdb->du_head = headspace;
sdb->du_tail = sdb->du_head + len;
@@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum,
--blocks;
}
+ index = *dum->ptr_head - 1;
+
pthread_mutex_unlock(dum->shm_mutex);
- return sdb;
+ return index;
}
-int shm_release_du_buff(struct shm_du_map * dum)
+/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */
+int shm_du_map_read_sdu(uint8_t ** dst,
+ struct shm_du_map * dum,
+ size_t idx)
+{
+ size_t len = 0;
+
+ if (idx > SHM_BLOCKS_IN_MAP)
+ return -1;
+
+ pthread_mutex_lock(dum->shm_mutex);
+
+ if (*dum->ptr_head == *dum->ptr_tail) {
+ pthread_mutex_unlock(dum->shm_mutex);
+ return -1;
+ }
+
+ *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) +
+ sizeof(struct shm_du_buff) +
+ idx_to_du_buff_ptr(dum, idx)->du_head;
+ len = sdu_size(dum, idx);
+
+ pthread_mutex_unlock(dum->shm_mutex);
+
+ return len;
+}
+
+int shm_release_du_buff(struct shm_du_map * dum, size_t idx)
{
long sz;
long blocks = 0;
+
+ /* FIXME: this is crap for the test */
+ if (idx > SHM_BLOCKS_IN_MAP)
+ idx = *dum->ptr_tail;
+
pthread_mutex_lock(dum->shm_mutex);
if (*dum->ptr_head == *dum->ptr_tail) {
- LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do.");
pthread_mutex_unlock(dum->shm_mutex);
return -1;
}
- sz = get_tail_ptr(dum)->size;
+ idx_to_du_buff_ptr(dum, idx)->garbage = 1;
- while (sz + (long) sizeof (struct shm_du_buff) > 0) {
- sz -= SHM_DU_BUFF_BLOCK_SIZE;
- ++blocks;
+ if (idx != *dum->ptr_tail) {
+ pthread_mutex_unlock(dum->shm_mutex);
+ return 0;
+ }
+
+ while (get_tail_ptr(dum)->garbage == 1) {
+ sz = get_tail_ptr(dum)->size;
+
+ while (sz + (long) sizeof (struct shm_du_buff) > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ *(dum->ptr_tail) =
+ (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
}
- *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1);
pthread_mutex_unlock(dum->shm_mutex);
return 0;
@@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
}
uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
if (sdb == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb,
}
int shm_du_buff_head_release(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
if (sdb == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb,
}
int shm_du_buff_tail_release(struct shm_du_buff * sdb,
- size_t size)
+ size_t size)
{
if (sdb == NULL) {
LOG_DBGF("Bogus input, bugging out.");
diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c
index 85a82e4d..55938a62 100644
--- a/src/lib/tests/shm_du_map_test.c
+++ b/src/lib/tests/shm_du_map_test.c
@@ -32,7 +32,7 @@
#include <ouroboros/logs.h>
-#define SIZE_OF_DU_BUFF 24
+#define SIZE_OF_DU_BUFF 32
#define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF)
#define MAX(a,b) (a > b ? a : b)
@@ -44,7 +44,7 @@ void * produce()
{
struct shm_du_map * dum;
long test_buf_size = 0;
- uint8_t * test_values;
+ uint8_t * test_values;
int headspace;
int tailspace;
long i;
@@ -66,9 +66,8 @@ void * produce()
test_values[i] = 170;
clock_gettime(CLOCK_MONOTONIC, &starttime);
- for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) {
- struct shm_du_buff * sdb;
- size_t len;
+ for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) {
+ size_t len;
test_buf_size = TEST_BUFF_SIZE;
@@ -77,21 +76,19 @@ void * produce()
len = test_buf_size - (headspace + tailspace);
- sdb = shm_create_du_buff(dum,
- test_buf_size,
- headspace,
- test_values,
- len);
-
- if (sdb != NULL) {
- bytes_written += len;
- }
- else {
- sync = -2;
- break;
+ if (shm_create_du_buff(dum,
+ test_buf_size,
+ headspace,
+ test_values,
+ len) < 0) {
+ continue;
}
+
+ bytes_written += len;
}
+ sync = -2;
+
clock_gettime(CLOCK_MONOTONIC, &stoptime);
elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) -
(starttime.tv_sec + starttime.tv_nsec / 1000000000.0);
@@ -104,13 +101,14 @@ void * produce()
sync = -1;
+ shm_du_map_close(dum);
+
return 0;
}
void * consume()
{
struct shm_du_map * dum;
-
struct timespec ts;
ts.tv_sec = 0;
@@ -123,10 +121,15 @@ void * consume()
return (void *)-1;
}
- while (!sync) {
- while (!shm_release_du_buff(dum));
- nanosleep(&ts, NULL);
+ while (true) {
+ shm_release_du_buff(dum, 1823429173941);
+ if (sync)
+ break;
}
+ nanosleep(&ts, NULL);
+
+
+ shm_du_map_close(dum);
return 0;
}
@@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv)
return -1;
}
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
LOG_INFO("done.");
@@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv)
pthread_create(&consumer, NULL, consume, NULL);
pthread_join(consumer, NULL);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
LOG_INFO("done.");
@@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv)
LOG_INFO("starting concurrency test.");
+ sync = 0;
+
dum = shm_du_map_create();
res1 = (int) pthread_create(&producer, NULL, produce, NULL);
@@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv)
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
LOG_INFO("done.");
diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c
index 8d3fc322..9cf56cee 100644
--- a/src/tools/echo/echo_client.c
+++ b/src/tools/echo/echo_client.c
@@ -23,19 +23,25 @@
#define CLIENT_AP_NAME "echo-client"
#include <ouroboros/dev.h>
+#include <stdlib.h>
int client_main()
{
int fd = 0;
int result = 0;
- uint8_t buf[BUF_SIZE];
+ char buf[BUF_SIZE];
char * message = "Client says hi!";
ssize_t count = 0;
- fd = flow_alloc(SERVER_AP_NAME, CLIENT_AP_NAME,
- NULL, NULL, 0);
+ if(ap_init(CLIENT_AP_NAME)) {
+ printf("Failed to init AP.");
+ return -1;
+ }
+
+ fd = flow_alloc(SERVER_AP_NAME, NULL, NULL);
if (fd < 0) {
printf("Failed to allocate flow\n");
+ ap_fini();
return -1;
}
@@ -43,12 +49,14 @@ int client_main()
if (result < 0) {
printf("Flow allocation refused\n");
flow_dealloc(fd);
+ ap_fini();
return -1;
}
if (flow_write(fd, message, strlen(message) + 1) == -1) {
printf("Failed to write SDU\n");
flow_dealloc(fd);
+ ap_fini();
return -1;
}
@@ -56,6 +64,7 @@ int client_main()
if (count < 0) {
printf("Failed to read SDU\n");
flow_dealloc(fd);
+ ap_fini();
return -1;
}
@@ -63,5 +72,7 @@ int client_main()
flow_dealloc(fd);
+ ap_fini();
+
return 0;
}
diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c
index e457e22b..d9af1c1f 100644
--- a/src/tools/echo/echo_server.c
+++ b/src/tools/echo/echo_server.c
@@ -33,65 +33,72 @@ void shutdown_server(int signo)
{
char * dif = DIF_NAME;
- if (ap_unreg(SERVER_AP_NAME, &dif, 1)) {
- printf("Failed to unregister application\n");
+ if (ap_unreg(&dif, 1)) {
+ printf("Failed to unregister application.\n");
+ ap_fini();
exit(EXIT_FAILURE);
}
+ ap_fini();
exit(EXIT_SUCCESS);
}
int server_main()
{
- int server_fd = 0;
- int client_fd = 0;
+ int server_fd = 0;
+ int client_fd = 0;
char * dif = DIF_NAME;
char * client_name = NULL;
- uint8_t buf[BUF_SIZE];
+ char buf[BUF_SIZE];
ssize_t count = 0;
- printf("Starting the server\n");
+ printf("Starting the server.\n");
/* Manual cleanup is required for now */
if (signal(SIGINT, shutdown_server) == SIG_ERR) {
- printf("Can't install signal handler\n");
+ printf("Can't install signal handler.\n");
return -1;
}
- server_fd = ap_reg(SERVER_AP_NAME, &dif, 1);
- if (server_fd < 0) {
- printf("Failed to register application\n");
+ if(ap_init(SERVER_AP_NAME)) {
+ printf("Failed to init AP.");
return -1;
}
- printf("Echo server started...\n");
+ server_fd = ap_reg(&dif, 1);
+ if (server_fd < 0) {
+ printf("Failed to register application.\n");
+ ap_fini();
+ return -1;
+ }
while (true) {
client_fd = flow_accept(server_fd,
- client_name, NULL);
+ &client_name, NULL);
if (client_fd < 0) {
- continue;
+ printf("Failed to accept flow.\n");
+ break;
}
- printf("New flow from %s\n", client_name);
+ printf("New flow from %s.\n", client_name);
if (flow_alloc_resp(client_fd, 0)) {
- printf("Failed to give an allocate response\n");
+ printf("Failed to give an allocate response.\n");
flow_dealloc(client_fd);
continue;
}
- count = flow_read(client_fd, buf, BUF_SIZE);
+ count = flow_read(client_fd, (void **) &buf, BUF_SIZE);
if (count < 0) {
- printf("Failed to read SDU\n");
+ printf("Failed to read SDU.\n");
flow_dealloc(client_fd);
continue;
}
- printf("Message from client is %.*s\n", (int) count, buf);
+ printf("Message from client is %.*s.\n", (int) count, buf);
if (flow_write(client_fd, buf, count) == -1) {
- printf("Failed to write SDU\n");
+ printf("Failed to write SDU.\n");
flow_dealloc(client_fd);
continue;
}
@@ -99,5 +106,7 @@ int server_main()
flow_dealloc(client_fd);
}
+ ap_fini();
+
return 0;
}