diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-07-05 16:36:40 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-07-05 16:36:40 +0200 |
commit | 51ccc34e0fe15aaf711f30fa8b63de1e1881029f (patch) | |
tree | 6c75a9574860e436287c5344ad8364d412c73543 /src/lib | |
parent | 627c11526e57b94d466a7d7acd4fe0bf8cd2b776 (diff) | |
parent | daa4e408b3e34bdc228d26816de09d7d1fb9b043 (diff) | |
download | ouroboros-51ccc34e0fe15aaf711f30fa8b63de1e1881029f.tar.gz ouroboros-51ccc34e0fe15aaf711f30fa8b63de1e1881029f.zip |
Merged in sandervrijders/ouroboros/be (pull request #154)
lib: Provide first implementation of revised CDAP
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/lib/cdap.c | 438 | ||||
-rw-r--r-- | src/lib/cdap.proto | 18 | ||||
-rw-r--r-- | src/lib/ipcp.c | 8 | ||||
-rw-r--r-- | src/lib/irm.c | 1 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 1 | ||||
-rw-r--r-- | src/lib/sockets.c | 7 |
7 files changed, 472 insertions, 8 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index ae46f5bc..57f44f15 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -9,7 +9,9 @@ include_directories(${PROTOBUF_INCLUDE_DIRS}) protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto) protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto) protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS - dif_config.proto) + dif_config.proto) +protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS + cdap.proto) find_library(LIBRT_LIBRARIES rt) if(NOT LIBRT_LIBRARIES) @@ -38,7 +40,8 @@ set(SOURCE_FILES ) add_library(ouroboros SHARED ${SOURCE_FILES} - ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS}) + ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} + ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS}) target_link_libraries(ouroboros ${LIBRT_LIBRARIES} ${LIBPTHREAD_LIBRARIES} ${PROTOBUF_C_LIBRARY}) diff --git a/src/lib/cdap.c b/src/lib/cdap.c index e69de29b..4275bfc7 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -0,0 +1,438 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * The Common Distributed Application Protocol + * + * Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/cdap.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/common.h> +#include <ouroboros/dev.h> + +#include <stdlib.h> +#include <pthread.h> + +#include "cdap.pb-c.h" +typedef Cdap cdap_t; +typedef Opcode opcode_t; + +#define IDS_SIZE 256 +#define BUF_SIZE 2048 + +struct cdap { + int fd; + struct bmp * ids; + pthread_mutex_t ids_lock; + pthread_t reader; + struct cdap_ops * ops; +}; + + +static ssize_t cdap_msg_to_buffer(cdap_t * msg, + buffer_t ** val) +{ + int i; + size_t len; + + len = msg->n_value; + + *val = malloc(len * sizeof(**val)); + if (*val == NULL) + return -1; + + for (i = 0; i < len; i++) { + if (msg->value[i].data == NULL) { + free(*val); + return -1; + } + + (*val)[i].data = msg->value[i].data; + (*val)[i].len = msg->value[i].len; + } + + return len; +} + + +static void * sdu_reader(void * o) +{ + struct cdap * instance = (struct cdap *) o; + cdap_t * msg; + uint8_t buf[BUF_SIZE]; + ssize_t len; + ssize_t length; + buffer_t * val; + + while (true) { + len = flow_read(instance->fd, buf, BUF_SIZE); + if (len < 0) + return (void *) -1; + + msg = cdap__unpack(NULL, len, buf); + if (msg == NULL) + continue; + + switch (msg->opcode) { + case OPCODE__READ: + if (msg->name != NULL) + instance->ops->cdap_read(instance, + msg->name); + break; + case OPCODE__WRITE: + length = cdap_msg_to_buffer(msg, &val); + if (msg->name != NULL && + msg->value != NULL && + len > 0) { + instance->ops->cdap_write(instance, + msg->name, + val, + length, + msg->flags); + free(val); + } + break; + case OPCODE__CREATE: + length = cdap_msg_to_buffer(msg, &val); + if (msg->name != NULL && + length == 1) { + instance->ops->cdap_create(instance, + msg->name, + val[0]); + free(val); + } + break; + case OPCODE__DELETE: + length = cdap_msg_to_buffer(msg, &val); + if (msg->name != NULL && + length == 1) { + instance->ops->cdap_create(instance, + msg->name, + val[0]); + free(val); + } + break; + case OPCODE__START: + if (msg->name != NULL) + instance->ops->cdap_start(instance, + msg->name); + break; + case OPCODE__STOP: + if (msg->name != NULL) + instance->ops->cdap_stop(instance, + msg->name); + break; + case OPCODE__REPLY: + length = cdap_msg_to_buffer(msg, &val); + if (msg->name != NULL && + length > 0) { + instance->ops->cdap_reply(instance, + msg->invoke_id, + msg->result, + val, + length); + free(val); + } + break; + default: + break; + } + + cdap__free_unpacked(msg, NULL); + } + + return (void *) 0; +} + +struct cdap * cdap_create(struct cdap_ops * ops, + int fd) +{ + struct cdap * instance = NULL; + int flags; + + if (ops == NULL || fd < 0 || + ops->cdap_reply == NULL || + ops->cdap_read == NULL || + ops->cdap_write == NULL || + ops->cdap_create == NULL || + ops->cdap_delete == NULL || + ops->cdap_start == NULL || + ops->cdap_stop == NULL) + return NULL; + + flags = flow_cntl(fd, FLOW_F_GETFL, 0); + if (flags & FLOW_O_NONBLOCK) + return NULL; + + instance = malloc(sizeof(*instance)); + if (instance == NULL) + return NULL; + + if (pthread_mutex_init(&instance->ids_lock, NULL)) { + free(instance); + return NULL; + } + + instance->ops = ops; + + instance->ids = bmp_create(IDS_SIZE, 0); + if (instance->ids == NULL) { + free(instance); + return NULL; + } + + pthread_create(&instance->reader, + NULL, + sdu_reader, + (void *) instance); + + return instance; +} + +int cdap_destroy(struct cdap * instance) +{ + if (instance == NULL) + return -1; + + pthread_cancel(instance->reader); + + pthread_mutex_lock(&instance->ids_lock); + + bmp_destroy(instance->ids); + + pthread_mutex_unlock(&instance->ids_lock); + + pthread_join(instance->reader, + NULL); + + free(instance); + + return 0; +} + +static int next_invoke_id(struct cdap * instance) +{ + int ret; + + pthread_mutex_lock(&instance->ids_lock); + ret = bmp_allocate(instance->ids); + pthread_mutex_unlock(&instance->ids_lock); + + return ret; +} + +static int release_invoke_id(struct cdap * instance, + int id) +{ + int ret; + + pthread_mutex_lock(&instance->ids_lock); + ret = bmp_release(instance->ids, id); + pthread_mutex_unlock(&instance->ids_lock); + + return ret; +} + +static int write_msg(struct cdap * instance, + cdap_t * msg) +{ + buffer_t buf; + int ret; + + buf.len = cdap__get_packed_size(msg); + if (buf.len == 0) + return -1; + + buf.data = malloc(BUF_SIZE); + if (buf.data == NULL) + return -1; + + cdap__pack(msg, buf.data); + + ret = flow_write(instance->fd, buf.data, buf.len); + + free(buf.data); + + return ret; +} + +static int buffer_to_cdap_msg(cdap_t * msg, + buffer_t * val, + size_t len) +{ + int i; + + msg->value = malloc(len * sizeof(*msg->value)); + if (msg->value == NULL) + return -1; + + msg->n_value = len; + for (i = 0; i < len; i++) { + if (val[i].data == NULL) { + free(msg->value); + return -1; + } + + msg->value[i].data = val[i].data; + msg->value[i].len = val[i].len; + } + + return 0; +} + +static int send_read_or_start_or_stop(struct cdap * instance, + char * name, + opcode_t code) +{ + int id; + cdap_t msg = CDAP__INIT; + + if (instance == NULL || name == NULL) + return -1; + + id = next_invoke_id(instance); + if (!bmp_is_id_valid(instance->ids, id)) + return -1; + + msg.opcode = code; + msg.invoke_id = id; + msg.name = name; + + return write_msg(instance, &msg); +} + +static int send_create_or_delete(struct cdap * instance, + char * name, + buffer_t val, + opcode_t code) +{ + int id; + cdap_t msg = CDAP__INIT; + int ret; + + if (instance == NULL || name == NULL) + return -1; + + id = next_invoke_id(instance); + if (!bmp_is_id_valid(instance->ids, id)) + return -1; + + msg.opcode = code; + msg.name = name; + msg.invoke_id = id; + + if (buffer_to_cdap_msg(&msg, &val, 1)) { + release_invoke_id(instance, id); + return -1; + } + + ret = write_msg(instance, &msg); + + free(msg.value); + + return ret; +} + +int cdap_send_read(struct cdap * instance, + char * name) +{ + return send_read_or_start_or_stop(instance, name, OPCODE__READ); +} + +int cdap_send_write(struct cdap * instance, + char * name, + buffer_t * val, + size_t len, + uint32_t flags) +{ + int id; + int ret; + cdap_t msg = CDAP__INIT; + + if (instance == NULL || name == NULL || + val == NULL || len < 1) + return -1; + + id = next_invoke_id(instance); + if (!bmp_is_id_valid(instance->ids, id)) + return -1; + + msg.opcode = OPCODE__WRITE; + msg.name = name; + msg.has_flags = true; + msg.flags = flags; + msg.invoke_id = id; + + if (buffer_to_cdap_msg(&msg, val, len)) { + release_invoke_id(instance, id); + return -1; + } + + ret = write_msg(instance, &msg); + + free(msg.value); + + return ret; +} + +int cdap_send_create(struct cdap * instance, + char * name, + buffer_t val) +{ + return send_create_or_delete(instance, name, val, OPCODE__CREATE); +} + +int cdap_send_delete(struct cdap * instance, + char * name, + buffer_t val) +{ + return send_create_or_delete(instance, name, val, OPCODE__DELETE); +} + +int cdap_send_start(struct cdap * instance, + char * name) +{ + return send_read_or_start_or_stop(instance, name, OPCODE__START); +} + +int cdap_send_stop(struct cdap * instance, + char * name) +{ + return send_read_or_start_or_stop(instance, name, OPCODE__STOP); +} + +int cdap_send_reply(struct cdap * instance, + int invoke_id, + int result, + buffer_t * val, + size_t len) +{ + cdap_t msg = CDAP__INIT; + + if (instance == NULL || val == NULL) + return -1; + + msg.invoke_id = invoke_id; + msg.has_result = true; + msg.result = result; + + if (buffer_to_cdap_msg(&msg, val, len)) + return -1; + + return write_msg(instance, &msg); +} diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto new file mode 100644 index 00000000..a5e0306d --- /dev/null +++ b/src/lib/cdap.proto @@ -0,0 +1,18 @@ +enum opcode { + CREATE = 1; + DELETE = 2; + READ = 3; + WRITE = 4; + START = 5; + STOP = 6; + REPLY = 7; +} + +message cdap { + required opcode opcode = 1; + required uint32 invoke_id = 2; + optional uint32 flags = 3; + optional string name = 4; + repeated bytes value = 5; + optional int32 result = 6; +} diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 2b6b6825..b336155e 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -32,6 +32,8 @@ #include <stdlib.h> #include <string.h> #include <signal.h> +#include <errno.h> +#include <stdbool.h> #include <sys/types.h> #include <sys/wait.h> @@ -54,8 +56,8 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api, return NULL; } - buf.size = ipcp_msg__get_packed_size(msg); - if (buf.size == 0) { + buf.len = ipcp_msg__get_packed_size(msg); + if (buf.len == 0) { close(sockfd); free(sock_path); return NULL; @@ -70,7 +72,7 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api, ipcp_msg__pack(msg, buf.data); - if (write(sockfd, buf.data, buf.size) == -1) { + if (write(sockfd, buf.data, buf.len) == -1) { free(sock_path); free(buf.data); close(sockfd); diff --git a/src/lib/irm.c b/src/lib/irm.c index a81fdad3..ee1851a3 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -29,6 +29,7 @@ #include <ouroboros/logs.h> #include <ouroboros/sockets.h> +#include <stdbool.h> #include <string.h> #include <stdlib.h> #include <sys/stat.h> diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 31fcca8e..cf0bad19 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -25,6 +25,7 @@ #include <ouroboros/shm_du_map.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/time_utils.h> + #include <pthread.h> #include <sys/mman.h> #include <fcntl.h> diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 9bfbad5e..d60ea91a 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -32,6 +32,7 @@ #include <sys/un.h> #include <string.h> #include <stdlib.h> +#include <errno.h> int client_socket_open(char * file_name) { @@ -109,8 +110,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) if (sockfd < 0) return NULL; - buf.size = irm_msg__get_packed_size(msg); - if (buf.size == 0) { + buf.len = irm_msg__get_packed_size(msg); + if (buf.len == 0) { close(sockfd); return NULL; } @@ -123,7 +124,7 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) irm_msg__pack(msg, buf.data); - if (write(sockfd, buf.data, buf.size) == -1) { + if (write(sockfd, buf.data, buf.len) == -1) { free(buf.data); close(sockfd); return NULL; |