summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/CMakeLists.txt18
-rw-r--r--include/ouroboros/cdap.h184
-rw-r--r--include/ouroboros/common.h16
-rw-r--r--include/ouroboros/dev.h1
-rw-r--r--include/ouroboros/ipcp.h2
-rw-r--r--include/ouroboros/shared.h38
-rw-r--r--include/ouroboros/shm_du_map.h4
-rw-r--r--src/ipcpd/ipcp-data.h2
-rw-r--r--src/ipcpd/ipcp.c8
-rw-r--r--src/ipcpd/ipcp.h2
-rw-r--r--src/ipcpd/local/main.c2
-rw-r--r--src/ipcpd/shim-udp/main.c6
-rw-r--r--src/irmd/main.c10
-rw-r--r--src/lib/CMakeLists.txt7
-rw-r--r--src/lib/cdap.c438
-rw-r--r--src/lib/cdap.proto18
-rw-r--r--src/lib/ipcp.c8
-rw-r--r--src/lib/irm.c1
-rw-r--r--src/lib/shm_du_map.c1
-rw-r--r--src/lib/sockets.c7
20 files changed, 596 insertions, 177 deletions
diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt
index 68c88a18..ee339294 100644
--- a/include/ouroboros/CMakeLists.txt
+++ b/include/ouroboros/CMakeLists.txt
@@ -3,14 +3,14 @@ configure_file(
"${CMAKE_CURRENT_BINARY_DIR}/config.h")
set(HEADER_FILES
- cdap.h
- dev.h
- errno.h
- flow.h
- irm.h
- irm_config.h
- nsm.h
- qos.h
-)
+ cdap.h
+ common.h
+ dev.h
+ errno.h
+ flow.h
+ irm.h
+ irm_config.h
+ nsm.h
+ qos.h)
install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros)
diff --git a/include/ouroboros/cdap.h b/include/ouroboros/cdap.h
index 72788ad6..e26f192b 100644
--- a/include/ouroboros/cdap.h
+++ b/include/ouroboros/cdap.h
@@ -3,7 +3,8 @@
*
* The Common Distributed Application Protocol
*
- * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ * Dimitri Staessens <dimitri.staessens@intec.ugent.be>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -23,148 +24,75 @@
#ifndef OUROBOROS_CDAP_H
#define OUROBOROS_CDAP_H
+#include <ouroboros/common.h>
+
#include <stdbool.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#define F_SYNC 0x0001
struct cdap;
+/* Callback functions that work on the application's RIB */
struct cdap_ops {
- /* Sender related callbacks */
- int (* handle_connect_r)(int fd,
- int invoke_id,
- int result);
- int (* handle_release_r)(int fd,
- int invoke_id,
- int result);
- int (* handle_read_r)(int fd,
- int invoke_id,
- int result,
- char * reason,
- char * obj_val,
- bool complete);
- int (* handle_cancelread_r)(int fd,
- int invoke_id,
- int result);
- int (* handle_write_r)(int fd,
- int invoke_id,
- int result,
- char * reason,
- char * obj_val);
- int (* handle_create_r)(int fd,
- int invoke_id,
- int result);
- int (* handle_delete_r)(int fd,
- int invoke_id,
- int result);
- int (* handle_start_r)(int fd,
- int invoke_id,
- int result);
- int (* handle_stop_r)(int fd,
- int invoke_id,
- int result);
+ int (* cdap_reply)(struct cdap * instance,
+ int invoke_id,
+ int result,
+ buffer_t * val,
+ size_t len);
+
+ int (* cdap_read)(struct cdap * instance,
+ char * name);
+ int (* cdap_write)(struct cdap * instance,
+ char * name,
+ buffer_t * val,
+ size_t len,
+ uint32_t flags);
- /* Receiver related callbacks */
- int (* handle_connect)(int fd,
- int invoke_id,
- rina_name_t src,
- rina_name_t dst,
- char * auth_mech,
- char * auth_val);
- int (* handle_release)(int fd,
- int invoke_id);
- int (* handle_cancelread)(int fd,
- int invoke_id);
- int (* handle_write)(int fd,
- int invoke_id,
- char * obj_name,
- char * obj_val);
- int (* handle_create)(int fd,
- int invoke_id,
- char * obj_class,
- char * obj_name,
- char * obj_val);
- int (* handle_delete)(int fd,
- int invoke_id,
- char * obj_name);
- int (* handle_start)(int fd,
- int invoke_id,
- char * obj_name,
- char * obj_val);
- int (* handle_stop)(int fd,
- int invoke_id,
- char * obj_name,
- char * obj_val);
+ int (* cdap_create)(struct cdap * instance,
+ char * name,
+ buffer_t val);
+ int (* cdap_delete)(struct cdap * instance,
+ char * name,
+ buffer_t val);
+
+ int (* cdap_start)(struct cdap * instance,
+ char * name);
+ int (* cdap_stop)(struct cdap * instance,
+ char * name);
};
-struct cdap * cdap_create(struct cdap_ops ops,
- int fd);
+/* Assumes flow is blocking */
+struct cdap * cdap_create(struct cdap_ops * ops,
+ int fd);
int cdap_destroy(struct cdap * instance);
-/* Sender related functions */
-int cdap_send_connect(struct cdap * instance,
- int invoke_id,
- rina_name_t src,
- rina_name_t dst,
- char * auth_mech,
- char * auth_val);
-int cdap_send_release(struct cdap * instance,
- int invoke_id);
+/* Returns a positive invoke-id on success to be used in the callback */
int cdap_send_read(struct cdap * instance,
- int invoke_id,
- char * obj_name);
-int cdap_send_cancelread(struct cdap * instance,
- int invoke_id,
- char * obj_name);
+ char * name);
int cdap_send_write(struct cdap * instance,
- int invoke_id,
- char * obj_name,
- char * obj_val);
+ char * name,
+ buffer_t * val,
+ size_t len,
+ uint32_t flags);
+
int cdap_send_create(struct cdap * instance,
- int invoke_id,
- char * obj_name,
- char * obj_val);
+ char * name,
+ buffer_t val);
int cdap_send_delete(struct cdap * instance,
- int invoke_id,
- char * obj_name);
+ char * name,
+ buffer_t val);
+
int cdap_send_start(struct cdap * instance,
- int invoke_id,
- char * obj_name,
- char * obj_val);
+ char * name);
int cdap_send_stop(struct cdap * instance,
- int invoke_id,
- char * obj_name,
- char * obj_val);
+ char * name);
-/* Receiver related functions */
-int cdap_send_connect_r(struct cdap * instance,
- int invoke_id,
- int result);
-int cdap_send_release_r(struct cdap * instance,
- int invoke_id,
- int result);
-int cdap_send_read_r(struct cdap * instance,
- int invoke_id,
- int result,
- char * reason,
- char * obj_val,
- bool complete);
-int cdap_send_cancelread_r(struct cdap * instance,
- int invoke_id,
- int result);
-int cdap_send_write_r(struct cdap * instance,
- int invoke_id,
- int result,
- char * obj_name,
- char * obj_val);
-int cdap_send_create_r(struct cdap * instance,
- int invoke_id,
- int result);
-int cdap_send_delete_r(struct cdap * instance,
- int invoke_id,
- int result);
-int cdap_send_start_r(struct cdap * instance,
- int invoke_id,
- int result);
-int cdap_send_stop_r(struct cdap * instance,
- int invoke_id,
- int result);
+/* Can only be called following a callback function */
+int cdap_send_reply(struct cdap * instance,
+ int invoke_id,
+ int result,
+ buffer_t * val,
+ size_t len);
#endif
diff --git a/include/ouroboros/common.h b/include/ouroboros/common.h
index 039e1a83..dbd050f1 100644
--- a/include/ouroboros/common.h
+++ b/include/ouroboros/common.h
@@ -25,24 +25,10 @@
#include <stdint.h>
#include <unistd.h>
-#include <stdbool.h>
-#include <errno.h>
typedef struct {
uint8_t * data;
- size_t size;
+ size_t len;
} buffer_t;
-/* FIXME: To be decided which QoS cubes we support */
-enum qos_cube {
- QOS_CUBE_BE = 0,
- QOS_CUBE_VIDEO
-};
-
-enum flow_state {
- FLOW_NULL = 0,
- FLOW_PENDING,
- FLOW_ALLOCATED
-};
-
#endif /* OUROBOROS_COMMON_H */
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index 897bc124..699973a3 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -48,7 +48,6 @@ int flow_alloc(char * dst_name,
int flow_alloc_res(int fd);
int flow_dealloc(int fd);
-/* Wraps around fnctl */
int flow_cntl(int fd, int cmd, int oflags);
ssize_t flow_write(int fd, void * buf, size_t count);
ssize_t flow_read(int fd, void * buf, size_t count);
diff --git a/include/ouroboros/ipcp.h b/include/ouroboros/ipcp.h
index f5657b64..c1aa452b 100644
--- a/include/ouroboros/ipcp.h
+++ b/include/ouroboros/ipcp.h
@@ -22,7 +22,7 @@
#include <ouroboros/irm_config.h>
#include <ouroboros/sockets.h>
-#include <ouroboros/common.h>
+#include <ouroboros/shared.h>
#include <sys/types.h>
diff --git a/include/ouroboros/shared.h b/include/ouroboros/shared.h
new file mode 100644
index 00000000..f5e34dc8
--- /dev/null
+++ b/include/ouroboros/shared.h
@@ -0,0 +1,38 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * Shared definitions between IRMd and IPCPs
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_SHARED_H
+#define OUROBOROS_SHARED_H
+
+/* FIXME: To be decided which QoS cubes we support */
+enum qos_cube {
+ QOS_CUBE_BE = 0,
+ QOS_CUBE_VIDEO
+};
+
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_PENDING,
+ FLOW_ALLOCATED
+};
+
+#endif /* OUROBOROS_SHARED_H */
diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h
index e8934bae..9d6d7aaf 100644
--- a/include/ouroboros/shm_du_map.h
+++ b/include/ouroboros/shm_du_map.h
@@ -24,7 +24,9 @@
#ifndef OUROBOROS_SHM_DU_MAP_H
#define OUROBOROS_SHM_DU_MAP_H
-#include "common.h"
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
#include <sys/types.h>
#include <pthread.h>
diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h
index ce20730b..1e183dca 100644
--- a/src/ipcpd/ipcp-data.h
+++ b/src/ipcpd/ipcp-data.h
@@ -24,7 +24,7 @@
#ifndef IPCPD_IPCP_DATA_H
#define IPCPD_IPCP_DATA_H
-#include <ouroboros/common.h>
+#include <ouroboros/shared.h>
#include <ouroboros/list.h>
#include <sys/types.h>
#include <pthread.h>
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 4acbffa2..579203c2 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -236,14 +236,14 @@ void * ipcp_main_loop(void * o)
ipcp_msg__free_unpacked(msg, NULL);
- buffer.size = ipcp_msg__get_packed_size(&ret_msg);
- if (buffer.size == 0) {
+ buffer.len = ipcp_msg__get_packed_size(&ret_msg);
+ if (buffer.len == 0) {
LOG_ERR("Failed to send reply message");
close(lsockfd);
continue;
}
- buffer.data = malloc(buffer.size);
+ buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
close(lsockfd);
continue;
@@ -251,7 +251,7 @@ void * ipcp_main_loop(void * o)
ipcp_msg__pack(&ret_msg, buffer.data);
- if (write(lsockfd, buffer.data, buffer.size) == -1) {
+ if (write(lsockfd, buffer.data, buffer.len) == -1) {
free(buffer.data);
close(lsockfd);
continue;
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 1e9e9763..27c3cf8e 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -24,6 +24,8 @@
#define IPCPD_IPCP_H
#include <ouroboros/config.h>
+#include <ouroboros/shared.h>
+
#include <pthread.h>
#include "ipcp-ops.h"
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 837cbf8c..4802a161 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -31,7 +31,7 @@
#include <ouroboros/irm_config.h>
#include <ouroboros/sockets.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/common.h>
+#include <ouroboros/shared.h>
#include <ouroboros/dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 68d393af..c22947fa 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -340,8 +340,8 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,
r_saddr.sin_addr.s_addr = dst_ip_addr;
r_saddr.sin_port = LISTEN_PORT;
- buf.size = shim_udp_msg__get_packed_size(msg);
- if (buf.size == 0) {
+ buf.len = shim_udp_msg__get_packed_size(msg);
+ if (buf.len == 0) {
return -1;
}
@@ -354,7 +354,7 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,
if (sendto(shim_data(_ipcp)->s_fd,
buf.data,
- buf.size,
+ buf.len,
0,
(struct sockaddr *) &r_saddr,
sizeof(r_saddr)) == -1) {
diff --git a/src/irmd/main.c b/src/irmd/main.c
index ab637789..38e10cc5 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1936,7 +1936,7 @@ void * irm_flow_cleaner()
pthread_rwlock_rdlock(&instance->state_lock);
- if (&instance->state == IRMD_NULL) {
+ if (instance->state == IRMD_NULL) {
pthread_rwlock_unlock(&instance->state_lock);
return (void *) 0;
}
@@ -2185,8 +2185,8 @@ void * mainloop()
pthread_cleanup_pop(true);
- buffer.size = irm_msg__get_packed_size(&ret_msg);
- if (buffer.size == 0) {
+ buffer.len = irm_msg__get_packed_size(&ret_msg);
+ if (buffer.len == 0) {
LOG_ERR("Failed to send reply message.");
if (apis != NULL)
free(apis);
@@ -2194,7 +2194,7 @@ void * mainloop()
continue;
}
- buffer.data = malloc(buffer.size);
+ buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
if (apis != NULL)
free(apis);
@@ -2204,7 +2204,7 @@ void * mainloop()
irm_msg__pack(&ret_msg, buffer.data);
- if (write(cli_sockfd, buffer.data, buffer.size) == -1) {
+ if (write(cli_sockfd, buffer.data, buffer.len) == -1) {
free(buffer.data);
if (apis != NULL)
free(apis);
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index ae46f5bc..57f44f15 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -9,7 +9,9 @@ include_directories(${PROTOBUF_INCLUDE_DIRS})
protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto)
protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto)
protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS
- dif_config.proto)
+ dif_config.proto)
+protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS
+ cdap.proto)
find_library(LIBRT_LIBRARIES rt)
if(NOT LIBRT_LIBRARIES)
@@ -38,7 +40,8 @@ set(SOURCE_FILES
)
add_library(ouroboros SHARED ${SOURCE_FILES}
- ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS})
+ ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS}
+ ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS})
target_link_libraries(ouroboros ${LIBRT_LIBRARIES}
${LIBPTHREAD_LIBRARIES} ${PROTOBUF_C_LIBRARY})
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index e69de29b..4275bfc7 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -0,0 +1,438 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The Common Distributed Application Protocol
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/cdap.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/common.h>
+#include <ouroboros/dev.h>
+
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "cdap.pb-c.h"
+typedef Cdap cdap_t;
+typedef Opcode opcode_t;
+
+#define IDS_SIZE 256
+#define BUF_SIZE 2048
+
+struct cdap {
+ int fd;
+ struct bmp * ids;
+ pthread_mutex_t ids_lock;
+ pthread_t reader;
+ struct cdap_ops * ops;
+};
+
+
+static ssize_t cdap_msg_to_buffer(cdap_t * msg,
+ buffer_t ** val)
+{
+ int i;
+ size_t len;
+
+ len = msg->n_value;
+
+ *val = malloc(len * sizeof(**val));
+ if (*val == NULL)
+ return -1;
+
+ for (i = 0; i < len; i++) {
+ if (msg->value[i].data == NULL) {
+ free(*val);
+ return -1;
+ }
+
+ (*val)[i].data = msg->value[i].data;
+ (*val)[i].len = msg->value[i].len;
+ }
+
+ return len;
+}
+
+
+static void * sdu_reader(void * o)
+{
+ struct cdap * instance = (struct cdap *) o;
+ cdap_t * msg;
+ uint8_t buf[BUF_SIZE];
+ ssize_t len;
+ ssize_t length;
+ buffer_t * val;
+
+ while (true) {
+ len = flow_read(instance->fd, buf, BUF_SIZE);
+ if (len < 0)
+ return (void *) -1;
+
+ msg = cdap__unpack(NULL, len, buf);
+ if (msg == NULL)
+ continue;
+
+ switch (msg->opcode) {
+ case OPCODE__READ:
+ if (msg->name != NULL)
+ instance->ops->cdap_read(instance,
+ msg->name);
+ break;
+ case OPCODE__WRITE:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ msg->value != NULL &&
+ len > 0) {
+ instance->ops->cdap_write(instance,
+ msg->name,
+ val,
+ length,
+ msg->flags);
+ free(val);
+ }
+ break;
+ case OPCODE__CREATE:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ length == 1) {
+ instance->ops->cdap_create(instance,
+ msg->name,
+ val[0]);
+ free(val);
+ }
+ break;
+ case OPCODE__DELETE:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ length == 1) {
+ instance->ops->cdap_create(instance,
+ msg->name,
+ val[0]);
+ free(val);
+ }
+ break;
+ case OPCODE__START:
+ if (msg->name != NULL)
+ instance->ops->cdap_start(instance,
+ msg->name);
+ break;
+ case OPCODE__STOP:
+ if (msg->name != NULL)
+ instance->ops->cdap_stop(instance,
+ msg->name);
+ break;
+ case OPCODE__REPLY:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ length > 0) {
+ instance->ops->cdap_reply(instance,
+ msg->invoke_id,
+ msg->result,
+ val,
+ length);
+ free(val);
+ }
+ break;
+ default:
+ break;
+ }
+
+ cdap__free_unpacked(msg, NULL);
+ }
+
+ return (void *) 0;
+}
+
+struct cdap * cdap_create(struct cdap_ops * ops,
+ int fd)
+{
+ struct cdap * instance = NULL;
+ int flags;
+
+ if (ops == NULL || fd < 0 ||
+ ops->cdap_reply == NULL ||
+ ops->cdap_read == NULL ||
+ ops->cdap_write == NULL ||
+ ops->cdap_create == NULL ||
+ ops->cdap_delete == NULL ||
+ ops->cdap_start == NULL ||
+ ops->cdap_stop == NULL)
+ return NULL;
+
+ flags = flow_cntl(fd, FLOW_F_GETFL, 0);
+ if (flags & FLOW_O_NONBLOCK)
+ return NULL;
+
+ instance = malloc(sizeof(*instance));
+ if (instance == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&instance->ids_lock, NULL)) {
+ free(instance);
+ return NULL;
+ }
+
+ instance->ops = ops;
+
+ instance->ids = bmp_create(IDS_SIZE, 0);
+ if (instance->ids == NULL) {
+ free(instance);
+ return NULL;
+ }
+
+ pthread_create(&instance->reader,
+ NULL,
+ sdu_reader,
+ (void *) instance);
+
+ return instance;
+}
+
+int cdap_destroy(struct cdap * instance)
+{
+ if (instance == NULL)
+ return -1;
+
+ pthread_cancel(instance->reader);
+
+ pthread_mutex_lock(&instance->ids_lock);
+
+ bmp_destroy(instance->ids);
+
+ pthread_mutex_unlock(&instance->ids_lock);
+
+ pthread_join(instance->reader,
+ NULL);
+
+ free(instance);
+
+ return 0;
+}
+
+static int next_invoke_id(struct cdap * instance)
+{
+ int ret;
+
+ pthread_mutex_lock(&instance->ids_lock);
+ ret = bmp_allocate(instance->ids);
+ pthread_mutex_unlock(&instance->ids_lock);
+
+ return ret;
+}
+
+static int release_invoke_id(struct cdap * instance,
+ int id)
+{
+ int ret;
+
+ pthread_mutex_lock(&instance->ids_lock);
+ ret = bmp_release(instance->ids, id);
+ pthread_mutex_unlock(&instance->ids_lock);
+
+ return ret;
+}
+
+static int write_msg(struct cdap * instance,
+ cdap_t * msg)
+{
+ buffer_t buf;
+ int ret;
+
+ buf.len = cdap__get_packed_size(msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(BUF_SIZE);
+ if (buf.data == NULL)
+ return -1;
+
+ cdap__pack(msg, buf.data);
+
+ ret = flow_write(instance->fd, buf.data, buf.len);
+
+ free(buf.data);
+
+ return ret;
+}
+
+static int buffer_to_cdap_msg(cdap_t * msg,
+ buffer_t * val,
+ size_t len)
+{
+ int i;
+
+ msg->value = malloc(len * sizeof(*msg->value));
+ if (msg->value == NULL)
+ return -1;
+
+ msg->n_value = len;
+ for (i = 0; i < len; i++) {
+ if (val[i].data == NULL) {
+ free(msg->value);
+ return -1;
+ }
+
+ msg->value[i].data = val[i].data;
+ msg->value[i].len = val[i].len;
+ }
+
+ return 0;
+}
+
+static int send_read_or_start_or_stop(struct cdap * instance,
+ char * name,
+ opcode_t code)
+{
+ int id;
+ cdap_t msg = CDAP__INIT;
+
+ if (instance == NULL || name == NULL)
+ return -1;
+
+ id = next_invoke_id(instance);
+ if (!bmp_is_id_valid(instance->ids, id))
+ return -1;
+
+ msg.opcode = code;
+ msg.invoke_id = id;
+ msg.name = name;
+
+ return write_msg(instance, &msg);
+}
+
+static int send_create_or_delete(struct cdap * instance,
+ char * name,
+ buffer_t val,
+ opcode_t code)
+{
+ int id;
+ cdap_t msg = CDAP__INIT;
+ int ret;
+
+ if (instance == NULL || name == NULL)
+ return -1;
+
+ id = next_invoke_id(instance);
+ if (!bmp_is_id_valid(instance->ids, id))
+ return -1;
+
+ msg.opcode = code;
+ msg.name = name;
+ msg.invoke_id = id;
+
+ if (buffer_to_cdap_msg(&msg, &val, 1)) {
+ release_invoke_id(instance, id);
+ return -1;
+ }
+
+ ret = write_msg(instance, &msg);
+
+ free(msg.value);
+
+ return ret;
+}
+
+int cdap_send_read(struct cdap * instance,
+ char * name)
+{
+ return send_read_or_start_or_stop(instance, name, OPCODE__READ);
+}
+
+int cdap_send_write(struct cdap * instance,
+ char * name,
+ buffer_t * val,
+ size_t len,
+ uint32_t flags)
+{
+ int id;
+ int ret;
+ cdap_t msg = CDAP__INIT;
+
+ if (instance == NULL || name == NULL ||
+ val == NULL || len < 1)
+ return -1;
+
+ id = next_invoke_id(instance);
+ if (!bmp_is_id_valid(instance->ids, id))
+ return -1;
+
+ msg.opcode = OPCODE__WRITE;
+ msg.name = name;
+ msg.has_flags = true;
+ msg.flags = flags;
+ msg.invoke_id = id;
+
+ if (buffer_to_cdap_msg(&msg, val, len)) {
+ release_invoke_id(instance, id);
+ return -1;
+ }
+
+ ret = write_msg(instance, &msg);
+
+ free(msg.value);
+
+ return ret;
+}
+
+int cdap_send_create(struct cdap * instance,
+ char * name,
+ buffer_t val)
+{
+ return send_create_or_delete(instance, name, val, OPCODE__CREATE);
+}
+
+int cdap_send_delete(struct cdap * instance,
+ char * name,
+ buffer_t val)
+{
+ return send_create_or_delete(instance, name, val, OPCODE__DELETE);
+}
+
+int cdap_send_start(struct cdap * instance,
+ char * name)
+{
+ return send_read_or_start_or_stop(instance, name, OPCODE__START);
+}
+
+int cdap_send_stop(struct cdap * instance,
+ char * name)
+{
+ return send_read_or_start_or_stop(instance, name, OPCODE__STOP);
+}
+
+int cdap_send_reply(struct cdap * instance,
+ int invoke_id,
+ int result,
+ buffer_t * val,
+ size_t len)
+{
+ cdap_t msg = CDAP__INIT;
+
+ if (instance == NULL || val == NULL)
+ return -1;
+
+ msg.invoke_id = invoke_id;
+ msg.has_result = true;
+ msg.result = result;
+
+ if (buffer_to_cdap_msg(&msg, val, len))
+ return -1;
+
+ return write_msg(instance, &msg);
+}
diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto
new file mode 100644
index 00000000..a5e0306d
--- /dev/null
+++ b/src/lib/cdap.proto
@@ -0,0 +1,18 @@
+enum opcode {
+ CREATE = 1;
+ DELETE = 2;
+ READ = 3;
+ WRITE = 4;
+ START = 5;
+ STOP = 6;
+ REPLY = 7;
+}
+
+message cdap {
+ required opcode opcode = 1;
+ required uint32 invoke_id = 2;
+ optional uint32 flags = 3;
+ optional string name = 4;
+ repeated bytes value = 5;
+ optional int32 result = 6;
+}
diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c
index 2b6b6825..b336155e 100644
--- a/src/lib/ipcp.c
+++ b/src/lib/ipcp.c
@@ -32,6 +32,8 @@
#include <stdlib.h>
#include <string.h>
#include <signal.h>
+#include <errno.h>
+#include <stdbool.h>
#include <sys/types.h>
#include <sys/wait.h>
@@ -54,8 +56,8 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
return NULL;
}
- buf.size = ipcp_msg__get_packed_size(msg);
- if (buf.size == 0) {
+ buf.len = ipcp_msg__get_packed_size(msg);
+ if (buf.len == 0) {
close(sockfd);
free(sock_path);
return NULL;
@@ -70,7 +72,7 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
ipcp_msg__pack(msg, buf.data);
- if (write(sockfd, buf.data, buf.size) == -1) {
+ if (write(sockfd, buf.data, buf.len) == -1) {
free(sock_path);
free(buf.data);
close(sockfd);
diff --git a/src/lib/irm.c b/src/lib/irm.c
index a81fdad3..ee1851a3 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -29,6 +29,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
+#include <stdbool.h>
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 31fcca8e..cf0bad19 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -25,6 +25,7 @@
#include <ouroboros/shm_du_map.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/time_utils.h>
+
#include <pthread.h>
#include <sys/mman.h>
#include <fcntl.h>
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 9bfbad5e..d60ea91a 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -32,6 +32,7 @@
#include <sys/un.h>
#include <string.h>
#include <stdlib.h>
+#include <errno.h>
int client_socket_open(char * file_name)
{
@@ -109,8 +110,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
if (sockfd < 0)
return NULL;
- buf.size = irm_msg__get_packed_size(msg);
- if (buf.size == 0) {
+ buf.len = irm_msg__get_packed_size(msg);
+ if (buf.len == 0) {
close(sockfd);
return NULL;
}
@@ -123,7 +124,7 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
irm_msg__pack(msg, buf.data);
- if (write(sockfd, buf.data, buf.size) == -1) {
+ if (write(sockfd, buf.data, buf.len) == -1) {
free(buf.data);
close(sockfd);
return NULL;