From c9fb31c7ebadc076b7c3bfd9dbe1c492c6ef7172 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Fri, 3 Mar 2017 15:31:42 +0100 Subject: Change email addresses to ugent.be Our mailserver was migrated from intec.ugent.be to the central ugent.be emailserver. This PR updates the header files to reflect this change as well. Some header files were also homogenized if the parameters within the functions were badly aligned. --- src/lib/cdap_req.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/lib/cdap_req.c') diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index 810ec9bf..a0348a14 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -3,8 +3,8 @@ * * CDAP - CDAP request management * - * Sander Vrijders - * Dimitri Staessens + * Dimitri Staessens + * Sander Vrijders * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License -- cgit v1.2.3 From a688b8a38d7eb9f42406eeb611717db737b0d257 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 3 Mar 2017 15:41:11 +0100 Subject: lib: Manage multiple flows with a single CDAP instance You can now add multiple flows to a CDAP instance. This will simplify sending messages to different peers (e.g. for syncing the RIB). A request will now return an array of keys terminated by CDAP_KEY_INVALID. Removes the enum from the CDAP proto file to just take the opcode as an integer. --- include/ouroboros/cdap.h | 11 +- src/ipcpd/normal/enroll.c | 48 +++++-- src/lib/cdap.c | 341 +++++++++++++++++++++++++++++++--------------- src/lib/cdap.proto | 12 +- src/lib/cdap_req.c | 9 +- src/lib/cdap_req.h | 5 +- 6 files changed, 287 insertions(+), 139 deletions(-) (limited to 'src/lib/cdap_req.c') diff --git a/include/ouroboros/cdap.h b/include/ouroboros/cdap.h index 19ab39a8..32edb51c 100644 --- a/include/ouroboros/cdap.h +++ b/include/ouroboros/cdap.h @@ -46,12 +46,17 @@ struct cdap; typedef int32_t cdap_key_t; -/* Assumes flow is blocking */ -struct cdap * cdap_create(int fd); +struct cdap * cdap_create(void); int cdap_destroy(struct cdap * instance); -cdap_key_t cdap_request_send(struct cdap * instance, +int cdap_add_flow(struct cdap * instance, + int fd); + +int cdap_del_flow(struct cdap * instance, + int fd); + +cdap_key_t * cdap_request_send(struct cdap * instance, enum cdap_opcode code, const char * name, const void * data, diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c index 680cfbba..4e510038 100644 --- a/src/ipcpd/normal/enroll.c +++ b/src/ipcpd/normal/enroll.c @@ -78,13 +78,20 @@ static void * enroll_handle(void * o) continue; } - cdap = cdap_create(conn.flow_info.fd); + cdap = cdap_create(); if (cdap == NULL) { log_err("Failed to instantiate CDAP."); flow_dealloc(conn.flow_info.fd); continue; } + if (cdap_add_flow(cdap, conn.flow_info.fd)) { + log_warn("Failed to add flow to CDAP."); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + continue; + } + while (!(boot_r && members_r && dif_name_r)) { key = cdap_request_wait(cdap, &oc, &name, &data, (size_t *) &len , &flags); @@ -167,7 +174,7 @@ static void * enroll_handle(void * o) int enroll_boot(char * dst_name) { struct cdap * cdap; - cdap_key_t key; + cdap_key_t * key; uint8_t * data; size_t len; struct conn conn; @@ -186,31 +193,41 @@ int enroll_boot(char * dst_name) return -1; } - cdap = cdap_create(conn.flow_info.fd); + cdap = cdap_create(); if (cdap == NULL) { log_err("Failed to instantiate CDAP."); return -1; } + if (cdap_add_flow(cdap, conn.flow_info.fd)) { + log_warn("Failed to add flow to CDAP."); + cdap_destroy(cdap); + flow_dealloc(conn.flow_info.fd); + return -1; + } + log_dbg("Getting boot information from %s.", dst_name); clock_gettime(CLOCK_REALTIME, &t0); key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0); - if (key < 0) { + if (key == NULL) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(cdap, key, &data, &len)) { + if (cdap_reply_wait(cdap, key[0], &data, &len)) { log_err("Failed to get CDAP reply."); + free(key); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } + free(key); + clock_gettime(CLOCK_REALTIME, &rtt); delta_t = ts_diff_ms(&t0, &rtt); @@ -226,20 +243,23 @@ int enroll_boot(char * dst_name) free(data); key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0); - if (key < 0) { + if (key == NULL) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(cdap, key, &data, &len)) { + if (cdap_reply_wait(cdap, key[0], &data, &len)) { log_err("Failed to get CDAP reply."); + free(key); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } + free(key); + log_dbg("Packed information received (%zu bytes).", len); if (rib_unpack(data, len, UNPACK_CREATE)) { @@ -254,20 +274,23 @@ int enroll_boot(char * dst_name) log_dbg("Packed information inserted into RIB."); key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0); - if (key < 0) { + if (key == NULL) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(cdap, key, &data, &len)) { + if (cdap_reply_wait(cdap, key[0], &data, &len)) { log_err("Failed to get CDAP reply."); + free(key); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } + free(key); + log_dbg("Packed information received (%zu bytes).", len); if (rib_unpack(data, len, UNPACK_CREATE)) { @@ -282,20 +305,23 @@ int enroll_boot(char * dst_name) log_dbg("Packed information inserted into RIB."); key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0); - if (key < 0) { + if (key == NULL) { log_err("Failed to send CDAP request."); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } - if (cdap_reply_wait(cdap, key, &data, &len)) { + if (cdap_reply_wait(cdap, key[0], &data, &len)) { log_err("Failed to get CDAP reply."); + free(key); cdap_destroy(cdap); flow_dealloc(conn.flow_info.fd); return -1; } + free(key); + log_dbg("Packed information received (%zu bytes).", len); if (rib_unpack(data, len, UNPACK_CREATE)) { diff --git a/src/lib/cdap.c b/src/lib/cdap.c index ba4a2a21..86554dd2 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -37,28 +38,39 @@ #include "cdap.pb-c.h" typedef Cdap cdap_t; -typedef Opcode opcode_t; typedef int32_t invoke_id_t; +#define CDAP_REPLY (CDAP_DELETE + 1) + #define INVALID_INVOKE_ID -1 #define IDS_SIZE 256 #define BUF_SIZE 2048 -struct cdap { +struct fd_el { + struct list_head next; + int fd; +}; + +struct cdap { + flow_set_t * set; + + size_t n_flows; + struct list_head flows; + pthread_rwlock_t flows_lock; struct bmp * ids; pthread_mutex_t ids_lock; - pthread_t reader; - struct list_head sent; pthread_rwlock_t sent_lock; struct list_head rcvd; pthread_cond_t rcvd_cond; pthread_mutex_t rcvd_lock; + + pthread_t reader; }; struct cdap_rcvd { @@ -133,6 +145,7 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, } static struct cdap_req * cdap_sent_add(struct cdap * instance, + int fd, cdap_key_t key) { struct cdap_req * req; @@ -141,7 +154,7 @@ static struct cdap_req * cdap_sent_add(struct cdap * instance, assert(key >= 0); assert(!cdap_sent_has_key(instance, key)); - req = cdap_req_create(key); + req = cdap_req_create(fd, key); if (req == NULL) return NULL; @@ -220,9 +233,14 @@ static void * sdu_reader(void * o) uint8_t buf[BUF_SIZE]; ssize_t len; buffer_t data; + fqueue_t * fq; - while (true) { - len = flow_read(instance->fd, buf, BUF_SIZE); + fq = fqueue_create(); + if (fq == NULL) + return (void *) -1; + + while (flow_event_wait(instance->set, fq, NULL)) { + len = flow_read(fqueue_next(fq), buf, BUF_SIZE); if (len < 0) continue; @@ -230,41 +248,17 @@ static void * sdu_reader(void * o) if (msg == NULL) continue; - if (msg->opcode != OPCODE__REPLY) { + if (msg->opcode != CDAP_REPLY) { rcvd = malloc(sizeof(*rcvd)); if (rcvd == NULL) { cdap__free_unpacked(msg, NULL); continue; } - switch (msg->opcode) { - case OPCODE__START: - rcvd->opcode = CDAP_START; - break; - case OPCODE__STOP: - rcvd->opcode = CDAP_STOP; - break; - case OPCODE__READ: - rcvd->opcode = CDAP_READ; - break; - case OPCODE__WRITE: - rcvd->opcode = CDAP_WRITE; - break; - case OPCODE__CREATE: - rcvd->opcode = CDAP_CREATE; - break; - case OPCODE__DELETE: - rcvd->opcode = CDAP_DELETE; - break; - default: - cdap__free_unpacked(msg, NULL); - free(rcvd); - continue; - } - - rcvd->iid = msg->invoke_id; - rcvd->flags = msg->flags; - rcvd->name = strdup(msg->name); + rcvd->opcode = msg->opcode; + rcvd->iid = msg->invoke_id; + rcvd->flags = msg->flags; + rcvd->name = strdup(msg->name); if (rcvd->name == NULL) { cdap__free_unpacked(msg, NULL); free(rcvd); @@ -303,7 +297,7 @@ static void * sdu_reader(void * o) cdap__free_unpacked(msg, NULL); continue; } - memcpy(data.data, msg->value.data, data.len); + memcpy(data.data, msg->value.data Iata.len); } else { data.len = 0; data.data = NULL; @@ -311,36 +305,32 @@ static void * sdu_reader(void * o) cdap_req_respond(req, msg->result, data); } - - cdap__free_unpacked(msg, NULL); } - return (void *) 0; } -struct cdap * cdap_create(int fd) +struct cdap * cdap_create() { struct cdap * instance = NULL; - int flags; - - if (fd < 0) - return NULL; - - flags = flow_get_flags(fd); - if (flags & FLOW_O_NONBLOCK) - return NULL; instance = malloc(sizeof(*instance)); if (instance == NULL) return NULL; + if (pthread_rwlock_init(&instance->flows_lock, NULL)) { + free(instance); + return NULL; + } + if (pthread_mutex_init(&instance->ids_lock, NULL)) { + pthread_rwlock_destroy(&instance->flows_lock); free(instance); return NULL; } if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { pthread_mutex_destroy(&instance->ids_lock); + pthread_rwlock_destroy(&instance->flows_lock); free(instance); return NULL; } @@ -348,6 +338,7 @@ struct cdap * cdap_create(int fd) if (pthread_rwlock_init(&instance->sent_lock, NULL)) { pthread_mutex_destroy(&instance->rcvd_lock); pthread_mutex_destroy(&instance->ids_lock); + pthread_rwlock_destroy(&instance->flows_lock); free(instance); return NULL; } @@ -356,6 +347,7 @@ struct cdap * cdap_create(int fd) pthread_rwlock_destroy(&instance->sent_lock); pthread_mutex_destroy(&instance->rcvd_lock); pthread_mutex_destroy(&instance->ids_lock); + pthread_rwlock_destroy(&instance->flows_lock); free(instance); return NULL; } @@ -366,15 +358,29 @@ struct cdap * cdap_create(int fd) pthread_rwlock_destroy(&instance->sent_lock); pthread_mutex_destroy(&instance->rcvd_lock); pthread_mutex_destroy(&instance->ids_lock); + pthread_rwlock_destroy(&instance->flows_lock); free(instance); return NULL; } + instance->set = flow_set_create(); + if (instance->set == NULL) { + bmp_destroy(instance->ids); + pthread_cond_destroy(&instance->rcvd_cond); + pthread_rwlock_destroy(&instance->sent_lock); + pthread_mutex_destroy(&instance->rcvd_lock); + pthread_mutex_destroy(&instance->ids_lock); + pthread_rwlock_destroy(&instance->flows_lock); + free(instance); + return NULL; + } + + instance->n_flows = 0; + + list_head_init(&instance->flows); list_head_init(&instance->sent); list_head_init(&instance->rcvd); - instance->fd = fd; - pthread_create(&instance->reader, NULL, sdu_reader, instance); return instance; @@ -382,12 +388,29 @@ struct cdap * cdap_create(int fd) int cdap_destroy(struct cdap * instance) { + struct list_head * p; + struct list_head * h; + if (instance == NULL) return 0; pthread_cancel(instance->reader); pthread_join(instance->reader, NULL); + flow_set_destroy(instance->set); + + pthread_rwlock_wrlock(&instance->flows_lock); + + list_for_each_safe(p,h, &instance->flows) { + struct fd_el * e = list_entry(p, struct fd_el, next); + list_del(&e->next); + free(e); + } + + pthread_rwlock_unlock(&instance->flows_lock); + + pthread_rwlock_destroy(&instance->flows_lock); + pthread_mutex_lock(&instance->ids_lock); bmp_destroy(instance->ids); @@ -409,14 +432,71 @@ int cdap_destroy(struct cdap * instance) return 0; } -static int write_msg(struct cdap * instance, +int cdap_add_flow(struct cdap * instance, + int fd) +{ + struct fd_el * e; + + if (fd < 0) + return -EINVAL; + + e = malloc(sizeof(*e)); + if (e == NULL) + return -ENOMEM; + + e->fd = fd; + + pthread_rwlock_wrlock(&instance->flows_lock); + + if (flow_set_add(instance->set, fd)) { + pthread_rwlock_unlock(&instance->flows_lock); + return -1; + } + + list_add(&e->next, &instance->flows); + + ++instance->n_flows; + + pthread_rwlock_unlock(&instance->flows_lock); + + return 0; +} + +int cdap_del_flow(struct cdap * instance, + int fd) +{ + struct list_head * p; + struct list_head * h; + + if (fd < 0) + return -EINVAL; + + pthread_rwlock_wrlock(&instance->flows_lock); + + flow_set_del(instance->set, fd); + + list_for_each_safe(p, h, &instance->flows) { + struct fd_el * e = list_entry(p, struct fd_el, next); + if (e->fd == fd) { + list_del(&e->next); + free(e); + break; + } + } + + --instance->n_flows; + + pthread_rwlock_unlock(&instance->flows_lock); + + return 0; +} + +static int write_msg(int fd, cdap_t * msg) { - int ret; uint8_t * data; size_t len; - assert(instance); assert(msg); len = cdap__get_packed_size(msg); @@ -429,11 +509,14 @@ static int write_msg(struct cdap * instance, cdap__pack(msg, data); - ret = flow_write(instance->fd, data, len); + if (flow_write(fd, data, len)) { + free(data); + return -1; + } free(data); - return ret; + return 0; } static cdap_key_t invoke_id_to_key(invoke_id_t iid) @@ -452,75 +535,114 @@ static invoke_id_t key_to_invoke_id(cdap_key_t key) return (invoke_id_t) key; } -cdap_key_t cdap_request_send(struct cdap * instance, - enum cdap_opcode code, - const char * name, - const void * data, - size_t len, - uint32_t flags) +cdap_key_t * cdap_request_send(struct cdap * instance, + enum cdap_opcode code, + const char * name, + const void * data, + size_t len, + uint32_t flags) { - cdap_t msg = CDAP__INIT; - struct cdap_req * req; - invoke_id_t iid; - cdap_key_t key; + cdap_key_t * keys; + cdap_key_t * key; + cdap_t msg = CDAP__INIT; + struct list_head * p; + int ret; - if (instance == NULL || name == NULL) - return -EINVAL; + if (instance == NULL || name == NULL || code > CDAP_DELETE) + return NULL; + pthread_rwlock_rdlock(&instance->flows_lock); - iid = next_invoke_id(instance); - if (iid == INVALID_INVOKE_ID) - return INVALID_CDAP_KEY; + keys = malloc(sizeof(*keys) * (instance->n_flows + 1)); + if (keys == NULL) + return NULL; - switch (code) { - case CDAP_READ: - msg.opcode = OPCODE__READ; - break; - case CDAP_WRITE: - msg.opcode = OPCODE__WRITE; - break; - case CDAP_CREATE: - msg.opcode = OPCODE__CREATE; - break; - case CDAP_DELETE: - msg.opcode = OPCODE__DELETE; - break; - case CDAP_START: - msg.opcode = OPCODE__START; - break; - case CDAP_STOP: - msg.opcode = OPCODE__STOP; - break; - default: - release_invoke_id(instance, iid); - return -EINVAL; - } + memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1)); + key = keys; + + msg.opcode = code; msg.name = (char *) name; msg.has_flags = true; msg.flags = flags; - msg.invoke_id = iid; + if (data != NULL) { msg.has_value = true; msg.value.data = (uint8_t *) data; msg.value.len = len; } - key = invoke_id_to_key(iid); + list_for_each(p, &instance->flows) { + struct cdap_req * req; + invoke_id_t iid; + struct fd_el * e; + cdap__init(&msg); + + iid = next_invoke_id(instance); + if (iid == INVALID_INVOKE_ID) { + pthread_rwlock_unlock(&instance->flows_lock); + while(key > keys) { + struct cdap_req * r = + cdap_sent_get_by_key(instance, + *(--key)); + cdap_sent_del(instance, r); + cdap_req_destroy(r); + } - req = cdap_sent_add(instance, key); - if (req == NULL) { - release_invoke_id(instance, iid); - return INVALID_CDAP_KEY; - } + free(keys); + return NULL; + } - if (write_msg(instance, &msg)) { - cdap_sent_del(instance, req); - release_invoke_id(instance, iid); - return INVALID_CDAP_KEY; + msg.invoke_id = iid; + + *key = invoke_id_to_key(iid); + + e = list_entry(p, struct fd_el, next); + + req = cdap_sent_add(instance, e->fd, *key); + if (req == NULL) { + pthread_rwlock_unlock(&instance->flows_lock); + while(key > keys) { + struct cdap_req * r = + cdap_sent_get_by_key(instance, + *(--key)); + release_invoke_id(instance, iid); + cdap_sent_del(instance, r); + release_invoke_id(instance, + key_to_invoke_id(r->key)); + cdap_req_destroy(r); + } + free(keys); + return NULL; + } + + ret = write_msg(e->fd, &msg); + if (ret == -ENOMEM) { + pthread_rwlock_unlock(&instance->flows_lock); + while(key >= keys) { + struct cdap_req * r = + cdap_sent_get_by_key(instance, *key); + cdap_sent_del(instance, r); + release_invoke_id(instance, + key_to_invoke_id(r->key)); + cdap_req_destroy(r); + } + + free(keys); + return NULL; + } + + if (ret < 0) { + release_invoke_id(instance, iid); + cdap_sent_del(instance, req); + } + + ++key; } - return key; + pthread_rwlock_unlock(&instance->flows_lock); + + return keys; } int cdap_reply_wait(struct cdap * instance, @@ -609,11 +731,14 @@ int cdap_reply_send(struct cdap * instance, { cdap_t msg = CDAP__INIT; invoke_id_t iid = key_to_invoke_id(key); + struct cdap_req * req = cdap_sent_get_by_key(instance, key); + if (req == NULL) + return -EINVAL; if (instance == NULL) return -EINVAL; - msg.opcode = OPCODE__REPLY; + msg.opcode = CDAP_REPLY; msg.invoke_id = iid; msg.has_result = true; msg.result = result; @@ -624,5 +749,5 @@ int cdap_reply_send(struct cdap * instance, msg.value.len = len; } - return write_msg(instance, &msg); + return write_msg(req->fd, &msg); } diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto index 5fde1658..120b2c97 100644 --- a/src/lib/cdap.proto +++ b/src/lib/cdap.proto @@ -23,18 +23,8 @@ syntax = "proto2"; -enum opcode { - CREATE = 1; - DELETE = 2; - READ = 3; - WRITE = 4; - START = 5; - STOP = 6; - REPLY = 7; -} - message cdap { - required opcode opcode = 1; + required uint32 opcode = 1; required uint32 invoke_id = 2; optional uint32 flags = 3; optional string name = 4; diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c index a0348a14..b60e73ad 100644 --- a/src/lib/cdap_req.c +++ b/src/lib/cdap_req.c @@ -30,7 +30,8 @@ #include #include -struct cdap_req * cdap_req_create(cdap_key_t key) +struct cdap_req * cdap_req_create(int fd, + cdap_key_t key) { struct cdap_req * creq = malloc(sizeof(*creq)); pthread_condattr_t cattr; @@ -38,10 +39,10 @@ struct cdap_req * cdap_req_create(cdap_key_t key) if (creq == NULL) return NULL; - creq->key = key; + creq->fd = fd; + creq->key = key; creq->state = REQ_INIT; - - creq->response = -1; + creq->response = -1; creq->data.data = NULL; creq->data.len = 0; diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h index 9023357d..fe8e3613 100644 --- a/src/lib/cdap_req.h +++ b/src/lib/cdap_req.h @@ -43,8 +43,8 @@ enum creq_state { struct cdap_req { struct list_head next; + int fd; struct timespec birth; - cdap_key_t key; int response; @@ -55,7 +55,8 @@ struct cdap_req { pthread_mutex_t lock; }; -struct cdap_req * cdap_req_create(cdap_key_t key); +struct cdap_req * cdap_req_create(int fd, + cdap_key_t key); void cdap_req_destroy(struct cdap_req * creq); -- cgit v1.2.3