diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-10-19 22:25:46 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-10-21 14:17:51 +0200 |
commit | f516b51169020ea1957010fbd1005d746f01b1d9 (patch) | |
tree | 03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b | |
parent | c79ab46894053312f80390bf13a52c238a7d4704 (diff) | |
download | ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip |
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per
process. This necessitated the development of a new method for the
asynchronous io call, which is now based on an event queue system for
scalability (fqueue). The ipcpd's and tools have been updated to this
API.
27 files changed, 1642 insertions, 1145 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bfb46d8..95ed6b8a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,7 @@ test_and_set_c_compiler_flag_global(-std=c89) test_and_set_c_compiler_flag_global(-Wall) test_and_set_c_compiler_flag_global(-Werror) test_and_set_c_compiler_flag_global(-Wundef) +test_and_set_c_compiler_flag_global(-Wdeclaration-after-statement) test_and_set_c_compiler_flag_global(-fmax-errors=5) configure_file( diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index f24857ed..41feb65e 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -7,11 +7,11 @@ set(HEADER_FILES dev.h errno.h fcntl.h + fqueue.h irm.h irm_config.h nsm.h - qos.h - select.h) + qos.h) install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index 143ae7c8..a9d65aec 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -36,6 +36,7 @@ #define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@" #define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" #define AP_MAX_FLOWS 256 +#define AP_MAX_FQUEUES 64 #define SHM_RDRB_BLOCK_SIZE sysconf(_SC_PAGESIZE) #define SHM_RDRB_MULTI_BLOCK #define SHM_RDRB_PREFIX "/ouroboros.rdrb." @@ -43,7 +44,8 @@ #define SHM_BUFFER_SIZE (1 << 14) #define DU_BUFF_HEADSPACE 128 #define DU_BUFF_TAILSPACE 0 -#define SHM_AP_RBUFF_PREFIX "/ouroboros.rbuff." +#define SHM_RBUFF_PREFIX "/ouroboros.rbuff." +#define SHM_FLOW_SET_PREFIX "/ouroboros.sets." #define IRMD_MAX_FLOWS 4096 #define IRMD_THREADPOOL_SIZE 5 #define LOG_DIR "/@LOG_DIR@/" diff --git a/include/ouroboros/select.h b/include/ouroboros/fqueue.h index de309b8d..943d6510 100644 --- a/include/ouroboros/select.h +++ b/include/ouroboros/fqueue.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 * - * A select call for flows + * Flow queues * * Dimitri Staessens <dimitri.staessens@intec.ugent.be> * Sander Vrijders <sander.vrijders@intec.ugent.be> @@ -21,32 +21,42 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#ifndef OUROBOROS_SELECT_H -#define OUROBOROS_SELECT_H +#ifndef OUROBOROS_FQUEUE_H +#define OUROBOROS_FQUEUE_H #include <stdbool.h> #include <time.h> struct flow_set; +struct fqueue; + typedef struct flow_set flow_set_t; +typedef struct fqueue fqueue_t; flow_set_t * flow_set_create(); void flow_set_destroy(flow_set_t * set); +fqueue_t * fqueue_create(); + +void fqueue_destroy(struct fqueue * fq); + void flow_set_zero(flow_set_t * set); -void flow_set_add(flow_set_t * set, +int flow_set_add(flow_set_t * set, int fd); -void flow_set_del(flow_set_t * set, +bool flow_set_has(flow_set_t * set, int fd); -bool flow_set_has(flow_set_t * set, +void flow_set_del(flow_set_t * set, int fd); -int flow_select(flow_set_t * set, - const struct timespec * timeout); +int fqueue_next(fqueue_t * fq); + +int flow_event_wait(flow_set_t * set, + fqueue_t * fq, + const struct timespec * timeout); #endif /* OUROBOROS_SELECT_H */ diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h index 9343aeaa..3ab05bd7 100644 --- a/include/ouroboros/ipcp-dev.h +++ b/include/ouroboros/ipcp-dev.h @@ -47,7 +47,4 @@ int ipcp_flow_write(int fd, void ipcp_flow_del(struct shm_du_buff * sdb); -/* returns flow descriptor and du buff */ -int ipcp_read_shim(struct shm_du_buff ** sdb); - #endif /* OUROBOROS_IPCP_DEV_H */ diff --git a/include/ouroboros/local-dev.h b/include/ouroboros/local-dev.h index 77ff47e9..30f440b1 100644 --- a/include/ouroboros/local-dev.h +++ b/include/ouroboros/local-dev.h @@ -20,14 +20,12 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#include <ouroboros/shm_ap_rbuff.h> - #ifndef OUROBOROS_LOCAL_DEV_H #define OUROBOROS_LOCAL_DEV_H -struct rb_entry * local_flow_read(int fd); +ssize_t local_flow_read(int fd); -int local_flow_write(int fd, - struct rb_entry * e); +int local_flow_write(int fd, + ssize_t idx); #endif /* OUROBOROS_LOCAL_DEV_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h deleted file mode 100644 index 453e4bf8..00000000 --- a/include/ouroboros/shm_ap_rbuff.h +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - * Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * Sander Vrijders <sander.vrijders@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_SHM_AP_RBUFF_H -#define OUROBOROS_SHM_AP_RBUFF_H - -#include <ouroboros/select.h> -#include <sys/types.h> -#include <sys/time.h> -#include <stdbool.h> - -struct shm_ap_rbuff; - -struct rb_entry { - ssize_t index; - int port_id; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create(); - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api); - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb); - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb); - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, - int port_id); - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, - int port_id); - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, - struct rb_entry * e); - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb); - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb); - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - bool * set, - const struct timespec * timeout); - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, - int port_id); - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, - const struct timespec * timeout); - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); - -#endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h new file mode 100644 index 00000000..32db5d36 --- /dev/null +++ b/include/ouroboros/shm_flow_set.h @@ -0,0 +1,63 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_FLOW_SET_H +#define OUROBOROS_SHM_FLOW_SET_H + +#include <ouroboros/fqueue.h> + +#include <sys/time.h> + +struct shm_flow_set; + +struct shm_flow_set * shm_flow_set_create(); + +void shm_flow_set_destroy(struct shm_flow_set * set); + +struct shm_flow_set * shm_flow_set_open(pid_t api); + +void shm_flow_set_close(struct shm_flow_set * set); + +void shm_flow_set_zero(struct shm_flow_set * shm_set, + ssize_t idx); + +int shm_flow_set_add(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id); + +int shm_flow_set_has(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id); + +void shm_flow_set_del(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id); + +void shm_flow_set_notify(struct shm_flow_set * set, + int port_id); + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, + ssize_t idx, + int * fqueue, + const struct timespec * timeout); + +#endif /* OUROBOROS_SHM_FLOW_SET_H */ diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h new file mode 100644 index 00000000..03660b88 --- /dev/null +++ b/include/ouroboros/shm_rbuff.h @@ -0,0 +1,53 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SHM_RBUFF_H +#define OUROBOROS_SHM_RBUFF_H + +#include <sys/types.h> +#include <sys/time.h> + +struct shm_rbuff; + +struct shm_rbuff * shm_rbuff_create(int port_id); + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id); + +void shm_rbuff_close(struct shm_rbuff * rb); + +void shm_rbuff_destroy(struct shm_rbuff * rb); + +int shm_rbuff_block(struct shm_rbuff * rb); + +void shm_rbuff_unblock(struct shm_rbuff * rb); + +int shm_rbuff_write(struct shm_rbuff * rb, + ssize_t idx); + +ssize_t shm_rbuff_read(struct shm_rbuff * rb); + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * timeout); + +void shm_rbuff_reset(struct shm_rbuff * rb); + +#endif /* OUROBOROS_SHM_RBUFF_H */ diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i index 394b505a..26cc6076 100644 --- a/include/ouroboros/wrap/ouroboros.i +++ b/include/ouroboros/wrap/ouroboros.i @@ -26,11 +26,11 @@ #include "ouroboros/dev.h" #include "ouroboros/errno.h" #include "ouroboros/fcntl.h" +#include "ouroboros/fqueue.h" #include "ouroboros/irm.h" #include "ouroboros/irm_config.h" #include "ouroboros/nsm.h" #include "ouroboros/qos.h" -#include "ouroboros/select.h" %} typedef int pid_t; @@ -39,8 +39,8 @@ typedef int pid_t; %include "ouroboros/dev.h" %include "ouroboros/errno.h" %include "ouroboros/fcntl.h" +%include "ouroboros/fqueue.h" %include "ouroboros/irm.h" %include "ouroboros/irm_config.h" %include "ouroboros/nsm.h" %include "ouroboros/qos.h" -%include "ouroboros/select.h" diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a9f80ee7..f9246c7a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o) ret_msg.has_result = true; ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, - msg->dst_name, - msg->src_ae_name, - msg->qos_cube); + msg->dst_name, + msg->src_ae_name, + msg->qos_cube); if (ret_msg.result < 0) { LOG_DBG("Deallocate failed on port_id %d.", msg->port_id); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 4e500a8a..68c9ae8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -25,7 +25,7 @@ #include <ouroboros/errno.h> #include <ouroboros/dev.h> #include <ouroboros/fcntl.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h> #include <ouroboros/ipcp-dev.h> #include <ouroboros/local-dev.h> #define OUROBOROS_PREFIX "ipcpd/local" @@ -39,6 +39,7 @@ #include <sys/wait.h> #include <fcntl.h> +#define EVENT_WAIT_TIMEOUT 100 /* us */ #define THIS_TYPE IPCP_LOCAL /* global for trapping signal */ @@ -46,18 +47,25 @@ int irmd_api; struct { int in_out[IRMD_MAX_FLOWS]; + flow_set_t * flows; pthread_rwlock_t lock; pthread_t sduloop; } local_data; -void local_data_init() +int local_data_init() { int i; for (i = 0; i < IRMD_MAX_FLOWS; ++i) local_data.in_out[i] = -1; + local_data.flows = flow_set_create(); + if (local_data.flows == NULL) + return -ENFILE; + pthread_rwlock_init(&local_data.lock, NULL); + + return 0; } void local_data_fini() @@ -67,11 +75,24 @@ void local_data_fini() static void * ipcp_local_sdu_loop(void * o) { + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { int fd; - struct rb_entry * e; + int ret; + ssize_t idx; + + ret = flow_event_wait(local_data.flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = flow_select(NULL, NULL); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o) pthread_rwlock_rdlock(&local_data.lock); - e = local_flow_read(fd); + while ((fd = fqueue_next(fq)) >= 0) { + idx = local_flow_read(fd); - fd = local_data.in_out[fd]; + fd = local_data.in_out[fd]; - if (fd != -1) - local_flow_write(fd, e); + if (fd != -1) + local_flow_write(fd, idx); + } pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); - - free(e); } - return (void *) 1; + return (void *) 0; } void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name) if (ipcp_data_add_reg_entry(ipcpi.data, name)) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Failed to add %s to local registry.", name); + LOG_DBG("Failed to add %s to local registry.", name); return -1; } @@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd, if (ipcp_get_state() != IPCP_ENROLLED) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Won't register with non-enrolled IPCP."); + LOG_DBG("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } pthread_rwlock_wrlock(&local_data.lock); + flow_set_add(local_data.flows, fd); + out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); local_data.in_out[fd] = out_fd; @@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return 0; pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&local_data.lock); out_fd = local_data.in_out[fd]; if (out_fd < 0) { @@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(local_data.flows, fd); + + pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) @@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd) if (fd < 0) return -EINVAL; + flow_set_del(local_data.flows, fd); + while (flow_dealloc(fd) == -EBUSY) nanosleep(&t, NULL); @@ -289,9 +318,14 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - local_data_init(); - if (ap_init(NULL) < 0) { + LOG_ERR("Failed to init application."); + close_logfile(); + exit(EXIT_FAILURE); + } + + if (local_data_init() < 0) { + LOG_ERR("Failed to init local data."); close_logfile(); exit(EXIT_FAILURE); } @@ -331,10 +365,10 @@ int main(int argc, char * argv[]) pthread_cancel(local_data.sduloop); pthread_join(local_data.sduloop, NULL); - ap_fini(); - local_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8c627641..2800dcb2 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@ #include <ouroboros/dev.h> #include <ouroboros/list.h> #include <ouroboros/ipcp-dev.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h> #include <ouroboros/errno.h> #include <stdlib.h> @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o) struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct np1_flow * flow; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.np1_set, &timeout); - if (fd == -ETIMEDOUT) + int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + + flow = fmgr.np1_flows[fd]; + if (flow == NULL) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } - if (frct_i_write_sdu(flow->cep_id, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); - continue; - } - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + } } return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o) struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct shm_du_buff * sdb; struct pci * pci; - + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.nm1_set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - if (pci->dst_addr != ribmgr_address()) { - LOG_DBG("PDU needs to be forwarded."); + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ ipcp_flow_del(sdb); free(pci); continue; } - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); ipcp_flow_del(sdb); free(pci); continue; } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - if (frct_nm1_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } } } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@ #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/fqueue.h> #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ + /* global for trapping signal */ int irmd_api; @@ -110,6 +114,7 @@ struct { uint8_t * tx_ring; int tx_offset; #endif + flow_set_t * np1_flows; int * ef_to_fd; struct ef * fd_to_ef; pthread_rwlock_t flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init() return -ENOMEM; } + eth_llc_data.np1_flows = flow_set_create(); + if (eth_llc_data.np1_flows == NULL) { + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; + } + for (i = 0; i < MAX_SAPS; ++i) eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init() void eth_llc_data_fini() { bmp_destroy(eth_llc_data.saps); + flow_set_destroy(eth_llc_data.np1_flows); free(eth_llc_data.fd_to_ef); free(eth_llc_data.ef_to_fd); pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr) return 0; } - bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); - pthread_rwlock_unlock(ð_llc_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - - LOG_DBG("Flow with fd %d deallocated.", fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); return 0; } static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr) { - shim_eth_llc_msg_t * msg = NULL; - - msg = shim_eth_llc_msg__unpack(NULL, len, buf); + shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf); if (msg == NULL) { LOG_ERR("Failed to unpack."); return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { + int fd; + struct shm_du_buff * sdb; + uint8_t ssap; + uint8_t dsap; + uint8_t r_addr[MAC_SIZE]; + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { - int fd; - struct shm_du_buff * sdb; - uint8_t ssap; - uint8_t dsap; - uint8_t r_addr[MAC_SIZE]; - - fd = ipcp_read_shim(&sdb); - if (fd < 0) + int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(ð_llc_data.flows_lock); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } - ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); - dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); - memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); + dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); + memcpy(r_addr, + eth_llc_data.fd_to_ef[fd].r_addr, + MAC_SIZE); + + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - eth_llc_ipcp_send_frame(r_addr, dsap, ssap, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb)); - ipcp_flow_del(sdb); + eth_llc_ipcp_send_frame(r_addr, dsap, ssap, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) + - shm_du_buff_head(sdb)); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd, uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd, return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap); return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) static int eth_llc_ipcp_flow_dealloc(int fd) { + struct timespec t = {0, 10000}; + uint8_t sap; uint8_t r_sap; uint8_t addr[MAC_SIZE]; int ret; + flow_set_del(eth_llc_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd) if (ret < 0) LOG_DBG("Could not notify remote."); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (eth_llc_data_init() < 0) + if (ap_init(NULL) < 0) { + close_logfile(); exit(EXIT_FAILURE); + } - if (ap_init(NULL) < 0) { + if (eth_llc_data_init() < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[]) pthread_join(eth_llc_data.sdu_writer, NULL); pthread_join(eth_llc_data.sdu_reader, NULL); - ap_fini(); - eth_llc_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@ #include <ouroboros/utils.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/errno.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct { struct sockaddr_in s_saddr; int s_fd; + flow_set_t * np1_flows; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct { pthread_mutex_t fd_set_lock; } udp_data; -static void udp_data_init() +static int udp_data_init() { int i; @@ -104,13 +108,21 @@ static void udp_data_init() FD_ZERO(&udp_data.flow_fd_s); + udp_data.np1_flows = flow_set_create(); + if (udp_data.np1_flows == NULL) + return -ENOMEM; + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); + + return 0; } static void udp_data_fini() { + flow_set_destroy(udp_data.np1_flows); + pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port) pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader() static void * ipcp_udp_sdu_loop(void * o) { + int fd; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; + struct shm_du_buff * sdb; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd; - struct shm_du_buff * sdb; + int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = ipcp_read_shim(&sdb); - if (fd < 0) + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); continue; + } - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } - fd = udp_data.fd_to_uf[fd].skfd; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + fd = udp_data.fd_to_uf[fd].skfd; + + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - LOG_ERR("Failed to send SDU."); + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) + LOG_ERR("Failed to send SDU."); - ipcp_flow_del(sdb); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd, udp_data.fd_to_uf[fd].skfd = skfd; udp_data.uf_to_fd[skfd] = fd; + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response) set_fd(skfd); + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd) { int skfd = -1; int remote_udp = -1; + struct timespec t = {0, 10000}; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); + flow_set_del(udp_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd) close(skfd); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - udp_data_init(); - if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } + if (udp_data_init() < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[]) pthread_join(udp_data.handler, NULL); pthread_join(udp_data.sdu_reader, NULL); - ap_fini(); - udp_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index f79e6caf..33f7650a 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -111,6 +111,7 @@ pid_t ipcp_create(enum ipcp_type ipcp_type) char * full_name = NULL; char * exec_name = NULL; char * log_file = NULL; + char * argv[4]; sprintf(irmd_api, "%u", getpid()); @@ -161,14 +162,12 @@ pid_t ipcp_create(enum ipcp_type ipcp_type) } /* log_file to be placed at the end */ - char * argv[] = {full_name, - irmd_api, - log_file, - 0}; + argv[0] = full_name; + argv[1] = irmd_api; + argv[2] = log_file; + argv[3] = NULL; - char * envp[] = {0}; - - execve(argv[0], &argv[0], envp); + execv(argv[0], &argv[0]); LOG_DBG("%s", strerror(errno)); LOG_ERR("Failed to load IPCP daemon"); diff --git a/src/irmd/main.c b/src/irmd/main.c index 157fd8eb..67941e41 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -28,7 +28,7 @@ #include <ouroboros/utils.h> #include <ouroboros/irm_config.h> #include <ouroboros/lockfile.h> -#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_rbuff.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/bitmap.h> #include <ouroboros/qos.h> @@ -1692,26 +1692,26 @@ void * irm_sanitize() } if (kill(f->n_api, 0) < 0) { - struct shm_ap_rbuff * rb = - shm_ap_rbuff_open(f->n_api); + struct shm_rbuff * rb = + shm_rbuff_open(f->n_api, f->port_id); bmp_release(irmd->port_ids, f->port_id); list_del(&f->next); LOG_INFO("AP-I %d gone, flow %d deallocated.", f->n_api, f->port_id); ipcp_flow_dealloc(f->n_1_api, f->port_id); if (rb != NULL) - shm_ap_rbuff_destroy(rb); + shm_rbuff_destroy(rb); irm_flow_destroy(f); continue; } if (kill(f->n_1_api, 0) < 0) { - struct shm_ap_rbuff * rb = - shm_ap_rbuff_open(f->n_1_api); + struct shm_rbuff * rb = + shm_rbuff_open(f->n_1_api, f->port_id); list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); if (rb != NULL) - shm_ap_rbuff_destroy(rb); + shm_rbuff_destroy(rb); irm_flow_destroy(f); } } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index b94d0eea..20ea473d 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -35,7 +35,8 @@ set(SOURCE_FILES lockfile.c logs.c nsm.c - shm_ap_rbuff.c + shm_flow_set.c + shm_rbuff.c shm_rdrbuff.c sockets.c time_utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 77c2d06a..f735e72b 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -3,7 +3,8 @@ * * API for applications * - * Sander Vrijders <sander.vrijders@intec.ugent.be> + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * Sander Vrijders <sander.vrijders@intec.ugent.be> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -26,20 +27,24 @@ #include <ouroboros/sockets.h> #include <ouroboros/fcntl.h> #include <ouroboros/bitmap.h> +#include <ouroboros/shm_flow_set.h> #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_rbuff.h> #include <ouroboros/utils.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h> #include <stdlib.h> #include <string.h> #include <stdio.h> struct flow_set { - bool dirty; - bool b[IRMD_MAX_FLOWS]; /* working copy */ - bool s[IRMD_MAX_FLOWS]; /* safe copy */ - pthread_rwlock_t lock; + size_t idx; +}; + +struct fqueue { + int fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */ + size_t fqsize; + size_t next; }; enum port_state { @@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p) } struct flow { - struct shm_ap_rbuff * rb; + struct shm_rbuff * rx_rb; + struct shm_rbuff * tx_rb; + struct shm_flow_set * set; int port_id; int oflags; @@ -139,10 +146,11 @@ struct { pid_t api; struct shm_rdrbuff * rdrb; - struct shm_ap_rbuff * rb; + struct shm_flow_set * fqset; pthread_rwlock_t data_lock; struct bmp * fds; + struct bmp * fqueues; struct flow * flows; struct port * ports; @@ -194,40 +202,52 @@ int ap_init(char * ap_name) if (ai.fds == NULL) return -ENOMEM; - ai.rdrb = shm_rdrbuff_open(); - if (ai.rdrb == NULL) { + ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0); + if (ai.fqueues == NULL) { + bmp_destroy(ai.fds); + return -ENOMEM; + } + + ai.fqset = shm_flow_set_create(); + if (ai.fqset == NULL) { + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } - ai.rb = shm_ap_rbuff_create(); - if (ai.rb == NULL) { - shm_rdrbuff_close(ai.rdrb); + ai.rdrb = shm_rdrbuff_open(); + if (ai.rdrb == NULL) { + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); if (ai.flows == NULL) { - shm_ap_rbuff_destroy(ai.rb); shm_rdrbuff_close(ai.rdrb); + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai.flows[i].rb = NULL; + ai.flows[i].rx_rb = NULL; + ai.flows[i].tx_rb = NULL; + ai.flows[i].set = NULL; ai.flows[i].port_id = -1; - ai.flows[i].oflags = 0; - ai.flows[i].api = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; ai.flows[i].timeout = NULL; } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); - if (ai.flows == NULL) { + if (ai.ports == NULL) { free(ai.flows); - shm_ap_rbuff_destroy(ai.rb); shm_rdrbuff_close(ai.rdrb); + shm_flow_set_destroy(ai.fqset); + bmp_destroy(ai.fqueues); bmp_destroy(ai.fds); return -1; } @@ -253,16 +273,10 @@ void ap_fini() pthread_rwlock_wrlock(&ai.data_lock); - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0) - shm_rdrbuff_remove(ai.rdrb, i); - - if (ai.fds != NULL) - bmp_destroy(ai.fds); - if (ai.rb != NULL) - shm_ap_rbuff_destroy(ai.rb); - if (ai.rdrb != NULL) - shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); + bmp_destroy(ai.fqueues); + shm_flow_set_destroy(ai.fqset); + shm_rdrbuff_close(ai.rdrb); if (ai.daf_name != NULL) free(ai.daf_name); @@ -270,8 +284,15 @@ void ap_fini() pthread_rwlock_rdlock(&ai.flows_lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ai.flows[i].rb != NULL) - shm_ap_rbuff_close(ai.flows[i].rb); + if (ai.flows[i].tx_rb != NULL) { + int idx; + while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) + shm_rdrbuff_remove(ai.rdrb, idx); + shm_rbuff_destroy(ai.flows[i].rx_rb); + shm_rbuff_close(ai.flows[i].tx_rb); + shm_flow_set_close(ai.flows[i].set); + } + if (ai.flows[i].timeout != NULL) free(ai.flows[i].timeout); } @@ -328,8 +349,8 @@ int flow_accept(char ** ae_name) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -337,10 +358,24 @@ int flow_accept(char ** ae_name) return -1; } + ai.flows[fd].set = shm_flow_set_open(recv_msg->api); + if (ai.flows[fd].set == NULL) { + bmp_release(ai.fds, fd); + shm_rbuff_destroy(ai.flows[fd].rx_rb); + shm_rbuff_close(ai.flows[fd].tx_rb); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - shm_ap_rbuff_close(ai.flows[fd].rb); + shm_rbuff_destroy(ai.flows[fd].tx_rb); + shm_rbuff_close(ai.flows[fd].tx_rb); + shm_flow_set_close(ai.flows[fd].set); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -356,8 +391,6 @@ int flow_accept(char ** ae_name) ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response) ret = recv_msg->result; + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, + ai.flows[fd].port_id); + if (ai.flows[fd].tx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; + ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -470,9 +517,26 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) return -1; } - ai.flows[fd].port_id = recv_msg->port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - ai.flows[fd].api = recv_msg->api; + ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); + if (ai.flows[fd].tx_rb == NULL) { + shm_rbuff_destroy(ai.flows[fd].rx_rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ai.flows[fd].set = shm_flow_set_open(recv_msg->api); + if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); + shm_rbuff_destroy(ai.flows[fd].rx_rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -480,8 +544,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id); - irm_msg__free_unpacked(recv_msg, NULL); return fd; @@ -548,7 +610,7 @@ int flow_dealloc(int fd) return -ENOTALLOC; } - if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) { + if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EBUSY; @@ -559,8 +621,10 @@ int flow_dealloc(int fd) port_destroy(&ai.ports[msg.port_id]); ai.flows[fd].port_id = -1; - shm_ap_rbuff_close(ai.flows[fd].rb); - ai.flows[fd].rb = NULL; + shm_rbuff_destroy(ai.flows[fd].rx_rb); + ai.flows[fd].rx_rb = NULL; + shm_rbuff_close(ai.flows[fd].tx_rb); + ai.flows[fd].tx_rb = NULL; ai.flows[fd].oflags = 0; ai.flows[fd].api = -1; if (ai.flows[fd].timeout != NULL) { @@ -604,9 +668,9 @@ int flow_cntl(int fd, int cmd, int oflags) case FLOW_F_SETFL: /* SET FLOW FLAGS */ ai.flows[fd].oflags = oflags; if (oflags & FLOW_O_WRONLY) - shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id); + shm_rbuff_block(ai.flows[fd].rx_rb); if (oflags & FLOW_O_RDWR) - shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id); + shm_rbuff_unblock(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return old; @@ -620,7 +684,6 @@ int flow_cntl(int fd, int cmd, int oflags) ssize_t flow_write(int fd, void * buf, size_t count) { ssize_t idx; - struct rb_entry e; if (buf == NULL) return 0; @@ -653,13 +716,10 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (idx < 0) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return -idx; + return idx; } - e.index = idx; - e.port_id = ai.flows[fd].port_id; - - if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -667,7 +727,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } } else { /* blocking */ struct shm_rdrbuff * rdrb = ai.rdrb; - pid_t api = ai.flows[fd].api; + pid_t api = ai.flows[fd].api; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -681,17 +741,16 @@ ssize_t flow_write(int fd, void * buf, size_t count) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - e.index = idx; - e.port_id = ai.flows[fd].port_id; - - if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { - shm_rdrbuff_remove(ai.rdrb, e.index); + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } } + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -717,15 +776,14 @@ ssize_t flow_read(int fd, void * buf, size_t count) } if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_ap_rbuff * rb = ai.rb; - int port_id = ai.flows[fd].port_id; - struct timespec * timeout = ai.flows[fd].timeout; + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + struct timespec * timeout = ai.flows[fd].timeout; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); + idx = shm_rbuff_read_b(rb, timeout); pthread_rwlock_rdlock(&ai.data_lock); } @@ -757,79 +815,163 @@ struct flow_set * flow_set_create() if (set == NULL) return NULL; - if (pthread_rwlock_init(&set->lock, NULL)) { + assert(ai.fqueues); + + set->idx = bmp_allocate(ai.fqueues); + if (!bmp_is_id_valid(ai.fqueues, set->idx)) { free(set); return NULL; } - memset(set->b, 0, IRMD_MAX_FLOWS); - memset(set->s, 0, IRMD_MAX_FLOWS); + return set; +} - set->dirty = true; +void flow_set_destroy(struct flow_set * set) +{ + if (set == NULL) + return; - return set; + flow_set_zero(set); + bmp_release(ai.fqueues, set->idx); + free(set); } -void flow_set_zero(struct flow_set * set) +struct fqueue * fqueue_create() { - pthread_rwlock_wrlock(&set->lock); - memset(set->b, 0, IRMD_MAX_FLOWS); - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + struct fqueue * fq = malloc(sizeof(*fq)); + if (fq == NULL) + return NULL; + + memset(fq->fqueue, -1, SHM_BUFFER_SIZE); + fq->fqsize = 0; + fq->next = 0; + + return fq; } -void flow_set_add(struct flow_set * set, int fd) +void fqueue_destroy(struct fqueue * fq) { - pthread_rwlock_wrlock(&set->lock); - set->b[ai.flows[fd].port_id] = true; - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + if (fq == NULL) + return + free(fq); } -void flow_set_del(struct flow_set * set, int fd) +void flow_set_zero(struct flow_set * set) { - pthread_rwlock_wrlock(&set->lock); - set->b[ai.flows[fd].port_id] = false; - set->dirty = true; - pthread_rwlock_unlock(&set->lock); + if (set == NULL) + return; + + pthread_rwlock_rdlock(&ai.data_lock); + + shm_flow_set_zero(ai.fqset, set->idx); + + pthread_rwlock_unlock(&ai.data_lock); } -bool flow_set_has(struct flow_set * set, int fd) +int flow_set_add(struct flow_set * set, int fd) { - bool ret; - pthread_rwlock_rdlock(&set->lock); - ret = set->b[ai.flows[fd].port_id]; - pthread_rwlock_unlock(&set->lock); + int ret; + + if (set == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return ret; } -void flow_set_destroy(struct flow_set * set) +void flow_set_del(struct flow_set * set, int fd) { - pthread_rwlock_destroy(&set->lock); - free(set); + if (set == NULL) + return; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].port_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); } -static void flow_set_cpy(struct flow_set * set) +bool flow_set_has(struct flow_set * set, int fd) { - pthread_rwlock_rdlock(&set->lock); - if (set->dirty) - memcpy(set->s, set->b, IRMD_MAX_FLOWS); - set->dirty = false; - pthread_rwlock_unlock(&set->lock); + bool ret = false; + + if (set == NULL || fd < 0) + return false; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return false; + } + + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return ret; } -int flow_select(struct flow_set * set, const struct timespec * timeout) +int fqueue_next(struct fqueue * fq) { - int port_id; - if (set == NULL) { - port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); - } else { - flow_set_cpy(set); - port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); + int fd; + + if (fq == NULL) + return -EINVAL; + + if (fq->next == fq->fqsize) { + fq->fqsize = 0; + fq->next = 0; + return -EPERM; } - if (port_id < 0) - return port_id; - return ai.ports[port_id].fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[fq->fqueue[fq->next++]].fd; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int flow_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + if (set == NULL) + return -EINVAL; + + if (fq->fqsize > 0) + return 0; + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) + return -ETIMEDOUT; + + if (ret < 0) + return ret; + + fq->fqsize = ret; + fq->next = 0; + + return 0; } /* ipcp-dev functions */ @@ -848,8 +990,8 @@ int np1_flow_alloc(pid_t n_api, int port_id) return -1; } - ai.flows[fd].rb = shm_ap_rbuff_open(n_api); - if (ai.flows[fd].rb == NULL) { + ai.flows[fd].rx_rb = shm_rbuff_create(port_id); + if (ai.flows[fd].rx_rb == NULL) { bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -863,8 +1005,6 @@ int np1_flow_alloc(pid_t n_api, int port_id) ai.ports[port_id].fd = fd; port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - shm_ap_rbuff_open_port(ai.rb, port_id); - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -890,7 +1030,6 @@ int np1_flow_dealloc(int port_id) int np1_flow_resp(pid_t n_api, int port_id) { int fd; - struct shm_ap_rbuff * rb; port_wait_assign(&ai.ports[port_id]); @@ -904,18 +1043,26 @@ int np1_flow_resp(pid_t n_api, int port_id) return fd; } - rb = shm_ap_rbuff_open(n_api); - if (rb == NULL) { + ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); + if (ai.flows[fd].tx_rb == NULL) { ai.flows[fd].port_id = -1; + shm_rbuff_destroy(ai.flows[fd].rx_rb); port_destroy(&ai.ports[port_id]); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -1; } - ai.flows[fd].rb = rb; - - shm_ap_rbuff_open_port(ai.rb, port_id); + ai.flows[fd].set = shm_flow_set_open(n_api); + if (ai.flows[fd].set == NULL) { + shm_rbuff_close(ai.flows[fd].tx_rb); + ai.flows[fd].port_id = -1; + shm_rbuff_destroy(ai.flows[fd].rx_rb); + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -929,9 +1076,9 @@ int ipcp_create_r(pid_t api) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_api = true; - msg.api = api; + msg.code = IRM_MSG_CODE__IPCP_CREATE_R; + msg.has_api = true; + msg.api = api; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) @@ -958,11 +1105,11 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) if (dst_name == NULL || src_ae_name == NULL) return -EINVAL; - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_api = true; - msg.api = api; - msg.dst_name = dst_name; - msg.ae_name = src_ae_name; + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_api = true; + msg.api = api; + msg.dst_name = dst_name; + msg.ae_name = src_ae_name; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_wrlock(&ai.flows_lock); @@ -974,7 +1121,7 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) return -1; /* -ENOMOREFDS */ } - ai.flows[fd].rb = NULL; + ai.flows[fd].tx_rb = NULL; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -996,8 +1143,16 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_wrlock(&ai.flows_lock); + ai.flows[fd].rx_rb = shm_rbuff_create(port_id); + if (ai.flows[fd].rx_rb == NULL) { + ai.flows[fd].port_id = -1; + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + ai.flows[fd].port_id = port_id; - ai.flows[fd].rb = NULL; ai.ports[port_id].fd = fd; port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); @@ -1019,16 +1174,13 @@ int ipcp_flow_alloc_reply(int fd, int response) pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - msg.port_id = ai.flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); msg.has_response = true; msg.response = response; - if (response) - shm_ap_rbuff_open_port(ai.rb, msg.port_id); - recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; @@ -1039,6 +1191,26 @@ int ipcp_flow_alloc_reply(int fd, int response) } ret = recv_msg->result; + + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api, + ai.flows[fd].port_id); + if (ai.flows[fd].tx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api); + if (ai.flows[fd].set == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + pthread_rwlock_unlock(&ai.flows_lock); + irm_msg__free_unpacked(recv_msg, NULL); return ret; @@ -1061,7 +1233,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port(ai.rb, port_id); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); if (idx < 0) { pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); @@ -1081,7 +1253,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - struct rb_entry e; + ssize_t idx; if (sdb == NULL) return -EINVAL; @@ -1095,16 +1267,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return -EPERM; } - if (ai.flows[fd].rb == NULL) { + if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EPERM; } - e.index = shm_du_buff_get_idx(sdb); - e.port_id = ai.flows[fd].port_id; + idx = shm_du_buff_get_idx(sdb); - shm_ap_rbuff_write(ai.flows[fd].rb, &e); + shm_rbuff_write(ai.flows[fd].tx_rb, idx); + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1112,46 +1284,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb) return 0; } -struct rb_entry * local_flow_read(int fd) +ssize_t local_flow_read(int fd) { - int port_id; - struct rb_entry * e = NULL; - - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_rdlock(&ai.flows_lock); - - port_id = ai.flows[fd].port_id; - - pthread_rwlock_unlock(&ai.flows_lock); - pthread_rwlock_unlock(&ai.data_lock); - - if (port_id != -1) { - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; - e->index = shm_ap_rbuff_read_port(ai.rb, port_id); - } - - return e; + return shm_rbuff_read(ai.flows[fd].rx_rb); } -int local_flow_write(int fd, struct rb_entry * e) +int local_flow_write(int fd, ssize_t idx) { - if (e == NULL || fd < 0) + if (fd < 0) return -EINVAL; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - if (ai.flows[fd].rb == NULL) { + if (ai.flows[fd].tx_rb == NULL) { pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); return -EPERM; } - e->port_id = ai.flows[fd].port_id; + shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_ap_rbuff_write(ai.flows[fd].rb, e); + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1159,22 +1313,26 @@ int local_flow_write(int fd, struct rb_entry * e) return 0; } -int ipcp_read_shim(struct shm_du_buff ** sdb) +int ipcp_read_shim(int fd, struct shm_du_buff ** sdb) { - int fd; - struct rb_entry * e = shm_ap_rbuff_read(ai.rb); + ssize_t idx; pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - fd = ai.ports[e->port_id].fd; + if (ai.flows[fd].rx_rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } - *sdb = shm_rdrbuff_get(ai.rdrb, e->index); + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - return fd; + return 0; } void ipcp_flow_del(struct shm_du_buff * sdb) diff --git a/src/lib/lockfile.c b/src/lib/lockfile.c index 04ce9324..a0222f18 100644 --- a/src/lib/lockfile.c +++ b/src/lib/lockfile.c @@ -39,10 +39,10 @@ struct lockfile { pid_t * api; - int fd; }; struct lockfile * lockfile_create() { + int fd; mode_t mask; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) @@ -50,8 +50,8 @@ struct lockfile * lockfile_create() { mask = umask(0); - lf->fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); - if (lf->fd == -1) { + fd = shm_open(LOCKFILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666); + if (fd == -1) { LOG_DBGF("Could not create lock file."); free(lf); return NULL; @@ -59,30 +59,24 @@ struct lockfile * lockfile_create() { umask(mask); - if (ftruncate(lf->fd, LF_SIZE - 1) < 0) { + if (ftruncate(fd, LF_SIZE - 1) < 0) { LOG_DBGF("Failed to extend lockfile."); free(lf); return NULL; } -#ifndef __APPLE__ - if (write(lf->fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise lockfile."); - free(lf); - return NULL; - } -#endif + lf->api = mmap(NULL, LF_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, - lf->fd, + fd, 0); + close (fd); + if (lf->api == MAP_FAILED) { LOG_DBGF("Failed to map lockfile."); - if (shm_unlink(LOCKFILE_NAME) == -1) LOG_DBGF("Failed to remove invalid lockfile."); - free(lf); return NULL; } @@ -93,12 +87,13 @@ struct lockfile * lockfile_create() { } struct lockfile * lockfile_open() { + int fd; struct lockfile * lf = malloc(sizeof(*lf)); if (lf == NULL) return NULL; - lf->fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); - if (lf->fd < 0) { + fd = shm_open(LOCKFILE_NAME, O_RDWR, 0666); + if (fd < 0) { LOG_DBGF("Could not open lock file."); free(lf); return NULL; @@ -107,15 +102,15 @@ struct lockfile * lockfile_open() { lf->api = mmap(NULL, LF_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, - lf->fd, + fd, 0); + close(fd); + if (lf->api == MAP_FAILED) { LOG_DBGF("Failed to map lockfile."); - if (shm_unlink(LOCKFILE_NAME) == -1) LOG_DBGF("Failed to remove invalid lockfile."); - free(lf); return NULL; } @@ -130,9 +125,6 @@ void lockfile_close(struct lockfile * lf) return; } - if (close(lf->fd) < 0) - LOG_DBGF("Couldn't close lockfile."); - if (munmap(lf->api, LF_SIZE) == -1) LOG_DBGF("Couldn't unmap lockfile."); @@ -151,9 +143,6 @@ void lockfile_destroy(struct lockfile * lf) return; } - if (close(lf->fd) < 0) - LOG_DBGF("Couldn't close lockfile."); - if (munmap(lf->api, LF_SIZE) == -1) LOG_DBGF("Couldn't unmap lockfile."); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c deleted file mode 100644 index 5cbf5bd0..00000000 --- a/src/lib/shm_ap_rbuff.c +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Ring buffer for application processes - * - * Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/lockfile.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> - -#define OUROBOROS_PREFIX "shm_ap_rbuff" - -#include <ouroboros/logs.h> - -#include <pthread.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <stdlib.h> -#include <string.h> -#include <stdint.h> -#include <unistd.h> -#include <signal.h> -#include <sys/stat.h> -#include <assert.h> - -#define FN_MAX_CHARS 255 - -#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ - + IRMD_MAX_FLOWS * sizeof(int8_t) \ - + IRMD_MAX_FLOWS * sizeof (ssize_t) \ - + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ - + 2 * sizeof (pthread_cond_t)) - -#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ - & (SHM_BUFFER_SIZE - 1)) -#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) -#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) -#define head_el_ptr(rb) (rb->shm_base + *rb->head) -#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) - -struct shm_ap_rbuff { - struct rb_entry * shm_base; /* start of entry */ - size_t * head; /* start of ringbuffer head */ - size_t * tail; /* start of ringbuffer tail */ - int8_t * acl; /* start of port_id access table */ - ssize_t * cntrs; /* start of port_id counters */ - pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * add; /* SDU arrived */ - pthread_cond_t * del; /* SDU removed */ - pid_t api; /* api to which this rb belongs */ - int fd; -}; - -struct shm_ap_rbuff * shm_ap_rbuff_create() -{ - struct shm_ap_rbuff * rb; - int shm_fd; - struct rb_entry * shm_base; - pthread_mutexattr_t mattr; - pthread_condattr_t cattr; - char fn[FN_MAX_CHARS]; - mode_t mask; - int i; - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); - - rb = malloc(sizeof(*rb)); - if (rb == NULL) { - LOG_DBG("Could not allocate struct."); - return NULL; - } - - mask = umask(0); - - shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBG("Failed creating ring buffer."); - free(rb); - return NULL; - } - - umask(mask); - - if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { - LOG_DBG("Failed to extend ringbuffer."); - free(rb); - return NULL; - } -#ifndef __APPLE__ - if (write(shm_fd, "", 1) != 1) { - LOG_DBG("Failed to finalise extension of ringbuffer."); - free(rb); - return NULL; - } -#endif - shm_base = mmap(NULL, - SHM_RBUFF_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBG("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to remove invalid shm."); - - free(rb); - return NULL; - } - - rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); - rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); - rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); - rb->add = (pthread_cond_t *) (rb->lock + 1); - rb->del = rb->add + 1; - - pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); -#endif - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(rb->lock, &mattr); - - pthread_condattr_init(&cattr); - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - for (i = 0; i < IRMD_MAX_FLOWS; ++i) { - rb->cntrs[i] = 0; - rb->acl[i] = -1; - } - - pthread_cond_init(rb->add, &cattr); - pthread_cond_init(rb->del, &cattr); - - *rb->head = 0; - *rb->tail = 0; - - rb->fd = shm_fd; - rb->api = getpid(); - - return rb; -} - -struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) -{ - struct shm_ap_rbuff * rb; - int shm_fd; - struct rb_entry * shm_base; - char fn[FN_MAX_CHARS]; - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); - - rb = malloc(sizeof(*rb)); - if (rb == NULL) { - LOG_DBG("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(fn, O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); - free(rb); - return NULL; - } - - shm_base = mmap(NULL, - SHM_RBUFF_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBG("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to remove invalid shm."); - - free(rb); - return NULL; - } - - rb->shm_base = shm_base; - rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); - rb->tail = rb->head + 1; - rb->acl = (int8_t *) (rb->tail + 1); - rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS); - rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS); - rb->add = (pthread_cond_t *) (rb->lock + 1); - rb->del = rb->add + 1; - - rb->fd = shm_fd; - rb->api = api; - - return rb; -} - -void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) -{ - assert(rb); - - if (close(rb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - - free(rb); -} - -void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id) -{ - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - rb->acl[port_id] = 0; /* open */ - - pthread_mutex_unlock(rb->lock); -} - -int shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id) -{ - int ret = 0; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - rb->acl[port_id] = -1; - - if (rb->cntrs[port_id] > 0) - ret = -EBUSY; - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) -{ - char fn[25]; - struct lockfile * lf = NULL; - - assert(rb); - - if (rb->api != getpid()) { - lf = lockfile_open(); - if (lf == NULL) - return; - if (lockfile_owner(lf) == getpid()) { - LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", - rb->api, getpid()); - lockfile_close(lf); - } else { - LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", - getpid(), rb->api); - lockfile_close(lf); - return; - } - } - - if (close(rb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - - sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); - - if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) - LOG_DBG("Couldn't unmap shared memory."); - - if (shm_unlink(fn) == -1) - LOG_DBG("Failed to unlink shm."); - - free(rb); -} - -int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) -{ - assert(rb); - assert(e); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (rb->acl[e->port_id]) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; - } - - if (!shm_rbuff_free(rb)) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - if (shm_rbuff_empty(rb)) - pthread_cond_broadcast(rb->add); - - *head_el_ptr(rb) = *e; - *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); - - ++rb->cntrs[e->port_id]; - - pthread_mutex_unlock(rb->lock); - - return 0; -} - -int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb) -{ - int ret = 0; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (shm_rbuff_empty(rb)) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - ret = tail_el_ptr(rb)->index; - --rb->cntrs[tail_el_ptr(rb)->port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_mutex_unlock(rb->lock); - - return ret; -} - -static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - - assert(rb); - - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - if (ret != ETIMEDOUT) - ret = tail_el_ptr(rb)->port_id; - else - ret = -ETIMEDOUT; - - pthread_cleanup_pop(true); - - return ret; -} - -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - bool * set, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret; - - assert(rb); - - if (set == NULL) - return shm_ap_rbuff_peek_b_all(rb, timeout); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (timeout != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); - - while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) - && (ret != ETIMEDOUT)) { - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); - -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - while (!set[tail_el_ptr(rb)->port_id]) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->del, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->del, rb->lock); - -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - } - - if (ret != ETIMEDOUT) - ret = tail_el_ptr(rb)->port_id; - else - ret = -ETIMEDOUT; - - pthread_cleanup_pop(true); - - return ret; -} - - -struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) -{ - struct rb_entry * e = NULL; - - assert(rb); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - while (shm_rbuff_empty(rb)) -#ifdef __APPLE__ - pthread_cond_wait(rb->add, rb->lock); -#else - if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - e = malloc(sizeof(*e)); - if (e != NULL) { - *e = *(rb->shm_base + *rb->tail); - --rb->cntrs[e->port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - } - - pthread_cleanup_pop(true); - - return e; -} - -ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) -{ - ssize_t idx = -1; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) { - pthread_mutex_unlock(rb->lock); - return -1; - } - - idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_cond_broadcast(rb->del); - pthread_mutex_unlock(rb->lock); - - return idx; -} - -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, - const struct timespec * timeout) -{ - struct timespec abstime; - int ret = 0; - ssize_t idx = -1; - - assert(rb); - -#ifdef __APPLE__ - pthread_mutex_lock(rb->lock); -#else - if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (timeout != NULL) { - idx = -ETIMEDOUT; - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeout, &abstime); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); - - while ((shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) - && (ret != ETIMEDOUT)) { - while (shm_rbuff_empty(rb)) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->add, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - - while (tail_el_ptr(rb)->port_id != port_id) { - if (timeout != NULL) - ret = pthread_cond_timedwait(rb->del, - rb->lock, - &abstime); - else - ret = pthread_cond_wait(rb->del, rb->lock); -#ifndef __APPLE__ - if (ret == EOWNERDEAD) { - LOG_DBG("Recovering dead mutex."); - pthread_mutex_consistent(rb->lock); - } -#endif - if (ret == ETIMEDOUT) - break; - } - } - - if (ret != ETIMEDOUT) { - idx = tail_el_ptr(rb)->index; - --rb->cntrs[port_id]; - *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); - - pthread_cond_broadcast(rb->del); - } - - pthread_cleanup_pop(true); - - return idx; -} - -void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) -{ - assert(rb); - - pthread_mutex_lock(rb->lock); - *rb->tail = 0; - *rb->head = 0; - pthread_mutex_unlock(rb->lock); -} diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c new file mode 100644 index 00000000..c960bd25 --- /dev/null +++ b/src/lib/shm_flow_set.c @@ -0,0 +1,408 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Management of flow_sets for fqueue + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/shm_flow_set.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/errno.h> + +#define OUROBOROS_PREFIX "shm_flow_set" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <unistd.h> +#include <signal.h> +#include <sys/stat.h> +#include <string.h> +#include <assert.h> + +#define FN_MAX_CHARS 255 + +#define FQUEUESIZE (SHM_BUFFER_SIZE * sizeof(int)) + +#define SHM_FLOW_SET_FILE_SIZE (IRMD_MAX_FLOWS * sizeof(ssize_t) \ + + AP_MAX_FQUEUES * sizeof(size_t) \ + + AP_MAX_FQUEUES * sizeof(pthread_cond_t) \ + + AP_MAX_FQUEUES * FQUEUESIZE \ + + sizeof(pthread_mutex_t)) + +#define fqueue_ptr(fs, idx) (fs->fqueues + SHM_BUFFER_SIZE * idx) + +struct shm_flow_set { + ssize_t * mtable; + size_t * heads; + pthread_cond_t * conds; + int * fqueues; + pthread_mutex_t * lock; + + pid_t api; +}; + +struct shm_flow_set * shm_flow_set_create() +{ + struct shm_flow_set * set; + ssize_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + char fn[FN_MAX_CHARS]; + mode_t mask; + int shm_fd; + int i; + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", getpid()); + + set = malloc(sizeof(*set)); + if (set == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + mask = umask(0); + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("Failed creating flag file."); + free(set); + return NULL; + } + + umask(mask); + + if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) { + LOG_DBG("Failed to extend flag file."); + free(set); + close(shm_fd); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FLOW_SET_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + + free(set); + return NULL; + } + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); + set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + + pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(set->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + for (i = 0; i < AP_MAX_FQUEUES; ++i) { + set->heads[i] = 0; + pthread_cond_init(&set->conds[i], &cattr); + } + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + set->mtable[i] = -1; + + set->api = getpid(); + + return set; +} + +struct shm_flow_set * shm_flow_set_open(pid_t api) +{ + struct shm_flow_set * set; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + int shm_fd; + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", api); + + set = malloc(sizeof(*set)); + if (set == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); + free(set); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FLOW_SET_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + free(set); + return NULL; + } + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + IRMD_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + AP_MAX_FQUEUES); + set->fqueues = (int *) (set->conds + AP_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + AP_MAX_FQUEUES * SHM_BUFFER_SIZE); + + set->api = api; + + return set; +} + +void shm_flow_set_destroy(struct shm_flow_set * set) +{ + char fn[25]; + struct lockfile * lf = NULL; + + assert(set); + + if (set->api != getpid()) { + lf = lockfile_open(); + if (lf == NULL) { + LOG_ERR("Failed to open lockfile."); + return; + } + + if (lockfile_owner(lf) == getpid()) { + LOG_DBG("Flow set %d destroyed by IRMd %d.", + set->api, getpid()); + lockfile_close(lf); + } else { + LOG_ERR("AP-I %d tried to destroy flowset owned by %d.", + getpid(), set->api); + lockfile_close(lf); + return; + } + } + + sprintf(fn, SHM_FLOW_SET_PREFIX "%d", set->api); + + if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(set); +} + +void shm_flow_set_close(struct shm_flow_set * set) +{ + assert(set); + + if (munmap(set->mtable, SHM_FLOW_SET_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + free(set); +} + +void shm_flow_set_zero(struct shm_flow_set * shm_set, + ssize_t idx) +{ + ssize_t i = 0; + + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + if (shm_set->mtable[i] == idx) + shm_set->mtable[i] = -1; + + shm_set->heads[idx] = 0; + + pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_add(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] != -1) { + pthread_mutex_unlock(shm_set->lock); + return -EPERM; + } + + shm_set->mtable[port_id] = idx; + + pthread_mutex_unlock(shm_set->lock); + + return 0; +} + +void shm_flow_set_del(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == idx) + shm_set->mtable[port_id] = -1; + + pthread_mutex_unlock(shm_set->lock); +} + +int shm_flow_set_has(struct shm_flow_set * shm_set, + ssize_t idx, + int port_id) +{ + int ret = 0; + + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == idx) + ret = 1; + + pthread_mutex_unlock(shm_set->lock); + + return ret; +} + +void shm_flow_set_notify(struct shm_flow_set * shm_set, int port_id) +{ + assert(shm_set); + assert(!(port_id < 0) && port_id < IRMD_MAX_FLOWS); + + pthread_mutex_lock(shm_set->lock); + + if (shm_set->mtable[port_id] == -1) { + pthread_mutex_unlock(shm_set->lock); + return; + } + + *(fqueue_ptr(shm_set, shm_set->mtable[port_id]) + + (shm_set->heads[shm_set->mtable[port_id]])++) = port_id; + + pthread_cond_signal(&shm_set->conds[shm_set->mtable[port_id]]); + + pthread_mutex_unlock(shm_set->lock); +} + + +int shm_flow_set_wait(const struct shm_flow_set * shm_set, + ssize_t idx, + int * fqueue, + const struct timespec * timeout) +{ + int ret = 0; + struct timespec abstime; + + assert(shm_set); + assert(!(idx < 0) && idx < AP_MAX_FQUEUES); + +#ifdef __APPLE__ + pthread_mutex_lock(shm_set->lock); +#else + if (pthread_mutex_lock(shm_set->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(shm_set->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) shm_set->lock); + + while (shm_set->heads[idx] == 0 && ret != -ETIMEDOUT) { + if (timeout != NULL) + ret = pthread_cond_timedwait(shm_set->conds + idx, + shm_set->lock, + &abstime); + else + ret = pthread_cond_wait(shm_set->conds + idx, + shm_set->lock); +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(shm_set->lock); + } +#endif + if (ret == ETIMEDOUT) { + ret = -ETIMEDOUT; + break; + } + } + + if (ret != -ETIMEDOUT) { + memcpy(fqueue, + fqueue_ptr(shm_set, idx), + shm_set->heads[idx] * sizeof(int)); + ret = shm_set->heads[idx]; + shm_set->heads[idx] = 0; + } + + pthread_cleanup_pop(true); + + return ret; +} diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c new file mode 100644 index 00000000..cf094488 --- /dev/null +++ b/src/lib/shm_rbuff.c @@ -0,0 +1,424 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for incoming SDUs + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/errno.h> + +#define OUROBOROS_PREFIX "shm_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <signal.h> +#include <sys/stat.h> +#include <assert.h> +#include <stdbool.h> + +#define FN_MAX_CHARS 255 + +#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(ssize_t) \ + + 2 * sizeof(size_t) + sizeof(int8_t) \ + + sizeof(pthread_mutex_t) \ + + 2 * sizeof (pthread_cond_t)) + +#define shm_rbuff_used(rb) ((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rbuff_free(rb) (shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE) +#define shm_rbuff_empty(rb) (*rb->head == *rb->tail) +#define head_el_ptr(rb) (rb->shm_base + *rb->head) +#define tail_el_ptr(rb) (rb->shm_base + *rb->tail) + +struct shm_rbuff { + ssize_t * shm_base; /* start of entry */ + size_t * head; /* start of ringbuffer head */ + size_t * tail; /* start of ringbuffer tail */ + int8_t * acl; /* access control */ + pthread_mutex_t * lock; /* lock all free space in shm */ + pthread_cond_t * add; /* SDU arrived */ + pthread_cond_t * del; /* SDU removed */ + pid_t api; /* api of the owner */ + int port_id; /* port_id of the flow */ +}; + +struct shm_rbuff * shm_rbuff_create(int port_id) +{ + struct shm_rbuff * rb; + int shm_fd; + ssize_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + char fn[FN_MAX_CHARS]; + mode_t mask; + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", getpid(), port_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + mask = umask(0); + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("Failed creating ring buffer."); + free(rb); + return NULL; + } + + umask(mask); + + if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) { + LOG_DBG("Failed to extend ringbuffer."); + free(rb); + close(shm_fd); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->lock = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; + + pthread_mutexattr_init(&mattr); +#ifndef __APPLE__ + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + pthread_cond_init(rb->add, &cattr); + pthread_cond_init(rb->del, &cattr); + + *rb->acl = 0; + *rb->head = 0; + *rb->tail = 0; + + rb->api = getpid(); + rb->port_id = port_id; + + return rb; +} + +struct shm_rbuff * shm_rbuff_open(pid_t api, int port_id) +{ + struct shm_rbuff * rb; + int shm_fd; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", api, port_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBG("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBG("%d failed opening shared memory %s.", getpid(), fn); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + close(shm_fd); + + if (shm_base == MAP_FAILED) { + LOG_DBG("Failed to map shared memory."); + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE); + rb->tail = rb->head + 1; + rb->acl = (int8_t *) (rb->tail + 1); + rb->lock = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->lock + 1); + rb->del = rb->add + 1; + + rb->api = api; + rb->port_id = port_id; + + return rb; +} + +void shm_rbuff_close(struct shm_rbuff * rb) +{ + assert(rb); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ + char fn[25]; + struct lockfile * lf = NULL; + + assert(rb); + + if (rb->api != getpid()) { + lf = lockfile_open(); + if (lf == NULL) { + LOG_ERR("Failed to open lockfile."); + return; + } + + if (lockfile_owner(lf) == getpid()) { + LOG_DBG("Ringbuffer %d destroyed by IRMd %d.", + rb->api, getpid()); + lockfile_close(lf); + } else { + LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", + getpid(), rb->api); + lockfile_close(lf); + return; + } + } + + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->api, rb->port_id); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(rb); +} + +int shm_rbuff_write(struct shm_rbuff * rb, ssize_t idx) +{ + assert(rb); + assert(idx >= 0); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (*rb->acl) { + pthread_mutex_unlock(rb->lock); + return -ENOTALLOC; + } + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + *head_el_ptr(rb) = idx; + *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1); + + pthread_mutex_unlock(rb->lock); + + return 0; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ + int ret = 0; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (shm_rbuff_empty(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + ret = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + ssize_t idx = -1; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + idx = -ETIMEDOUT; + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) { + idx = -ETIMEDOUT; + break; + } + } + + if (idx != -ETIMEDOUT) { + idx = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1); + pthread_cond_broadcast(rb->del); + } + + pthread_cleanup_pop(true); + + return idx; +} + +int shm_rbuff_block(struct shm_rbuff * rb) +{ + int ret = 0; + + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + *rb->acl = -1; + + if (!shm_rbuff_empty(rb)) + ret = -EBUSY; + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +void shm_rbuff_unblock(struct shm_rbuff * rb) +{ + assert(rb); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + *rb->acl = 0; /* open */ + + pthread_mutex_unlock(rb->lock); +} + +void shm_rbuff_reset(struct shm_rbuff * rb) +{ + assert(rb); + + pthread_mutex_lock(rb->lock); + *rb->tail = 0; + *rb->head = 0; + pthread_mutex_unlock(rb->lock); +} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index f6683dc2..e5a37577 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -90,7 +90,6 @@ struct shm_rdrbuff { pthread_cond_t * full; /* run sanitizer when buffer full */ pid_t * api; /* api of the irmd owner */ enum qos_cube qos; /* qos id which this buffer serves */ - int fd; }; static void garbage_collect(struct shm_rdrbuff * rdrb) @@ -189,17 +188,11 @@ struct shm_rdrbuff * shm_rdrbuff_create() if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { LOG_DBGF("Failed to extend shared memory map."); free(shm_rdrb_fn); + close(shm_fd); free(rdrb); return NULL; } -#ifndef __APPLE - if (write(shm_fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise extension of shared memory map."); - free(shm_rdrb_fn); - free(rdrb); - return NULL; - } -#endif + shm_base = mmap(NULL, SHM_FILE_SIZE, PROT_READ | PROT_WRITE, @@ -207,6 +200,8 @@ struct shm_rdrbuff * shm_rdrbuff_create() shm_fd, 0); + close(shm_fd); + if (shm_base == MAP_FAILED) { LOG_DBGF("Failed to map shared memory."); if (shm_unlink(shm_rdrb_fn) == -1) @@ -235,6 +230,9 @@ struct shm_rdrbuff * shm_rdrbuff_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif pthread_cond_init(rdrb->full, &cattr); pthread_cond_init(rdrb->healthy, &cattr); @@ -246,7 +244,6 @@ struct shm_rdrbuff * shm_rdrbuff_create() *rdrb->api = getpid(); rdrb->qos = qos; - rdrb->fd = shm_fd; free(shm_rdrb_fn); @@ -287,10 +284,11 @@ struct shm_rdrbuff * shm_rdrbuff_open() MAP_SHARED, shm_fd, 0); + + close(shm_fd); + if (shm_base == MAP_FAILED) { LOG_DBGF("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBG("Failed to close invalid shm."); if (shm_unlink(shm_rdrb_fn) == -1) LOG_DBG("Failed to unlink invalid shm."); free(shm_rdrb_fn); @@ -309,7 +307,6 @@ struct shm_rdrbuff * shm_rdrbuff_open() rdrb->api = (pid_t *) (rdrb->full + 1); rdrb->qos = qos; - rdrb->fd = shm_fd; free(shm_rdrb_fn); @@ -400,9 +397,6 @@ void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) { assert(rdrb); - if (close(rdrb->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); @@ -420,9 +414,6 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) return; } - if (close(rdrb->fd) < 0) - LOG_DBG("Couldn't close shared memory."); - if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 7d41b497..0ca40326 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -23,7 +23,7 @@ #define _POSIX_C_SOURCE 199506L -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h> #include <ouroboros/dev.h> #include <stdio.h> @@ -53,6 +53,8 @@ struct c { float rtt_avg; float rtt_m2; + flow_set_t * flows; + /* needs locking */ struct timespec * times; pthread_mutex_t lock; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 4742d0de..40f75785 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -55,20 +55,21 @@ void * reader(void * o) struct timespec timeout = {2, 0}; struct timespec now = {0, 0}; - struct oping_msg * msg; char buf[OPING_BUF_SIZE]; + struct oping_msg * msg = (struct oping_msg *) buf; int fd = 0; int msg_len = 0; float ms = 0; float d = 0; - - msg = (struct oping_msg *) buf; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; /* FIXME: use flow timeout option once we have it */ - while(client.rcvd != client.count && - (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) { - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK); - while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { + while (client.rcvd != client.count + && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) { + while ((fd = fqueue_next(fq)) >= 0) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -165,12 +166,20 @@ int client_main() struct timespec tic; struct timespec toc; - int fd = flow_alloc(client.s_apn, NULL, NULL); + int fd; + + client.flows = flow_set_create(); + if (client.flows == NULL) + return 0; + + fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow.\n"); return -1; } + flow_set_add(client.flows, fd); + if (flow_alloc_res(fd)) { printf("Flow allocation refused.\n"); flow_dealloc(fd); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 845f0cbd..8a5a3512 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -69,16 +69,23 @@ void * server_thread(void *o) struct oping_msg * msg = (struct oping_msg *) buf; struct timespec now = {0, 0}; struct timespec timeout = {0, 100 * MILLION}; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(server.flows, &timeout); - if (fd == -ETIMEDOUT) - continue; - if (fd < 0) { - printf("Failed to get active fd.\n"); + int ret = flow_event_wait(server.flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; + + if (ret < 0) { + printf("Event error.\n"); + break; } - while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { + + while ((fd = fqueue_next(fq)) >= 0) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -160,8 +167,6 @@ int server_main() if (server.flows == NULL) return 0; - flow_set_zero(server.flows); - pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); pthread_create(&server.accept_pt, NULL, accept_thread, NULL); pthread_create(&server.server_pt, NULL, server_thread, NULL); |