/* * Ouroboros - Copyright (C) 2016 * * The Common Distributed Application Protocol * * Sander Vrijders <sander.vrijders@intec.ugent.be> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include <ouroboros/config.h> #include <ouroboros/cdap.h> #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> #include <ouroboros/fcntl.h> #include <stdlib.h> #include <pthread.h> #include "cdap.pb-c.h" typedef Cdap cdap_t; typedef Opcode opcode_t; #define IDS_SIZE 256 #define BUF_SIZE 2048 struct cdap { int fd; struct bmp * ids; pthread_mutex_t ids_lock; pthread_t reader; struct cdap_ops * ops; }; struct cdap_info { pthread_t thread; struct cdap * instance; cdap_t * msg; }; static int next_invoke_id(struct cdap * instance) { int ret; pthread_mutex_lock(&instance->ids_lock); ret = bmp_allocate(instance->ids); if (!bmp_is_id_valid(instance->ids, ret)) ret = -1; /* INVALID_INVOKE_ID */ pthread_mutex_unlock(&instance->ids_lock); return ret; } static int release_invoke_id(struct cdap * instance, int id) { int ret; pthread_mutex_lock(&instance->ids_lock); ret = bmp_release(instance->ids, id); pthread_mutex_unlock(&instance->ids_lock); return ret; } static void * handle_cdap_msg(void * o) { struct cdap_info * info = (struct cdap_info *) o; struct cdap * instance = info->instance; cdap_t * msg = info->msg; switch (msg->opcode) { case OPCODE__READ: if (msg->name != NULL) instance->ops->cdap_read(instance, msg->invoke_id, msg->name); break; case OPCODE__WRITE: if (msg->name != NULL && msg->has_value) instance->ops->cdap_write(instance, msg->invoke_id, msg->name, msg->value.data, msg->value.len, msg->flags); break; case OPCODE__CREATE: if (msg->name != NULL && msg->has_value) instance->ops->cdap_create(instance, msg->invoke_id, msg->name, msg->value.data, msg->value.len); break; case OPCODE__DELETE: if (msg->name != NULL && msg->has_value) instance->ops->cdap_delete(instance, msg->invoke_id, msg->name, msg->value.data, msg->value.len); break; case OPCODE__START: if (msg->name != NULL) instance->ops->cdap_start(instance, msg->invoke_id, msg->name); break; case OPCODE__STOP: if (msg->name != NULL) instance->ops->cdap_stop(instance, msg->invoke_id, msg->name); break; case OPCODE__REPLY: instance->ops->cdap_reply(instance, msg->invoke_id, msg->result, msg->value.data, msg->value.len); release_invoke_id(instance, msg->invoke_id); break; default: break; } free(info); cdap__free_unpacked(msg, NULL); return (void *) 0; } static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; cdap_t * msg; uint8_t buf[BUF_SIZE]; ssize_t len; struct cdap_info * cdap_info; while (true) { len = flow_read(instance->fd, buf, BUF_SIZE); if (len < 0) return (void *) -1; msg = cdap__unpack(NULL, len, buf); if (msg == NULL) continue; cdap_info = malloc(sizeof(*cdap_info)); if (cdap_info == NULL) { cdap__free_unpacked(msg, NULL); continue; } cdap_info->instance = instance; cdap_info->msg = msg; pthread_create(&cdap_info->thread, NULL, handle_cdap_msg, (void *) cdap_info); pthread_detach(cdap_info->thread); } return (void *) 0; } struct cdap * cdap_create(struct cdap_ops * ops, int fd) { struct cdap * instance = NULL; int flags; if (ops == NULL || fd < 0 || ops->cdap_reply == NULL || ops->cdap_read == NULL || ops->cdap_write == NULL || ops->cdap_create == NULL || ops->cdap_delete == NULL || ops->cdap_start == NULL || ops->cdap_stop == NULL) return NULL; flags = flow_cntl(fd, FLOW_F_GETFL, 0); if (flags & FLOW_O_NONBLOCK) return NULL; instance = malloc(sizeof(*instance)); if (instance == NULL) return NULL; if (pthread_mutex_init(&instance->ids_lock, NULL)) { free(instance); return NULL; } instance->ops = ops; instance->fd = fd; instance->ids = bmp_create(IDS_SIZE, 0); if (instance->ids == NULL) { free(instance); return NULL; } pthread_create(&instance->reader, NULL, sdu_reader, (void *) instance); return instance; } int cdap_destroy(struct cdap * instance) { if (instance == NULL) return 0; pthread_cancel(instance->reader); pthread_join(instance->reader, NULL); pthread_mutex_lock(&instance->ids_lock); bmp_destroy(instance->ids); pthread_mutex_unlock(&instance->ids_lock); flow_dealloc(instance->fd); free(instance); return 0; } static int write_msg(struct cdap * instance, cdap_t * msg) { int ret; uint8_t * data; size_t len; len = cdap__get_packed_size(msg); if (len == 0) return -1; data = malloc(BUF_SIZE); if (data == NULL) return -1; cdap__pack(msg, data); ret = flow_write(instance->fd, data, len); free(data); return ret; } static int send_read_or_start_or_stop(struct cdap * instance, char * name, opcode_t code) { int id; cdap_t msg = CDAP__INIT; if (instance == NULL || name == NULL) return -1; id = next_invoke_id(instance); if (!bmp_is_id_valid(instance->ids, id)) return -1; msg.opcode = code; msg.invoke_id = id; msg.name = name; if (write_msg(instance, &msg)) return -1; return id; } static int send_create_or_delete(struct cdap * instance, char * name, uint8_t * data, size_t len, opcode_t code) { int id; cdap_t msg = CDAP__INIT; if (instance == NULL || name == NULL || data == NULL) return -1; id = next_invoke_id(instance); if (!bmp_is_id_valid(instance->ids, id)) return -1; msg.opcode = code; msg.name = name; msg.invoke_id = id; msg.has_value = true; msg.value.data = data; msg.value.len = len; if (write_msg(instance, &msg)) return -1; return id; } int cdap_send_read(struct cdap * instance, char * name) { return send_read_or_start_or_stop(instance, name, OPCODE__READ); } int cdap_send_write(struct cdap * instance, char * name, uint8_t * data, size_t len, uint32_t flags) { int id; cdap_t msg = CDAP__INIT; if (instance == NULL || name == NULL || data == NULL) return -1; id = next_invoke_id(instance); if (!bmp_is_id_valid(instance->ids, id)) return -1; msg.opcode = OPCODE__WRITE; msg.name = name; msg.has_flags = true; msg.flags = flags; msg.invoke_id = id; msg.has_value = true; msg.value.data = data; msg.value.len = len; if (write_msg(instance, &msg)) return -1; return id; } int cdap_send_create(struct cdap * instance, char * name, uint8_t * data, size_t len) { return send_create_or_delete(instance, name, data, len, OPCODE__CREATE); } int cdap_send_delete(struct cdap * instance, char * name, uint8_t * data, size_t len) { return send_create_or_delete(instance, name, data, len, OPCODE__DELETE); } int cdap_send_start(struct cdap * instance, char * name) { return send_read_or_start_or_stop(instance, name, OPCODE__START); } int cdap_send_stop(struct cdap * instance, char * name) { return send_read_or_start_or_stop(instance, name, OPCODE__STOP); } int cdap_send_reply(struct cdap * instance, int invoke_id, int result, uint8_t * data, size_t len) { cdap_t msg = CDAP__INIT; if (instance == NULL) return -1; msg.opcode = OPCODE__REPLY; msg.invoke_id = invoke_id; msg.has_result = true; msg.result = result; if (data != NULL) { msg.has_value = true; msg.value.data = data; msg.value.len = len; } return write_msg(instance, &msg); }