diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-04 15:23:54 +0200 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-10-04 15:23:54 +0200 |
commit | 1a7c0923206cfb98d43122621a585027c67040ea (patch) | |
tree | acd08f09f5a094e897020e97961b2847209df043 | |
parent | ecdf47b97abb8c5107846f4ef4a17bd62ba6dc82 (diff) | |
parent | c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (diff) | |
download | ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.tar.gz ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.zip |
Merged in dstaesse/ouroboros/be-unify (pull request #251)
lib, ipcp: Revise fast path and flow interfaces
44 files changed, 2065 insertions, 2840 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 1918165c..0bfb46d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,14 +19,14 @@ set(PACKAGE_BUGREPORT "None" set(PACKAGE_VERSION "${PACKAGE_VERSION_MAJOR}.${PACKAGE_VERSION_MINOR}" CACHE STRING "Package version") -SET(CMAKE_SKIP_BUILD_RPATH FALSE) -SET(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) -SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) +set(CMAKE_SKIP_BUILD_RPATH FALSE) +set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) +set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) LIST(FIND CMAKE_PLATFORM_IMPLICIT_LINK_DIRECTORIES "${CMAKE_INSTALL_PREFIX}/usr/lib" isSystemDir) IF("${isSystemDir}" STREQUAL "-1") - SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/usr/lib") + set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/usr/lib") ENDIF("${isSystemDir}" STREQUAL "-1") message(STATUS "Package name is: ${PACKAGE_NAME}") diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index 78a7bb9c..f24857ed 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -6,7 +6,7 @@ set(HEADER_FILES cdap.h dev.h errno.h - flow.h + fcntl.h irm.h irm_config.h nsm.h diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index fe5ff4b5..25299ee3 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -20,11 +20,9 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#include <unistd.h> -#include <stdint.h> - #include <ouroboros/qos.h> -#include <ouroboros/flow.h> + +#include <unistd.h> #ifndef OUROBOROS_DEV_H #define OUROBOROS_DEV_H @@ -36,32 +34,29 @@ int ap_init(char * ap_name); void ap_fini(void); -/* Returns file descriptor (> 0) and client AE name. */ +/* Returns flow descriptor (> 0) and client AE name. */ int flow_accept(char ** ae_name); -int flow_alloc_resp(int fd, int result); +int flow_alloc_resp(int fd, + int response); /* - * Returns file descriptor (> 0). + * Returns flow descriptor (> 0). * On returning, qos will contain the actual supplied QoS. */ -int flow_alloc(char * dst_name, - char * src_ae_name, +int flow_alloc(char * dst_name, + char * src_ae_name, struct qos_spec * qos); int flow_alloc_res(int fd); int flow_dealloc(int fd); -int flow_cntl(int fd, - int cmd, - int oflags); - -ssize_t flow_write(int fd, +ssize_t flow_write(int fd, void * buf, size_t count); -ssize_t flow_read(int fd, +ssize_t flow_read(int fd, void * buf, size_t count); diff --git a/include/ouroboros/flow.h b/include/ouroboros/fcntl.h index 754c7632..ccb45996 100644 --- a/include/ouroboros/flow.h +++ b/include/ouroboros/fcntl.h @@ -20,8 +20,8 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#ifndef OUROBOROS_FLOW_H -#define OUROBOROS_FLOW_H +#ifndef OUROBOROS_FCNTL_H +#define OUROBOROS_FCNTL_H /* same values as fcntl.h */ #define FLOW_O_RDONLY 00000000 @@ -37,4 +37,8 @@ #define FLOW_F_GETFL 00000001 #define FLOW_F_SETFL 00000002 -#endif /* OUROBOROS_FLOW_H */ +int flow_cntl(int fd, + int cmd, + int oflags); + +#endif /* OUROBOROS_FCNTL_H */ diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h new file mode 100644 index 00000000..3c2ff264 --- /dev/null +++ b/include/ouroboros/ipcp-dev.h @@ -0,0 +1,50 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Additional API for IPCPs + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * Sander Vrijders <sander.vrijders@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <unistd.h> +#include <time.h> + +#include <ouroboros/qos.h> +#include <ouroboros/shm_rdrbuff.h> + +#ifndef OUROBOROS_IPCP_DEV_H +#define OUROBOROS_IPCP_DEV_H + +int ipcp_create_r(pid_t api); + +int ipcp_flow_req_arr(pid_t api, + char * dst_name, + char * src_ae_name); + +int ipcp_flow_alloc_reply(int fd, + int response); + +/* returns flow descriptor and du buff */ +int ipcp_flow_read(struct shm_du_buff ** sdb); + +int ipcp_flow_write(int fd, + struct shm_du_buff * sdb); + +void ipcp_flow_del(struct shm_du_buff * sdb); + +#endif /* OUROBOROS_IPCP_DEV_H */ diff --git a/src/ipcpd/flow.h b/include/ouroboros/local-dev.h index 01226c1e..b4915672 100644 --- a/src/ipcpd/flow.h +++ b/include/ouroboros/local-dev.h @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 * - * Flows + * Optimized calls for the local IPCPs * * Dimitri Staessens <dimitri.staessens@intec.ugent.be> * @@ -20,20 +20,15 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#ifndef OUROBOROS_IPCP_FLOW_H -#define OUROBOROS_IPCP_FLOW_H - -#include <ouroboros/list.h> #include <ouroboros/shm_ap_rbuff.h> -#include <stdint.h> +#ifndef OUROBOROS_LOCAL_DEV_H +#define OUROBOROS_LOCAL_DEV_H -struct flow { - int port_id; - struct shm_ap_rbuff * rb; - enum flow_state state; +/* returns flow descriptor and rb_entry, no access to du_buff */ +int local_flow_read(struct rb_entry * e); - pid_t api; -}; +int local_flow_write(int fd, + struct rb_entry * e); -#endif /* OUROBOROS_FLOW_H */ +#endif /* OUROBOROS_LOCAL_DEV_H */ diff --git a/include/ouroboros/common.h b/include/ouroboros/np1_flow.h index dbd050f1..c89af70e 100644 --- a/include/ouroboros/common.h +++ b/include/ouroboros/np1_flow.h @@ -1,9 +1,9 @@ /* * Ouroboros - Copyright (C) 2016 * - * Common definitions + * Adapter functions for N + 1 flow descriptors * - * Sander Vrijders <sander.vrijders@intec.ugent.be> + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -20,15 +20,23 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#ifndef OUROBOROS_COMMON_H -#define OUROBOROS_COMMON_H - -#include <stdint.h> #include <unistd.h> +#include <stdint.h> +#include <time.h> + +#include <ouroboros/qos.h> +#include <ouroboros/flow.h> +#include <ouroboros/shm_rdrbuff.h> + +#ifndef OUROBOROS_NP1_FLOW_H +#define OUROBOROS_NP1_FLOW_H + +int np1_flow_alloc(pid_t n_api, + int port_id); + +int np1_flow_resp(pid_t n_api, + int port_id); -typedef struct { - uint8_t * data; - size_t len; -} buffer_t; +int np1_flow_dealloc(int port_id); -#endif /* OUROBOROS_COMMON_H */ +#endif /* OUROBOROS_NP1_FLOW_H */ diff --git a/include/ouroboros/shared.h b/include/ouroboros/shared.h index bfd99eb0..c38b1bde 100644 --- a/include/ouroboros/shared.h +++ b/include/ouroboros/shared.h @@ -30,11 +30,4 @@ enum qos_cube { QOS_MAX }; -enum flow_state { - FLOW_NULL = 0, - FLOW_PENDING, - FLOW_ALLOCATED, - FLOW_DESTROY -}; - #endif /* OUROBOROS_SHARED_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 6b11fd2d..89d9876d 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -36,17 +36,9 @@ struct rb_entry { int port_id; }; -/* recv SDUs from N + 1 */ -struct shm_ap_rbuff * shm_ap_rbuff_create_n(); +struct shm_ap_rbuff * shm_ap_rbuff_create(); -/* recv SDUs from N - 1 */ -struct shm_ap_rbuff * shm_ap_rbuff_create_s(); - -/* write SDUs to N - 1 */ -struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api); - -/* write SDUs to N + 1 */ -struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api); +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api); void shm_ap_rbuff_close(struct shm_ap_rbuff * rb); diff --git a/include/ouroboros/shm_rdrbuff.h b/include/ouroboros/shm_rdrbuff.h index 7a7049e3..f1be3652 100644 --- a/include/ouroboros/shm_rdrbuff.h +++ b/include/ouroboros/shm_rdrbuff.h @@ -33,6 +33,8 @@ struct shm_du_buff; struct shm_rdrbuff; +size_t shm_du_buff_get_idx(struct shm_du_buff * sdb); + struct shm_rdrbuff * shm_rdrbuff_create(); struct shm_rdrbuff * shm_rdrbuff_open(); @@ -44,40 +46,43 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb); void * shm_rdrbuff_sanitize(void * o); /* returns the index of the buffer in the DU map */ -ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t data_len); +ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t data_len); + +ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t data_len); -ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t data_len); +int shm_rdrbuff_read(uint8_t ** dst, + struct shm_rdrbuff * rdrb, + ssize_t idx); -int shm_rdrbuff_read(uint8_t ** dst, - struct shm_rdrbuff * rdrb, - ssize_t idx); +struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, + ssize_t idx); -int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, - ssize_t idx); +int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, + ssize_t idx); -uint8_t * shm_du_buff_head(struct shm_du_buff * sdb); +uint8_t * shm_du_buff_head(struct shm_du_buff * sdb); -uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb); +uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb); -uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, - size_t size); +uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, + size_t size); -uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, - size_t size); +uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, + size_t size); -int shm_du_buff_head_release(struct shm_du_buff * sdb, - size_t size); +int shm_du_buff_head_release(struct shm_du_buff * sdb, + size_t size); -int shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size); +int shm_du_buff_tail_release(struct shm_du_buff * sdb, + size_t size); #endif /* OUROBOROS_SHM_RDRBUFF_H */ diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h index 5d654cb1..aef4259e 100644 --- a/include/ouroboros/sockets.h +++ b/include/ouroboros/sockets.h @@ -23,8 +23,6 @@ #ifndef OUROBOROS_SOCKETS_H #define OUROBOROS_SOCKETS_H -#include <ouroboros/common.h> - #include <sys/types.h> #include "dif_config.pb-c.h" @@ -49,9 +47,11 @@ typedef IpcpMsg ipcp_msg_t; char * ipcp_sock_path(pid_t api); int server_socket_open(char * file_name); + int client_socket_open(char * file_name); irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); + irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg); #endif diff --git a/include/ouroboros/utils.h b/include/ouroboros/utils.h index 17bd41ee..cf9a49bc 100644 --- a/include/ouroboros/utils.h +++ b/include/ouroboros/utils.h @@ -20,8 +20,19 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define MAX(a,b) (a > b ? a : b) -#define MIN(a,b) (a < b ? a : b) +#ifndef OUROBOROS_UTILS_H +#define OUROBOROS_UTILS_H + +#include <stdint.h> +#include <unistd.h> + +#define MIN(a,b) (((a) < (b)) ? (a) : (b)) +#define MAX(a,b) (((a) > (b)) ? (a) : (b)) + +typedef struct { + uint8_t * data; + size_t len; +} buffer_t; /* * Returns the number of characters a uint would @@ -34,3 +45,5 @@ char * strdup(const char * src); /* gets the application name */ char * path_strip(char * src); + +#endif /* OUROBOROS_UTILS_H */ diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i index 2f66aa16..394b505a 100644 --- a/include/ouroboros/wrap/ouroboros.i +++ b/include/ouroboros/wrap/ouroboros.i @@ -25,7 +25,7 @@ #include "ouroboros/cdap.h" #include "ouroboros/dev.h" #include "ouroboros/errno.h" -#include "ouroboros/flow.h" +#include "ouroboros/fcntl.h" #include "ouroboros/irm.h" #include "ouroboros/irm_config.h" #include "ouroboros/nsm.h" @@ -38,7 +38,7 @@ typedef int pid_t; %include "ouroboros/cdap.h" %include "ouroboros/dev.h" %include "ouroboros/errno.h" -%include "ouroboros/flow.h" +%include "ouroboros/fcntl.h" %include "ouroboros/irm.h" %include "ouroboros/irm_config.h" %include "ouroboros/nsm.h" diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 36245eea..4971dbb5 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -30,7 +30,6 @@ #include <pthread.h> #include "ipcp-ops.h" -#include "flow.h" struct ipcp_data { enum ipcp_type type; @@ -46,24 +45,32 @@ struct ipcp_data { }; struct ipcp_data * ipcp_data_create(); + struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, enum ipcp_type ipcp_type); + void ipcp_data_destroy(struct ipcp_data * data); -int ipcp_data_add_reg_entry(struct ipcp_data * data, - char * name); -int ipcp_data_del_reg_entry(struct ipcp_data * data, - const char * name); -int ipcp_data_add_dir_entry(struct ipcp_data * data, - char * ap_name, - uint64_t addr); -int ipcp_data_del_dir_entry(struct ipcp_data * data, - const char * ap_name, - uint64_t addr); -bool ipcp_data_is_in_registry(struct ipcp_data * data, - const char * name); -bool ipcp_data_is_in_directory(struct ipcp_data * data, - const char * ap_name); -uint64_t ipcp_data_get_addr(struct ipcp_data * data, - const char * ap_name); +int ipcp_data_add_reg_entry(struct ipcp_data * data, + char * name); + +int ipcp_data_del_reg_entry(struct ipcp_data * data, + const char * name); + +int ipcp_data_add_dir_entry(struct ipcp_data * data, + char * ap_name, + uint64_t addr); + +int ipcp_data_del_dir_entry(struct ipcp_data * data, + const char * ap_name, + uint64_t addr); + +bool ipcp_data_is_in_registry(struct ipcp_data * data, + const char * name); + +bool ipcp_data_is_in_directory(struct ipcp_data * data, + const char * ap_name); + +uint64_t ipcp_data_get_addr(struct ipcp_data * data, + const char * ap_name); #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index e43c2c38..815cda09 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -25,23 +25,26 @@ #define IPCPD_IPCP_OPS_H #include <ouroboros/irm_config.h> -#include <ouroboros/common.h> -#include <sys/types.h> +#include <ouroboros/shared.h> struct ipcp_ops { int (* ipcp_bootstrap)(struct dif_config * conf); + int (* ipcp_enroll)(char * dif_name); - int (* ipcp_name_reg)(char * name); + + int (* ipcp_name_reg)(char * name); + int (* ipcp_name_unreg)(char * name); - int (* ipcp_flow_alloc)(pid_t n_api, - int port_id, + + int (* ipcp_flow_alloc)(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos); - int (* ipcp_flow_alloc_resp)(pid_t n_api, - int port_id, - int response); - int (* ipcp_flow_dealloc)(int port_id); + + int (* ipcp_flow_alloc_resp)(int fd, + int response); + + int (* ipcp_flow_dealloc)(int fd); }; #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index ec5ab927..db72b88d 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -21,8 +21,12 @@ */ #include <ouroboros/config.h> -#include <ouroboros/ipcp.h> #include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> +#include <ouroboros/sockets.h> +#include <ouroboros/errno.h> +#include <ouroboros/dev.h> +#include <ouroboros/np1_flow.h> #define OUROBOROS_PREFIX "ipcpd/ipcp" #include <ouroboros/logs.h> @@ -32,62 +36,68 @@ #include <stdlib.h> #include "ipcp.h" -struct ipcp * ipcp_instance_create() +int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops) { pthread_condattr_t cattr; - struct ipcp * i = malloc(sizeof *i); - if (i == NULL) - return NULL; + ipcpi.irmd_fd = -1; + ipcpi.state = IPCP_INIT; - i->data = NULL; - i->ops = NULL; - i->irmd_fd = -1; - i->state = IPCP_INIT; + ipcpi.ops = ops; - pthread_rwlock_init(&i->state_lock, NULL); - pthread_mutex_init(&i->state_mtx, NULL); + ipcpi.data = ipcp_data_create(); + if (ipcpi.data == NULL) + return -ENOMEM; + + ipcp_data_init(ipcpi.data, type); + + pthread_rwlock_init(&ipcpi.state_lock, NULL); + pthread_mutex_init(&ipcpi.state_mtx, NULL); pthread_condattr_init(&cattr); #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - pthread_cond_init(&i->state_cond, &cattr); + pthread_cond_init(&ipcpi.state_cond, &cattr); - return i; + pthread_create(&ipcpi.mainloop, NULL, ipcp_main_loop, NULL); + + return 0; } -void ipcp_set_state(struct ipcp * ipcp, - enum ipcp_state state) +void ipcp_fini() { - if (ipcp == NULL) - return; + pthread_join(ipcpi.mainloop, NULL); - pthread_mutex_lock(&ipcp->state_mtx); + ipcp_data_destroy(ipcpi.data); + pthread_cond_destroy(&ipcpi.state_cond); + pthread_mutex_destroy(&ipcpi.state_mtx); + pthread_rwlock_destroy(&ipcpi.state_lock); +} + +void ipcp_set_state(enum ipcp_state state) +{ + pthread_mutex_lock(&ipcpi.state_mtx); - ipcp->state = state; + ipcpi.state = state; - pthread_cond_broadcast(&ipcp->state_cond); - pthread_mutex_unlock(&ipcp->state_mtx); + pthread_cond_broadcast(&ipcpi.state_cond); + pthread_mutex_unlock(&ipcpi.state_mtx); } -enum ipcp_state ipcp_get_state(struct ipcp * ipcp) +enum ipcp_state ipcp_get_state() { enum ipcp_state state; - if (ipcp == NULL) - return IPCP_NULL; + pthread_mutex_lock(&ipcpi.state_mtx); - pthread_mutex_lock(&ipcp->state_mtx); + state = ipcpi.state; - state = ipcp->state; - - pthread_mutex_unlock(&ipcp->state_mtx); + pthread_mutex_unlock(&ipcpi.state_mtx); return state; } -int ipcp_wait_state(struct ipcp * ipcp, - enum ipcp_state state, +int ipcp_wait_state(enum ipcp_state state, const struct timespec * timeout) { struct timespec abstime; @@ -95,24 +105,24 @@ int ipcp_wait_state(struct ipcp * ipcp, clock_gettime(PTHREAD_COND_CLOCK, &abstime); ts_add(&abstime, timeout, &abstime); - pthread_mutex_lock(&ipcp->state_mtx); + pthread_mutex_lock(&ipcpi.state_mtx); - while (ipcp->state != state && ipcp->state != IPCP_SHUTDOWN) { + while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN) { int ret; if (timeout == NULL) - ret = pthread_cond_wait(&ipcp->state_cond, - &ipcp->state_mtx); + ret = pthread_cond_wait(&ipcpi.state_cond, + &ipcpi.state_mtx); else - ret = pthread_cond_timedwait(&ipcp->state_cond, - &ipcp->state_mtx, + ret = pthread_cond_timedwait(&ipcpi.state_cond, + &ipcpi.state_mtx, &abstime); if (ret) { - pthread_mutex_unlock(&ipcp->state_mtx); + pthread_mutex_unlock(&ipcpi.state_mtx); return -ret; } } - pthread_mutex_unlock(&ipcp->state_mtx); + pthread_mutex_unlock(&ipcpi.state_mtx); return 0; } @@ -161,7 +171,6 @@ void * ipcp_main_loop(void * o) int lsockfd; int sockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; - struct ipcp * _ipcp = (struct ipcp *) o; ipcp_msg_t * msg; ssize_t count; @@ -180,12 +189,6 @@ void * ipcp_main_loop(void * o) struct timeval ltv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; - - if (_ipcp == NULL) { - LOG_ERR("Invalid ipcp struct."); - return (void *) 1; - } - sock_path = ipcp_sock_path(getpid()); if (sock_path == NULL) return (void *) 1; @@ -202,13 +205,15 @@ void * ipcp_main_loop(void * o) LOG_WARN("Failed to set timeout on socket."); while (true) { - pthread_rwlock_rdlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) { - pthread_rwlock_unlock(&_ipcp->state_lock); + int fd = -1; + + pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&ipcpi.state_lock); break; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -235,7 +240,7 @@ void * ipcp_main_loop(void * o) switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: - if (_ipcp->ops->ipcp_bootstrap == NULL) { + if (ipcpi.ops->ipcp_bootstrap == NULL) { LOG_ERR("Bootstrap unsupported."); break; } @@ -267,72 +272,102 @@ void * ipcp_main_loop(void * o) conf.if_name = conf_msg->if_name; ret_msg.has_result = true; - ret_msg.result = _ipcp->ops->ipcp_bootstrap(&conf); + ret_msg.result = ipcpi.ops->ipcp_bootstrap(&conf); if (ret_msg.result < 0) free(conf.dif_name); break; case IPCP_MSG_CODE__IPCP_ENROLL: - if (_ipcp->ops->ipcp_enroll == NULL) { + if (ipcpi.ops->ipcp_enroll == NULL) { LOG_ERR("Enroll unsupported."); break; } ret_msg.has_result = true; - ret_msg.result = _ipcp->ops->ipcp_enroll(msg->dif_name); + ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dif_name); break; case IPCP_MSG_CODE__IPCP_NAME_REG: - if (_ipcp->ops->ipcp_name_reg == NULL) { + if (ipcpi.ops->ipcp_name_reg == NULL) { LOG_ERR("Ap_reg unsupported."); break; } msg_name_dup = strdup(msg->name); ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_name_reg(msg_name_dup); + ipcpi.ops->ipcp_name_reg(msg_name_dup); if (ret_msg.result < 0) free(msg_name_dup); break; case IPCP_MSG_CODE__IPCP_NAME_UNREG: - if (_ipcp->ops->ipcp_name_unreg == NULL) { + if (ipcpi.ops->ipcp_name_unreg == NULL) { LOG_ERR("Ap_unreg unsupported."); break; } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_name_unreg(msg->name); + ipcpi.ops->ipcp_name_unreg(msg->name); break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC: - if (_ipcp->ops->ipcp_flow_alloc == NULL) { + if (ipcpi.ops->ipcp_flow_alloc == NULL) { LOG_ERR("Flow_alloc unsupported."); break; } + fd = np1_flow_alloc(msg->api, msg->port_id); + if (fd < 0) { + LOG_ERR("Could not get fd for flow."); + ret_msg.has_result = true; + ret_msg.result = -1; + break; + } + ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_alloc(msg->api, - msg->port_id, + ipcpi.ops->ipcp_flow_alloc(fd, msg->dst_name, msg->src_ae_name, msg->qos_cube); + if (ret_msg.result < 0) { + LOG_DBG("Deallocating failed flow on port_id %d.", + msg->port_id); + flow_dealloc(fd); + } break; case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP: - if (_ipcp->ops->ipcp_flow_alloc_resp == NULL) { + if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) { LOG_ERR("Flow_alloc_resp unsupported."); break; } + + if (!msg->response) { + fd = np1_flow_resp(msg->api, msg->port_id); + if (fd < 0) { + LOG_ERR("Could not get fd for flow."); + ret_msg.has_result = true; + ret_msg.result = -1; + break; + } + } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_alloc_resp(msg->api, - msg->port_id, - msg->result); + ipcpi.ops->ipcp_flow_alloc_resp(fd, + msg->response); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: - if (_ipcp->ops->ipcp_flow_dealloc == NULL) { + if (ipcpi.ops->ipcp_flow_dealloc == NULL) { LOG_ERR("Flow_dealloc unsupported."); break; } + + fd = np1_flow_dealloc(msg->port_id); + if (fd < 0) { + LOG_ERR("Could not get fd for flow."); + ret_msg.has_result = true; + ret_msg.result = -1; + break; + } + ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_dealloc(msg->port_id); + ipcpi.ops->ipcp_flow_dealloc(fd); break; default: LOG_ERR("Don't know that message code"); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index edaea0fd..87c0c5d1 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -24,7 +24,6 @@ #define IPCPD_IPCP_H #include <ouroboros/config.h> -#include <ouroboros/shared.h> #include <pthread.h> #include <time.h> @@ -50,23 +49,23 @@ struct ipcp { pthread_rwlock_t state_lock; pthread_mutex_t state_mtx; pthread_cond_t state_cond; -}; -struct ipcp * ipcp_instance_create(); + pthread_t mainloop; +} ipcpi; + +int ipcp_init(); -void ipcp_set_state(struct ipcp * ipcp, - enum ipcp_state state); +void ipcp_fini(); -enum ipcp_state ipcp_get_state(struct ipcp * ipcp); +void ipcp_set_state(enum ipcp_state state); -int ipcp_wait_state(struct ipcp * ipcp, - enum ipcp_state state, +enum ipcp_state ipcp_get_state(); + +int ipcp_wait_state(enum ipcp_state state, const struct timespec * timeout); void * ipcp_main_loop(void * o); -void * ipcp_sdu_loop(void * o); - int ipcp_parse_arg(int argc, char * argv[]); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c0809429..1ccec0c0 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -22,17 +22,10 @@ #include <ouroboros/config.h> #include "ipcp.h" -#include "flow.h" #include <ouroboros/errno.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/list.h> -#include <ouroboros/utils.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/irm_config.h> -#include <ouroboros/bitmap.h> -#include <ouroboros/shared.h> #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/local-dev.h> #define OUROBOROS_PREFIX "ipcpd/local" #include <ouroboros/logs.h> @@ -46,176 +39,51 @@ #define THIS_TYPE IPCP_LOCAL -#define shim_data(type) ((struct ipcp_local_data *) type->data) - /* global for trapping signal */ int irmd_api; -/* this IPCP's data */ -#ifdef MAKE_CHECK -extern struct ipcp * _ipcp; /* defined in test */ -#else -struct ipcp * _ipcp; -#endif - -/* - * copied from ouroboros/dev. The shim needs access to the internals - * because it doesn't follow all steps necessary steps to get - * the info - */ - -/* the shim needs access to these internals */ -struct shim_ap_data { - pid_t api; - struct shm_rdrbuff * rdrb; - struct bmp * fds; - struct shm_ap_rbuff * rb; - - int in_out[AP_MAX_FLOWS]; +struct { + int in_out[IRMD_MAX_FLOWS]; - struct flow flows[AP_MAX_FLOWS]; - pthread_rwlock_t flows_lock; - - pthread_t mainloop; + pthread_rwlock_t lock; pthread_t sduloop; +} local_data; -} * _ap_instance; - -static int shim_ap_init() +void local_data_init() { int i; + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + local_data.in_out[i] = -1; - _ap_instance = malloc(sizeof(struct shim_ap_data)); - if (_ap_instance == NULL) { - return -1; - } - - _ap_instance->api = getpid(); - - _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); - if (_ap_instance->fds == NULL) { - free(_ap_instance); - return -1; - } - - _ap_instance->rdrb = shm_rdrbuff_open(); - if (_ap_instance->rdrb == NULL) { - bmp_destroy(_ap_instance->fds); - free(_ap_instance); - return -1; - } - - _ap_instance->rb = shm_ap_rbuff_create_n(); - if (_ap_instance->rb == NULL) { - shm_rdrbuff_close(_ap_instance->rdrb); - bmp_destroy(_ap_instance->fds); - free(_ap_instance); - return -1; - } - - for (i = 0; i < AP_MAX_FLOWS; i ++) { - _ap_instance->flows[i].rb = NULL; - _ap_instance->flows[i].port_id = -1; - _ap_instance->flows[i].state = FLOW_NULL; - _ap_instance->in_out[i] = -1; - } - - pthread_rwlock_init(&_ap_instance->flows_lock, NULL); - - return 0; -} - -void shim_ap_fini() -{ - int i = 0; - - if (_ap_instance == NULL) - return; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_SHUTDOWN) - LOG_WARN("Cleaning up AP while not in shutdown."); - - if (_ap_instance->fds != NULL) - bmp_destroy(_ap_instance->fds); - - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) - shm_rdrbuff_remove(_ap_instance->rdrb, i); - - if (_ap_instance->rdrb != NULL) - shm_rdrbuff_close(_ap_instance->rdrb); - if (_ap_instance->rb != NULL) - shm_ap_rbuff_destroy(_ap_instance->rb); - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - for (i = 0; i < AP_MAX_FLOWS; i ++) - if (_ap_instance->flows[i].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[i].rb); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - free(_ap_instance); + pthread_rwlock_init(&local_data.lock, NULL); } -/* only call this under flows_lock */ -static int port_id_to_fd(int port_id) +void local_data_fini() { - int i; - - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (_ap_instance->flows[i].port_id == port_id - && _ap_instance->flows[i].state != FLOW_NULL) - return i; - } - - return -1; + pthread_rwlock_destroy(&local_data.lock); } -/* - * end copy from dev.c - */ - -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_local_sdu_loop(void * o) { while (true) { - struct rb_entry * e; - int fd; - - e = shm_ap_rbuff_read(_ap_instance->rb); - if (e == NULL) { - continue; - } + struct rb_entry e; + int fd = local_flow_read(&e); - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (_ipcp->state != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); return (void *) 1; /* -ENOTENROLLED */ } - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - fd = _ap_instance->in_out[port_id_to_fd(e->port_id)]; - if (fd == -1) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - free(e); - continue; - } - - e->port_id = _ap_instance->flows[fd].port_id; - - while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0) - ; + pthread_rwlock_rdlock(&local_data.lock); + fd = local_data.in_out[fd]; + pthread_rwlock_unlock(&local_data.lock); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + if (fd != -1) + local_flow_write(fd, &e); - free(e); + pthread_rwlock_unlock(&ipcpi.state_lock); } return (void *) 1; @@ -223,10 +91,6 @@ static void * ipcp_local_sdu_loop(void * o) void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - switch(sig) { case SIGINT: case SIGTERM: @@ -236,11 +100,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) LOG_DBG("IPCP %d terminating by order of %d. Bye.", getpid(), info->si_pid); - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(_ipcp, IPCP_SHUTDOWN); + ipcp_set_state(IPCP_SHUTDOWN); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); } default: return; @@ -254,307 +118,154 @@ static int ipcp_local_bootstrap(struct dif_config * conf) return -1; } - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("IPCP in wrong state."); return -1; } - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); - pthread_create(&_ap_instance->sduloop, - NULL, - ipcp_local_sdu_loop, - NULL); + pthread_create(&local_data.sduloop, NULL, ipcp_local_sdu_loop, NULL); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Bootstrapped local IPCP with api %d.", - getpid()); + LOG_INFO("Bootstrapped local IPCP with api %d.", getpid()); return 0; } static int ipcp_local_name_reg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(ipcpi.data, name)) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBGF("Failed to add %s to local registry.", name); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Registered %s.", name); + LOG_INFO("Registered %s.", name); return 0; } static int ipcp_local_name_unreg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); + ipcp_data_del_reg_entry(ipcpi.data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + + LOG_INFO("Unregistered %s.", name); return 0; } -static int ipcp_local_flow_alloc(pid_t n_api, - int port_id, +static int ipcp_local_flow_alloc(int fd, char * dst_name, char * src_ae_name, enum qos_cube qos) { - int in_fd = -1; int out_fd = -1; - struct shm_ap_rbuff * rb; - - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s on fd %d.", dst_name, fd); if (dst_name == NULL || src_ae_name == NULL) return -1; /* This ipcpd has all QoS */ - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBGF("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) { - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; /* -ENORBUFF */ - } - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_wrlock(&local_data.lock); - in_fd = bmp_allocate(_ap_instance->fds); - if (!bmp_is_id_valid(_ap_instance->fds, in_fd)) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -EMFILE; - } + out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); - _ap_instance->flows[in_fd].port_id = port_id; - _ap_instance->flows[in_fd].state = FLOW_PENDING; - _ap_instance->flows[in_fd].rb = rb; + local_data.in_out[fd] = out_fd; + local_data.in_out[out_fd] = fd; - LOG_DBGF("Pending local flow with port_id %d.", port_id); + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - /* reply to IRM */ - port_id = ipcp_flow_req_arr(getpid(), - dst_name, - src_ae_name); - - if (port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not get port id from IRMd"); - /* shm_ap_rbuff_close(n_api); */ - return -1; - } - - out_fd = bmp_allocate(_ap_instance->fds); - if (!bmp_is_id_valid(_ap_instance->fds, out_fd)) { - /* shm_ap_rbuff_close(n_api); */ - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; /* -ENOMOREFDS */ - } - - _ap_instance->flows[out_fd].port_id = port_id; - _ap_instance->flows[out_fd].rb = NULL; - _ap_instance->flows[out_fd].state = FLOW_PENDING; - - _ap_instance->in_out[in_fd] = out_fd; - _ap_instance->in_out[out_fd] = in_fd; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - LOG_DBGF("Pending local allocation request, port_id %d.", port_id); + LOG_INFO("Pending local allocation request on fd %d.", fd); return 0; } -static int ipcp_local_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +static int ipcp_local_flow_alloc_resp(int fd, int response) { - struct shm_ap_rbuff * rb; - int in_fd = -1; int out_fd = -1; int ret = -1; + LOG_DBG("Received response for fd %d: %d.", fd, response); + if (response) return 0; - pthread_rwlock_rdlock(&_ipcp->state_lock); - - /* awaken pending flow */ - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - in_fd = port_id_to_fd(port_id); - if (in_fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Could not find flow with port_id %d.", port_id); - return -1; - } - - if (_ap_instance->flows[in_fd].state != FLOW_PENDING) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow was not pending."); - return -1; - } - - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) { - LOG_ERR("Could not open N + 1 ringbuffer."); - _ap_instance->flows[in_fd].state = FLOW_NULL; - _ap_instance->flows[in_fd].port_id = -1; - _ap_instance->in_out[in_fd] = -1; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; - } - - _ap_instance->flows[in_fd].state = FLOW_ALLOCATED; - _ap_instance->flows[in_fd].rb = rb; - - LOG_DBGF("Accepted flow, port_id %d on fd %d.", port_id, in_fd); + pthread_rwlock_rdlock(&ipcpi.state_lock); - out_fd = _ap_instance->in_out[in_fd]; + out_fd = local_data.in_out[fd]; if (out_fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("No pending local flow with port_id %d.", port_id); + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - if (_ap_instance->flows[out_fd].state != FLOW_PENDING) { - /* FIXME: clean up other end */ - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow was not pending."); - return -1; - } - - _ap_instance->flows[out_fd].state = FLOW_ALLOCATED; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if ((ret = ipcp_flow_alloc_reply(getpid(), - _ap_instance->flows[out_fd].port_id, - response)) < 0) { - return -1; /* -EPIPE */ - } + if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) + return -1; - LOG_INFO("Flow allocation completed, port_ids (%d, %d).", - _ap_instance->flows[out_fd].port_id, - _ap_instance->flows[in_fd].port_id); + LOG_INFO("Flow allocation completed, fds (%d, %d).", out_fd, fd); return ret; } -static int ipcp_local_flow_dealloc(int port_id) +static int ipcp_local_flow_dealloc(int fd) { - int fd = -1; - struct shm_ap_rbuff * rb; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - fd = port_id_to_fd(port_id); - if (fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Could not find flow with port_id %d.", port_id); - return 0; - } - - bmp_release(_ap_instance->fds, fd); - - if (_ap_instance->in_out[fd] != -1) - _ap_instance->in_out[_ap_instance->in_out[fd]] = -1; - - _ap_instance->in_out[fd] = -1; - - _ap_instance->flows[fd].state = FLOW_NULL; - _ap_instance->flows[fd].port_id = -1; - rb = _ap_instance->flows[fd].rb; - _ap_instance->flows[fd].rb = NULL; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - - if (rb != NULL) - shm_ap_rbuff_close(rb); - - pthread_rwlock_unlock(&_ipcp->state_lock); - - LOG_DBGF("Flow with port_id %d deallocated.", port_id); - - return 0; -} - -static struct ipcp * ipcp_local_create() -{ - struct ipcp * i; - struct ipcp_ops * ops; + int out_fd = -1; - i = ipcp_instance_create(); - if (i == NULL) - return NULL; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&local_data.lock); - i->data = ipcp_data_create(); - if (i->data == NULL) { - free(i); - return NULL; - } + out_fd = local_data.in_out[fd]; - if (ipcp_data_init(i->data, THIS_TYPE) == NULL) { - free(i->data); - free(i); - return NULL; + if (out_fd != -1) { + local_data.in_out[out_fd] = -1; + flow_dealloc(out_fd); } - ops = malloc(sizeof(*ops)); - if (ops == NULL) { - free(i->data); - free(i); - return NULL; - } + local_data.in_out[fd] = -1; - ops->ipcp_bootstrap = ipcp_local_bootstrap; - ops->ipcp_enroll = NULL; /* shim */ - ops->ipcp_name_reg = ipcp_local_name_reg; - ops->ipcp_name_unreg = ipcp_local_name_unreg; - ops->ipcp_flow_alloc = ipcp_local_flow_alloc; - ops->ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp; - ops->ipcp_flow_dealloc = ipcp_local_flow_dealloc; + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - i->ops = ops; + LOG_INFO("Flow with fd %d deallocated.", fd); - i->state = IPCP_INIT; - - return i; + return 0; } -#ifndef MAKE_CHECK +static struct ipcp_ops local_ops = { + .ipcp_bootstrap = ipcp_local_bootstrap, + .ipcp_enroll = NULL, /* shim */ + .ipcp_name_reg = ipcp_local_name_reg, + .ipcp_name_unreg = ipcp_local_name_unreg, + .ipcp_flow_alloc = ipcp_local_flow_alloc, + .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, + .ipcp_flow_dealloc = ipcp_local_flow_dealloc +}; int main(int argc, char * argv[]) { @@ -571,7 +282,9 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (shim_ap_init() < 0) { + local_data_init(); + + if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -591,17 +304,13 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_local_create(); - if (_ipcp == NULL) { - LOG_ERR("Failed to create IPCP."); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (ipcp_init(THIS_TYPE, &local_ops) < 0) { close_logfile(); exit(EXIT_FAILURE); } - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - - pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (ipcp_create_r(getpid())) { @@ -610,21 +319,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - pthread_join(_ap_instance->mainloop, NULL); - - pthread_cancel(_ap_instance->sduloop); - pthread_join(_ap_instance->sduloop, NULL); + ipcp_fini(); - shim_ap_fini(); + pthread_cancel(local_data.sduloop); + pthread_join(local_data.sduloop, NULL); - ipcp_data_destroy(_ipcp->data); + ap_fini(); - free(_ipcp->ops); - free(_ipcp); + local_data_fini(); close_logfile(); exit(EXIT_SUCCESS); } - -#endif /* MAKE_CHECK */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 79b1bb4b..b6ec1984 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -26,7 +26,7 @@ #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/list.h> -#include <ouroboros/ipcp.h> +#include <ouroboros/ipcp-dev.h> #include <stdlib.h> #include <stdbool.h> @@ -41,10 +41,8 @@ #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; -extern struct ipcp * _ipcp; - struct n_flow { - struct flow flow; + int fd; struct frct_i * frct_i; enum qos_cube qos; @@ -57,7 +55,7 @@ struct n_1_flow { struct list_head next; }; -struct fmgr { +struct { pthread_t listen_thread; struct list_head n_1_flows; @@ -66,10 +64,9 @@ struct fmgr { struct list_head n_flows; /* FIXME: Make this a read/write lock */ pthread_mutex_t n_flows_lock; -} * fmgr = NULL; +} fmgr; -static int add_n_1_fd(int fd, - char * ae_name) +static int add_n_1_fd(int fd, char * ae_name) { struct n_1_flow * tmp; @@ -85,9 +82,9 @@ static int add_n_1_fd(int fd, INIT_LIST_HEAD(&tmp->next); - pthread_mutex_lock(&fmgr->n_1_flows_lock); - list_add(&tmp->next, &fmgr->n_1_flows); - pthread_mutex_unlock(&fmgr->n_1_flows_lock); + pthread_mutex_lock(&fmgr.n_1_flows_lock); + list_add(&tmp->next, &fmgr.n_1_flows); + pthread_mutex_unlock(&fmgr.n_1_flows_lock); return 0; } @@ -98,16 +95,16 @@ static void * fmgr_listen(void * o) char * ae_name; while (true) { - ipcp_wait_state(_ipcp, IPCP_ENROLLED, NULL); + ipcp_wait_state(IPCP_ENROLLED, NULL); - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); fd = flow_accept(&ae_name); if (fd < 0) { @@ -161,17 +158,13 @@ static void * fmgr_listen(void * o) int fmgr_init() { - fmgr = malloc(sizeof(*fmgr)); - if (fmgr == NULL) - return -1; + INIT_LIST_HEAD(&fmgr.n_1_flows); + INIT_LIST_HEAD(&fmgr.n_flows); - INIT_LIST_HEAD(&fmgr->n_1_flows); - INIT_LIST_HEAD(&fmgr->n_flows); + pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); + pthread_mutex_init(&fmgr.n_flows_lock, NULL); - pthread_mutex_init(&fmgr->n_1_flows_lock, NULL); - pthread_mutex_init(&fmgr->n_flows_lock, NULL); - - pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL); + pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); return 0; } @@ -180,23 +173,20 @@ int fmgr_fini() { struct list_head * pos = NULL; - pthread_cancel(fmgr->listen_thread); + pthread_cancel(fmgr.listen_thread); - pthread_join(fmgr->listen_thread, NULL); + pthread_join(fmgr.listen_thread, NULL); - list_for_each(pos, &fmgr->n_1_flows) { - struct n_1_flow * e = - list_entry(pos, struct n_1_flow, next); + list_for_each(pos, &fmgr.n_1_flows) { + struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); if (e->ae_name != NULL) free(e->ae_name); if (ribmgr_remove_flow(e->fd)) LOG_ERR("Failed to remove management flow."); } - pthread_mutex_destroy(&fmgr->n_1_flows_lock); - pthread_mutex_destroy(&fmgr->n_flows_lock); - - free(fmgr); + pthread_mutex_destroy(&fmgr.n_1_flows_lock); + pthread_mutex_destroy(&fmgr.n_flows_lock); return 0; } @@ -243,8 +233,7 @@ int fmgr_mgmt_flow(char * dst_name) return 0; } -int fmgr_dt_flow(char * dst_name, - enum qos_cube qos) +int fmgr_dt_flow(char * dst_name, enum qos_cube qos) { int fd; int result; @@ -288,14 +277,13 @@ int fmgr_dt_flow(char * dst_name, } /* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_port_id(int port_id) +static struct n_flow * get_n_flow_by_fd(int fd) { struct list_head * pos = NULL; - list_for_each(pos, &fmgr->n_flows) { - struct n_flow * e = - list_entry(pos, struct n_flow, next); - if (e->flow.port_id == port_id) + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->fd == fd) return e; } @@ -307,9 +295,8 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) { struct list_head * pos = NULL; - list_for_each(pos, &fmgr->n_flows) { - struct n_flow * e = - list_entry(pos, struct n_flow, next); + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); if (e->frct_i == frct_i) return e; } @@ -317,8 +304,7 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) return NULL; } -int fmgr_flow_alloc(pid_t n_api, - int port_id, +int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) @@ -355,49 +341,40 @@ int fmgr_flow_alloc(pid_t n_api, flow_alloc_msg__pack(&msg, buf.data); - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); frct_i = frct_i_create(address, &buf, qos); if (frct_i == NULL) { free(buf.data); free(flow); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } free(buf.data); - flow->flow.rb = shm_ap_rbuff_open_s(n_api); - if (flow->flow.rb == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - free(flow); - return -1; - } - - flow->flow.api = n_api; - flow->flow.port_id = port_id; - flow->flow.state = FLOW_PENDING; + flow->fd = fd; flow->frct_i = frct_i; - flow->qos = qos; + flow->qos = qos; INIT_LIST_HEAD(&flow->next); - list_add(&flow->next, &fmgr->n_flows); + list_add(&flow->next, &fmgr.n_flows); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return 0; } /* Call under n_flows lock */ -static int n_flow_dealloc(int port_id) +static int n_flow_dealloc(int fd) { struct n_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; - flow = get_n_flow_by_port_id(port_id); + flow = get_n_flow_by_fd(fd); if (flow == NULL) return -1; @@ -414,8 +391,6 @@ static int n_flow_dealloc(int port_id) flow_alloc_msg__pack(&msg, buf.data); ret = frct_i_destroy(flow->frct_i, &buf); - if (flow->flow.rb != NULL) - shm_ap_rbuff_close(flow->flow.rb); list_del(&flow->next); free(flow); @@ -424,25 +399,17 @@ static int n_flow_dealloc(int port_id) return ret; } -int fmgr_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +int fmgr_flow_alloc_resp(int fd, int response) { struct n_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); - flow = get_n_flow_by_port_id(port_id); + flow = get_n_flow_by_fd(fd); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } - - if (flow->flow.state != FLOW_PENDING) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - LOG_ERR("Flow is not pending."); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } @@ -452,13 +419,13 @@ int fmgr_flow_alloc_resp(pid_t n_api, buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } buf.data = malloc(buf.len); if (buf.data == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } @@ -469,106 +436,85 @@ int fmgr_flow_alloc_resp(pid_t n_api, free(buf.data); list_del(&flow->next); free(flow); - } else { - if (frct_i_accept(flow->frct_i, &buf)) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } - - flow->flow.state = FLOW_ALLOCATED; - flow->flow.api = n_api; - - flow->flow.rb = shm_ap_rbuff_open_s(n_api); - if (flow->flow.rb == NULL) { - n_flow_dealloc(port_id); - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } + } else if (frct_i_accept(flow->frct_i, &buf)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + return -1; } - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return 0; } -int fmgr_flow_dealloc(int port_id) +int fmgr_flow_dealloc(int fd) { int ret; - pthread_mutex_lock(&fmgr->n_flows_lock); - ret = n_flow_dealloc(port_id); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); + ret = n_flow_dealloc(fd); + pthread_mutex_unlock(&fmgr.n_flows_lock); return ret; } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, - buffer_t * buf) +int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) { struct n_flow * flow; int ret = 0; - int port_id; + int fd; flow_alloc_msg_t * msg; - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); - /* Depending on what is in the message call the function in ipcp.h */ + /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: - flow = malloc(sizeof(*flow)); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); return -1; } - flow->flow.state = FLOW_PENDING; flow->frct_i = frct_i; flow->qos = msg->qos_cube; - flow->flow.rb = NULL; - flow->flow.api = 0; - - port_id = ipcp_flow_req_arr(getpid(), - msg->dst_name, - msg->src_ae_name); - if (port_id < 0) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + + fd = ipcp_flow_req_arr(getpid(), + msg->dst_name, + msg->src_ae_name); + if (fd < 0) { + pthread_mutex_unlock(&fmgr.n_flows_lock); free(flow); flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to get port-id from IRMd."); + LOG_ERR("Failed to get fd for flow."); return -1; } - flow->flow.port_id = port_id; + flow->fd = fd; INIT_LIST_HEAD(&flow->next); - list_add(&flow->next, &fmgr->n_flows); + list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: flow = get_n_flow_by_frct_i(frct_i); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - ret = ipcp_flow_alloc_reply(getpid(), - flow->flow.port_id, - msg->response); - + ret = ipcp_flow_alloc_reply(flow->fd, msg->response); if (msg->response < 0) { - shm_ap_rbuff_close(flow->flow.rb); list_del(&flow->next); free(flow); } @@ -577,13 +523,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, case FLOW_ALLOC_CODE__FLOW_DEALLOC: flow = get_n_flow_by_frct_i(frct_i); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - ret = irm_flow_dealloc(flow->flow.port_id); + ret = flow_dealloc(flow->fd); break; default: LOG_ERR("Got an unknown flow allocation message."); @@ -591,7 +537,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, break; } - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 342410ca..7e3ef5f4 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -35,25 +35,25 @@ #define DT_AE "Data transfer" int fmgr_init(); + int fmgr_fini(); /* N-flow ops */ int fmgr_mgmt_flow(char * dst_name); + int fmgr_dt_flow(char * dst_name, enum qos_cube qos); /* N+1-flow ops, local */ -int fmgr_flow_alloc(pid_t n_api, - int port_id, +int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos); -int fmgr_flow_alloc_resp(pid_t n_api, - int port_id, - int response); +int fmgr_flow_alloc_resp(int fd, + int response); -int fmgr_flow_dealloc(int port_id); +int fmgr_flow_dealloc(int fd); /* N+1-flow ops, remote */ int fmgr_flow_alloc_msg(struct frct_i * frct_i, diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 09873445..0ee87004 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -24,7 +24,7 @@ #define OUROBOROS_IPCP_FRCT_H #include <ouroboros/shared.h> -#include <ouroboros/common.h> +#include <ouroboros/utils.h> #include "dt_const.h" diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 082973f4..4611408d 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -24,10 +24,8 @@ #include <ouroboros/config.h> #include <ouroboros/logs.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/dev.h> -#include <ouroboros/ipcp.h> +#include <ouroboros/ipcp-dev.h> #include <ouroboros/time_utils.h> #include <stdbool.h> @@ -47,26 +45,8 @@ /* global for trapping signal */ int irmd_api; -struct ipcp * _ipcp; - -#define normal_data(type) ((struct normal_ipcp_data *) type->data) - -struct normal_ipcp_data { - /* Keep ipcp_data first for polymorphism. */ - struct ipcp_data ipcp_data; - - struct shm_rdrbuff * rdrb; - struct shm_ap_rbuff * rb; - - pthread_t mainloop; -}; - void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - switch(sig) { case SIGINT: case SIGTERM: @@ -75,11 +55,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) LOG_DBG("IPCP %d terminating by order of %d. Bye.", getpid(), info->si_pid); - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(_ipcp, IPCP_SHUTDOWN); + ipcp_set_state(IPCP_SHUTDOWN); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); } default: return; @@ -88,15 +68,15 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) static int normal_ipcp_name_reg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(ipcpi.data, name)) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to add %s to local registry.", name); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Registered %s.", name); @@ -105,11 +85,11 @@ static int normal_ipcp_name_reg(char * name) static int normal_ipcp_name_unreg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); + ipcp_data_del_reg_entry(ipcpi.data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -119,59 +99,59 @@ static int normal_ipcp_enroll(char * dif_name) struct timespec timeout = {(ENROLL_TIMEOUT / 1000), (ENROLL_TIMEOUT % 1000) * MILLION}; - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Won't enroll an IPCP that is not in INIT."); return -1; /* -ENOTINIT */ } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (fmgr_mgmt_flow(dif_name)) { LOG_ERR("Failed to establish management flow."); return -1; } - if (ipcp_wait_state(_ipcp, IPCP_ENROLLED, &timeout) == -ETIMEDOUT) { + if (ipcp_wait_state(IPCP_ENROLLED, &timeout) == -ETIMEDOUT) { LOG_ERR("Enrollment timed out."); return -1; } - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } static int normal_ipcp_bootstrap(struct dif_config * conf) { - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Won't bootstrap an IPCP that is not in INIT."); return -1; /* -ENOTINIT */ } if (ribmgr_bootstrap(conf)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to bootstrap RIB manager."); return -1; } - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); - _ipcp->data->dif_name = conf->dif_name; + ipcpi.data->dif_name = conf->dif_name; - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Bootstrapped in DIF %s.", conf->dif_name); @@ -188,67 +168,6 @@ static struct ipcp_ops normal_ops = { .ipcp_flow_dealloc = fmgr_flow_dealloc }; -struct normal_ipcp_data * normal_ipcp_data_create() -{ - struct normal_ipcp_data * normal_data; - enum ipcp_type ipcp_type; - - normal_data = malloc(sizeof(*normal_data)); - if (normal_data == NULL) { - LOG_ERR("Failed to allocate."); - return NULL; - } - - ipcp_type = THIS_TYPE; - if (ipcp_data_init((struct ipcp_data *) normal_data, - ipcp_type) == NULL) { - free(normal_data); - return NULL; - } - - normal_data->rdrb = shm_rdrbuff_open(); - if (normal_data->rdrb == NULL) { - free(normal_data); - return NULL; - } - - normal_data->rb = shm_ap_rbuff_create_n(); - if (normal_data->rb == NULL) { - shm_rdrbuff_close(normal_data->rdrb); - free(normal_data); - return NULL; - } - - return normal_data; -} - - -void normal_ipcp_data_destroy() -{ - int idx = 0; - - if (_ipcp == NULL) - return; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) - LOG_WARN("Cleaning up while not in shutdown."); - - /* remove all remaining sdus */ - while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0) - shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx); - - if (normal_data(_ipcp)->rdrb != NULL) - shm_rdrbuff_close(normal_data(_ipcp)->rdrb); - if (normal_data(_ipcp)->rb != NULL) - shm_ap_rbuff_close(normal_data(_ipcp)->rb); - - ipcp_data_destroy(_ipcp->data); - - pthread_rwlock_unlock(&_ipcp->state_lock); -} - int main(int argc, char * argv[]) { struct sigaction sig_act; @@ -285,56 +204,38 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_instance_create(); - if (_ipcp == NULL) { - LOG_ERR("Failed to create instance."); - close_logfile(); - exit(EXIT_FAILURE); - } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); - _ipcp->data = (struct ipcp_data *) normal_ipcp_data_create(); - if (_ipcp->data == NULL) { - LOG_ERR("Failed to create instance data."); - free(_ipcp); + if (ipcp_init(THIS_TYPE, &normal_ops) < 0) { + LOG_ERR("Failed to create instance."); close_logfile(); exit(EXIT_FAILURE); } - _ipcp->ops = &normal_ops; - _ipcp->state = IPCP_INIT; + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (fmgr_init()) { - normal_ipcp_data_destroy(); - free(_ipcp); + ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); } if (ribmgr_init()) { - normal_ipcp_data_destroy(); fmgr_fini(); - free(_ipcp); + ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); } - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - - pthread_create(&normal_data(_ipcp)->mainloop, NULL, - ipcp_main_loop, _ipcp); - - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - if (ipcp_create_r(getpid())) { LOG_ERR("Failed to notify IRMd we are initialized."); - normal_ipcp_data_destroy(); fmgr_fini(); - free(_ipcp); + ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); } - pthread_join(normal_data(_ipcp)->mainloop, NULL); + ipcp_fini(); if (fmgr_fini()) LOG_ERR("Failed to finalize flow manager."); @@ -345,10 +246,9 @@ int main(int argc, char * argv[]) if (frct_fini()) LOG_ERR("Failed to finalize FRCT."); - normal_ipcp_data_destroy(); - free(_ipcp); close_logfile(); ap_fini(); + exit(EXIT_SUCCESS); } diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 9733abc9..99d156f5 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -27,6 +27,7 @@ #include <ouroboros/cdap.h> #include <ouroboros/list.h> #include <ouroboros/time_utils.h> +#include <ouroboros/ipcp-dev.h> #include <stdlib.h> #include <pthread.h> @@ -45,15 +46,13 @@ typedef StaticInfoMsg static_info_msg_t; #define ENROLLMENT "enrollment" #define STATIC_INFO "static DIF information" -extern struct ipcp * _ipcp; - struct mgmt_flow { struct cdap * instance; int fd; struct list_head next; }; -struct rib { +struct { struct dt_const dtc; uint32_t address; @@ -63,7 +62,7 @@ struct rib { struct list_head cdap_reqs; pthread_mutex_t cdap_reqs_lock; -} * rib = NULL; +} rib; /* Call while holding cdap_reqs_lock */ /* FIXME: better not to call blocking functions under any lock */ @@ -84,13 +83,13 @@ int cdap_result_wait(struct cdap * instance, return -1; } - list_add(&req->next, &rib->cdap_reqs); + list_add(&req->next, &rib.cdap_reqs); - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); ret = cdap_request_wait(req); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); if (ret == -1) /* should only be on ipcp shutdown */ LOG_DBG("Waiting CDAP request destroyed."); @@ -112,22 +111,16 @@ int cdap_result_wait(struct cdap * instance, int ribmgr_init() { - rib = malloc(sizeof(*rib)); - if (rib == NULL) - return -1; + INIT_LIST_HEAD(&rib.flows); + INIT_LIST_HEAD(&rib.cdap_reqs); - INIT_LIST_HEAD(&rib->flows); - INIT_LIST_HEAD(&rib->cdap_reqs); - - if (pthread_rwlock_init(&rib->flows_lock, NULL)) { + if (pthread_rwlock_init(&rib.flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); - free(rib); return -1; } - if (pthread_mutex_init(&rib->cdap_reqs_lock, NULL)) { + if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); - free(rib); return -1; } @@ -139,19 +132,18 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; - pthread_mutex_lock(&rib->cdap_reqs_lock); - list_for_each_safe(pos, n, &rib->cdap_reqs) { + pthread_mutex_lock(&rib.cdap_reqs_lock); + list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); - free(req->name); list_del(&req->next); free(req); } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); - pthread_rwlock_wrlock(&rib->flows_lock); - list_for_each_safe(pos, n, &rib->flows) { + pthread_rwlock_wrlock(&rib.flows_lock); + list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (cdap_destroy(flow->instance)) @@ -159,9 +151,10 @@ int ribmgr_fini() list_del(&flow->next); free(flow); } - pthread_rwlock_unlock(&rib->flows_lock); + pthread_rwlock_unlock(&rib.flows_lock); - free(rib); + pthread_mutex_destroy(&rib.cdap_reqs_lock); + pthread_rwlock_destroy(&rib.flows_lock); return 0; } @@ -174,9 +167,9 @@ int ribmgr_cdap_reply(struct cdap * instance, { struct list_head * pos, * n = NULL; - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); - list_for_each_safe(pos, n, &rib->cdap_reqs) { + list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); if (req->instance == instance && @@ -191,15 +184,15 @@ int ribmgr_cdap_reply(struct cdap * instance, "executed succesfully", req->code, req->name); - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); /* FIXME: In case of a read, update values here */ cdap_request_respond(req, result); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); } } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); return 0; } @@ -223,34 +216,34 @@ int ribmgr_cdap_write(struct cdap * instance, static_info_msg_t * msg; int ret = 0; - pthread_rwlock_wrlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL && + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, STATIC_INFO) == 0) { LOG_DBG("Received static DIF information."); msg = static_info_msg__unpack(NULL, len, data); if (msg == NULL) { - ipcp_set_state(_ipcp, IPCP_INIT); - pthread_rwlock_unlock(&_ipcp->state_lock); + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); LOG_ERR("Failed to unpack static info message."); return -1; } - rib->dtc.addr_size = msg->addr_size; - rib->dtc.cep_id_size = msg->cep_id_size; - rib->dtc.pdu_length_size = msg->pdu_length_size; - rib->dtc.seqno_size = msg->seqno_size; - rib->dtc.has_ttl = msg->has_ttl; - rib->dtc.has_chk = msg->has_chk; - rib->dtc.min_pdu_size = msg->min_pdu_size; - rib->dtc.max_pdu_size = msg->max_pdu_size; + rib.dtc.addr_size = msg->addr_size; + rib.dtc.cep_id_size = msg->cep_id_size; + rib.dtc.pdu_length_size = msg->pdu_length_size; + rib.dtc.seqno_size = msg->seqno_size; + rib.dtc.has_ttl = msg->has_ttl; + rib.dtc.has_chk = msg->has_chk; + rib.dtc.min_pdu_size = msg->min_pdu_size; + rib.dtc.max_pdu_size = msg->max_pdu_size; - rib->address = msg->address; + rib.address = msg->address; - if (frct_init(&rib->dtc, rib->address)) { - ipcp_set_state(_ipcp, IPCP_INIT); - pthread_rwlock_unlock(&_ipcp->state_lock); + if (frct_init(&rib.dtc, rib.address)) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); static_info_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to init FRCT"); @@ -262,7 +255,7 @@ int ribmgr_cdap_write(struct cdap * instance, ret = -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); @@ -303,39 +296,39 @@ int ribmgr_cdap_start(struct cdap * instance, size_t len = 0; int iid = 0; - pthread_rwlock_wrlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_ENROLLED && + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_ENROLLED && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } - stat_info.addr_size = rib->dtc.addr_size; - stat_info.cep_id_size = rib->dtc.cep_id_size; - stat_info.pdu_length_size = rib->dtc.pdu_length_size; - stat_info.seqno_size = rib->dtc.seqno_size; - stat_info.has_ttl = rib->dtc.has_ttl; - stat_info.has_chk = rib->dtc.has_chk; - stat_info.min_pdu_size = rib->dtc.min_pdu_size; - stat_info.max_pdu_size = rib->dtc.max_pdu_size; + stat_info.addr_size = rib.dtc.addr_size; + stat_info.cep_id_size = rib.dtc.cep_id_size; + stat_info.pdu_length_size = rib.dtc.pdu_length_size; + stat_info.seqno_size = rib.dtc.seqno_size; + stat_info.has_ttl = rib.dtc.has_ttl; + stat_info.has_chk = rib.dtc.has_chk; + stat_info.min_pdu_size = rib.dtc.min_pdu_size; + stat_info.max_pdu_size = rib.dtc.max_pdu_size; /* FIXME: Hand out an address. */ stat_info.address = 0; len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to get size of static information."); return -1; } data = malloc(len); if (data == NULL) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to allocate memory."); return -1; } @@ -344,59 +337,59 @@ int ribmgr_cdap_start(struct cdap * instance, LOG_DBGF("Sending static info..."); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_write(instance, STATIC_INFO, data, len, 0); if (iid < 0) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Failed to send static information."); return -1; } if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Remote did not receive static information."); return -1; } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); /* FIXME: Send neighbors here. */ LOG_DBGF("Sending stop enrollment..."); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_stop(instance, ENROLLMENT); if (iid < 0) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Failed to send stop of enrollment."); return -1; } if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Remote failed to complete enrollment."); return -1; } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); free(data); } else { if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to start request."); return -1; } } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -407,21 +400,21 @@ int ribmgr_cdap_stop(struct cdap * instance, { int ret = 0; - pthread_rwlock_wrlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL && + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); } else ret = -1; if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -457,19 +450,18 @@ int ribmgr_add_flow(int fd) flow->instance = instance; flow->fd = fd; - pthread_rwlock_wrlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&rib->flows_lock); - if (list_empty(&rib->flows) && - ipcp_get_state(_ipcp) == IPCP_INIT) { - ipcp_set_state(_ipcp, IPCP_PENDING_ENROLL); + pthread_rwlock_wrlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&rib.flows_lock); + if (list_empty(&rib.flows) && ipcp_get_state() == IPCP_INIT) { + ipcp_set_state(IPCP_PENDING_ENROLL); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_start(instance, ENROLLMENT); if (iid < 0) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&rib->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); @@ -477,20 +469,20 @@ int ribmgr_add_flow(int fd) } if (cdap_result_wait(instance, START, ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&rib->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); } - list_add(&flow->next, &rib->flows); - pthread_rwlock_unlock(&rib->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + list_add(&flow->next, &rib.flows); + pthread_rwlock_unlock(&rib.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -499,20 +491,20 @@ int ribmgr_remove_flow(int fd) { struct list_head * pos, * n = NULL; - pthread_rwlock_wrlock(&rib->flows_lock); - list_for_each_safe(pos, n, &rib->flows) { + pthread_rwlock_wrlock(&rib.flows_lock); + list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); - pthread_rwlock_unlock(&rib->flows_lock); + pthread_rwlock_unlock(&rib.flows_lock); free(flow); return 0; } } - pthread_rwlock_unlock(&rib->flows_lock); + pthread_rwlock_unlock(&rib.flows_lock); return -1; } @@ -525,19 +517,19 @@ int ribmgr_bootstrap(struct dif_config * conf) return -1; } - rib->dtc.addr_size = conf->addr_size; - rib->dtc.cep_id_size = conf->cep_id_size; - rib->dtc.pdu_length_size = conf->pdu_length_size; - rib->dtc.seqno_size = conf->seqno_size; - rib->dtc.has_ttl = conf->has_ttl; - rib->dtc.has_chk = conf->has_chk; - rib->dtc.min_pdu_size = conf->min_pdu_size; - rib->dtc.max_pdu_size = conf->max_pdu_size; + rib.dtc.addr_size = conf->addr_size; + rib.dtc.cep_id_size = conf->cep_id_size; + rib.dtc.pdu_length_size = conf->pdu_length_size; + rib.dtc.seqno_size = conf->seqno_size; + rib.dtc.has_ttl = conf->has_ttl; + rib.dtc.has_chk = conf->has_chk; + rib.dtc.min_pdu_size = conf->min_pdu_size; + rib.dtc.max_pdu_size = conf->max_pdu_size; /* FIXME: Set correct address. */ - rib->address = 0; + rib.address = 0; - if (frct_init(&rib->dtc, rib->address)) { + if (frct_init(&rib.dtc, rib.address)) { LOG_ERR("Failed to initialize FRCT."); return -1; } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index d74984cc..2cf46e51 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -24,24 +24,19 @@ #define _DEFAULT_SOURCE -#include "ipcp.h" -#include "flow.h" #include <ouroboros/errno.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/irm_config.h> -#include <ouroboros/sockets.h> #include <ouroboros/bitmap.h> -#include <ouroboros/flow.h> #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" #include <ouroboros/logs.h> +#include "ipcp.h" + #include <net/if.h> #include <signal.h> #include <stdlib.h> @@ -79,18 +74,12 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define LLC_HEADER_SIZE 3 #define MAX_SAPS 64 #define ETH_HEADER_SIZE (2 * MAC_SIZE + 2) -#define ETH_FRAME_SIZE (SHIM_ETH_LLC_MAX_SDU_SIZE + ETH_HEADER_SIZE + \ - LLC_HEADER_SIZE + 2) +#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + + SHIM_ETH_LLC_MAX_SDU_SIZE) /* global for trapping signal */ int irmd_api; -struct ipcp * _ipcp; - -#define shim_data(type) ((struct eth_llc_ipcp_data *) type->data) - -#define ipcp_flow(index) ((struct flow *) &(shim_data(_ipcp)->flows[index])) - struct eth_llc_frame { uint8_t dst_hwaddr[MAC_SIZE]; uint8_t src_hwaddr[MAC_SIZE]; @@ -98,196 +87,78 @@ struct eth_llc_frame { uint8_t dsap; uint8_t ssap; uint8_t cf; - uint8_t size[2]; uint8_t payload; }; -struct eth_llc_flow { - struct flow flow; - uint8_t sap; - uint8_t r_sap; - uint8_t r_addr[MAC_SIZE]; +struct ef { + int8_t sap; + int8_t r_sap; + uint8_t r_addr[MAC_SIZE]; }; -struct eth_llc_ipcp_data { - /* Keep ipcp_data first for polymorphism. */ - struct ipcp_data ipcp_data; - +struct { #ifdef __FreeBSD__ - struct sockaddr_dl device; + struct sockaddr_dl device; #else - struct sockaddr_ll device; + struct sockaddr_ll device; #endif - int s_fd; - - struct bmp * indices; - struct bmp * saps; + int s_fd; - struct shm_rdrbuff * rdrb; - struct shm_ap_rbuff * rb; - - uint8_t * rx_ring; - uint8_t * tx_ring; - int tx_offset; - - struct eth_llc_flow flows[AP_MAX_FLOWS]; - pthread_rwlock_t flows_lock; + struct bmp * saps; +#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) + uint8_t * rx_ring; + uint8_t * tx_ring; + int tx_offset; +#endif + int * ef_to_fd; + struct ef * fd_to_ef; + pthread_rwlock_t flows_lock; - pthread_t mainloop; - pthread_t sdu_writer; - pthread_t sdu_reader; -}; + pthread_t sdu_writer; + pthread_t sdu_reader; +} eth_llc_data; -struct eth_llc_ipcp_data * eth_llc_ipcp_data_create() +static int eth_llc_data_init() { - struct eth_llc_ipcp_data * eth_llc_data; - enum ipcp_type ipcp_type; - - eth_llc_data = malloc(sizeof(*eth_llc_data)); - if (eth_llc_data == NULL) { - LOG_ERR("Failed to allocate."); - return NULL; - } - - ipcp_type = THIS_TYPE; - if (ipcp_data_init((struct ipcp_data *) eth_llc_data, - ipcp_type) == NULL) { - free(eth_llc_data); - return NULL; - } - - eth_llc_data->rdrb = shm_rdrbuff_open(); - if (eth_llc_data->rdrb == NULL) { - free(eth_llc_data); - return NULL; - } - - eth_llc_data->rb = shm_ap_rbuff_create_n(); - if (eth_llc_data->rb == NULL) { - shm_rdrbuff_close(eth_llc_data->rdrb); - free(eth_llc_data); - return NULL; - } + int i; - eth_llc_data->indices = bmp_create(AP_MAX_FLOWS, 0); - if (eth_llc_data->indices == NULL) { - shm_ap_rbuff_destroy(eth_llc_data->rb); - shm_rdrbuff_close(eth_llc_data->rdrb); - free(eth_llc_data); - return NULL; - } + eth_llc_data.fd_to_ef = malloc(sizeof(struct ef) * IRMD_MAX_FLOWS); + if (eth_llc_data.fd_to_ef == NULL) + return -ENOMEM; - eth_llc_data->saps = bmp_create(MAX_SAPS, 2); - if (eth_llc_data->indices == NULL) { - bmp_destroy(eth_llc_data->indices); - shm_ap_rbuff_destroy(eth_llc_data->rb); - shm_rdrbuff_close(eth_llc_data->rdrb); - free(eth_llc_data); - return NULL; + eth_llc_data.ef_to_fd = malloc(sizeof(struct ef) * MAX_SAPS); + if (eth_llc_data.ef_to_fd == NULL) { + free(eth_llc_data.fd_to_ef); + return -ENOMEM; } - pthread_rwlock_init(ð_llc_data->flows_lock, NULL); - - return eth_llc_data; -} - -void eth_llc_ipcp_data_destroy() -{ - int i = 0; - - if (_ipcp == NULL) - return; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) - LOG_WARN("Cleaning up while not in shutdown."); - - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(shim_data(_ipcp)->rb)) >= 0) - shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, i); - - if (shim_data(_ipcp)->rdrb != NULL) - shm_rdrbuff_close(shim_data(_ipcp)->rdrb); - if (shim_data(_ipcp)->rb != NULL) - shm_ap_rbuff_destroy(shim_data(_ipcp)->rb); - if (shim_data(_ipcp)->indices != NULL) - bmp_destroy(shim_data(_ipcp)->indices); - if (shim_data(_ipcp)->saps != NULL) - bmp_destroy(shim_data(_ipcp)->saps); - - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - - for (i = 0; i < AP_MAX_FLOWS; i ++) - if (ipcp_flow(i)->rb != NULL) - shm_ap_rbuff_close(ipcp_flow(i)->rb); - - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - ipcp_data_destroy(_ipcp->data); -} - -/* only call this under flows_lock */ -static int port_id_to_index(int port_id) -{ - int i; - - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ipcp_flow(i)->port_id == port_id - && ipcp_flow(i)->state != FLOW_NULL) - return i; + eth_llc_data.saps = bmp_create(MAX_SAPS, 2); + if (eth_llc_data.saps == NULL) { + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; } - return -1; -} + for (i = 0; i < MAX_SAPS; ++i) + eth_llc_data.ef_to_fd[i] = -1; -/* only call this under flows_lock */ -static int addr_and_saps_to_index(const uint8_t * r_addr, - uint8_t r_sap, - uint8_t sap) -{ - int i = 0; - - for (i = 0; i < AP_MAX_FLOWS; i++) { - if (ipcp_flow(i)->state == FLOW_ALLOCATED && - shim_data(_ipcp)->flows[i].r_sap == r_sap && - shim_data(_ipcp)->flows[i].sap == sap && - !memcmp(shim_data(_ipcp)->flows[i].r_addr, - r_addr, - MAC_SIZE)) { - return i; - } + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + eth_llc_data.fd_to_ef[i].sap = -1; + eth_llc_data.fd_to_ef[i].r_sap = -1; + memset(ð_llc_data.fd_to_ef[i].r_addr, 0, MAC_SIZE); } - return -1; -} + pthread_rwlock_init(ð_llc_data.flows_lock, NULL); -/* only call this under flows_lock */ -static int sap_to_index(uint8_t sap) -{ - int i = 0; - - for (i = 0; i < AP_MAX_FLOWS; i++) { - if (shim_data(_ipcp)->flows[i].sap == sap) { - return i; - } - } - - return -1; + return 0; } -/* only call this under flows_lock */ -static void destroy_ipcp_flow(int index) +void eth_llc_data_fini() { - ipcp_flow(index)->port_id = -1; - if (ipcp_flow(index)->rb != NULL) - shm_ap_rbuff_close(ipcp_flow(index)->rb); - ipcp_flow(index)->rb = NULL; - ipcp_flow(index)->state = FLOW_NULL; - bmp_release(shim_data(_ipcp)->indices, index); - bmp_release(shim_data(_ipcp)->saps, - shim_data(_ipcp)->flows[index].sap); + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.fd_to_ef); + free(eth_llc_data.ef_to_fd); + pthread_rwlock_destroy(ð_llc_data.flows_lock); } static uint8_t reverse_bits(uint8_t b) @@ -299,7 +170,7 @@ static uint8_t reverse_bits(uint8_t b) return b; } -static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE], +static int eth_llc_ipcp_send_frame(uint8_t * dst_addr, uint8_t dsap, uint8_t ssap, uint8_t * payload, @@ -307,24 +178,16 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE], { int frame_len = 0; uint8_t cf = 0x03; - int fd; - - uint16_t size; uint16_t length; - #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) struct pollfd pfd; struct tpacket_hdr * header; uint8_t * frame; #else uint8_t frame[SHIM_ETH_LLC_MAX_SDU_SIZE]; -#ifdef __FreeBSD__ - struct sockaddr_dl device; -#else - struct sockaddr_ll device; -#endif #endif struct eth_llc_frame * llc_frame; + if (payload == NULL) { LOG_ERR("Payload was NULL."); return -1; @@ -333,79 +196,75 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE], if (len > SHIM_ETH_LLC_MAX_SDU_SIZE) return -1; - fd = (shim_data(_ipcp))->s_fd; - #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - header = (void *) shim_data(_ipcp)->tx_ring + - (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE); + header = (void *) (eth_llc_data.tx_ring + + eth_llc_data.tx_offset * SHM_RDRB_BLOCK_SIZE); while (header->tp_status != TP_STATUS_AVAILABLE) { - pfd.fd = fd; + pfd.fd = eth_llc_data.s_fd; pfd.revents = 0; pfd.events = POLLIN | POLLRDNORM | POLLERR; if (poll(&pfd, 1, -1) <= 0) { - LOG_ERR("Failed to poll: %s.", strerror(errno)); + LOG_ERR("Failed to poll."); continue; } - header = (void *) shim_data(_ipcp)->tx_ring + - (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE); + header = (void *) (eth_llc_data.tx_ring + + eth_llc_data.tx_offset + * SHM_RDRB_BLOCK_SIZE); } - frame = (void *) header + TPACKET_HDRLEN - sizeof(struct sockaddr_ll); + frame = (uint8_t *) header + + TPACKET_HDRLEN - sizeof(struct sockaddr_ll); #endif - llc_frame = (struct eth_llc_frame *) frame; - memcpy(&llc_frame->dst_hwaddr, dst_addr, MAC_SIZE); - memcpy(&llc_frame->src_hwaddr, + memcpy(llc_frame->dst_hwaddr, dst_addr, MAC_SIZE); + memcpy(llc_frame->src_hwaddr, #ifdef __FreeBSD__ - LLADDR(&shim_data(_ipcp)->device), + LLADDR(ð_llc_data.device), #else - shim_data(_ipcp)->device.sll_addr, + eth_llc_data.device.sll_addr, #endif MAC_SIZE); - length = htons(LLC_HEADER_SIZE + sizeof(size) + len); + length = htons(LLC_HEADER_SIZE + len); memcpy(&llc_frame->length, &length, sizeof(length)); llc_frame->dsap = dsap; llc_frame->ssap = ssap; - llc_frame->cf = cf; - /* write the payload length, can't trust the driver */ - size = htons(len); - memcpy(&llc_frame->size, &size, sizeof(size)); + llc_frame->cf = cf; memcpy(&llc_frame->payload, payload, len); - frame_len = ETH_HEADER_SIZE + LLC_HEADER_SIZE + sizeof(uint16_t) + len; + frame_len = ETH_HEADER_SIZE + LLC_HEADER_SIZE + len; #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) header->tp_len = frame_len; header->tp_status = TP_STATUS_SEND_REQUEST; - if (send(fd, NULL, 0, MSG_DONTWAIT) < 0) { + if (send(eth_llc_data.s_fd, NULL, 0, MSG_DONTWAIT) < 0) { LOG_ERR("Failed to write frame into TX_RING."); return -1; } - shim_data(_ipcp)->tx_offset = - (shim_data(_ipcp)->tx_offset + 1) - & (SHM_BUFFER_SIZE -1); + eth_llc_data.tx_offset = + (eth_llc_data.tx_offset + 1) & (SHM_BUFFER_SIZE - 1); #else - device = (shim_data(_ipcp))->device; - - if (sendto(fd, frame, frame_len, 0, - (struct sockaddr *) &device, sizeof(device)) <= 0) { + if (sendto(eth_llc_data.s_fd, + frame, + frame_len, + 0, + (struct sockaddr *) ð_llc_data.device, + sizeof(eth_llc_data.device)) <= 0) { LOG_ERR("Failed to send message."); return -1; } #endif - return 0; } static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg, - uint8_t dst_addr[MAC_SIZE]) + uint8_t * dst_addr) { size_t len; uint8_t * buf; @@ -423,6 +282,7 @@ static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg, if (eth_llc_ipcp_send_frame(dst_addr, reverse_bits(MGMT_SAP), reverse_bits(MGMT_SAP), buf, len)) { LOG_ERR("Failed to send management frame."); + free(buf); return -1; } @@ -431,10 +291,10 @@ static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg, return 0; } -static int eth_llc_ipcp_port_alloc(uint8_t dst_addr[MAC_SIZE], - uint8_t ssap, - char * dst_name, - char * src_ae_name) +static int eth_llc_ipcp_sap_alloc(uint8_t * dst_addr, + uint8_t ssap, + char * dst_name, + char * src_ae_name) { shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -446,10 +306,10 @@ static int eth_llc_ipcp_port_alloc(uint8_t dst_addr[MAC_SIZE], return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr); } -static int eth_llc_ipcp_port_alloc_resp(uint8_t dst_addr[MAC_SIZE], - uint8_t ssap, - uint8_t dsap, - int response) +static int eth_llc_ipcp_sap_alloc_resp(uint8_t * dst_addr, + uint8_t ssap, + uint8_t dsap, + int response) { shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -463,8 +323,7 @@ static int eth_llc_ipcp_port_alloc_resp(uint8_t dst_addr[MAC_SIZE], return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr); } -static int eth_llc_ipcp_port_dealloc(uint8_t dst_addr[MAC_SIZE], - uint8_t ssap) +static int eth_llc_ipcp_sap_dealloc(uint8_t * dst_addr, uint8_t ssap) { shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT; @@ -474,142 +333,102 @@ static int eth_llc_ipcp_port_dealloc(uint8_t dst_addr[MAC_SIZE], return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr); } -static int eth_llc_ipcp_port_req(uint8_t r_sap, - uint8_t r_addr[MAC_SIZE], - char * dst_name, - char * src_ae_name) +static int eth_llc_ipcp_sap_req(uint8_t r_sap, + uint8_t * r_addr, + char * dst_name, + char * src_ae_name) { - int port_id; - ssize_t index = 0; - int i; + int fd; - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - - index = bmp_allocate(shim_data(_ipcp)->indices); - if (index < 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - LOG_ERR("Out of free indices."); - return -1; - } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); /* reply to IRM */ - port_id = ipcp_flow_req_arr(getpid(), - dst_name, - src_ae_name); - - if (port_id < 0) { - bmp_release(shim_data(_ipcp)->indices, index); - pthread_rwlock_unlock(&_ipcp->state_lock); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - LOG_ERR("Could not get port id from IRMd."); + fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); + if (fd < 0) { + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Could not get new flow from IRMd."); return -1; } - ipcp_flow(index)->port_id = port_id; - ipcp_flow(index)->rb = NULL; - ipcp_flow(index)->state = FLOW_PENDING; - shim_data(_ipcp)->flows[index].r_sap = r_sap; - for (i = 0; i < MAC_SIZE; i++) { - shim_data(_ipcp)->flows[index].r_addr[i] = r_addr[i]; - } + eth_llc_data.fd_to_ef[fd].r_sap = r_sap; + memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("New flow request, port_id %d, remote SAP %d.", port_id, r_sap); + LOG_DBG("New flow request, fd %d, remote SAP %d.", fd, r_sap); return 0; } -static int eth_llc_ipcp_port_alloc_reply(uint8_t ssap, - uint8_t r_addr[MAC_SIZE], - int dsap, - int response) +static int eth_llc_ipcp_sap_alloc_reply(uint8_t ssap, + uint8_t * r_addr, + int dsap, + int response) { - int index = -1; int ret = 0; - int port_id = -1; - int i; + int fd = -1; - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(& eth_llc_data.flows_lock); - index = sap_to_index(ssap); - if (index < 0) { - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + fd = eth_llc_data.ef_to_fd[dsap]; + if (fd < 0) { + pthread_rwlock_unlock(& eth_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("No flow found with that SAP."); return -1; /* -EFLOWNOTFOUND */ } - if (ipcp_flow(index)->state != FLOW_PENDING) { - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; /* -EFLOWNOTPENDING */ - } - - port_id = ipcp_flow(index)->port_id; - if (response) { - destroy_ipcp_flow(index); + bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); } else { - ipcp_flow(index)->state = FLOW_ALLOCATED; - shim_data(_ipcp)->flows[index].r_sap = dsap; - for (i = 0; i < MAC_SIZE; i++) { - shim_data(_ipcp)->flows[index].r_addr[i] = r_addr[i]; - } + eth_llc_data.fd_to_ef[fd].r_sap = ssap; + memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE); } - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Flow reply, port_id %d, remote SAP %d.", port_id, dsap); + LOG_DBG("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap); - if ((ret = ipcp_flow_alloc_reply(getpid(), - port_id, - response)) < 0) { - return -1; /* -EPIPE */ - } + if ((ret = ipcp_flow_alloc_reply(fd, response)) < 0) + return -1; return ret; } -static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, - uint8_t r_addr[MAC_SIZE]) +static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr) { - int port_id = -1; - int i = 0; + int fd = -1; - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); - i = sap_to_index(ssap); - if (i < 0) { - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + fd = eth_llc_data.ef_to_fd[ssap]; + if (fd < 0) { + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("No flow found for remote deallocation request."); return 0; } - port_id = ipcp_flow(i)->port_id; - destroy_ipcp_flow(i); + bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - irm_flow_dealloc(port_id); + flow_dealloc(fd); - LOG_DBG("Flow with port_id %d deallocated.", port_id); + LOG_DBG("Flow with fd %d deallocated.", fd); return 0; } -static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, - size_t len, - uint8_t r_addr[MAC_SIZE]) +static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr) { shim_eth_llc_msg_t * msg = NULL; @@ -621,27 +440,24 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, switch (msg->code) { case SHIM_ETH_LLC_MSG_CODE__FLOW_REQ: - if (ipcp_data_is_in_registry(_ipcp->data, - msg->dst_name)) { - eth_llc_ipcp_port_req(msg->ssap, - r_addr, - msg->dst_name, - msg->src_ae_name); + if (ipcp_data_is_in_registry(ipcpi.data, msg->dst_name)) { + eth_llc_ipcp_sap_req(msg->ssap, + r_addr, + msg->dst_name, + msg->src_ae_name); } break; case SHIM_ETH_LLC_MSG_CODE__FLOW_REPLY: - eth_llc_ipcp_port_alloc_reply(msg->ssap, - r_addr, - msg->dsap, - msg->response); + eth_llc_ipcp_sap_alloc_reply(msg->ssap, + r_addr, + msg->dsap, + msg->response); break; case SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC: - eth_llc_ipcp_flow_dealloc_req(msg->ssap, - r_addr); + eth_llc_ipcp_flow_dealloc_req(msg->ssap, r_addr); break; default: - LOG_ERR("Unknown message received %d.", - msg->code); + LOG_ERR("Unknown message received %d.", msg->code); shim_eth_llc_msg__free_unpacked(msg, NULL); return -1; } @@ -652,15 +468,11 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, static void * eth_llc_ipcp_sdu_reader(void * o) { - ssize_t index; - struct rb_entry e; uint8_t br_addr[MAC_SIZE]; + uint16_t length; uint8_t dsap; uint8_t ssap; - int i = 0; - struct eth_llc_frame * llc_frame; - uint16_t size; - + int fd; #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) struct pollfd pfd; int offset = 0; @@ -670,43 +482,43 @@ static void * eth_llc_ipcp_sdu_reader(void * o) uint8_t buf[ETH_FRAME_SIZE]; int frame_len = 0; #endif + struct eth_llc_frame * llc_frame; memset(br_addr, 0xff, MAC_SIZE * sizeof(uint8_t)); while (true) { #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - header = (void *) shim_data(_ipcp)->rx_ring + - (offset * SHM_RDRB_BLOCK_SIZE); + header = (void *) (eth_llc_data.rx_ring + + offset * SHM_RDRB_BLOCK_SIZE); while (!(header->tp_status & TP_STATUS_USER)) { - pfd.fd = shim_data(_ipcp)->s_fd; + pfd.fd = eth_llc_data.s_fd; pfd.revents = 0; pfd.events = POLLIN | POLLRDNORM | POLLERR; if (poll(&pfd, 1, -1) <= 0) { - LOG_ERR("Failed to poll: %s.", strerror(errno)); + LOG_ERR("Failed to poll."); continue; } - header = (void *) shim_data(_ipcp)->rx_ring + - (offset * SHM_RDRB_BLOCK_SIZE); + header = (void *) (eth_llc_data.rx_ring + + offset * SHM_RDRB_BLOCK_SIZE); } - buf = (void * ) header + header->tp_mac; + buf = (uint8_t * ) header + header->tp_mac; #else - frame_len = recv(shim_data(_ipcp)->s_fd, buf, + frame_len = recv(eth_llc_data.s_fd, buf, SHIM_ETH_LLC_MAX_SDU_SIZE, 0); if (frame_len < 0) { - LOG_ERR("Failed to recv frame."); + LOG_ERR("Failed to receive frame."); continue; } #endif - llc_frame = (struct eth_llc_frame *) buf; #ifdef __FreeBSD__ - if (memcmp(LLADDR(&shim_data(_ipcp)->device), + if (memcmp(LLADDR(ð_llc_data.device), #else - if (memcmp(shim_data(_ipcp)->device.sll_addr, + if (memcmp(eth_llc_data.device.sll_addr, #endif &llc_frame->dst_hwaddr, MAC_SIZE) && @@ -721,46 +533,29 @@ static void * eth_llc_ipcp_sdu_reader(void * o) dsap = reverse_bits(llc_frame->dsap); ssap = reverse_bits(llc_frame->ssap); - memcpy(&size, &llc_frame->size, sizeof(size)); + memcpy(&length, &llc_frame->length, sizeof(length)); + length = ntohs(length) - LLC_HEADER_SIZE; if (ssap == MGMT_SAP && dsap == MGMT_SAP) { eth_llc_ipcp_mgmt_frame(&llc_frame->payload, - ntohs(size), + length, llc_frame->src_hwaddr); } else { - pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock); - - i = addr_and_saps_to_index(llc_frame->src_hwaddr, - ssap, - dsap); - if (i < 0) { - pthread_rwlock_unlock(&shim_data(_ipcp)-> - flows_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); + + fd = eth_llc_data.ef_to_fd[dsap]; + if (fd < 0) { + pthread_rwlock_unlock(ð_llc_data.flows_lock); #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - offset = (offset + 1) - & (SHM_BUFFER_SIZE - 1); + offset = (offset + 1) & (SHM_BUFFER_SIZE - 1); header->tp_status = TP_STATUS_KERNEL; #endif continue; } - while ((index = - shm_rdrbuff_write(shim_data(_ipcp)->rdrb, - ipcp_flow(i)->api, - 0, - 0, - &llc_frame->payload, - ntohs(size))) - < 0) - ; - - e.index = index; - e.port_id = ipcp_flow(i)->port_id; - - while (shm_ap_rbuff_write(ipcp_flow(i)->rb, &e) < 0) - ; + pthread_rwlock_unlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); + flow_write(fd, &llc_frame->payload, length); } #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) offset = (offset + 1) & (SHM_BUFFER_SIZE -1); @@ -774,51 +569,34 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { while (true) { - struct rb_entry * e; - int i; - int len = 0; - uint8_t * buf; + int fd; + struct shm_du_buff * sdb; uint8_t ssap; uint8_t dsap; + uint8_t r_addr[MAC_SIZE]; - e = shm_ap_rbuff_read(shim_data(_ipcp)->rb); - if (e == NULL) - continue; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - len = shm_rdrbuff_read((uint8_t **) &buf, - shim_data(_ipcp)->rdrb, - e->index); - if (len <= 0) { - free(e); - LOG_ERR("Length of du map read was %d.", len); - continue; - } + pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock); - - i = port_id_to_index(e->port_id); - if (i < 0) { - free(e); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); + fd = ipcp_flow_read(&sdb); + if (fd < 0) { + pthread_rwlock_unlock(&ipcpi.state_lock); continue; } - ssap = reverse_bits(shim_data(_ipcp)->flows[i].sap); - dsap = reverse_bits(shim_data(_ipcp)->flows[i].r_sap); - - eth_llc_ipcp_send_frame(shim_data(_ipcp)->flows[i].r_addr, - dsap, ssap, buf, len); - - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); - if (shim_data(_ipcp)->rdrb != NULL) - shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, e->index); + ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); + dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); + memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - free(e); + eth_llc_ipcp_send_frame(r_addr, dsap, ssap, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) + - shm_du_buff_head(sdb)); + ipcp_flow_del(sdb); } return (void *) 1; @@ -826,10 +604,6 @@ static void * eth_llc_ipcp_sdu_writer(void * o) void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - switch(sig) { case SIGINT: case SIGTERM: @@ -838,12 +612,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) LOG_DBG("IPCP %d terminating by order of %d. Bye.", getpid(), info->si_pid); - pthread_rwlock_wrlock(&_ipcp->state_lock); - - ipcp_set_state(_ipcp, IPCP_SHUTDOWN); + pthread_rwlock_wrlock(&ipcpi.state_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + ipcp_set_state(IPCP_SHUTDOWN); + pthread_rwlock_unlock(&ipcpi.state_lock); } default: return; @@ -852,7 +625,7 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) static int eth_llc_ipcp_bootstrap(struct dif_config * conf) { - int fd = -1; + int skfd = -1; struct ifreq ifr; int idx; #ifdef __FreeBSD__ @@ -882,17 +655,10 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) memset(&ifr, 0, sizeof(ifr)); - fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) { - LOG_ERR("Failed to open socket."); - return -1; - } - memcpy(ifr.ifr_name, conf->if_name, strlen(conf->if_name)); #ifdef __FreeBSD__ if (getifaddrs(&ifaddr) < 0) { - close(fd); LOG_ERR("Could not get interfaces."); return -1; } @@ -900,8 +666,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) { if (strcmp(ifa->ifa_name, conf->if_name)) continue; - LOG_DBGF("Interface %s found.", conf->if_name); - + LOG_DBG("Interface %s found.", conf->if_name); memcpy(&ifr.ifr_addr, ifa->ifa_addr, sizeof(*ifa->ifa_addr)); break; } @@ -913,30 +678,33 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) } freeifaddrs(ifaddr); - #else - if (ioctl(fd, SIOCGIFHWADDR, &ifr)) { - close(fd); - LOG_ERR("Failed to ioctl: %s.", strerror(errno)); + skfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (skfd < 0) { + LOG_ERR("Failed to open socket."); + return -1; + } + + if (ioctl(skfd, SIOCGIFHWADDR, &ifr)) { + LOG_ERR("Failed to ioctl."); + close(skfd); return -1; } + close(skfd); + idx = if_nametoindex(conf->if_name); if (idx == 0) { LOG_ERR("Failed to retrieve interface index."); + close(skfd); return -1; } #endif - - close(fd); - memset(&(device), 0, sizeof(device)); #ifdef __FreeBSD__ device.sdl_index = idx; device.sdl_family = AF_LINK; - memcpy(LLADDR(&device), - ifr.ifr_addr.sa_data, - MAC_SIZE * sizeof (uint8_t)); + memcpy(LLADDR(&device), ifr.ifr_addr.sa_data, MAC_SIZE); device.sdl_alen = MAC_SIZE; /* TODO: replace socket calls with bpf for BSD */ LOG_MISSING; @@ -944,23 +712,21 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) #else device.sll_ifindex = idx; device.sll_family = AF_PACKET; - memcpy(device.sll_addr, - ifr.ifr_hwaddr.sa_data, - MAC_SIZE * sizeof (uint8_t)); + memcpy(device.sll_addr, ifr.ifr_hwaddr.sa_data, MAC_SIZE); device.sll_halen = MAC_SIZE; device.sll_protocol = htons(ETH_P_ALL); - fd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_802_2)); + skfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_802_2)); #endif - if (fd < 0) { - LOG_ERR("Failed to create socket: %s.", strerror(errno)); + if (skfd < 0) { + LOG_ERR("Failed to create socket."); return -1; } #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_RDRB_BLOCK_SIZE) { LOG_ERR("Max SDU size is bigger than DU map block size."); - close(fd); + close(skfd); return -1; } @@ -969,68 +735,68 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) req.tp_block_nr = SHM_BUFFER_SIZE; req.tp_frame_nr = SHM_BUFFER_SIZE; - if (setsockopt(fd, SOL_PACKET, PACKET_RX_RING, + if (setsockopt(skfd, SOL_PACKET, PACKET_RX_RING, (void *) &req, sizeof(req))) { LOG_ERR("Failed to set sockopt PACKET_RX_RING"); - close(fd); + close(skfd); return -1; } - if (setsockopt(fd, SOL_PACKET, PACKET_TX_RING, + if (setsockopt(skfd, SOL_PACKET, PACKET_TX_RING, (void *) &req, sizeof(req))) { LOG_ERR("Failed to set sockopt PACKET_TX_RING"); - close(fd); + close(skfd); return -1; } #endif - - if (bind(fd,(struct sockaddr *) &device, sizeof(device))) { + if (bind(skfd, (struct sockaddr *) &device, sizeof(device))) { LOG_ERR("Failed to bind socket to interface"); - close(fd); + close(skfd); return -1; } #if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) - shim_data(_ipcp)->rx_ring = mmap(NULL, - 2 * SHM_RDRB_BLOCK_SIZE - * SHM_BUFFER_SIZE, - PROT_READ | PROT_WRITE, MAP_SHARED, - fd, 0); - if (shim_data(_ipcp)->rx_ring == NULL) { + eth_llc_data.rx_ring = mmap(NULL, 2 * SHM_RDRB_BLOCK_SIZE + * SHM_BUFFER_SIZE, + PROT_READ | PROT_WRITE, MAP_SHARED, + skfd, 0); + if (eth_llc_data.rx_ring == NULL) { LOG_ERR("Failed to mmap"); - close(fd); + close(skfd); return -1; } - shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring - + (SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE); + eth_llc_data.tx_ring = eth_llc_data.rx_ring + + SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE; #endif - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("IPCP in wrong state."); - close(fd); + close(skfd); return -1; } - shim_data(_ipcp)->s_fd = fd; - shim_data(_ipcp)->device = device; - shim_data(_ipcp)->tx_offset = 0; + eth_llc_data.s_fd = skfd; + eth_llc_data.device = device; +#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING) + eth_llc_data.tx_offset = 0; +#endif - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); - pthread_create(&shim_data(_ipcp)->sdu_reader, + pthread_create(ð_llc_data.sdu_reader, NULL, eth_llc_ipcp_sdu_reader, NULL); - pthread_create(&shim_data(_ipcp)->sdu_writer, + pthread_create(ð_llc_data.sdu_writer, NULL, eth_llc_ipcp_sdu_writer, NULL); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Bootstrapped shim IPCP over Ethernet with LLC with api %d.", getpid()); @@ -1040,15 +806,15 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) static int eth_llc_ipcp_name_reg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(ipcpi.data, name)) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to add %s to local registry.", name); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Registered %s.", name); @@ -1057,25 +823,22 @@ static int eth_llc_ipcp_name_reg(char * name) static int eth_llc_ipcp_name_unreg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); + ipcp_data_del_reg_entry(ipcpi.data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } -static int eth_llc_ipcp_flow_alloc(pid_t n_api, - int port_id, +static int eth_llc_ipcp_flow_alloc(int fd, char * dst_name, char * src_ae_name, enum qos_cube qos) { - struct shm_ap_rbuff * rb; uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; - int index = 0; LOG_INFO("Allocating flow to %s.", dst_name); @@ -1083,182 +846,122 @@ static int eth_llc_ipcp_flow_alloc(pid_t n_api, return -1; if (qos != QOS_CUBE_BE) - LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now."); - - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) - return -1; /* -ENORBUFF */ + LOG_DBG("QoS requested. Ethernet LLC can't do that. For now."); - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); - shm_ap_rbuff_close(rb); - LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_DBG("Won't allocate flow with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } - index = bmp_allocate(shim_data(_ipcp)->indices); - if (index < 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); - shm_ap_rbuff_close(rb); - return -1; - } - - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); - ssap = bmp_allocate(shim_data(_ipcp)->saps); + ssap = bmp_allocate(eth_llc_data.saps); if (ssap < 0) { - shm_ap_rbuff_close(rb); - bmp_release(shim_data(_ipcp)->indices, index); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - ipcp_flow(index)->port_id = port_id; - ipcp_flow(index)->state = FLOW_PENDING; - ipcp_flow(index)->rb = rb; - shim_data(_ipcp)->flows[index].sap = ssap; - - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - memset(r_addr, 0xff, MAC_SIZE * sizeof(uint8_t)); - - if (eth_llc_ipcp_port_alloc(r_addr, ssap, - dst_name, - src_ae_name) < 0) { - LOG_DBGF("Port alloc returned -1."); - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - destroy_ipcp_flow(index); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + eth_llc_data.fd_to_ef[fd].sap = ssap; + eth_llc_data.ef_to_fd[ssap] = fd; + + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + + memset(r_addr, 0xff, MAC_SIZE); + + if (eth_llc_ipcp_sap_alloc(r_addr, ssap, dst_name, src_ae_name) < 0) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); + bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); + eth_llc_data.fd_to_ef[fd].sap = -1; + eth_llc_data.ef_to_fd[ssap] = -1; + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - LOG_DBG("Pending flow with port_id %d on SAP %d.", - port_id, ssap); + LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); - return index; + return 0; } -static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) { - struct shm_ap_rbuff * rb; - int index = -1; uint8_t ssap = 0; + uint8_t r_sap = 0; + uint8_t r_addr[MAC_SIZE]; - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - - index = port_id_to_index(port_id); - if (index < 0) { - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Could not find flow with port_id %d.", port_id); - return -1; - } - - if (ipcp_flow(index)->state != FLOW_PENDING) { - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow was not pending."); - return -1; - } - - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) { - LOG_ERR("Could not open N + 1 ringbuffer."); - ipcp_flow(index)->state = FLOW_NULL; - ipcp_flow(index)->port_id = -1; - bmp_release(shim_data(_ipcp)->indices, index); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; - } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); - ssap = bmp_allocate(shim_data(_ipcp)->saps); + ssap = bmp_allocate(eth_llc_data.saps); if (ssap < 0) { - ipcp_flow(index)->state = FLOW_NULL; - ipcp_flow(index)->port_id = -1; - shm_ap_rbuff_close(ipcp_flow(index)->rb); - bmp_release(shim_data(_ipcp)->indices, index); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - ipcp_flow(index)->state = FLOW_ALLOCATED; - ipcp_flow(index)->rb = rb; - shim_data(_ipcp)->flows[index].sap = ssap; - - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + eth_llc_data.fd_to_ef[fd].sap = ssap; + memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); + r_sap = eth_llc_data.fd_to_ef[fd].r_sap; + eth_llc_data.ef_to_fd[ssap] = fd; - if (eth_llc_ipcp_port_alloc_resp(shim_data(_ipcp)->flows[index].r_addr, - shim_data(_ipcp)->flows[index].r_sap, - ssap, - response) < 0) { - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); - destroy_ipcp_flow(index); - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Could not send response."); + if (eth_llc_ipcp_sap_alloc_resp(r_addr, ssap, r_sap, response) < 0) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); + bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - LOG_DBG("Accepted flow, port_id %d, SAP %d.", port_id, ssap); + LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap); return 0; } -static int eth_llc_ipcp_flow_dealloc(int port_id) +static int eth_llc_ipcp_flow_dealloc(int fd) { - int index = -1; uint8_t sap; uint8_t addr[MAC_SIZE]; - int i; int ret; - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(ð_llc_data.flows_lock); - index = port_id_to_index(port_id); - if (index < 0) { - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return 0; - } + sap = eth_llc_data.fd_to_ef[fd].r_sap; + memcpy(addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); - sap = shim_data(_ipcp)->flows[index].r_sap; - for (i = 0; i < MAC_SIZE; i++) { - addr[i] = shim_data(_ipcp)->flows[index].r_addr[i]; - } + bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); + eth_llc_data.fd_to_ef[fd].sap = -1; + eth_llc_data.fd_to_ef[fd].r_sap = -1; + memset(ð_llc_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE); - destroy_ipcp_flow(index); + eth_llc_data.ef_to_fd[sap] = -1; - pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock); + pthread_rwlock_unlock(ð_llc_data.flows_lock); - ret = eth_llc_ipcp_port_dealloc(addr, sap); - pthread_rwlock_unlock(&_ipcp->state_lock); + ret = eth_llc_ipcp_sap_dealloc(addr, sap); + pthread_rwlock_unlock(&ipcpi.state_lock); if (ret < 0) - LOG_DBGF("Could not notify remote."); + LOG_DBG("Could not notify remote."); - LOG_DBG("Flow with port_id %d deallocated.", port_id); + LOG_DBG("Flow with fd %d deallocated.", fd); return 0; } static struct ipcp_ops eth_llc_ops = { .ipcp_bootstrap = eth_llc_ipcp_bootstrap, - .ipcp_enroll = NULL, /* shim */ + .ipcp_enroll = NULL, .ipcp_name_reg = eth_llc_ipcp_name_reg, .ipcp_name_unreg = eth_llc_ipcp_name_unreg, .ipcp_flow_alloc = eth_llc_ipcp_flow_alloc, @@ -1270,7 +973,6 @@ int main(int argc, char * argv[]) { struct sigaction sig_act; sigset_t sigset; - int i = 0; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); @@ -1283,6 +985,14 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } + if (eth_llc_data_init() < 0) + exit(EXIT_FAILURE); + + if (ap_init(NULL) < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1298,35 +1008,13 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_instance_create(); - if (_ipcp == NULL) { - LOG_ERR("Failed to create instance."); - close_logfile(); - exit(EXIT_FAILURE); - } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); - _ipcp->data = (struct ipcp_data *) eth_llc_ipcp_data_create(); - if (_ipcp->data == NULL) { - LOG_ERR("Failed to create instance data."); - free(_ipcp); + if (ipcp_init(THIS_TYPE, ð_llc_ops) < 0) { close_logfile(); exit(EXIT_FAILURE); } - for (i = 0; i < AP_MAX_FLOWS; i++) { - ipcp_flow(i)->rb = NULL; - ipcp_flow(i)->port_id = -1; - ipcp_flow(i)->state = FLOW_NULL; - } - - _ipcp->ops = ð_llc_ops; - _ipcp->state = IPCP_INIT; - - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - - pthread_create(&shim_data(_ipcp)->mainloop, NULL, - ipcp_main_loop, _ipcp); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (ipcp_create_r(getpid())) { @@ -1335,17 +1023,17 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - pthread_join(shim_data(_ipcp)->mainloop, NULL); + ipcp_fini(); - pthread_cancel(shim_data(_ipcp)->sdu_reader); - pthread_cancel(shim_data(_ipcp)->sdu_writer); + pthread_cancel(eth_llc_data.sdu_reader); + pthread_cancel(eth_llc_data.sdu_writer); - pthread_join(shim_data(_ipcp)->sdu_writer, NULL); - pthread_join(shim_data(_ipcp)->sdu_reader, NULL); + pthread_join(eth_llc_data.sdu_writer, NULL); + pthread_join(eth_llc_data.sdu_reader, NULL); - eth_llc_ipcp_data_destroy(); + ap_fini(); - free(_ipcp); + eth_llc_data_fini(); close_logfile(); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index c35bd244..8c31e11a 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -22,18 +22,11 @@ #include <ouroboros/config.h> #include "ipcp.h" -#include "flow.h" #include "shim_udp_config.h" -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/irm_config.h> -#include <ouroboros/sockets.h> -#include <ouroboros/bitmap.h> -#include <ouroboros/flow.h> #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -63,268 +56,93 @@ typedef ShimUdpMsg shim_udp_msg_t; #define DNS_TTL 86400 #define FD_UPDATE_TIMEOUT 100 /* microseconds */ -#define shim_data(type) ((struct ipcp_udp_data *) type->data) +#define local_ip (udp_data.s_saddr.sin_addr.s_addr) -#define local_ip (((struct ipcp_udp_data *) \ - _ipcp->data)->s_saddr.sin_addr.s_addr) +#define UDP_MAX_PORTS 0xFFFF /* global for trapping signal */ int irmd_api; -/* this IPCP's data */ -#ifdef MAKE_CHECK -extern struct ipcp * _ipcp; /* defined in test */ -#else -struct ipcp * _ipcp; -#endif - -/* - * copied from ouroboros/dev. The shim needs access to the internals - * because it doesn't follow all steps necessary steps to get - * the info - */ - -/* the shim needs access to these internals */ -struct shim_ap_data { - pid_t api; - struct shm_rdrbuff * rdrb; - struct bmp * fds; - struct shm_ap_rbuff * rb; - - struct flow flows[AP_MAX_FLOWS]; - pthread_rwlock_t flows_lock; - - pthread_t mainloop; - pthread_t sduloop; - pthread_t handler; - pthread_t sdu_reader; - - bool fd_set_mod; - pthread_cond_t fd_set_cond; - pthread_mutex_t fd_set_lock; -} * _ap_instance; - -static int shim_ap_init() -{ - int i; - - _ap_instance = malloc(sizeof(struct shim_ap_data)); - if (_ap_instance == NULL) { - return -1; - } - - _ap_instance->api = getpid(); - - _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); - if (_ap_instance->fds == NULL) { - free(_ap_instance); - return -1; - } - - _ap_instance->rdrb = shm_rdrbuff_open(); - if (_ap_instance->rdrb == NULL) { - bmp_destroy(_ap_instance->fds); - free(_ap_instance); - return -1; - } - - _ap_instance->rb = shm_ap_rbuff_create_n(); - if (_ap_instance->rb == NULL) { - shm_rdrbuff_close(_ap_instance->rdrb); - bmp_destroy(_ap_instance->fds); - free(_ap_instance); - return -1; - } - - for (i = 0; i < AP_MAX_FLOWS; i ++) { - _ap_instance->flows[i].rb = NULL; - _ap_instance->flows[i].port_id = -1; - _ap_instance->flows[i].state = FLOW_NULL; - } - - pthread_rwlock_init(&_ap_instance->flows_lock, NULL); - pthread_cond_init(&_ap_instance->fd_set_cond, NULL); - pthread_mutex_init(&_ap_instance->fd_set_lock, NULL); - - return 0; -} - -void shim_ap_fini() -{ - int i = 0; - - if (_ap_instance == NULL) - return; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_SHUTDOWN) - LOG_WARN("Cleaning up AP while not in shutdown."); - - if (_ap_instance->fds != NULL) - bmp_destroy(_ap_instance->fds); - - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) - shm_rdrbuff_remove(_ap_instance->rdrb, i); - - if (_ap_instance->rdrb != NULL) - shm_rdrbuff_close(_ap_instance->rdrb); - if (_ap_instance->rb != NULL) - shm_ap_rbuff_destroy(_ap_instance->rb); +struct uf { + int udp; + int skfd; +}; - pthread_rwlock_wrlock(&_ap_instance->flows_lock); +struct { + uint32_t ip_addr; + uint32_t dns_addr; + /* listen server */ + struct sockaddr_in s_saddr; + int s_fd; - for (i = 0; i < AP_MAX_FLOWS; i ++) - if (_ap_instance->flows[i].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[i].rb); + fd_set flow_fd_s; + /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ + int uf_to_fd[FD_SETSIZE]; + struct uf fd_to_uf[IRMD_MAX_FLOWS]; + pthread_rwlock_t flows_lock; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader; - free(_ap_instance); -} + bool fd_set_mod; + pthread_cond_t fd_set_cond; + pthread_mutex_t fd_set_lock; +} udp_data; -/* only call this under flows_lock */ -static int port_id_to_fd(int port_id) +static void udp_data_init() { int i; - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (_ap_instance->flows[i].port_id == port_id - && _ap_instance->flows[i].state != FLOW_NULL) - return i; - } + for (i = 0; i < FD_SETSIZE; ++i) + udp_data.uf_to_fd[i] = -1; - return -1; -} - -static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) -{ - ssize_t index; - struct rb_entry e; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - - index = shm_rdrbuff_write_b(_ap_instance->rdrb, - _ap_instance->flows[fd].api, - 0, - 0, - (uint8_t *) buf, - count); - if (index < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + udp_data.fd_to_uf[i].skfd = -1; + udp_data.fd_to_uf[i].udp = -1; } - e.index = index; - e.port_id = _ap_instance->flows[fd].port_id; - - shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e); + FD_ZERO(&udp_data.flow_fd_s); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - return 0; + pthread_rwlock_init(&udp_data.flows_lock, NULL); + pthread_cond_init(&udp_data.fd_set_cond, NULL); + pthread_mutex_init(&udp_data.fd_set_lock, NULL); } -/* - * end copy from dev.c - */ - -/* only call this under flows_lock */ -static int udp_port_to_fd(int udp_port) +static void udp_data_fini() { - int i; - - struct sockaddr_in f_saddr; - socklen_t len = sizeof(f_saddr); - - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (_ap_instance->flows[i].state == FLOW_NULL) - continue; - - if (getsockname(i, (struct sockaddr *) &f_saddr, &len) < 0) - continue; - - if (f_saddr.sin_port == udp_port) - return i; - } - - return -1; -} - -struct ipcp_udp_data { - /* keep ipcp_data first for polymorphism */ - struct ipcp_data ipcp_data; - - uint32_t ip_addr; - uint32_t dns_addr; - /* listen server */ - struct sockaddr_in s_saddr; - int s_fd; - - /* only modify under _ap_instance->flows_lock */ - fd_set flow_fd_s; -}; - -struct ipcp_udp_data * ipcp_udp_data_create() -{ - struct ipcp_udp_data * udp_data; - struct ipcp_data * data; - enum ipcp_type ipcp_type; - - udp_data = malloc(sizeof(*udp_data)); - if (udp_data == NULL) { - LOG_ERR("Failed to allocate."); - return NULL; - } - - ipcp_type = THIS_TYPE; - data = (struct ipcp_data *) udp_data; - if (ipcp_data_init(data, ipcp_type) == NULL) { - free(udp_data); - return NULL; - } - - FD_ZERO(&udp_data->flow_fd_s); - - return udp_data; + pthread_rwlock_destroy(&udp_data.flows_lock); + pthread_mutex_destroy(&udp_data.fd_set_lock); + pthread_cond_destroy(&udp_data.fd_set_cond); } static void set_fd(int fd) { - pthread_mutex_lock(&_ap_instance->fd_set_lock); + pthread_mutex_lock(&udp_data.fd_set_lock); - _ap_instance->fd_set_mod = true; - FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + udp_data.fd_set_mod = true; + FD_SET(fd, &udp_data.flow_fd_s); - while (_ap_instance->fd_set_mod) - pthread_cond_wait(&_ap_instance->fd_set_cond, - &_ap_instance->fd_set_lock); + while (udp_data.fd_set_mod) + pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - pthread_mutex_unlock(&_ap_instance->fd_set_lock); + pthread_mutex_unlock(&udp_data.fd_set_lock); } static void clr_fd(int fd) { - pthread_mutex_lock(&_ap_instance->fd_set_lock); + pthread_mutex_lock(&udp_data.fd_set_lock); - _ap_instance->fd_set_mod = true; - FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + udp_data.fd_set_mod = true; + FD_CLR(fd, &udp_data.flow_fd_s); - while (_ap_instance->fd_set_mod) - pthread_cond_wait(&_ap_instance->fd_set_cond, - &_ap_instance->fd_set_lock); + while (udp_data.fd_set_mod) + pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock); - pthread_mutex_unlock(&_ap_instance->fd_set_lock); + pthread_mutex_unlock(&udp_data.fd_set_lock); } - -static int send_shim_udp_msg(shim_udp_msg_t * msg, - uint32_t dst_ip_addr) +static int send_shim_udp_msg(shim_udp_msg_t * msg, uint32_t dst_ip_addr) { buffer_t buf; struct sockaddr_in r_saddr; @@ -340,13 +158,12 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg, } buf.data = malloc(SHIM_UDP_MSG_SIZE); - if (buf.data == NULL) { + if (buf.data == NULL) return -1; - } shim_udp_msg__pack(msg, buf.data); - if (sendto(shim_data(_ipcp)->s_fd, + if (sendto(udp_data.s_fd, buf.data, buf.len, 0, @@ -409,8 +226,8 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, char * dst_name, char * src_ae_name) { - int fd; - int port_id; + int skfd; + int fd; struct sockaddr_in f_saddr; socklen_t f_saddr_len = sizeof(f_saddr); @@ -418,7 +235,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, LOG_DBG("Port request arrived from UDP port %d", ntohs(c_saddr->sin_port)); - if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { LOG_ERR("Could not create UDP socket."); return -1; } @@ -426,73 +243,72 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, memset((char *) &f_saddr, 0, sizeof(f_saddr)); f_saddr.sin_family = AF_INET; f_saddr.sin_addr.s_addr = local_ip; - - /* - * FIXME: we could have a port dedicated per registered AP - * Not that critical for UDP, but will be for LLC - */ - f_saddr.sin_port = 0; - if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { + if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { LOG_ERR("Could not bind to socket."); - close(fd); + close(skfd); return -1; } - if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { LOG_ERR("Could not get address from fd."); return -1; } - /* - * store the remote address in the file descriptor - * this avoids having to store the sockaddr_in in - * the flow structure - */ - - if (connect(fd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { + /* connect stores the remote address in the file descriptor */ + if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { LOG_ERR("Could not connect to remote UDP client."); - close(fd); + close(skfd); return -1; } - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); /* reply to IRM */ - port_id = ipcp_flow_req_arr(getpid(), - dst_name, - src_ae_name); - - if (port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not get port id from IRMd"); - close(fd); + fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); + if (fd < 0) { + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_ERR("Could not get new flow from IRMd."); + close(skfd); return -1; } - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].state = FLOW_PENDING; + pthread_rwlock_wrlock(&udp_data.flows_lock); + + udp_data.uf_to_fd[skfd] = fd; + udp_data.fd_to_uf[fd].skfd = skfd; + udp_data.fd_to_uf[fd].udp = f_saddr.sin_port; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Pending allocation request, port_id %d, UDP port (%d, %d).", - port_id, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); + LOG_DBG("Pending allocation request, fd %d, UDP port (%d, %d).", + fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); return 0; } +/* returns the n flow descriptor */ +static int udp_port_to_fd(int udp_port) +{ + int i; + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + if (udp_data.fd_to_uf[i].udp == udp_port) + return i; + + return -1; +} + static int ipcp_udp_port_alloc_reply(int src_udp_port, int dst_udp_port, int response) { - int fd = -1; - int ret = 0; - int port_id = -1; + int fd = -1; + int ret = 0; + int skfd = -1; struct sockaddr_in t_saddr; socklen_t t_saddr_len = sizeof(t_saddr); @@ -500,117 +316,82 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port, LOG_DBG("Received reply for flow on udp port %d.", ntohs(dst_udp_port)); - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); fd = udp_port_to_fd(dst_udp_port); - if (fd == -1) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Unknown flow on UDP port %d.", ntohs(dst_udp_port)); - return -1; /* -EUNKNOWNFLOW */ - } + skfd = udp_data.fd_to_uf[fd].skfd; - if (_ap_instance->flows[fd].state != FLOW_PENDING) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Flow on UDP port %d not pending.", - ntohs(dst_udp_port)); - return -1; /* -EFLOWNOTPENDING */ - } - - port_id = _ap_instance->flows[fd].port_id; - - if (response) { - _ap_instance->flows[fd].port_id = -1; - _ap_instance->flows[fd].rb = NULL; - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); - _ap_instance->flows[fd].state = FLOW_NULL; - } else { - /* get the original address with the LISTEN PORT */ - if (getpeername(fd, - (struct sockaddr *) &t_saddr, - &t_saddr_len) < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Flow with port_id %d has no peer.", port_id); - return -1; - } + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - /* connect to the flow udp port */ - t_saddr.sin_port = src_udp_port; + /* get the original address with the LISTEN PORT */ + if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) { + LOG_DBG("Flow with fd %d has no peer.", fd); + return -1; + } - if (connect(fd, - (struct sockaddr *) &t_saddr, - sizeof(t_saddr)) < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - close(fd); - return -1; - } + /* connect to the flow udp port */ + t_saddr.sin_port = src_udp_port; - _ap_instance->flows[fd].state = FLOW_ALLOCATED; + if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) { + close(skfd); + return -1; } - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); + set_fd(skfd); - if ((ret = ipcp_flow_alloc_reply(getpid(), - port_id, - response)) < 0) { - return -1; /* -EPIPE */ - } + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + + if (ipcp_flow_alloc_reply(fd, response) < 0) + return -1; - LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).", + LOG_DBG("Flow allocation completed, UDP ports: (%d, %d).", ntohs(dst_udp_port), ntohs(src_udp_port)); return ret; - } static int ipcp_udp_flow_dealloc_req(int udp_port) { - int fd = -1; - int port_id = -1; - - struct shm_ap_rbuff * rb; + int skfd = -1; + int fd = -1; - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); fd = udp_port_to_fd(udp_port); if (fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Could not find flow on UDP port %d.", ntohs(udp_port)); return 0; } - clr_fd(fd); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + skfd = udp_data.fd_to_uf[fd].skfd; - _ap_instance->flows[fd].state = FLOW_NULL; - port_id = _ap_instance->flows[fd].port_id; - _ap_instance->flows[fd].port_id = -1; - rb = _ap_instance->flows[fd].rb; - _ap_instance->flows[fd].rb = NULL; + udp_data.uf_to_fd[skfd] = -1; + udp_data.fd_to_uf[fd].udp = -1; + udp_data.fd_to_uf[fd].skfd = -1; - pthread_rwlock_unlock(&_ap_instance->flows_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - if (rb != NULL) - shm_ap_rbuff_close(rb); + clr_fd(skfd); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - irm_flow_dealloc(port_id); + flow_dealloc(fd); - close(fd); + close(skfd); - LOG_DBG("Flow with port_id %d deallocated.", port_id); + LOG_DBG("Flow with fd %d deallocated.", fd); return 0; } @@ -619,39 +400,28 @@ static void * ipcp_udp_listener() { uint8_t buf[SHIM_UDP_MSG_SIZE]; int n = 0; - struct sockaddr_in c_saddr; + int sfd = udp_data.s_fd; while (true) { - int sfd = 0; shim_udp_msg_t * msg = NULL; - pthread_rwlock_rdlock(&_ipcp->state_lock); - - sfd = shim_data(_ipcp)->s_fd; - - pthread_rwlock_unlock(&_ipcp->state_lock); - memset(&buf, 0, SHIM_UDP_MSG_SIZE); n = sizeof(c_saddr); n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); - - if (n < 0) { + if (n < 0) continue; - } /* flow alloc request from other host */ if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, sizeof(c_saddr.sin_addr.s_addr), AF_INET) - == NULL) { + == NULL) continue; - } msg = shim_udp_msg__unpack(NULL, n, buf); - if (msg == NULL) { + if (msg == NULL) continue; - } switch (msg->code) { case SHIM_UDP_MSG_CODE__FLOW_REQ: @@ -685,103 +455,80 @@ static void * ipcp_udp_listener() static void * ipcp_udp_sdu_reader() { int n; + int skfd; int fd; + /* FIXME: avoid this copy */ char buf[SHIM_UDP_MAX_SDU_SIZE]; struct sockaddr_in r_saddr; fd_set read_fds; int flags; + struct timeval tv = {0, FD_UPDATE_TIMEOUT}; while (true) { - struct timeval tv = {0, FD_UPDATE_TIMEOUT}; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - pthread_mutex_lock(&_ap_instance->fd_set_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); + pthread_mutex_lock(&udp_data.fd_set_lock); - read_fds = shim_data(_ipcp)->flow_fd_s; - _ap_instance->fd_set_mod = false; - pthread_cond_broadcast(&_ap_instance->fd_set_cond); + read_fds = udp_data.flow_fd_s; + udp_data.fd_set_mod = false; + pthread_cond_broadcast(&udp_data.fd_set_cond); - pthread_mutex_unlock(&_ap_instance->fd_set_lock); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&udp_data.fd_set_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) { + if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) continue; - } - for (fd = 0; fd < FD_SETSIZE; ++fd) { - if (!FD_ISSET(fd, &read_fds)) + for (skfd = 0; skfd < FD_SETSIZE; ++skfd) { + if (!FD_ISSET(skfd, &read_fds)) continue; - flags = fcntl(fd, F_GETFL, 0); - fcntl(fd, F_SETFL, flags | O_NONBLOCK); - + flags = fcntl(skfd, F_GETFL, 0); + fcntl(skfd, F_SETFL, flags | O_NONBLOCK); + fd = udp_data.uf_to_fd[skfd]; n = sizeof(r_saddr); - if ((n = recvfrom(fd, - buf, + if ((n = recvfrom(skfd, + &buf, SHIM_UDP_MAX_SDU_SIZE, 0, (struct sockaddr *) &r_saddr, (unsigned *) &n)) <= 0) continue; - /* send the sdu to the correct port_id */ - ipcp_udp_flow_write(fd, buf, n); + /* send the sdu to the correct fd */ + flow_write(fd, buf, n); } } return (void *) 0; } -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_udp_sdu_loop(void * o) { while (true) { - struct rb_entry * e; int fd; - int len = 0; - char * buf; + struct shm_du_buff * sdb; - e = shm_ap_rbuff_read(_ap_instance->rb); - if (e == NULL) { + fd = ipcp_flow_read(&sdb); + if (fd < 0) continue; - } - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - len = shm_rdrbuff_read((uint8_t **) &buf, - _ap_instance->rdrb, - e->index); - if (len <= 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); - free(e); - continue; - } - - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + fd = udp_data.fd_to_uf[fd].skfd; - fd = port_id_to_fd(e->port_id); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - if (fd == -1) { - free(e); - continue; - } + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, buf, len, 0) < 0) + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) LOG_ERR("Failed to send SDU."); - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (_ap_instance->rdrb != NULL) - shm_rdrbuff_remove(_ap_instance->rdrb, e->index); - - pthread_rwlock_unlock(&_ipcp->state_lock); - - free(e); + ipcp_flow_del(sdb); } return (void *) 1; @@ -789,23 +536,16 @@ static void * ipcp_udp_sdu_loop(void * o) void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: if (info->si_pid == irmd_api) { - LOG_DBG("IPCP %d terminating by order of %d. Bye.", - getpid(), info->si_pid); - - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(_ipcp, IPCP_SHUTDOWN); + ipcp_set_state(IPCP_SHUTDOWN); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); } default: return; @@ -865,54 +605,52 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) LOG_WARN("Failed to set SO_REUSEADDR."); memset((char *) &s_saddr, 0, sizeof(s_saddr)); - shim_data(_ipcp)->s_saddr.sin_family = AF_INET; - shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; - shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT; + udp_data.s_saddr.sin_family = AF_INET; + udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr; + udp_data.s_saddr.sin_port = LISTEN_PORT; if (bind(fd, - (struct sockaddr *) &shim_data(_ipcp)->s_saddr, - sizeof(shim_data(_ipcp)->s_saddr)) < 0) { + (struct sockaddr *) &udp_data.s_saddr, + sizeof(udp_data.s_saddr)) < 0) { LOG_ERR("Couldn't bind to %s.", ipstr); close(fd); return -1; } - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("IPCP in wrong state."); close(fd); return -1; } - shim_data(_ipcp)->s_fd = fd; - shim_data(_ipcp)->ip_addr = conf->ip_addr; - shim_data(_ipcp)->dns_addr = conf->dns_addr; + udp_data.s_fd = fd; + udp_data.ip_addr = conf->ip_addr; + udp_data.dns_addr = conf->dns_addr; - FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s); - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); - pthread_create(&_ap_instance->handler, + pthread_create(&udp_data.handler, NULL, ipcp_udp_listener, NULL); - pthread_create(&_ap_instance->sdu_reader, + pthread_create(&udp_data.sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); - pthread_create(&_ap_instance->sduloop, + pthread_create(&udp_data.sduloop, NULL, ipcp_udp_sdu_loop, NULL); - pthread_rwlock_unlock(&_ipcp->state_lock); - - LOG_DBG("Bootstrapped shim IPCP over UDP with api %d.", - getpid()); + pthread_rwlock_unlock(&ipcpi.state_lock); + LOG_DBG("Bootstrapped shim IPCP over UDP with api %d.", getpid()); LOG_DBG("Bound to IP address %s.", ipstr); LOG_DBG("DNS server address is %s.", dnsstr); @@ -1059,10 +797,10 @@ static int ipcp_udp_name_reg(char * name) return -1; } - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(ipcpi.data, name)) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to add %s to local registry.", name); return -1; } @@ -1070,12 +808,12 @@ static int ipcp_udp_name_reg(char * name) #ifdef CONFIG_OUROBOROS_ENABLE_DNS /* register application with DNS server */ - dns_addr = shim_data(_ipcp)->dns_addr; + dns_addr = udp_data.dns_addr; - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (dns_addr != 0) { - ip_addr = shim_data(_ipcp)->ip_addr; + ip_addr = udp_data.ip_addr; if (inet_ntop(AF_INET, &ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { @@ -1091,14 +829,14 @@ static int ipcp_udp_name_reg(char * name) dnsstr, name, DNS_TTL, ipstr); if (ddns_send(cmd)) { - pthread_rwlock_rdlock(&_ipcp->state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + ipcp_data_del_reg_entry(ipcpi.data, name); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } } #else - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); #endif LOG_DBG("Registered %s.", name); @@ -1122,11 +860,11 @@ static int ipcp_udp_name_unreg(char * name) #ifdef CONFIG_OUROBOROS_ENABLE_DNS /* unregister application with DNS server */ - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - dns_addr = shim_data(_ipcp)->dns_addr; + dns_addr = udp_data.dns_addr; - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) @@ -1140,17 +878,16 @@ static int ipcp_udp_name_unreg(char * name) } #endif - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); + ipcp_data_del_reg_entry(ipcpi.data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } -static int ipcp_udp_flow_alloc(pid_t n_api, - int port_id, +static int ipcp_udp_flow_alloc(int fd, char * dst_name, char * src_ae_name, enum qos_cube qos) @@ -1158,15 +895,13 @@ static int ipcp_udp_flow_alloc(pid_t n_api, struct sockaddr_in r_saddr; /* server address */ struct sockaddr_in f_saddr; /* flow */ socklen_t f_saddr_len = sizeof(f_saddr); - int fd; + int skfd; struct hostent * h; uint32_t ip_addr = 0; #ifdef CONFIG_OUROBOROS_ENABLE_DNS uint32_t dns_addr = 0; #endif - struct shm_ap_rbuff * rb; - - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; @@ -1179,11 +914,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api, if (qos != QOS_CUBE_BE) LOG_DBG("QoS requested. UDP/IP can't do that."); - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) - return -1; /* -ENORBUFF */ - - fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ memset((char *) &f_saddr, 0, sizeof(f_saddr)); @@ -1191,31 +922,31 @@ static int ipcp_udp_flow_alloc(pid_t n_api, f_saddr.sin_addr.s_addr = local_ip; f_saddr.sin_port = 0; - if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { - close(fd); + if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { + close(skfd); return -1; } - if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { LOG_ERR("Could not get address from fd."); - close(fd); + close(skfd); return -1; } - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Won't allocate flow with non-enrolled IPCP."); - close(fd); + close(skfd); return -1; /* -ENOTENROLLED */ } #ifdef CONFIG_OUROBOROS_ENABLE_DNS - dns_addr = shim_data(_ipcp)->dns_addr; + dns_addr = udp_data.dns_addr; if (dns_addr != 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { @@ -1224,11 +955,11 @@ static int ipcp_udp_flow_alloc(pid_t n_api, return -1; } - pthread_rwlock_rdlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Won't allocate flow with non-enrolled IPCP."); - close(fd); + close(skfd); return -1; /* -ENOTENROLLED */ } } else { @@ -1236,7 +967,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api, h = gethostbyname(dst_name); if (h == NULL) { LOG_DBG("Could not resolve %s.", dst_name); - close(fd); + close(skfd); return -1; } @@ -1251,60 +982,46 @@ static int ipcp_udp_flow_alloc(pid_t n_api, r_saddr.sin_addr.s_addr = ip_addr; r_saddr.sin_port = LISTEN_PORT; - if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { - close(fd); + if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { + close(skfd); return -1; } - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].state = FLOW_PENDING; - _ap_instance->flows[fd].rb = rb; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); - set_fd(fd); + udp_data.fd_to_uf[fd].udp = f_saddr.sin_port; + udp_data.fd_to_uf[fd].skfd = skfd; + udp_data.uf_to_fd[skfd] = fd; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (ipcp_udp_port_alloc(ip_addr, f_saddr.sin_port, dst_name, src_ae_name) < 0) { - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - - clr_fd(fd); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); - _ap_instance->flows[fd].port_id = -1; - _ap_instance->flows[fd].state = FLOW_NULL; - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); - _ap_instance->flows[fd].rb = NULL; + udp_data.fd_to_uf[fd].udp = -1; + udp_data.fd_to_uf[fd].skfd = -1; + udp_data.uf_to_fd[skfd] = -1; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - close(fd); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + close(skfd); return -1; } - LOG_DBG("Flow pending on port_id %d.", port_id); + LOG_DBG("Flow pending on fd %d, UDP port %d.", + fd, ntohs(f_saddr.sin_port)); return fd; } -static int ipcp_udp_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +static int ipcp_udp_flow_alloc_resp(int fd, int response) { - struct shm_ap_rbuff * rb; - int fd = -1; + int skfd = -1; struct sockaddr_in f_saddr; struct sockaddr_in r_saddr; socklen_t len = sizeof(r_saddr); @@ -1312,148 +1029,95 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api, if (response) return 0; - pthread_rwlock_rdlock(&_ipcp->state_lock); - - /* awaken pending flow */ - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); - fd = port_id_to_fd(port_id); - if (fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Could not find flow with port_id %d.", port_id); - return -1; - } + skfd = udp_data.fd_to_uf[fd].skfd; - if (_ap_instance->flows[fd].state != FLOW_PENDING) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Flow was not pending."); + if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) { + LOG_DBG("Socket with fd %d has no address.", skfd); return -1; } - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) { - LOG_ERR("Could not open N + 1 ringbuffer."); - _ap_instance->flows[fd].state = FLOW_NULL; - _ap_instance->flows[fd].port_id = -1; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) { + LOG_DBG("Socket with fd %d has no peer.", skfd); return -1; } - if (getsockname(fd, (struct sockaddr *) &f_saddr, &len) < 0) { - LOG_DBG("Flow with port_id %d has no socket.", port_id); - return -1; - } + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - if (getpeername(fd, (struct sockaddr *) &r_saddr, &len) < 0) { - LOG_DBG("Flow with port_id %d has no peer.", port_id); - return -1; - } + set_fd(skfd); - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = rb; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - - set_fd(fd); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, f_saddr.sin_port, r_saddr.sin_port, response) < 0) { - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - - clr_fd(fd); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - _ap_instance->flows[fd].state = FLOW_NULL; - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); - _ap_instance->flows[fd].rb = NULL; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); + clr_fd(skfd); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); + LOG_DBG("Accepted flow, fd %d on UDP port %d.", + fd, ntohs(f_saddr.sin_port)); return 0; } -static int ipcp_udp_flow_dealloc(int port_id) +static int ipcp_udp_flow_dealloc(int fd) { - int fd = -1; + int skfd = -1; int remote_udp = -1; - struct shm_ap_rbuff * rb; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_rdlock(&_ap_instance->flows_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&udp_data.flows_lock); - fd = port_id_to_fd(port_id); - if (fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Could not find flow with port_id %d.", port_id); - return 0; - } + skfd = udp_data.fd_to_uf[fd].skfd; - clr_fd(fd); + udp_data.uf_to_fd[skfd] = -1; + udp_data.fd_to_uf[fd].udp = -1; + udp_data.fd_to_uf[fd].skfd = -1; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - _ap_instance->flows[fd].state = FLOW_NULL; - _ap_instance->flows[fd].port_id = -1; - rb = _ap_instance->flows[fd].rb; - _ap_instance->flows[fd].rb = NULL; + clr_fd(skfd); - pthread_rwlock_unlock(&_ap_instance->flows_lock); + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (rb != NULL) - shm_ap_rbuff_close(rb); - - if (getpeername(fd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBG("Flow with port_id %d has no peer.", port_id); - close(fd); + if (getpeername(skfd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) { + LOG_DBG("Socket with fd %d has no peer.", skfd); + close(skfd); return 0; } remote_udp = r_saddr.sin_port; r_saddr.sin_port = LISTEN_PORT; - if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); - close(fd); + if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { + close(skfd); return 0 ; } - if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, - remote_udp) < 0) { + if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, remote_udp) < 0) { LOG_DBG("Could not notify remote."); - pthread_rwlock_unlock(&_ipcp->state_lock); - close(fd); + close(skfd); return 0; } - pthread_rwlock_unlock(&_ipcp->state_lock); - - close(fd); + close(skfd); - LOG_DBG("Flow with port_id %d deallocated.", port_id); + LOG_DBG("Flow with fd %d deallocated.", fd); return 0; } @@ -1468,31 +1132,6 @@ static struct ipcp_ops udp_ops = { .ipcp_flow_dealloc = ipcp_udp_flow_dealloc }; -static struct ipcp * ipcp_udp_create() -{ - struct ipcp * i; - struct ipcp_udp_data * data; - - i = ipcp_instance_create(); - if (i == NULL) - return NULL; - - data = ipcp_udp_data_create(); - if (data == NULL) { - free(i); - return NULL; - } - - i->data = (struct ipcp_data *) data; - i->ops = &udp_ops; - - i->state = IPCP_INIT; - - return i; -} - -#ifndef MAKE_CHECK - int main(int argc, char * argv[]) { struct sigaction sig_act; @@ -1508,7 +1147,9 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (shim_ap_init() < 0) { + udp_data_init(); + + if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -1528,17 +1169,13 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_udp_create(); - if (_ipcp == NULL) { - LOG_ERR("Failed to create IPCP."); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (ipcp_init(THIS_TYPE, &udp_ops) < 0) { close_logfile(); exit(EXIT_FAILURE); } - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - - pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (ipcp_create_r(getpid())) { @@ -1547,24 +1184,21 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - pthread_join(_ap_instance->mainloop, NULL); + ipcp_fini(); - pthread_cancel(_ap_instance->handler); - pthread_cancel(_ap_instance->sdu_reader); - pthread_cancel(_ap_instance->sduloop); + pthread_cancel(udp_data.handler); + pthread_cancel(udp_data.sdu_reader); + pthread_cancel(udp_data.sduloop); - pthread_join(_ap_instance->sduloop, NULL); - pthread_join(_ap_instance->handler, NULL); - pthread_join(_ap_instance->sdu_reader, NULL); + pthread_join(udp_data.sduloop, NULL); + pthread_join(udp_data.handler, NULL); + pthread_join(udp_data.sdu_reader, NULL); - shim_ap_fini(); + ap_fini(); - ipcp_data_destroy(_ipcp->data); - free(_ipcp); + udp_data_fini(); close_logfile(); exit(EXIT_SUCCESS); } - -#endif /* MAKE_CHECK */ diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt index 05919326..16b53414 100644 --- a/src/irmd/CMakeLists.txt +++ b/src/irmd/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCE_FILES # Add source files here api_table.c apn_table.c + ipcp.c irm_flow.c main.c registry.c diff --git a/src/lib/ipcp.c b/src/irmd/ipcp.c index 01741121..f79e6caf 100644 --- a/src/lib/ipcp.c +++ b/src/irmd/ipcp.c @@ -20,16 +20,17 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "lib-ipcp" - #include <ouroboros/config.h> #include <ouroboros/errno.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/common.h> -#include <ouroboros/logs.h> #include <ouroboros/utils.h> #include <ouroboros/sockets.h> +#define OUROBOROS_PREFIX "irmd/ipcp" + +#include <ouroboros/logs.h> + +#include "ipcp.h" + #include <stdlib.h> #include <string.h> #include <signal.h> @@ -42,11 +43,10 @@ static void close_ptr(void * o) { - close(*((int *) o)); + close(*(int *) o); } -static ipcp_msg_t * send_recv_ipcp_msg(pid_t api, - ipcp_msg_t * msg) +ipcp_msg_t * send_recv_ipcp_msg(pid_t api, ipcp_msg_t * msg) { int sockfd = 0; buffer_t buf; @@ -177,31 +177,6 @@ pid_t ipcp_create(enum ipcp_type ipcp_type) exit(EXIT_FAILURE); } -int ipcp_create_r(pid_t api) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.has_api = true; - msg.api = api; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - int ipcp_destroy(pid_t api) { int status; @@ -399,68 +374,6 @@ int ipcp_flow_alloc_resp(pid_t api, return ret; } -int ipcp_flow_req_arr(pid_t api, - char * dst_name, - char * src_ae_name) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int port_id = -1; - - if (dst_name == NULL || src_ae_name == NULL) - return -EINVAL; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_api = true; - msg.api = api; - msg.dst_name = dst_name; - msg.ae_name = src_ae_name; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -1; - - if (!recv_msg->has_port_id) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - port_id = recv_msg->port_id; - irm_msg__free_unpacked(recv_msg, NULL); - - return port_id; -} - -int ipcp_flow_alloc_reply(pid_t api, - int port_id, - int response) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.port_id = port_id; - msg.has_port_id = true; - msg.response = response; - msg.has_response = true; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -1; - - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} - - int ipcp_flow_dealloc(pid_t api, int port_id) { @@ -487,28 +400,3 @@ int ipcp_flow_dealloc(pid_t api, return ret; } - -int irm_flow_dealloc(int port_id) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_port_id = true; - msg.port_id = port_id; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return 0; - - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return 0; - } - - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; -} diff --git a/include/ouroboros/ipcp.h b/src/irmd/ipcp.h index 98337da6..930695fa 100644 --- a/include/ouroboros/ipcp.h +++ b/src/irmd/ipcp.h @@ -29,14 +29,9 @@ #ifndef OUROBOROS_IPCP_H #define OUROBOROS_IPCP_H -struct ipcp; - /* Returns the process id */ pid_t ipcp_create(enum ipcp_type ipcp_type); -/* IPCP calls this when it is initialized */ -int ipcp_create_r(pid_t api); - int ipcp_destroy(pid_t api); int ipcp_enroll(pid_t api, @@ -45,8 +40,6 @@ int ipcp_enroll(pid_t api, int ipcp_bootstrap(pid_t api, dif_config_msg_t * conf); -/* Flow related ops, these go from IRMd to IPCP */ - int ipcp_name_reg(pid_t api, char * name); int ipcp_name_unreg(pid_t api, @@ -63,19 +56,7 @@ int ipcp_flow_alloc_resp(pid_t api, pid_t n_api, int response); -/* These operations go from the IPCP to the IRMd */ - -/* Returns the port_id */ -int ipcp_flow_req_arr(pid_t api, - char * dst_name, - char * src_ae_name); -int ipcp_flow_alloc_reply(pid_t api, - int port_id, - int response); - int ipcp_flow_dealloc(pid_t api, int port_id); -int irm_flow_dealloc(int port_id); - #endif /* OUROBOROS_IPCP_H */ diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index d9fe3fb3..b99c6f97 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -58,6 +58,11 @@ void irm_flow_destroy(struct irm_flow * f) { pthread_mutex_lock(&f->state_lock); + if (f->state == FLOW_DESTROY) { + pthread_mutex_unlock(&f->state_lock); + return; + } + if (f->state == FLOW_PENDING) f->state = FLOW_DESTROY; else @@ -75,3 +80,45 @@ void irm_flow_destroy(struct irm_flow * f) free(f); } + +enum flow_state irm_flow_get_state(struct irm_flow * f) +{ + enum flow_state state; + + pthread_mutex_lock(&f->state_lock); + + state = f->state; + + pthread_mutex_unlock(&f->state_lock); + + return state; +} + +void irm_flow_set_state(struct irm_flow * f, enum flow_state state) +{ + pthread_mutex_lock(&f->state_lock); + + f->state = state; + pthread_cond_broadcast(&f->state_cond); + + pthread_mutex_unlock(&f->state_lock); +} + +enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) +{ + pthread_mutex_lock(&f->state_lock); + + while (!(f->state == state || f->state == FLOW_DESTROY)) + pthread_cond_wait(&f->state_cond, &f->state_lock); + + if (state == FLOW_DESTROY) { + f->state = FLOW_NULL; + pthread_cond_broadcast(&f->state_cond); + } + + state = f->state; + + pthread_mutex_unlock(&f->state_lock); + + return state; +} diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index b7e5a1be..db6598bf 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -24,12 +24,18 @@ #define OUROBOROS_IRMD_IRM_FLOW_H #include <ouroboros/list.h> -#include <ouroboros/shared.h> #include <sys/types.h> #include <pthread.h> #include <time.h> +enum flow_state { + FLOW_NULL = 0, + FLOW_PENDING, + FLOW_ALLOCATED, + FLOW_DESTROY +}; + struct irm_flow { struct list_head next; @@ -46,6 +52,16 @@ struct irm_flow { }; struct irm_flow * irm_flow_create(); + void irm_flow_destroy(struct irm_flow * f); +enum flow_state irm_flow_get_state(struct irm_flow * f); + + +void irm_flow_set_state(struct irm_flow * f, + enum flow_state state); + +enum flow_state irm_flow_wait_state(struct irm_flow * f, + enum flow_state state); + #endif /* OUROBOROS_IRMD_IRM_FLOW_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index cc9160bf..523741ef 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -21,14 +21,9 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "irmd" - #include <ouroboros/config.h> #include <ouroboros/errno.h> -#include <ouroboros/logs.h> #include <ouroboros/sockets.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/nsm.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/irm_config.h> @@ -36,14 +31,19 @@ #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/bitmap.h> -#include <ouroboros/flow.h> #include <ouroboros/qos.h> #include <ouroboros/time_utils.h> +#define OUROBOROS_PREFIX "irmd" + +#include <ouroboros/logs.h> + + #include "utils.h" #include "registry.h" #include "irm_flow.h" #include "api_table.h" +#include "ipcp.h" #include <sys/socket.h> #include <sys/un.h> @@ -60,10 +60,12 @@ struct ipcp_entry { struct list_head next; + char * name; pid_t api; enum ipcp_type type; char * dif_name; + pthread_cond_t init_cond; pthread_mutex_t init_lock; bool init; @@ -100,7 +102,7 @@ struct irm { pthread_t irm_sanitize; pthread_t shm_sanitize; -} * irmd = NULL; +} * irmd; static struct irm_flow * get_irm_flow(int port_id) { @@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->port_id == port_id) return e; } @@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->n_api == n_api) return e; } @@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name) return NULL; } - LOG_INFO("New instance (%d) of %s added.", api, e->apn); - + LOG_DBG("New instance (%d) of %s added.", api, e->apn); LOG_DBG("This instance accepts flows for:"); list_for_each(p, &e->names) { struct str_el * s = list_entry(p, struct str_el, next); @@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api, struct api_entry * e = NULL; int ret = -1; - pid_t f_n_1_api; - pid_t f_n_api; + pid_t api_n1; + pid_t api_n; pthread_rwlock_rdlock(&irmd->state_lock); @@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api, return -1; } - f_n_api = f->n_api; - f_n_1_api = f->n_1_api; - - if (!response) { - f->state = FLOW_ALLOCATED; - pthread_cond_signal(&f->state_cond); - } + api_n = f->n_api; + api_n1 = f->n_1_api; pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - ret = ipcp_flow_alloc_resp(f_n_1_api, - port_id, - f_n_api, - response); + ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); + + if (!(response || ret)) + irm_flow_set_state(f, FLOW_ALLOCATED); + return ret; } @@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t api, { struct irm_flow * f; pid_t ipcp; + int port_id; /* FIXME: Map qos_spec to qos_cube */ @@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t api, f->n_api = api; f->state = FLOW_PENDING; + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) LOG_WARN("Failed to set timestamp."); @@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - f->port_id = bmp_allocate(irmd->port_ids); + port_id = f->port_id = bmp_allocate(irmd->port_ids); f->n_1_api = ipcp; list_add(&f->next, &irmd->irm_flows); @@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - if (ipcp_flow_alloc(ipcp, - f->port_id, - f->n_api, - dst_name, - src_ae_name, - QOS_CUBE_BE) < 0) { + if (ipcp_flow_alloc(ipcp, port_id, api, + dst_name, src_ae_name, QOS_CUBE_BE) < 0) { pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); list_del(&f->next); bmp_release(irmd->port_ids, f->port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - free(f); + irm_flow_destroy(f); return NULL; } @@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id) f = get_irm_flow(port_id); if (f == NULL) { - LOG_ERR("Could not find port %d.", port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + LOG_ERR("Could not find port %d.", port_id); return -1; } - if (f->state == FLOW_NULL) { - LOG_INFO("Port %d is deprecated.", port_id); + if (irm_flow_get_state(f) == FLOW_NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + LOG_INFO("Port %d is deprecated.", port_id); return -1; } - if (f->state == FLOW_ALLOCATED) { + if (irm_flow_get_state(f) == FLOW_ALLOCATED) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; @@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - pthread_mutex_lock(&f->state_lock); - - while (f->state == FLOW_PENDING) - pthread_cond_wait(&f->state_cond, &f->state_lock); - - pthread_mutex_unlock(&f->state_lock); - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - pthread_mutex_lock(&f->state_lock); - - if (f->state == FLOW_ALLOCATED) { - pthread_cond_broadcast(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); + if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) return 0; - } - - f->state = FLOW_NULL; - pthread_cond_broadcast(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); return -1; } -static int flow_dealloc(int port_id) +static int flow_dealloc(pid_t api, int port_id) { pid_t n_1_api; int ret = 0; @@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); - ret = ipcp_flow_dealloc(n_1_api, port_id); + if (api != n_1_api) + ret = ipcp_flow_dealloc(n_1_api, port_id); pthread_rwlock_unlock(&irmd->state_lock); @@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t api, struct pid_el * c_api; pid_t h_api = -1; + LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.", + api, dst_name, ae_name); + f = irm_flow_create(); if (f == NULL) { LOG_ERR("Failed to create irm_flow."); @@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t api, return f; } -static int flow_alloc_reply(int port_id, - int response) +static int flow_alloc_reply(int port_id, int response) { struct irm_flow * f; @@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id, return -1; } - pthread_mutex_lock(&f->state_lock); - if (!response) - f->state = FLOW_ALLOCATED; - + irm_flow_set_state(f, FLOW_ALLOCATED); else - f->state = FLOW_NULL; - - if (pthread_cond_signal(&f->state_cond)) - LOG_ERR("Failed to send signal."); - - pthread_mutex_unlock(&f->state_lock); + irm_flow_set_state(f, FLOW_NULL); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); @@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id, return 0; } -static int flow_dealloc_ipcp(int port_id) -{ - struct irm_flow * f = NULL; - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return 0; - } - - list_del(&f->next); - - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - irm_flow_destroy(f); - - return 0; -} - static void irm_destroy() { struct list_head * p; @@ -1729,46 +1671,35 @@ void * irm_sanitize() struct irm_flow * f = list_entry(p, struct irm_flow, next); - pthread_mutex_lock(&f->state_lock); - - if (f->state == FLOW_PENDING && - ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { + if (irm_flow_get_state(f) == FLOW_PENDING + && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { LOG_INFO("Pending port_id %d timed out.", f->port_id); - f->state = FLOW_NULL; - pthread_cond_signal(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); + irm_flow_set_state(f, FLOW_NULL); continue; } - pthread_mutex_unlock(&f->state_lock); - if (kill(f->n_api, 0) < 0) { - struct shm_ap_rbuff * n_rb = - shm_ap_rbuff_open_s(f->n_api); + struct shm_ap_rbuff * rb = + shm_ap_rbuff_open(f->n_api); bmp_release(irmd->port_ids, f->port_id); - list_del(&f->next); LOG_INFO("AP-I %d gone, flow %d deallocated.", f->n_api, f->port_id); ipcp_flow_dealloc(f->n_1_api, f->port_id); - if (n_rb != NULL) - shm_ap_rbuff_destroy(n_rb); + if (rb != NULL) + shm_ap_rbuff_destroy(rb); irm_flow_destroy(f); continue; } if (kill(f->n_1_api, 0) < 0) { - struct shm_ap_rbuff * n_1_rb_s = - shm_ap_rbuff_open_s(f->n_1_api); - struct shm_ap_rbuff * n_1_rb_n = - shm_ap_rbuff_open_n(f->n_1_api); + struct shm_ap_rbuff * rb = + shm_ap_rbuff_open(f->n_1_api); list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); - if (n_1_rb_n != NULL) - shm_ap_rbuff_destroy(n_1_rb_n); - if (n_1_rb_s != NULL) - shm_ap_rbuff_destroy(n_1_rb_s); + if (rb != NULL) + shm_ap_rbuff_destroy(rb); irm_flow_destroy(f); } } @@ -1939,7 +1870,7 @@ void * mainloop() break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->port_id); + ret_msg.result = flow_dealloc(msg->api, msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->api, @@ -1950,7 +1881,6 @@ void * mainloop() ret_msg.result = -1; break; } - /* FIXME: badly timed dealloc may give SEGV */ ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_api = true; @@ -1961,10 +1891,6 @@ void * mainloop() ret_msg.result = flow_alloc_reply(msg->port_id, msg->response); break; - case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: - ret_msg.has_result = true; - ret_msg.result = flow_dealloc_ipcp(msg->port_id); - break; default: LOG_ERR("Don't know that message code."); break; diff --git a/src/irmd/utils.h b/src/irmd/utils.h index 37c745af..2fbc8ef2 100644 --- a/src/irmd/utils.h +++ b/src/irmd/utils.h @@ -40,7 +40,8 @@ struct pid_el { pid_t pid; }; -int wildcard_match(const char * pattern, const char * string); +int wildcard_match(const char * pattern, + const char * string); /* functions for copying and destroying arguments list */ char ** argvdup(char ** argv); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 14e7051a..b94d0eea 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -30,7 +30,6 @@ set(SOURCE_FILES bitmap.c cdap.c dev.c - ipcp.c irm.c list.c lockfile.c diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 8b1b3bc6..92a05221 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -24,6 +24,7 @@ #include <ouroboros/cdap.h> #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> +#include <ouroboros/fcntl.h> #include <stdlib.h> #include <pthread.h> diff --git a/src/lib/dev.c b/src/lib/dev.c index 391563da..178ee287 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -24,6 +24,7 @@ #include <ouroboros/errno.h> #include <ouroboros/dev.h> #include <ouroboros/sockets.h> +#include <ouroboros/fcntl.h> #include <ouroboros/bitmap.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> @@ -41,6 +42,87 @@ struct flow_set { pthread_rwlock_t lock; }; +enum port_state { + PORT_NULL = 0, + PORT_ID_PENDING, + PORT_ID_ASSIGNED, + PORT_DESTROY +}; + +struct port { + int fd; + + enum port_state state; + pthread_mutex_t state_lock; + pthread_cond_t state_cond; +}; + +static void port_destroy(struct port * p) +{ + pthread_mutex_lock(&p->state_lock); + + if (p->state == PORT_DESTROY) { + pthread_mutex_unlock(&p->state_lock); + return; + } + + if (p->state == PORT_ID_PENDING) + p->state = PORT_DESTROY; + else + p->state = PORT_NULL; + + pthread_cond_signal(&p->state_cond); + + while (p->state != PORT_NULL) + pthread_cond_wait(&p->state_cond, &p->state_lock); + + p->fd = -1; + p->state = PORT_ID_PENDING; + + pthread_mutex_unlock(&p->state_lock); +} + +static void port_set_state(struct port * p, enum port_state state) +{ + pthread_mutex_lock(&p->state_lock); + + if (p->state == PORT_DESTROY) { + pthread_mutex_unlock(&p->state_lock); + return; + } + + p->state = state; + pthread_cond_broadcast(&p->state_cond); + + pthread_mutex_unlock(&p->state_lock); +} + +enum port_state port_wait_assign(struct port * p) +{ + enum port_state state; + + pthread_mutex_lock(&p->state_lock); + + if (p->state != PORT_ID_PENDING) { + pthread_mutex_unlock(&p->state_lock); + return -1; + } + + while (!(p->state == PORT_ID_ASSIGNED || p->state == PORT_DESTROY)) + pthread_cond_wait(&p->state_cond, &p->state_lock); + + if (p->state == PORT_DESTROY) { + p->state = PORT_NULL; + pthread_cond_broadcast(&p->state_cond); + } + + state = p->state; + + pthread_mutex_unlock(&p->state_lock); + + return state; +} + struct flow { struct shm_ap_rbuff * rb; int port_id; @@ -48,24 +130,24 @@ struct flow { pid_t api; - struct timespec * timeout; + struct timespec timeout; }; -struct ap_instance { +struct { char * ap_name; char * daf_name; pid_t api; struct shm_rdrbuff * rdrb; - struct bmp * fds; struct shm_ap_rbuff * rb; pthread_rwlock_t data_lock; - struct flow flows[AP_MAX_FLOWS]; - int ports[AP_MAX_FLOWS]; + struct bmp * fds; + struct flow * flows; + struct port * ports; pthread_rwlock_t flows_lock; -} * ai; +} ai; static int api_announce(char * ap_name) { @@ -76,12 +158,12 @@ static int api_announce(char * ap_name) msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE; msg.has_api = true; - pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai.data_lock); - msg.api = ai->api; + msg.api = ai.api; msg.ap_name = ap_name; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { @@ -104,47 +186,61 @@ int ap_init(char * ap_name) ap_name = path_strip(ap_name); - ai = malloc(sizeof(*ai)); - if (ai == NULL) { - return -ENOMEM; - } - - ai->api = getpid(); - ai->ap_name = ap_name; - ai->daf_name = NULL; + ai.api = getpid(); + ai.ap_name = ap_name; + ai.daf_name = NULL; - ai->fds = bmp_create(AP_MAX_FLOWS, 0); - if (ai->fds == NULL) { - free(ai); + ai.fds = bmp_create(AP_MAX_FLOWS, 0); + if (ai.fds == NULL) return -ENOMEM; + + ai.rdrb = shm_rdrbuff_open(); + if (ai.rdrb == NULL) { + bmp_destroy(ai.fds); + return -1; } - ai->rdrb = shm_rdrbuff_open(); - if (ai->rdrb == NULL) { - bmp_destroy(ai->fds); - free(ai); + ai.rb = shm_ap_rbuff_create(); + if (ai.rb == NULL) { + shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); return -1; } - ai->rb = shm_ap_rbuff_create_s(); - if (ai->rb == NULL) { - shm_rdrbuff_close(ai->rdrb); - bmp_destroy(ai->fds); - free(ai); + ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS); + if (ai.flows == NULL) { + shm_ap_rbuff_destroy(ai.rb); + shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); return -1; } for (i = 0; i < AP_MAX_FLOWS; ++i) { - ai->flows[i].rb = NULL; - ai->flows[i].port_id = -1; - ai->flows[i].oflags = 0; - ai->flows[i].api = -1; - ai->flows[i].timeout = NULL; - ai->ports[i] = -1; + ai.flows[i].rb = NULL; + ai.flows[i].port_id = -1; + ai.flows[i].oflags = 0; + ai.flows[i].api = -1; + ai.flows[i].timeout.tv_sec = 0; + ai.flows[i].timeout.tv_nsec = 0; } - pthread_rwlock_init(&ai->flows_lock, NULL); - pthread_rwlock_init(&ai->data_lock, NULL); + ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); + if (ai.flows == NULL) { + free(ai.flows); + shm_ap_rbuff_destroy(ai.rb); + shm_rdrbuff_close(ai.rdrb); + bmp_destroy(ai.fds); + return -1; + } + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + ai.ports[i].state = PORT_ID_PENDING; + pthread_mutex_init(&ai.ports[i].state_lock, NULL); + pthread_cond_init(&ai.ports[i].state_cond, NULL); + } + + pthread_rwlock_init(&ai.flows_lock, NULL); + pthread_rwlock_init(&ai.data_lock, NULL); if (ap_name != NULL) return api_announce(ap_name); @@ -152,46 +248,49 @@ int ap_init(char * ap_name) return 0; } -void ap_fini(void) +void ap_fini() { int i = 0; - if (ai == NULL) - return; - - pthread_rwlock_wrlock(&ai->data_lock); + pthread_rwlock_wrlock(&ai.data_lock); /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0) - shm_rdrbuff_remove(ai->rdrb, i); + while ((i = shm_ap_rbuff_peek_idx(ai.rb)) >= 0) + shm_rdrbuff_remove(ai.rdrb, i); - if (ai->fds != NULL) - bmp_destroy(ai->fds); - if (ai->rb != NULL) - shm_ap_rbuff_destroy(ai->rb); - if (ai->rdrb != NULL) - shm_rdrbuff_close(ai->rdrb); + if (ai.fds != NULL) + bmp_destroy(ai.fds); + if (ai.rb != NULL) + shm_ap_rbuff_destroy(ai.rb); + if (ai.rdrb != NULL) + shm_rdrbuff_close(ai.rdrb); - if (ai->daf_name != NULL) - free(ai->daf_name); + if (ai.daf_name != NULL) + free(ai.daf_name); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (ai->flows[i].rb != NULL) - shm_ap_rbuff_close(ai->flows[i].rb); - ai->ports[ai->flows[i].port_id] = -1; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (ai.flows[i].rb != NULL) + shm_ap_rbuff_close(ai.flows[i].rb); + + for (i = 0; i < IRMD_MAX_FLOWS; ++i) { + ai.ports[i].state = PORT_NULL; + pthread_mutex_destroy(&ai.ports[i].state_lock); + pthread_cond_destroy(&ai.ports[i].state_cond); } - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + free(ai.flows); + free(ai.ports); - pthread_rwlock_destroy(&ai->flows_lock); - pthread_rwlock_destroy(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); - free(ai); + pthread_rwlock_destroy(&ai.flows_lock); + pthread_rwlock_destroy(&ai.data_lock); } + int flow_accept(char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; @@ -201,11 +300,11 @@ int flow_accept(char ** ae_name) msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; - pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai.data_lock); - msg.api = ai->api; + msg.api = ai.api; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg_b(&msg); if (recv_msg == NULL) @@ -216,22 +315,22 @@ int flow_accept(char ** ae_name) return -1; } - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - fd = bmp_allocate(ai->fds); - if (!bmp_is_id_valid(ai->fds, fd)) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); - if (ai->flows[fd].rb == NULL) { - bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); + if (ai.flows[fd].rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -239,31 +338,31 @@ int flow_accept(char ** ae_name) if (ae_name != NULL) { *ae_name = strdup(recv_msg->ae_name); if (*ae_name == NULL) { - shm_ap_rbuff_close(ai->flows[fd].rb); - bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + shm_ap_rbuff_close(ai.flows[fd].rb); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -ENOMEM; } } - ai->flows[fd].port_id = recv_msg->port_id; - ai->flows[fd].oflags = FLOW_O_DEFAULT; - ai->flows[fd].api = recv_msg->api; + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; - ai->ports[recv_msg->port_id] = fd; + ai.ports[recv_msg->port_id].fd = fd; + ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int flow_alloc_resp(int fd, - int response) +int flow_alloc_resp(int fd, int response) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -274,49 +373,47 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_api = true; - msg.api = ai->api; + msg.api = ai.api; msg.has_port_id = true; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - msg.port_id = ai->flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; - pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai.flows_lock); msg.has_response = true; msg.response = response; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } if (!recv_msg->has_result) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } ret = recv_msg->result; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return ret; } -int flow_alloc(char * dst_name, - char * src_ae_name, - struct qos_spec * qos) +int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; @@ -333,11 +430,11 @@ int flow_alloc(char * dst_name, msg.ae_name = src_ae_name; msg.has_api = true; - pthread_rwlock_rdlock(&ai->data_lock); + pthread_rwlock_rdlock(&ai.data_lock); - msg.api = ai->api; + msg.api = ai.api; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { @@ -349,34 +446,35 @@ int flow_alloc(char * dst_name, return -1; } - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - fd = bmp_allocate(ai->fds); - if (!bmp_is_id_valid(ai->fds, fd)) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api); - if (ai->flows[fd].rb == NULL) { - bmp_release(ai->fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api); + if (ai.flows[fd].rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai->flows[fd].port_id = recv_msg->port_id; - ai->flows[fd].oflags = FLOW_O_DEFAULT; - ai->flows[fd].api = recv_msg->api; + ai.flows[fd].port_id = recv_msg->port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = recv_msg->api; - ai->ports[recv_msg->port_id] = fd; + ai.ports[recv_msg->port_id].fd = fd; + ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -395,19 +493,19 @@ int flow_alloc_res(int fd) msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; msg.has_port_id = true; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - msg.port_id = ai->flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg_b(&msg); if (recv_msg == NULL) { @@ -437,43 +535,43 @@ int flow_dealloc(int fd) msg.has_api = true; msg.api = getpid(); - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - msg.port_id = ai->flows[fd].port_id; + msg.port_id = ai.flows[fd].port_id; - ai->ports[msg.port_id] = -1; + port_destroy(&ai.ports[msg.port_id]); - ai->flows[fd].port_id = -1; - shm_ap_rbuff_close(ai->flows[fd].rb); - ai->flows[fd].rb = NULL; - ai->flows[fd].api = -1; + ai.flows[fd].port_id = -1; + shm_ap_rbuff_close(ai.flows[fd].rb); + ai.flows[fd].rb = NULL; + ai.flows[fd].api = -1; - bmp_release(ai->fds, fd); + bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai->flows_lock); + pthread_rwlock_unlock(&ai.flows_lock); recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } if (!recv_msg->has_result) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } ret = recv_msg->result; - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); @@ -487,30 +585,30 @@ int flow_cntl(int fd, int cmd, int oflags) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_wrlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - old = ai->flows[fd].oflags; + old = ai.flows[fd].oflags; switch (cmd) { case FLOW_F_GETFL: /* GET FLOW FLAGS */ - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return old; case FLOW_F_SETFL: /* SET FLOW FLAGS */ - ai->flows[fd].oflags = oflags; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + ai.flows[fd].oflags = oflags; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return old; default: - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return FLOW_O_INVALID; /* unknown command */ } } @@ -526,62 +624,62 @@ ssize_t flow_write(int fd, void * buf, size_t count) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rdrbuff_write(ai->rdrb, - ai->flows[fd].api, + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rdrbuff_write(ai.rdrb, + ai.flows[fd].api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, - (uint8_t *) buf, + buf, count); if (idx == -1) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -EAGAIN; } e.index = idx; - e.port_id = ai->flows[fd].port_id; + e.port_id = ai.flows[fd].port_id; - if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) { - shm_rdrbuff_remove(ai->rdrb, idx); - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } } else { /* blocking */ - struct shm_rdrbuff * rdrb = ai->rdrb; - pid_t api = ai->flows[fd].api; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + struct shm_rdrbuff * rdrb = ai.rdrb; + pid_t api = ai.flows[fd].api; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); idx = shm_rdrbuff_write_b(rdrb, - api, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - (uint8_t *) buf, - count); + api, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + buf, + count); - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); e.index = idx; - e.port_id = ai->flows[fd].port_id; + e.port_id = ai.flows[fd].port_id; - while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) + while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) ; } - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return 0; } @@ -595,47 +693,44 @@ ssize_t flow_read(int fd, void * buf, size_t count) if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; - pthread_rwlock_rdlock(&ai->data_lock); - pthread_rwlock_rdlock(&ai->flows_lock); + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); - if (ai->flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); + if (ai.flows[fd].port_id < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -ENOTALLOC; } - if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_ap_rbuff_read_port(ai->rb, - ai->flows[fd].port_id); - pthread_rwlock_unlock(&ai->flows_lock); + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id); + pthread_rwlock_unlock(&ai.flows_lock); } else { - struct shm_ap_rbuff * rb = ai->rb; - int port_id = ai->flows[fd].port_id; - struct timespec * timeout = ai->flows[fd].timeout; - pthread_rwlock_unlock(&ai->flows_lock); - pthread_rwlock_unlock(&ai->data_lock); - - idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); - - pthread_rwlock_rdlock(&ai->data_lock); + struct shm_ap_rbuff * rb = ai.rb; + int port_id = ai.flows[fd].port_id; + struct timespec timeout = ai.flows[fd].timeout; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout); + pthread_rwlock_rdlock(&ai.data_lock); } if (idx < 0) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -EAGAIN; } - n = shm_rdrbuff_read(&sdu, ai->rdrb, idx); + n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); if (n < 0) { - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return -1; } memcpy(buf, sdu, MIN(n, count)); - shm_rdrbuff_remove(ai->rdrb, idx); + shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai->data_lock); + pthread_rwlock_unlock(&ai.data_lock); return n; } @@ -671,7 +766,7 @@ void flow_set_zero(struct flow_set * set) void flow_set_add(struct flow_set * set, int fd) { pthread_rwlock_wrlock(&set->lock); - set->b[ai->flows[fd].port_id] = true; + set->b[ai.flows[fd].port_id] = true; set->dirty = true; pthread_rwlock_unlock(&set->lock); } @@ -679,7 +774,7 @@ void flow_set_add(struct flow_set * set, int fd) void flow_set_del(struct flow_set * set, int fd) { pthread_rwlock_wrlock(&set->lock); - set->b[ai->flows[fd].port_id] = false; + set->b[ai.flows[fd].port_id] = false; set->dirty = true; pthread_rwlock_unlock(&set->lock); } @@ -688,7 +783,7 @@ bool flow_set_has(struct flow_set * set, int fd) { bool ret; pthread_rwlock_rdlock(&set->lock); - ret = set->b[ai->flows[fd].port_id]; + ret = set->b[ai.flows[fd].port_id]; pthread_rwlock_unlock(&set->lock); return ret; } @@ -712,12 +807,324 @@ int flow_select(struct flow_set * set, const struct timespec * timeout) { int port_id; if (set == NULL) { - port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); + port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout); } else { flow_set_cpy(set); - port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout); + port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout); } if (port_id < 0) return port_id; - return ai->ports[port_id]; + return ai.ports[port_id].fd; +} + +/* ipcp-dev functions */ + +int np1_flow_alloc(pid_t n_api, int port_id) +{ + int fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].rb = shm_ap_rbuff_open(n_api); + if (ai.flows[fd].rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].port_id = port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = n_api; + + ai.ports[port_id].fd = fd; + port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int np1_flow_dealloc(int port_id) +{ + int fd; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = ai.ports[port_id].fd; + if (fd < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return fd; + } + + ai.flows[fd].port_id = -1; + shm_ap_rbuff_close(ai.flows[fd].rb); + ai.flows[fd].rb = NULL; + ai.flows[fd].api = -1; + + bmp_release(ai.fds, fd); + + port_destroy(&ai.ports[port_id]); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + + +int np1_flow_resp(pid_t n_api, int port_id) +{ + int fd; + struct shm_ap_rbuff * rb; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + port_wait_assign(&ai.ports[port_id]); + + fd = ai.ports[port_id].fd; + if (fd < 0) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return fd; + } + + rb = shm_ap_rbuff_open(n_api); + if (rb == NULL) { + ai.flows[fd].port_id = -1; + port_destroy(&ai.ports[port_id]); + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; + } + + ai.flows[fd].rb = rb; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int ipcp_create_r(pid_t api) +{ + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IRM_MSG_CODE__IPCP_CREATE_R; + msg.has_api = true; + msg.api = api; + + recv_msg = send_recv_irm_msg(&msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + irm_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name) +{ + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int port_id = -1; + int fd = -1; + + if (dst_name == NULL || src_ae_name == NULL) + return -EINVAL; + + msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_api = true; + msg.api = api; + msg.dst_name = dst_name; + msg.ae_name = src_ae_name; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -1; /* -ENOMOREFDS */ + } + + ai.flows[fd].rb = NULL; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + recv_msg = send_recv_irm_msg(&msg); + if (recv_msg == NULL) + return -1; + + if (!recv_msg->has_port_id) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + port_id = recv_msg->port_id; + irm_msg__free_unpacked(recv_msg, NULL); + if (port_id < 0) + return -1; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + ai.flows[fd].port_id = port_id; + ai.flows[fd].rb = NULL; + + ai.ports[port_id].fd = fd; + port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int ipcp_flow_alloc_reply(int fd, int response) +{ + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; + msg.has_port_id = true; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + msg.port_id = ai.flows[fd].port_id; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + msg.has_response = true; + msg.response = response; + + recv_msg = send_recv_irm_msg(&msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + irm_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_read(struct shm_du_buff ** sdb) +{ + int fd; + struct rb_entry * e; + + e = shm_ap_rbuff_read(ai.rb); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[e->port_id].fd; + + *sdb = shm_rdrbuff_get(ai.rdrb, e->index); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int ipcp_flow_write(int fd, struct shm_du_buff * sdb) +{ + struct rb_entry e; + + if (sdb == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } + + e.index = shm_du_buff_get_idx(sdb); + e.port_id = ai.flows[fd].port_id; + + shm_ap_rbuff_write(ai.flows[fd].rb, &e); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + +int local_flow_read(struct rb_entry * e) +{ + int fd; + + *e = *(shm_ap_rbuff_read(ai.rb)); + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + fd = ai.ports[e->port_id].fd; + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return fd; +} + +int local_flow_write(int fd, struct rb_entry * e) +{ + if (e == NULL) + return -EINVAL; + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_rdlock(&ai.flows_lock); + + if (ai.flows[fd].rb == NULL) { + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + return -EPERM; + } + + e->port_id = ai.flows[fd].port_id; + + shm_ap_rbuff_write(ai.flows[fd].rb, e); + + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + + return 0; +} + +void ipcp_flow_del(struct shm_du_buff * sdb) +{ + shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); } diff --git a/src/lib/irm.c b/src/lib/irm.c index fce11ba5..c4c6395b 100644 --- a/src/lib/irm.c +++ b/src/lib/irm.c @@ -25,7 +25,7 @@ #include <ouroboros/config.h> #include <ouroboros/errno.h> #include <ouroboros/irm.h> -#include <ouroboros/common.h> +#include <ouroboros/utils.h> #include <ouroboros/logs.h> #include <ouroboros/sockets.h> diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 7a634201..61c27d01 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -43,8 +43,7 @@ enum irm_msg_code { IRM_FLOW_DEALLOC = 18; IPCP_FLOW_REQ_ARR = 19; IPCP_FLOW_ALLOC_REPLY = 20; - IPCP_FLOW_DEALLOC = 21; - IRM_REPLY = 22; + IRM_REPLY = 21; }; message irm_msg { diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index d9e332fe..184a1bf2 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -21,14 +21,14 @@ */ #include <ouroboros/config.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/time_utils.h> #include <ouroboros/errno.h> #define OUROBOROS_PREFIX "shm_ap_rbuff" #include <ouroboros/logs.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/lockfile.h> -#include <ouroboros/time_utils.h> #include <pthread.h> #include <sys/mman.h> @@ -41,8 +41,6 @@ #include <sys/stat.h> #define FN_MAX_CHARS 255 -#define NORTH false -#define SOUTH true #define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \ + 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \ @@ -63,11 +61,10 @@ struct shm_ap_rbuff { pthread_cond_t * add; /* SDU arrived */ pthread_cond_t * del; /* SDU removed */ pid_t api; /* api to which this rb belongs */ - bool dir; /* direction, false = N */ int fd; }; -static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) +struct shm_ap_rbuff * shm_ap_rbuff_create() { struct shm_ap_rbuff * rb; int shm_fd; @@ -77,10 +74,7 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) char fn[FN_MAX_CHARS]; mode_t mask; - if (dir == SOUTH) - sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid()); - else - sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid()); + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -157,22 +151,18 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir) rb->fd = shm_fd; rb->api = getpid(); - rb->dir = dir; return rb; } -static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api) { struct shm_ap_rbuff * rb; int shm_fd; struct rb_entry * shm_base; char fn[FN_MAX_CHARS]; - if (dir == SOUTH) - sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api); - else - sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api); + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api); rb = malloc(sizeof(*rb)); if (rb == NULL) { @@ -215,31 +205,10 @@ static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir) rb->fd = shm_fd; rb->api = api; - rb->dir = dir; return rb; } -struct shm_ap_rbuff * shm_ap_rbuff_create_n() -{ - return shm_ap_rbuff_create(NORTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_create_s() -{ - return shm_ap_rbuff_create(SOUTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api) -{ - return shm_ap_rbuff_open(api, NORTH); -} - -struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api) -{ - return shm_ap_rbuff_open(api, SOUTH); -} - void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) { if (rb == NULL) { @@ -285,10 +254,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) if (close(rb->fd) < 0) LOG_DBG("Couldn't close shared memory."); - if (rb->dir == SOUTH) - sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api); - else - sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api); + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api); if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) LOG_DBG("Couldn't unmap shared memory."); diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index bf5c7f16..fb58a4d6 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -24,7 +24,6 @@ #include <ouroboros/config.h> #include <ouroboros/errno.h> #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/time_utils.h> #include <pthread.h> @@ -35,6 +34,7 @@ #include <string.h> #include <signal.h> #include <sys/stat.h> +#include <stdbool.h> #define OUROBOROS_PREFIX "shm_rdrbuff" @@ -76,6 +76,7 @@ struct shm_du_buff { size_t du_head; size_t du_tail; pid_t dst_api; + size_t idx; }; struct shm_rdrbuff { @@ -458,7 +459,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #endif int sz = size + sizeof *sdb; uint8_t * write_pos; - ssize_t idx = -1; if (rdrb == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -505,6 +505,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb->dst_api = -1; sdb->du_head = 0; sdb->du_tail = 0; + sdb->idx = *rdrb->ptr_head; *rdrb->ptr_head = 0; } @@ -521,7 +522,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, memcpy(write_pos, data, len); - idx = *rdrb->ptr_head; + sdb->idx = *rdrb->ptr_head; #ifdef SHM_RDRB_MULTI_BLOCK *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); #else @@ -529,7 +530,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #endif pthread_mutex_unlock(rdrb->lock); - return idx; + return sdb->idx; } ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, @@ -547,7 +548,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, #endif int sz = size + sizeof *sdb; uint8_t * write_pos; - ssize_t idx = -1; if (rdrb == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -596,6 +596,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb->dst_api = -1; sdb->du_head = 0; sdb->du_tail = 0; + sdb->idx = *rdrb->ptr_head; *rdrb->ptr_head = 0; } @@ -612,7 +613,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, memcpy(write_pos, data, len); - idx = *rdrb->ptr_head; + sdb->idx = *rdrb->ptr_head; #ifdef SHM_RDRB_MULTI_BLOCK *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); #else @@ -620,7 +621,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, #endif pthread_cleanup_pop(true); - return idx; + return sdb->idx; } int shm_rdrbuff_read(uint8_t ** dst, @@ -654,6 +655,32 @@ int shm_rdrbuff_read(uint8_t ** dst, return len; } +struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx) +{ + struct shm_du_buff * sdb; + + if (idx > SHM_BUFFER_SIZE) + return NULL; +#ifdef __APPLE__ + pthread_mutex_lock(rdrb->lock); +#else + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } +#endif + if (shm_rdrb_empty(rdrb)) { + pthread_mutex_unlock(rdrb->lock); + return NULL; + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + pthread_mutex_unlock(rdrb->lock); + + return sdb; +} + int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) { if (idx > SHM_BUFFER_SIZE) @@ -688,6 +715,11 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) return 0; } +size_t shm_du_buff_get_idx(struct shm_du_buff * sdb) +{ + return sdb->idx; +} + uint8_t * shm_du_buff_head(struct shm_du_buff * sdb) { if (sdb == NULL) diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 751c61b2..408e79e7 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -25,7 +25,6 @@ #include <ouroboros/config.h> #include <ouroboros/errno.h> #include <ouroboros/logs.h> -#include <ouroboros/common.h> #include <ouroboros/sockets.h> #include <ouroboros/utils.h> @@ -102,13 +101,12 @@ int server_socket_open(char * file_name) return sockfd; } -void close_ptr(void * o) +static void close_ptr(void * o) { close(*(int *) o); } -static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, - bool timed) +static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed) { int sockfd; buffer_t buf; diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c index 8eff4a4c..c5664d8b 100644 --- a/src/tools/cbr/cbr_server.c +++ b/src/tools/cbr/cbr_server.c @@ -21,6 +21,10 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include <ouroboros/dev.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/fcntl.h> + #include <stdbool.h> #ifdef __FreeBSD__ @@ -32,9 +36,6 @@ #include <stdlib.h> #include <pthread.h> -#include <ouroboros/dev.h> -#include <ouroboros/time_utils.h> - #define THREADS_SIZE 10 pthread_t listen_thread; diff --git a/src/tools/irm/irm.c b/src/tools/irm/irm.c index c260feb9..a674c7ba 100644 --- a/src/tools/irm/irm.c +++ b/src/tools/irm/irm.c @@ -20,7 +20,6 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#include <ouroboros/common.h> #include <ouroboros/irm.h> #include <stdio.h> #include <string.h> diff --git a/src/tools/irm/irm_utils.c b/src/tools/irm/irm_utils.c index feb8ac98..41a1e811 100644 --- a/src/tools/irm/irm_utils.c +++ b/src/tools/irm/irm_utils.c @@ -23,7 +23,6 @@ #include <string.h> #include <stdbool.h> #include <stdlib.h> -#include <ouroboros/common.h> #include "irm_utils.h" diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 3a254984..47b40118 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -22,7 +22,7 @@ */ #include <ouroboros/dev.h> -#include <ouroboros/errno.h> +#include <ouroboros/fcntl.h> #include <ouroboros/time_utils.h> #ifdef __FreeBSD__ @@ -34,6 +34,7 @@ #include <sys/time.h> #include <arpa/inet.h> #include <math.h> +#include <errno.h> #include <float.h> void shutdown_client(int signo, siginfo_t * info, void * c) |