summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-07-05 16:36:40 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-07-05 16:36:40 +0200
commit51ccc34e0fe15aaf711f30fa8b63de1e1881029f (patch)
tree6c75a9574860e436287c5344ad8364d412c73543 /src
parent627c11526e57b94d466a7d7acd4fe0bf8cd2b776 (diff)
parentdaa4e408b3e34bdc228d26816de09d7d1fb9b043 (diff)
downloadouroboros-51ccc34e0fe15aaf711f30fa8b63de1e1881029f.tar.gz
ouroboros-51ccc34e0fe15aaf711f30fa8b63de1e1881029f.zip
Merged in sandervrijders/ouroboros/be (pull request #154)
lib: Provide first implementation of revised CDAP
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/ipcp-data.h2
-rw-r--r--src/ipcpd/ipcp.c8
-rw-r--r--src/ipcpd/ipcp.h2
-rw-r--r--src/ipcpd/local/main.c2
-rw-r--r--src/ipcpd/shim-udp/main.c6
-rw-r--r--src/irmd/main.c10
-rw-r--r--src/lib/CMakeLists.txt7
-rw-r--r--src/lib/cdap.c438
-rw-r--r--src/lib/cdap.proto18
-rw-r--r--src/lib/ipcp.c8
-rw-r--r--src/lib/irm.c1
-rw-r--r--src/lib/shm_du_map.c1
-rw-r--r--src/lib/sockets.c7
13 files changed, 488 insertions, 22 deletions
diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h
index ce20730b..1e183dca 100644
--- a/src/ipcpd/ipcp-data.h
+++ b/src/ipcpd/ipcp-data.h
@@ -24,7 +24,7 @@
#ifndef IPCPD_IPCP_DATA_H
#define IPCPD_IPCP_DATA_H
-#include <ouroboros/common.h>
+#include <ouroboros/shared.h>
#include <ouroboros/list.h>
#include <sys/types.h>
#include <pthread.h>
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 4acbffa2..579203c2 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -236,14 +236,14 @@ void * ipcp_main_loop(void * o)
ipcp_msg__free_unpacked(msg, NULL);
- buffer.size = ipcp_msg__get_packed_size(&ret_msg);
- if (buffer.size == 0) {
+ buffer.len = ipcp_msg__get_packed_size(&ret_msg);
+ if (buffer.len == 0) {
LOG_ERR("Failed to send reply message");
close(lsockfd);
continue;
}
- buffer.data = malloc(buffer.size);
+ buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
close(lsockfd);
continue;
@@ -251,7 +251,7 @@ void * ipcp_main_loop(void * o)
ipcp_msg__pack(&ret_msg, buffer.data);
- if (write(lsockfd, buffer.data, buffer.size) == -1) {
+ if (write(lsockfd, buffer.data, buffer.len) == -1) {
free(buffer.data);
close(lsockfd);
continue;
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 1e9e9763..27c3cf8e 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -24,6 +24,8 @@
#define IPCPD_IPCP_H
#include <ouroboros/config.h>
+#include <ouroboros/shared.h>
+
#include <pthread.h>
#include "ipcp-ops.h"
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 837cbf8c..4802a161 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -31,7 +31,7 @@
#include <ouroboros/irm_config.h>
#include <ouroboros/sockets.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/common.h>
+#include <ouroboros/shared.h>
#include <ouroboros/dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 68d393af..c22947fa 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -340,8 +340,8 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,
r_saddr.sin_addr.s_addr = dst_ip_addr;
r_saddr.sin_port = LISTEN_PORT;
- buf.size = shim_udp_msg__get_packed_size(msg);
- if (buf.size == 0) {
+ buf.len = shim_udp_msg__get_packed_size(msg);
+ if (buf.len == 0) {
return -1;
}
@@ -354,7 +354,7 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,
if (sendto(shim_data(_ipcp)->s_fd,
buf.data,
- buf.size,
+ buf.len,
0,
(struct sockaddr *) &r_saddr,
sizeof(r_saddr)) == -1) {
diff --git a/src/irmd/main.c b/src/irmd/main.c
index ab637789..38e10cc5 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1936,7 +1936,7 @@ void * irm_flow_cleaner()
pthread_rwlock_rdlock(&instance->state_lock);
- if (&instance->state == IRMD_NULL) {
+ if (instance->state == IRMD_NULL) {
pthread_rwlock_unlock(&instance->state_lock);
return (void *) 0;
}
@@ -2185,8 +2185,8 @@ void * mainloop()
pthread_cleanup_pop(true);
- buffer.size = irm_msg__get_packed_size(&ret_msg);
- if (buffer.size == 0) {
+ buffer.len = irm_msg__get_packed_size(&ret_msg);
+ if (buffer.len == 0) {
LOG_ERR("Failed to send reply message.");
if (apis != NULL)
free(apis);
@@ -2194,7 +2194,7 @@ void * mainloop()
continue;
}
- buffer.data = malloc(buffer.size);
+ buffer.data = malloc(buffer.len);
if (buffer.data == NULL) {
if (apis != NULL)
free(apis);
@@ -2204,7 +2204,7 @@ void * mainloop()
irm_msg__pack(&ret_msg, buffer.data);
- if (write(cli_sockfd, buffer.data, buffer.size) == -1) {
+ if (write(cli_sockfd, buffer.data, buffer.len) == -1) {
free(buffer.data);
if (apis != NULL)
free(apis);
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index ae46f5bc..57f44f15 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -9,7 +9,9 @@ include_directories(${PROTOBUF_INCLUDE_DIRS})
protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto)
protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto)
protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS
- dif_config.proto)
+ dif_config.proto)
+protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS
+ cdap.proto)
find_library(LIBRT_LIBRARIES rt)
if(NOT LIBRT_LIBRARIES)
@@ -38,7 +40,8 @@ set(SOURCE_FILES
)
add_library(ouroboros SHARED ${SOURCE_FILES}
- ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS})
+ ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS}
+ ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS})
target_link_libraries(ouroboros ${LIBRT_LIBRARIES}
${LIBPTHREAD_LIBRARIES} ${PROTOBUF_C_LIBRARY})
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index e69de29b..4275bfc7 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -0,0 +1,438 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The Common Distributed Application Protocol
+ *
+ * Sander Vrijders <sander.vrijders@intec.ugent.be>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/cdap.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/common.h>
+#include <ouroboros/dev.h>
+
+#include <stdlib.h>
+#include <pthread.h>
+
+#include "cdap.pb-c.h"
+typedef Cdap cdap_t;
+typedef Opcode opcode_t;
+
+#define IDS_SIZE 256
+#define BUF_SIZE 2048
+
+struct cdap {
+ int fd;
+ struct bmp * ids;
+ pthread_mutex_t ids_lock;
+ pthread_t reader;
+ struct cdap_ops * ops;
+};
+
+
+static ssize_t cdap_msg_to_buffer(cdap_t * msg,
+ buffer_t ** val)
+{
+ int i;
+ size_t len;
+
+ len = msg->n_value;
+
+ *val = malloc(len * sizeof(**val));
+ if (*val == NULL)
+ return -1;
+
+ for (i = 0; i < len; i++) {
+ if (msg->value[i].data == NULL) {
+ free(*val);
+ return -1;
+ }
+
+ (*val)[i].data = msg->value[i].data;
+ (*val)[i].len = msg->value[i].len;
+ }
+
+ return len;
+}
+
+
+static void * sdu_reader(void * o)
+{
+ struct cdap * instance = (struct cdap *) o;
+ cdap_t * msg;
+ uint8_t buf[BUF_SIZE];
+ ssize_t len;
+ ssize_t length;
+ buffer_t * val;
+
+ while (true) {
+ len = flow_read(instance->fd, buf, BUF_SIZE);
+ if (len < 0)
+ return (void *) -1;
+
+ msg = cdap__unpack(NULL, len, buf);
+ if (msg == NULL)
+ continue;
+
+ switch (msg->opcode) {
+ case OPCODE__READ:
+ if (msg->name != NULL)
+ instance->ops->cdap_read(instance,
+ msg->name);
+ break;
+ case OPCODE__WRITE:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ msg->value != NULL &&
+ len > 0) {
+ instance->ops->cdap_write(instance,
+ msg->name,
+ val,
+ length,
+ msg->flags);
+ free(val);
+ }
+ break;
+ case OPCODE__CREATE:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ length == 1) {
+ instance->ops->cdap_create(instance,
+ msg->name,
+ val[0]);
+ free(val);
+ }
+ break;
+ case OPCODE__DELETE:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ length == 1) {
+ instance->ops->cdap_create(instance,
+ msg->name,
+ val[0]);
+ free(val);
+ }
+ break;
+ case OPCODE__START:
+ if (msg->name != NULL)
+ instance->ops->cdap_start(instance,
+ msg->name);
+ break;
+ case OPCODE__STOP:
+ if (msg->name != NULL)
+ instance->ops->cdap_stop(instance,
+ msg->name);
+ break;
+ case OPCODE__REPLY:
+ length = cdap_msg_to_buffer(msg, &val);
+ if (msg->name != NULL &&
+ length > 0) {
+ instance->ops->cdap_reply(instance,
+ msg->invoke_id,
+ msg->result,
+ val,
+ length);
+ free(val);
+ }
+ break;
+ default:
+ break;
+ }
+
+ cdap__free_unpacked(msg, NULL);
+ }
+
+ return (void *) 0;
+}
+
+struct cdap * cdap_create(struct cdap_ops * ops,
+ int fd)
+{
+ struct cdap * instance = NULL;
+ int flags;
+
+ if (ops == NULL || fd < 0 ||
+ ops->cdap_reply == NULL ||
+ ops->cdap_read == NULL ||
+ ops->cdap_write == NULL ||
+ ops->cdap_create == NULL ||
+ ops->cdap_delete == NULL ||
+ ops->cdap_start == NULL ||
+ ops->cdap_stop == NULL)
+ return NULL;
+
+ flags = flow_cntl(fd, FLOW_F_GETFL, 0);
+ if (flags & FLOW_O_NONBLOCK)
+ return NULL;
+
+ instance = malloc(sizeof(*instance));
+ if (instance == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&instance->ids_lock, NULL)) {
+ free(instance);
+ return NULL;
+ }
+
+ instance->ops = ops;
+
+ instance->ids = bmp_create(IDS_SIZE, 0);
+ if (instance->ids == NULL) {
+ free(instance);
+ return NULL;
+ }
+
+ pthread_create(&instance->reader,
+ NULL,
+ sdu_reader,
+ (void *) instance);
+
+ return instance;
+}
+
+int cdap_destroy(struct cdap * instance)
+{
+ if (instance == NULL)
+ return -1;
+
+ pthread_cancel(instance->reader);
+
+ pthread_mutex_lock(&instance->ids_lock);
+
+ bmp_destroy(instance->ids);
+
+ pthread_mutex_unlock(&instance->ids_lock);
+
+ pthread_join(instance->reader,
+ NULL);
+
+ free(instance);
+
+ return 0;
+}
+
+static int next_invoke_id(struct cdap * instance)
+{
+ int ret;
+
+ pthread_mutex_lock(&instance->ids_lock);
+ ret = bmp_allocate(instance->ids);
+ pthread_mutex_unlock(&instance->ids_lock);
+
+ return ret;
+}
+
+static int release_invoke_id(struct cdap * instance,
+ int id)
+{
+ int ret;
+
+ pthread_mutex_lock(&instance->ids_lock);
+ ret = bmp_release(instance->ids, id);
+ pthread_mutex_unlock(&instance->ids_lock);
+
+ return ret;
+}
+
+static int write_msg(struct cdap * instance,
+ cdap_t * msg)
+{
+ buffer_t buf;
+ int ret;
+
+ buf.len = cdap__get_packed_size(msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(BUF_SIZE);
+ if (buf.data == NULL)
+ return -1;
+
+ cdap__pack(msg, buf.data);
+
+ ret = flow_write(instance->fd, buf.data, buf.len);
+
+ free(buf.data);
+
+ return ret;
+}
+
+static int buffer_to_cdap_msg(cdap_t * msg,
+ buffer_t * val,
+ size_t len)
+{
+ int i;
+
+ msg->value = malloc(len * sizeof(*msg->value));
+ if (msg->value == NULL)
+ return -1;
+
+ msg->n_value = len;
+ for (i = 0; i < len; i++) {
+ if (val[i].data == NULL) {
+ free(msg->value);
+ return -1;
+ }
+
+ msg->value[i].data = val[i].data;
+ msg->value[i].len = val[i].len;
+ }
+
+ return 0;
+}
+
+static int send_read_or_start_or_stop(struct cdap * instance,
+ char * name,
+ opcode_t code)
+{
+ int id;
+ cdap_t msg = CDAP__INIT;
+
+ if (instance == NULL || name == NULL)
+ return -1;
+
+ id = next_invoke_id(instance);
+ if (!bmp_is_id_valid(instance->ids, id))
+ return -1;
+
+ msg.opcode = code;
+ msg.invoke_id = id;
+ msg.name = name;
+
+ return write_msg(instance, &msg);
+}
+
+static int send_create_or_delete(struct cdap * instance,
+ char * name,
+ buffer_t val,
+ opcode_t code)
+{
+ int id;
+ cdap_t msg = CDAP__INIT;
+ int ret;
+
+ if (instance == NULL || name == NULL)
+ return -1;
+
+ id = next_invoke_id(instance);
+ if (!bmp_is_id_valid(instance->ids, id))
+ return -1;
+
+ msg.opcode = code;
+ msg.name = name;
+ msg.invoke_id = id;
+
+ if (buffer_to_cdap_msg(&msg, &val, 1)) {
+ release_invoke_id(instance, id);
+ return -1;
+ }
+
+ ret = write_msg(instance, &msg);
+
+ free(msg.value);
+
+ return ret;
+}
+
+int cdap_send_read(struct cdap * instance,
+ char * name)
+{
+ return send_read_or_start_or_stop(instance, name, OPCODE__READ);
+}
+
+int cdap_send_write(struct cdap * instance,
+ char * name,
+ buffer_t * val,
+ size_t len,
+ uint32_t flags)
+{
+ int id;
+ int ret;
+ cdap_t msg = CDAP__INIT;
+
+ if (instance == NULL || name == NULL ||
+ val == NULL || len < 1)
+ return -1;
+
+ id = next_invoke_id(instance);
+ if (!bmp_is_id_valid(instance->ids, id))
+ return -1;
+
+ msg.opcode = OPCODE__WRITE;
+ msg.name = name;
+ msg.has_flags = true;
+ msg.flags = flags;
+ msg.invoke_id = id;
+
+ if (buffer_to_cdap_msg(&msg, val, len)) {
+ release_invoke_id(instance, id);
+ return -1;
+ }
+
+ ret = write_msg(instance, &msg);
+
+ free(msg.value);
+
+ return ret;
+}
+
+int cdap_send_create(struct cdap * instance,
+ char * name,
+ buffer_t val)
+{
+ return send_create_or_delete(instance, name, val, OPCODE__CREATE);
+}
+
+int cdap_send_delete(struct cdap * instance,
+ char * name,
+ buffer_t val)
+{
+ return send_create_or_delete(instance, name, val, OPCODE__DELETE);
+}
+
+int cdap_send_start(struct cdap * instance,
+ char * name)
+{
+ return send_read_or_start_or_stop(instance, name, OPCODE__START);
+}
+
+int cdap_send_stop(struct cdap * instance,
+ char * name)
+{
+ return send_read_or_start_or_stop(instance, name, OPCODE__STOP);
+}
+
+int cdap_send_reply(struct cdap * instance,
+ int invoke_id,
+ int result,
+ buffer_t * val,
+ size_t len)
+{
+ cdap_t msg = CDAP__INIT;
+
+ if (instance == NULL || val == NULL)
+ return -1;
+
+ msg.invoke_id = invoke_id;
+ msg.has_result = true;
+ msg.result = result;
+
+ if (buffer_to_cdap_msg(&msg, val, len))
+ return -1;
+
+ return write_msg(instance, &msg);
+}
diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto
new file mode 100644
index 00000000..a5e0306d
--- /dev/null
+++ b/src/lib/cdap.proto
@@ -0,0 +1,18 @@
+enum opcode {
+ CREATE = 1;
+ DELETE = 2;
+ READ = 3;
+ WRITE = 4;
+ START = 5;
+ STOP = 6;
+ REPLY = 7;
+}
+
+message cdap {
+ required opcode opcode = 1;
+ required uint32 invoke_id = 2;
+ optional uint32 flags = 3;
+ optional string name = 4;
+ repeated bytes value = 5;
+ optional int32 result = 6;
+}
diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c
index 2b6b6825..b336155e 100644
--- a/src/lib/ipcp.c
+++ b/src/lib/ipcp.c
@@ -32,6 +32,8 @@
#include <stdlib.h>
#include <string.h>
#include <signal.h>
+#include <errno.h>
+#include <stdbool.h>
#include <sys/types.h>
#include <sys/wait.h>
@@ -54,8 +56,8 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
return NULL;
}
- buf.size = ipcp_msg__get_packed_size(msg);
- if (buf.size == 0) {
+ buf.len = ipcp_msg__get_packed_size(msg);
+ if (buf.len == 0) {
close(sockfd);
free(sock_path);
return NULL;
@@ -70,7 +72,7 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
ipcp_msg__pack(msg, buf.data);
- if (write(sockfd, buf.data, buf.size) == -1) {
+ if (write(sockfd, buf.data, buf.len) == -1) {
free(sock_path);
free(buf.data);
close(sockfd);
diff --git a/src/lib/irm.c b/src/lib/irm.c
index a81fdad3..ee1851a3 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -29,6 +29,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
+#include <stdbool.h>
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 31fcca8e..cf0bad19 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -25,6 +25,7 @@
#include <ouroboros/shm_du_map.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/time_utils.h>
+
#include <pthread.h>
#include <sys/mman.h>
#include <fcntl.h>
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 9bfbad5e..d60ea91a 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -32,6 +32,7 @@
#include <sys/un.h>
#include <string.h>
#include <stdlib.h>
+#include <errno.h>
int client_socket_open(char * file_name)
{
@@ -109,8 +110,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
if (sockfd < 0)
return NULL;
- buf.size = irm_msg__get_packed_size(msg);
- if (buf.size == 0) {
+ buf.len = irm_msg__get_packed_size(msg);
+ if (buf.len == 0) {
close(sockfd);
return NULL;
}
@@ -123,7 +124,7 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
irm_msg__pack(msg, buf.data);
- if (write(sockfd, buf.data, buf.size) == -1) {
+ if (write(sockfd, buf.data, buf.len) == -1) {
free(buf.data);
close(sockfd);
return NULL;