diff options
| -rw-r--r-- | include/ouroboros/CMakeLists.txt | 18 | ||||
| -rw-r--r-- | include/ouroboros/cdap.h | 184 | ||||
| -rw-r--r-- | include/ouroboros/common.h | 16 | ||||
| -rw-r--r-- | include/ouroboros/dev.h | 1 | ||||
| -rw-r--r-- | include/ouroboros/ipcp.h | 2 | ||||
| -rw-r--r-- | include/ouroboros/shared.h | 38 | ||||
| -rw-r--r-- | include/ouroboros/shm_du_map.h | 4 | ||||
| -rw-r--r-- | src/ipcpd/ipcp-data.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.c | 8 | ||||
| -rw-r--r-- | src/ipcpd/ipcp.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 2 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 6 | ||||
| -rw-r--r-- | src/irmd/main.c | 10 | ||||
| -rw-r--r-- | src/lib/CMakeLists.txt | 7 | ||||
| -rw-r--r-- | src/lib/cdap.c | 438 | ||||
| -rw-r--r-- | src/lib/cdap.proto | 18 | ||||
| -rw-r--r-- | src/lib/ipcp.c | 8 | ||||
| -rw-r--r-- | src/lib/irm.c | 1 | ||||
| -rw-r--r-- | src/lib/shm_du_map.c | 1 | ||||
| -rw-r--r-- | src/lib/sockets.c | 7 | 
20 files changed, 596 insertions, 177 deletions
diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index 68c88a18..ee339294 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -3,14 +3,14 @@ configure_file(    "${CMAKE_CURRENT_BINARY_DIR}/config.h")  set(HEADER_FILES -        cdap.h -        dev.h -        errno.h -        flow.h -        irm.h -        irm_config.h -        nsm.h -        qos.h -) +  cdap.h +  common.h +  dev.h +  errno.h +  flow.h +  irm.h +  irm_config.h +  nsm.h +  qos.h)  install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros) diff --git a/include/ouroboros/cdap.h b/include/ouroboros/cdap.h index 72788ad6..e26f192b 100644 --- a/include/ouroboros/cdap.h +++ b/include/ouroboros/cdap.h @@ -3,7 +3,8 @@   *   * The Common Distributed Application Protocol   * - *    Sander Vrijders <sander.vrijders@intec.ugent.be> + *    Sander Vrijders   <sander.vrijders@intec.ugent.be> + *    Dimitri Staessens <dimitri.staessens@intec.ugent.be>   *   * This program is free software; you can redistribute it and/or modify   * it under the terms of the GNU General Public License as published by @@ -23,148 +24,75 @@  #ifndef OUROBOROS_CDAP_H  #define OUROBOROS_CDAP_H +#include <ouroboros/common.h> +  #include <stdbool.h> +#include <stdint.h> +#include <unistd.h> + +#define F_SYNC 0x0001  struct cdap; +/* Callback functions that work on the application's RIB */  struct cdap_ops { -        /* Sender related callbacks */ -        int (* handle_connect_r)(int fd, -                                 int invoke_id, -                                 int result); -        int (* handle_release_r)(int fd, -                                 int invoke_id, -                                 int result); -        int (* handle_read_r)(int fd, -                              int invoke_id, -                              int result, -                              char * reason, -                              char * obj_val, -                              bool complete); -        int (* handle_cancelread_r)(int fd, -                                    int invoke_id, -                                    int result); -        int (* handle_write_r)(int fd, -                               int invoke_id, -                               int result, -                               char * reason, -                               char * obj_val); -        int (* handle_create_r)(int fd, -                                int invoke_id, -                                int result); -        int (* handle_delete_r)(int fd, -                                int invoke_id, -                                int result); -        int (* handle_start_r)(int fd, -                               int invoke_id, -                               int result); -        int (* handle_stop_r)(int fd, -                              int invoke_id, -                              int result); +        int (* cdap_reply)(struct cdap * instance, +                           int           invoke_id, +                           int           result, +                           buffer_t *    val, +                           size_t        len); + +        int (* cdap_read)(struct cdap * instance, +                          char *        name); +        int (* cdap_write)(struct cdap * instance, +                           char *        name, +                           buffer_t *    val, +                           size_t        len, +                           uint32_t      flags); -        /* Receiver related callbacks */ -        int (* handle_connect)(int fd, -                               int invoke_id, -                               rina_name_t src, -                               rina_name_t dst, -                               char * auth_mech, -                               char * auth_val); -        int (* handle_release)(int fd, -                               int invoke_id); -        int (* handle_cancelread)(int fd, -                                  int invoke_id); -        int (* handle_write)(int fd, -                             int invoke_id, -                             char * obj_name, -                             char * obj_val); -        int (* handle_create)(int fd, -                              int invoke_id, -                              char * obj_class, -                              char * obj_name, -                              char * obj_val); -        int (* handle_delete)(int fd, -                              int invoke_id, -                              char * obj_name); -        int (* handle_start)(int fd, -                             int invoke_id, -                             char * obj_name, -                             char * obj_val); -        int (* handle_stop)(int fd, -                            int invoke_id, -                            char * obj_name, -                            char * obj_val); +        int (* cdap_create)(struct cdap * instance, +                            char *        name, +                            buffer_t      val); +        int (* cdap_delete)(struct cdap * instance, +                            char *        name, +                            buffer_t      val); + +        int (* cdap_start)(struct cdap * instance, +                           char *        name); +        int (* cdap_stop)(struct cdap * instance, +                          char *        name);  }; -struct cdap * cdap_create(struct cdap_ops ops, -                          int fd); +/* Assumes flow is blocking */ +struct cdap * cdap_create(struct cdap_ops * ops, +                          int               fd);  int           cdap_destroy(struct cdap * instance); -/* Sender related functions */ -int           cdap_send_connect(struct cdap * instance, -                                int invoke_id, -                                rina_name_t src, -                                rina_name_t dst, -                                char * auth_mech, -                                char * auth_val); -int           cdap_send_release(struct cdap * instance, -                                int invoke_id); +/* Returns a positive invoke-id on success to be used in the callback */  int           cdap_send_read(struct cdap * instance, -                             int invoke_id, -                             char * obj_name); -int           cdap_send_cancelread(struct cdap * instance, -                                   int invoke_id, -                                   char * obj_name); +                             char *        name);  int           cdap_send_write(struct cdap * instance, -                              int invoke_id, -                              char * obj_name, -                              char * obj_val); +                              char *        name, +                              buffer_t *    val, +                              size_t        len, +                              uint32_t      flags); +  int           cdap_send_create(struct cdap * instance, -                               int invoke_id, -                               char * obj_name, -                               char * obj_val); +                               char *        name, +                               buffer_t      val);  int           cdap_send_delete(struct cdap * instance, -                               int invoke_id, -                               char * obj_name); +                               char *        name, +                               buffer_t      val); +  int           cdap_send_start(struct cdap * instance, -                              int invoke_id, -                              char * obj_name, -                              char * obj_val); +                              char *        name);  int           cdap_send_stop(struct cdap * instance, -                             int invoke_id, -                             char * obj_name, -                             char * obj_val); +                             char *        name); -/* Receiver related functions */ -int           cdap_send_connect_r(struct cdap * instance, -                                  int invoke_id, -                                  int result); -int           cdap_send_release_r(struct cdap * instance, -                                  int invoke_id, -                                  int result); -int           cdap_send_read_r(struct cdap * instance, -                               int invoke_id, -                               int result, -                               char * reason, -                               char * obj_val, -                               bool complete); -int           cdap_send_cancelread_r(struct cdap * instance, -                                     int invoke_id, -                                     int result); -int           cdap_send_write_r(struct cdap * instance, -                                int invoke_id, -                                int result, -                                char * obj_name, -                                char * obj_val); -int           cdap_send_create_r(struct cdap * instance, -                                 int invoke_id, -                                 int result); -int           cdap_send_delete_r(struct cdap * instance, -                                 int invoke_id, -                                 int result); -int           cdap_send_start_r(struct cdap * instance, -                                int invoke_id, -                                int result); -int           cdap_send_stop_r(struct cdap * instance, -                               int invoke_id, -                               int result); +/* Can only be called following a callback function */ +int           cdap_send_reply(struct cdap * instance, +                              int           invoke_id, +                              int           result, +                              buffer_t *    val, +                              size_t        len);  #endif diff --git a/include/ouroboros/common.h b/include/ouroboros/common.h index 039e1a83..dbd050f1 100644 --- a/include/ouroboros/common.h +++ b/include/ouroboros/common.h @@ -25,24 +25,10 @@  #include <stdint.h>  #include <unistd.h> -#include <stdbool.h> -#include <errno.h>  typedef struct {          uint8_t * data; -        size_t    size; +        size_t    len;  } buffer_t; -/* FIXME: To be decided which QoS cubes we support */ -enum qos_cube { -        QOS_CUBE_BE = 0, -        QOS_CUBE_VIDEO -}; - -enum flow_state { -        FLOW_NULL = 0, -        FLOW_PENDING, -        FLOW_ALLOCATED -}; -  #endif /* OUROBOROS_COMMON_H */ diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index 897bc124..699973a3 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -48,7 +48,6 @@ int     flow_alloc(char * dst_name,  int     flow_alloc_res(int fd);  int     flow_dealloc(int fd); -/* Wraps around fnctl */  int     flow_cntl(int fd, int cmd, int oflags);  ssize_t flow_write(int fd, void * buf, size_t count);  ssize_t flow_read(int fd, void * buf, size_t count); diff --git a/include/ouroboros/ipcp.h b/include/ouroboros/ipcp.h index f5657b64..c1aa452b 100644 --- a/include/ouroboros/ipcp.h +++ b/include/ouroboros/ipcp.h @@ -22,7 +22,7 @@  #include <ouroboros/irm_config.h>  #include <ouroboros/sockets.h> -#include <ouroboros/common.h> +#include <ouroboros/shared.h>  #include <sys/types.h> diff --git a/include/ouroboros/shared.h b/include/ouroboros/shared.h new file mode 100644 index 00000000..f5e34dc8 --- /dev/null +++ b/include/ouroboros/shared.h @@ -0,0 +1,38 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Shared definitions between IRMd and IPCPs + * + *    Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHARED_H +#define OUROBOROS_SHARED_H + +/* FIXME: To be decided which QoS cubes we support */ +enum qos_cube { +        QOS_CUBE_BE = 0, +        QOS_CUBE_VIDEO +}; + +enum flow_state { +        FLOW_NULL = 0, +        FLOW_PENDING, +        FLOW_ALLOCATED +}; + +#endif /* OUROBOROS_SHARED_H */ diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h index e8934bae..9d6d7aaf 100644 --- a/include/ouroboros/shm_du_map.h +++ b/include/ouroboros/shm_du_map.h @@ -24,7 +24,9 @@  #ifndef OUROBOROS_SHM_DU_MAP_H  #define OUROBOROS_SHM_DU_MAP_H -#include "common.h" +#include <errno.h> +#include <unistd.h> +#include <stdint.h>  #include <sys/types.h>  #include <pthread.h> diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index ce20730b..1e183dca 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -24,7 +24,7 @@  #ifndef IPCPD_IPCP_DATA_H  #define IPCPD_IPCP_DATA_H -#include <ouroboros/common.h> +#include <ouroboros/shared.h>  #include <ouroboros/list.h>  #include <sys/types.h>  #include <pthread.h> diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 4acbffa2..579203c2 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -236,14 +236,14 @@ void * ipcp_main_loop(void * o)                  ipcp_msg__free_unpacked(msg, NULL); -                buffer.size = ipcp_msg__get_packed_size(&ret_msg); -                if (buffer.size == 0) { +                buffer.len = ipcp_msg__get_packed_size(&ret_msg); +                if (buffer.len == 0) {                          LOG_ERR("Failed to send reply message");                          close(lsockfd);                          continue;                  } -                buffer.data = malloc(buffer.size); +                buffer.data = malloc(buffer.len);                  if (buffer.data == NULL) {                          close(lsockfd);                          continue; @@ -251,7 +251,7 @@ void * ipcp_main_loop(void * o)                  ipcp_msg__pack(&ret_msg, buffer.data); -                if (write(lsockfd, buffer.data, buffer.size) == -1) { +                if (write(lsockfd, buffer.data, buffer.len) == -1) {                          free(buffer.data);                          close(lsockfd);                          continue; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1e9e9763..27c3cf8e 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -24,6 +24,8 @@  #define IPCPD_IPCP_H  #include <ouroboros/config.h> +#include <ouroboros/shared.h> +  #include <pthread.h>  #include "ipcp-ops.h" diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 837cbf8c..4802a161 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -31,7 +31,7 @@  #include <ouroboros/irm_config.h>  #include <ouroboros/sockets.h>  #include <ouroboros/bitmap.h> -#include <ouroboros/common.h> +#include <ouroboros/shared.h>  #include <ouroboros/dev.h>  #define OUROBOROS_PREFIX "ipcpd/local" diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 68d393af..c22947fa 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -340,8 +340,8 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,         r_saddr.sin_addr.s_addr = dst_ip_addr;         r_saddr.sin_port        = LISTEN_PORT; -       buf.size = shim_udp_msg__get_packed_size(msg); -       if (buf.size == 0) { +       buf.len = shim_udp_msg__get_packed_size(msg); +       if (buf.len == 0) {                 return -1;         } @@ -354,7 +354,7 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,         if (sendto(shim_data(_ipcp)->s_fd,                    buf.data, -                  buf.size, +                  buf.len,                    0,                    (struct sockaddr *) &r_saddr,                    sizeof(r_saddr)) == -1) { diff --git a/src/irmd/main.c b/src/irmd/main.c index ab637789..38e10cc5 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1936,7 +1936,7 @@ void * irm_flow_cleaner()                  pthread_rwlock_rdlock(&instance->state_lock); -                if (&instance->state == IRMD_NULL) { +                if (instance->state == IRMD_NULL) {                          pthread_rwlock_unlock(&instance->state_lock);                          return (void *) 0;                  } @@ -2185,8 +2185,8 @@ void * mainloop()                  pthread_cleanup_pop(true); -                buffer.size = irm_msg__get_packed_size(&ret_msg); -                if (buffer.size == 0) { +                buffer.len = irm_msg__get_packed_size(&ret_msg); +                if (buffer.len == 0) {                          LOG_ERR("Failed to send reply message.");                          if (apis != NULL)                                  free(apis); @@ -2194,7 +2194,7 @@ void * mainloop()                          continue;                  } -                buffer.data = malloc(buffer.size); +                buffer.data = malloc(buffer.len);                  if (buffer.data == NULL) {                          if (apis != NULL)                                  free(apis); @@ -2204,7 +2204,7 @@ void * mainloop()                  irm_msg__pack(&ret_msg, buffer.data); -                if (write(cli_sockfd, buffer.data, buffer.size) == -1) { +                if (write(cli_sockfd, buffer.data, buffer.len) == -1) {                          free(buffer.data);                          if (apis != NULL)                                  free(apis); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index ae46f5bc..57f44f15 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -9,7 +9,9 @@ include_directories(${PROTOBUF_INCLUDE_DIRS})  protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto)  protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto)  protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS -                                          dif_config.proto) +  dif_config.proto) +protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS +  cdap.proto)  find_library(LIBRT_LIBRARIES rt)  if(NOT LIBRT_LIBRARIES) @@ -38,7 +40,8 @@ set(SOURCE_FILES    )  add_library(ouroboros SHARED ${SOURCE_FILES} -  ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS}) +  ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} +  ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS})  target_link_libraries(ouroboros ${LIBRT_LIBRARIES}    ${LIBPTHREAD_LIBRARIES} ${PROTOBUF_C_LIBRARY}) diff --git a/src/lib/cdap.c b/src/lib/cdap.c index e69de29b..4275bfc7 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -0,0 +1,438 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * The Common Distributed Application Protocol + * + *    Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/cdap.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/common.h> +#include <ouroboros/dev.h> + +#include <stdlib.h> +#include <pthread.h> + +#include "cdap.pb-c.h" +typedef Cdap cdap_t; +typedef Opcode opcode_t; + +#define IDS_SIZE 256 +#define BUF_SIZE 2048 + +struct cdap { +        int               fd; +        struct bmp *      ids; +        pthread_mutex_t   ids_lock; +        pthread_t         reader; +        struct cdap_ops * ops; +}; + + +static ssize_t cdap_msg_to_buffer(cdap_t * msg, +                                  buffer_t ** val) +{ +        int i; +        size_t len; + +        len = msg->n_value; + +        *val = malloc(len * sizeof(**val)); +        if (*val == NULL) +                return -1; + +        for (i = 0; i < len; i++) { +                if (msg->value[i].data == NULL) { +                        free(*val); +                        return -1; +                } + +                (*val)[i].data = msg->value[i].data; +                (*val)[i].len = msg->value[i].len; +        } + +        return len; +} + + +static void * sdu_reader(void * o) +{ +        struct cdap * instance = (struct cdap *) o; +        cdap_t * msg; +        uint8_t buf[BUF_SIZE]; +        ssize_t len; +        ssize_t length; +        buffer_t * val; + +        while (true) { +                len = flow_read(instance->fd, buf, BUF_SIZE); +                if (len < 0) +                        return (void *) -1; + +                msg = cdap__unpack(NULL, len, buf); +                if (msg == NULL) +                        continue; + +                switch (msg->opcode) { +                case OPCODE__READ: +                        if (msg->name != NULL) +                                instance->ops->cdap_read(instance, +                                                         msg->name); +                        break; +                case OPCODE__WRITE: +                        length = cdap_msg_to_buffer(msg, &val); +                        if (msg->name != NULL && +                            msg->value != NULL && +                            len > 0) { +                                instance->ops->cdap_write(instance, +                                                          msg->name, +                                                          val, +                                                          length, +                                                          msg->flags); +                                free(val); +                        } +                        break; +                case OPCODE__CREATE: +                        length = cdap_msg_to_buffer(msg, &val); +                        if (msg->name != NULL && +                            length == 1) { +                                instance->ops->cdap_create(instance, +                                                           msg->name, +                                                           val[0]); +                                free(val); +                        } +                        break; +                case OPCODE__DELETE: +                        length = cdap_msg_to_buffer(msg, &val); +                        if (msg->name != NULL && +                            length == 1) { +                                instance->ops->cdap_create(instance, +                                                           msg->name, +                                                           val[0]); +                                free(val); +                        } +                        break; +                case OPCODE__START: +                        if (msg->name != NULL) +                                instance->ops->cdap_start(instance, +                                                          msg->name); +                        break; +                case OPCODE__STOP: +                        if (msg->name != NULL) +                                instance->ops->cdap_stop(instance, +                                                         msg->name); +                        break; +                case OPCODE__REPLY: +                        length = cdap_msg_to_buffer(msg, &val); +                        if (msg->name != NULL && +                            length > 0) { +                                instance->ops->cdap_reply(instance, +                                                          msg->invoke_id, +                                                          msg->result, +                                                          val, +                                                          length); +                                free(val); +                        } +                        break; +                default: +                        break; +                } + +                cdap__free_unpacked(msg, NULL); +        } + +        return (void *) 0; +} + +struct cdap * cdap_create(struct cdap_ops * ops, +                          int               fd) +{ +        struct cdap * instance = NULL; +        int flags; + +        if (ops == NULL || fd < 0 || +            ops->cdap_reply == NULL || +            ops->cdap_read == NULL || +            ops->cdap_write == NULL || +            ops->cdap_create == NULL || +            ops->cdap_delete == NULL || +            ops->cdap_start == NULL || +            ops->cdap_stop == NULL) +                return NULL; + +        flags = flow_cntl(fd, FLOW_F_GETFL, 0); +        if (flags & FLOW_O_NONBLOCK) +                return NULL; + +        instance = malloc(sizeof(*instance)); +        if (instance == NULL) +                return NULL; + +        if (pthread_mutex_init(&instance->ids_lock, NULL)) { +                free(instance); +                return NULL; +        } + +        instance->ops = ops; + +        instance->ids = bmp_create(IDS_SIZE, 0); +        if (instance->ids == NULL) { +                free(instance); +                return NULL; +        } + +        pthread_create(&instance->reader, +                       NULL, +                       sdu_reader, +                       (void *) instance); + +        return instance; +} + +int cdap_destroy(struct cdap * instance) +{ +        if (instance == NULL) +                return -1; + +        pthread_cancel(instance->reader); + +        pthread_mutex_lock(&instance->ids_lock); + +        bmp_destroy(instance->ids); + +        pthread_mutex_unlock(&instance->ids_lock); + +        pthread_join(instance->reader, +                     NULL); + +        free(instance); + +        return 0; +} + +static int next_invoke_id(struct cdap * instance) +{ +        int ret; + +        pthread_mutex_lock(&instance->ids_lock); +        ret = bmp_allocate(instance->ids); +        pthread_mutex_unlock(&instance->ids_lock); + +        return ret; +} + +static int release_invoke_id(struct cdap * instance, +                             int id) +{ +        int ret; + +        pthread_mutex_lock(&instance->ids_lock); +        ret = bmp_release(instance->ids, id); +        pthread_mutex_unlock(&instance->ids_lock); + +        return ret; +} + +static int write_msg(struct cdap * instance, +                     cdap_t * msg) +{ +        buffer_t buf; +        int ret; + +        buf.len = cdap__get_packed_size(msg); +        if (buf.len == 0) +                return -1; + +        buf.data = malloc(BUF_SIZE); +        if (buf.data == NULL) +                return -1; + +        cdap__pack(msg, buf.data); + +        ret = flow_write(instance->fd, buf.data, buf.len); + +        free(buf.data); + +        return ret; +} + +static int buffer_to_cdap_msg(cdap_t * msg, +                              buffer_t * val, +                              size_t len) +{ +        int i; + +        msg->value = malloc(len * sizeof(*msg->value)); +        if (msg->value == NULL) +                return -1; + +        msg->n_value = len; +        for (i = 0; i < len; i++) { +                if (val[i].data == NULL) { +                        free(msg->value); +                        return -1; +                } + +                msg->value[i].data = val[i].data; +                msg->value[i].len = val[i].len; +        } + +        return 0; +} + +static int send_read_or_start_or_stop(struct cdap * instance, +                                      char *        name, +                                      opcode_t      code) +{ +        int id; +        cdap_t msg = CDAP__INIT; + +        if (instance == NULL || name == NULL) +                return -1; + +        id = next_invoke_id(instance); +        if (!bmp_is_id_valid(instance->ids, id)) +                return -1; + +        msg.opcode = code; +        msg.invoke_id = id; +        msg.name = name; + +        return write_msg(instance, &msg); +} + +static int send_create_or_delete(struct cdap * instance, +                                 char *        name, +                                 buffer_t      val, +                                 opcode_t      code) +{ +        int id; +        cdap_t msg = CDAP__INIT; +        int ret; + +        if (instance == NULL || name == NULL) +                return -1; + +        id = next_invoke_id(instance); +        if (!bmp_is_id_valid(instance->ids, id)) +                return -1; + +        msg.opcode = code; +        msg.name = name; +        msg.invoke_id = id; + +        if (buffer_to_cdap_msg(&msg, &val, 1)) { +                release_invoke_id(instance, id); +                return -1; +        } + +        ret = write_msg(instance, &msg); + +        free(msg.value); + +        return ret; +} + +int cdap_send_read(struct cdap * instance, +                   char *        name) +{ +        return send_read_or_start_or_stop(instance, name, OPCODE__READ); +} + +int cdap_send_write(struct cdap * instance, +                    char *        name, +                    buffer_t *    val, +                    size_t        len, +                    uint32_t      flags) +{ +        int id; +        int ret; +        cdap_t msg = CDAP__INIT; + +        if (instance == NULL || name == NULL || +            val == NULL || len < 1) +                return -1; + +        id = next_invoke_id(instance); +        if (!bmp_is_id_valid(instance->ids, id)) +                return -1; + +        msg.opcode = OPCODE__WRITE; +        msg.name = name; +        msg.has_flags = true; +        msg.flags = flags; +        msg.invoke_id = id; + +        if (buffer_to_cdap_msg(&msg, val, len)) { +                release_invoke_id(instance, id); +                return -1; +        } + +        ret = write_msg(instance, &msg); + +        free(msg.value); + +        return ret; +} + +int cdap_send_create(struct cdap * instance, +                     char *        name, +                     buffer_t      val) +{ +        return send_create_or_delete(instance, name, val, OPCODE__CREATE); +} + +int cdap_send_delete(struct cdap * instance, +                     char *        name, +                     buffer_t      val) +{ +        return send_create_or_delete(instance, name, val, OPCODE__DELETE); +} + +int cdap_send_start(struct cdap * instance, +                    char *        name) +{ +        return send_read_or_start_or_stop(instance, name, OPCODE__START); +} + +int cdap_send_stop(struct cdap * instance, +                   char *        name) +{ +        return send_read_or_start_or_stop(instance, name, OPCODE__STOP); +} + +int cdap_send_reply(struct cdap * instance, +                    int           invoke_id, +                    int           result, +                    buffer_t *    val, +                    size_t        len) +{ +        cdap_t msg = CDAP__INIT; + +        if (instance == NULL || val == NULL) +                return -1; + +        msg.invoke_id = invoke_id; +        msg.has_result = true; +        msg.result = result; + +        if (buffer_to_cdap_msg(&msg, val, len)) +                return -1; + +        return write_msg(instance, &msg); +} diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto new file mode 100644 index 00000000..a5e0306d --- /dev/null +++ b/src/lib/cdap.proto @@ -0,0 +1,18 @@ +enum opcode { +	CREATE = 1; +	DELETE = 2; +	READ   = 3; +	WRITE  = 4; +	START  = 5; +	STOP   = 6; +	REPLY  = 7; +} + +message cdap { +	required opcode opcode    = 1; +	required uint32 invoke_id = 2; +	optional uint32 flags     = 3; +	optional string name      = 4; +	repeated bytes value      = 5; +	optional int32 result     = 6; +} diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 2b6b6825..b336155e 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -32,6 +32,8 @@  #include <stdlib.h>  #include <string.h>  #include <signal.h> +#include <errno.h> +#include <stdbool.h>  #include <sys/types.h>  #include <sys/wait.h> @@ -54,8 +56,8 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,                 return NULL;         } -       buf.size = ipcp_msg__get_packed_size(msg); -       if (buf.size == 0) { +       buf.len = ipcp_msg__get_packed_size(msg); +       if (buf.len == 0) {                 close(sockfd);                 free(sock_path);                 return NULL; @@ -70,7 +72,7 @@ static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,         ipcp_msg__pack(msg, buf.data); -       if (write(sockfd, buf.data, buf.size) == -1) { +       if (write(sockfd, buf.data, buf.len) == -1) {                 free(sock_path);                 free(buf.data);                 close(sockfd); diff --git a/src/lib/irm.c b/src/lib/irm.c index a81fdad3..ee1851a3 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -29,6 +29,7 @@  #include <ouroboros/logs.h>  #include <ouroboros/sockets.h> +#include <stdbool.h>  #include <string.h>  #include <stdlib.h>  #include <sys/stat.h> diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 31fcca8e..cf0bad19 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -25,6 +25,7 @@  #include <ouroboros/shm_du_map.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/time_utils.h> +  #include <pthread.h>  #include <sys/mman.h>  #include <fcntl.h> diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 9bfbad5e..d60ea91a 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -32,6 +32,7 @@  #include <sys/un.h>  #include <string.h>  #include <stdlib.h> +#include <errno.h>  int client_socket_open(char * file_name)  { @@ -109,8 +110,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)          if (sockfd < 0)                  return NULL; -        buf.size = irm_msg__get_packed_size(msg); -        if (buf.size == 0) { +        buf.len = irm_msg__get_packed_size(msg); +        if (buf.len == 0) {                  close(sockfd);                  return NULL;          } @@ -123,7 +124,7 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)          irm_msg__pack(msg, buf.data); -        if (write(sockfd, buf.data, buf.size) == -1) { +        if (write(sockfd, buf.data, buf.len) == -1) {                  free(buf.data);                  close(sockfd);                  return NULL;  | 
