From eb9f44379d5316e7f7e9311d7a66d2041eca743a Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 7 May 2016 16:11:09 +0200 Subject: irmd: flow allocation and fast path This commit has a first implementation of flow allocation (the "slow path") and read/write (the "fast path") for ouroboros. It provides basic but unstable communications over the shared memory. It required a lot of changes all over the stack, and fixes a number of previously undetected issues. This PR still need heavy revision regarding data model, locking and cleanup. lib/dev: modifications to the API. It now uses an ap_init() call to set the AP name and sets the Instance ID to the pid of the process. It also binds the AP to the shared memory and creates tables for mappings in the fast path. A call to ap_fini() releases the resources. lib/shm_ap_rbuff: added ring buffer for data exchange between processes in the fast path. It passes an index in the shm_du_map. lib/shm_du_map: rewrote API to work with calls from dev.c. Garbage collector added. Tests updated to new API. ipcpd/ipcp-data: removed everything related to flows, as these are universal for all ap's and kept in ap_data (dev.c), or similar structs for shim ipcps. shim-udp: added flow allocator and read/write functions and shm elements. irmd: revised data model and structures necessary for flow allocation. tools: echo updated to new dev.h API. messaging system was updated to comply with new flow allocation messages. All exchanges use pid and port_id to bootstrap the fast path. --- src/ipcpd/flow.c | 38 +- src/ipcpd/flow.h | 12 +- src/ipcpd/ipcp-data.c | 104 +---- src/ipcpd/ipcp-data.h | 16 - src/ipcpd/ipcp-ops.h | 9 +- src/ipcpd/ipcp.c | 20 +- src/ipcpd/ipcp.h | 3 +- src/ipcpd/shim-udp/main.c | 524 +++++++++++++++------ src/ipcpd/shim-udp/tests/shim_udp_test.c | 12 +- src/irmd/main.c | 760 +++++++++++++++++++++++-------- src/lib/CMakeLists.txt | 1 + src/lib/bitmap.c | 14 +- src/lib/dev.c | 330 ++++++++++---- src/lib/ipcp.c | 65 +-- src/lib/ipcpd_messages.proto | 6 +- src/lib/irmd_messages.proto | 26 +- src/lib/shm_ap_rbuff.c | 268 +++++++++++ src/lib/shm_du_map.c | 143 ++++-- src/lib/tests/shm_du_map_test.c | 53 ++- src/tools/echo/echo_client.c | 15 +- src/tools/echo/echo_server.c | 53 ++- 21 files changed, 1714 insertions(+), 758 deletions(-) create mode 100644 src/lib/shm_ap_rbuff.c (limited to 'src') diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c index c436733b..ae8f848c 100644 --- a/src/ipcpd/flow.c +++ b/src/ipcpd/flow.c @@ -27,7 +27,7 @@ #include -flow_t * flow_create(int32_t port_id) +flow_t * flow_create(uint32_t port_id) { flow_t * flow = malloc(sizeof *flow); if (flow == NULL) { @@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id) INIT_LIST_HEAD(&flow->list); flow->port_id = port_id; - flow->oflags = FLOW_O_DEFAULT; - flow->state = FLOW_NULL; + flow->state = FLOW_NULL; pthread_mutex_init(&flow->lock, NULL); @@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow) return; free(flow); } - -int flow_set_opts(flow_t * flow, uint16_t opts) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return -1; - } - - pthread_mutex_lock(&flow->lock); - - if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) { - flow->oflags = FLOW_O_DEFAULT; - pthread_mutex_unlock(&flow->lock); - LOG_WARN("Invalid flow options. Setting default."); - return -1; - } - - flow->oflags = opts; - - pthread_mutex_unlock(&flow->lock); - - return 0; -} - -uint16_t flow_get_opts(const flow_t * flow) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return FLOW_O_INVALID; - } - - return flow->oflags; -} diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index 000de5ad..0a3e90d1 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -25,6 +25,7 @@ #include #include +#include #include /* same values as fcntl.h */ @@ -47,17 +48,14 @@ enum flow_state { typedef struct flow { struct list_head list; - int32_t port_id; - uint16_t oflags; - enum flow_state state; + uint32_t port_id; + struct shm_ap_rbuff * rb; + enum flow_state state; pthread_mutex_t lock; } flow_t; -flow_t * flow_create(int32_t port_id); +flow_t * flow_create(uint32_t port_id); void flow_destroy(flow_t * flow); -int flow_set_opts(flow_t * flow, uint16_t opts); -uint16_t flow_get_opts(const flow_t * flow); - #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 72407a53..76fc4bcd 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create() if (data == NULL) return NULL; - data->iname = NULL; data->type = 0; - data->dum = NULL; return data; } struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type) { if (dst == NULL) return NULL; - dst->iname = instance_name_create(); - if (dst->iname == NULL) - return NULL; - - if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - dst->type = ipcp_type; - dst->dum = shm_du_map_open(); - if (dst->dum == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - /* init the lists */ INIT_LIST_HEAD(&dst->registry); - INIT_LIST_HEAD(&dst->flows); INIT_LIST_HEAD(&dst->directory); /* init the mutexes */ pthread_mutex_init(&dst->reg_lock, NULL); pthread_mutex_init(&dst->dir_lock, NULL); - pthread_mutex_init(&dst->flow_lock, NULL); return dst; } @@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data) dir_entry_destroy(list_entry(h, struct dir_entry, list)); } -static void clear_flows(struct ipcp_data * data) -{ - struct list_head * h; - struct list_head * t; - list_for_each_safe(h, t, &data->flows) - flow_destroy(list_entry(h, flow_t, list)); - -} - void ipcp_data_destroy(struct ipcp_data * data) { if (data == NULL) return; - /* FIXME: finish all pending operations here */ - - if (data->iname != NULL) - instance_name_destroy(data->iname); - data->iname = NULL; - - if (data->dum != NULL) - shm_du_map_close(data->dum); - data->dum = NULL; + /* FIXME: finish all pending operations here and cancel all threads */ pthread_mutex_lock(&data->reg_lock); pthread_mutex_lock(&data->dir_lock); - pthread_mutex_lock(&data->flow_lock); /* clear the lists */ clear_registry(data); clear_directory(data); - clear_flows(data); /* * no need to unlock, just free the entire thing - * pthread_mutex_unlock(&data->flow_lock); * pthread_mutex_unlock(&data->dir_lock); * pthread_mutex_unlock(&data->reg_lock); */ @@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data, return addr; } - -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id) -{ - struct list_head * h; - list_for_each(h, &data->flows) { - flow_t * f = list_entry(h, flow_t, list); - if (f->port_id == port_id) - return f; - } - - return NULL; -} - -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id) -{ - return ipcp_data_find_flow(data, port_id) != NULL; -} - -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow) -{ - if (data == NULL || flow == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - if (ipcp_data_has_flow(data, flow->port_id)) { - pthread_mutex_unlock(&data->flow_lock); - return -2; - } - - list_add(&flow->list,&data->flows); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} - -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id) -{ - flow_t * f; - - if (data == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - f = ipcp_data_find_flow(data, port_id); - if (f == NULL) - return -1; - - list_del(&f->list); - - free(f); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 1dea8c3c..2e86ba11 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -34,17 +34,11 @@ #include "flow.h" struct ipcp_data { - instance_name_t * iname; enum ipcp_type type; - struct shm_du_map * dum; - struct list_head registry; pthread_mutex_t reg_lock; - struct list_head flows; - pthread_mutex_t flow_lock; - struct list_head directory; pthread_mutex_t dir_lock; @@ -53,7 +47,6 @@ struct ipcp_data { struct ipcp_data * ipcp_data_create(); struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type); void ipcp_data_destroy(struct ipcp_data * data); @@ -73,13 +66,4 @@ bool ipcp_data_is_in_directory(struct ipcp_data * data, const char * ap_name); uint64_t ipcp_data_get_addr(struct ipcp_data * data, const char * ap_name); -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id); -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id); -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow); -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id); - #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index 2ccb2e59..91b6cac9 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -39,20 +39,15 @@ struct ipcp_ops { int (* ipcp_name_reg)(char * name); int (* ipcp_name_unreg)(char * name); int (* ipcp_flow_alloc)(uint32_t port_id, + pid_t n_pid, char * dst_ap_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos); int (* ipcp_flow_alloc_resp)(uint32_t port_id, + pid_t n_pid, int response); int (* ipcp_flow_dealloc)(uint32_t port_id); - - /* FIXME: let's see how this will work with the shm_du_map */ - int (* ipcp_du_write)(uint32_t port_id, - size_t map_index); - - int (* ipcp_du_read)(uint32_t port_id, - size_t map_index); }; #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d6f373cd..13632a80 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[]) return 0; } -int ipcp_main_loop(struct ipcp * _ipcp) +void * ipcp_main_loop(void * o) { int lsockfd; int sockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; + struct ipcp * _ipcp = (struct ipcp *) o; ipcp_msg_t * msg; ssize_t count; @@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp) if (_ipcp == NULL) { LOG_ERR("Invalid ipcp struct."); - return 1; + return (void *) 1; } sockfd = server_socket_open(ipcp_sock_path(getpid())); if (sockfd < 0) { LOG_ERR("Could not open server socket."); - return 1; + return (void *) 1; } while (true) { @@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) conf.max_pdu_size = conf_msg->max_pdu_size; } if (conf_msg->ipcp_type == IPCP_SHIM_UDP) { - conf.ip_addr = conf_msg->ip_addr; + conf.ip_addr = conf_msg->ip_addr; conf.dns_addr = conf_msg->dns_addr; } @@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len); + _ipcp->ops->ipcp_unreg(msg->dif_names, + msg->len); break; case IPCP_MSG_CODE__IPCP_NAME_REG: if (_ipcp->ops->ipcp_name_reg == NULL) { @@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp) LOG_ERR("Flow_alloc unsupported."); break; } - ret_msg.has_fd = true; - ret_msg.fd = + ret_msg.has_result = true; + ret_msg.result = _ipcp->ops->ipcp_flow_alloc(msg->port_id, + msg->pid, msg->dst_name, msg->src_ap_name, msg->src_ae_name, @@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) ret_msg.has_result = true; ret_msg.result = _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, + msg->pid, msg->result); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: @@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp) close(lsockfd); } - return 0; + return NULL; } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9decac8b..393af994 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -43,7 +43,8 @@ struct ipcp { int irmd_fd; }; -int ipcp_main_loop(); +void * ipcp_main_loop(void * o); +void * ipcp_sdu_loop(void * o); int ipcp_arg_check(int argc, char * argv[]); #endif diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 460fe9e3..1f7bb12f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,12 +24,13 @@ #include "ipcp.h" #include "flow.h" #include +#include #include #include #include #include #include -#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */ struct ipcp * _ipcp; #endif +/* + * copied from ouroboros/dev. The shim needs access to the internals + * because it doesn't follow all steps necessary steps to get + * the info + */ + +#define UNKNOWN_AP "__UNKNOWN_AP__" +#define UNKNOWN_AE "__UNKNOWN_AE__" + +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +/* the shim needs access to these internals */ +struct shim_ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader[2]; + int ping_pong; +} * _ap_instance; + +int shim_ap_init(char * ap_name) +{ + _ap_instance = malloc(sizeof(struct shim_ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void shim_ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; i ++) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} + +static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count, + 0, + buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + + if (index == -1) + return -1; + + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; + } + + return 0; +} + +/* + * end copy from dev.c + */ + struct ipcp_udp_data { /* keep ipcp_data first for polymorphism */ struct ipcp_data ipcp_data; @@ -79,39 +218,15 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - flow_t * fd_to_flow_ptr[FD_SETSIZE]; - pthread_mutex_t lock; + pthread_mutex_t lock; }; -struct udp_flow { - /* keep flow first for polymorphism */ - flow_t flow; - int fd; -}; - -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) -{ - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGHUP: - LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); - if (info->si_pid == irmd_pid) { - /* shm_du_map_close(_ipcp->data->dum); */ - exit(0); - } - default: - return; - } -} - -struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) +struct ipcp_udp_data * ipcp_udp_data_create() { struct ipcp_udp_data * udp_data; struct ipcp_data * data; enum ipcp_type ipcp_type; - int n; udp_data = malloc(sizeof *udp_data); if (udp_data == NULL) { @@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) ipcp_type = THIS_TYPE; data = (struct ipcp_data *) udp_data; - if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) { + if (ipcp_data_init(data, ipcp_type) == NULL) { free(udp_data); return NULL; } FD_ZERO(&udp_data->flow_fd_s); - for (n = 0; n < FD_SETSIZE; ++n) - udp_data->fd_to_flow_ptr[n] = NULL; return udp_data; } +void ipcp_udp_data_destroy(struct ipcp_udp_data * data) +{ + if (data == NULL) + return; + + ipcp_data_destroy((struct ipcp_data *) data); +} + +void ipcp_udp_destroy(struct ipcp * ipcp) +{ + ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data); + shim_ap_fini(); + free(ipcp); +} + +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGHUP: + if (info->si_pid == irmd_pid || info->si_pid == 0) { + LOG_DBG("Terminating by order of %d. Bye.", + info->si_pid); + pthread_cancel(_ap_instance->mainloop); + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader[0]); + pthread_cancel(_ap_instance->sdu_reader[1]); + pthread_cancel(_ap_instance->sduloop); + exit(0); + } + default: + return; + } +} + static void * ipcp_udp_listener() { char buf[SHIM_UDP_BUF_SIZE]; @@ -141,10 +290,10 @@ static void * ipcp_udp_listener() struct sockaddr_in f_saddr; struct sockaddr_in c_saddr; struct hostent * hostp; - struct udp_flow * flow; int sfd = shim_data(_ipcp)->s_fd; while (true) { + int fd; n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); @@ -157,16 +306,7 @@ static void * ipcp_udp_listener() if (hostp == NULL) continue; - /* create a new socket for the server */ - flow = malloc(sizeof *flow); - if (flow == NULL) - continue; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - continue; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); memset((char *) &f_saddr, 0, sizeof f_saddr); f_saddr.sin_family = AF_INET; @@ -185,36 +325,33 @@ static void * ipcp_udp_listener() * the flow structure */ - if (connect(flow->fd, + if (connect(fd, (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { - close(flow->fd); - free(flow); + close(fd); continue; } + /* echo back the packet */ + while(send(fd, buf, strlen(buf), 0) < 0) + ; + /* reply to IRM */ - flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf, - UNKNOWN_AP, ""); - if (flow->flow.port_id < 0) { + _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + if (_ap_instance->flows[fd].port_id < 0) { LOG_ERR("Could not get port id from IRMd"); - close(flow->fd); - free(flow); + close(fd); continue; } - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - close(flow->fd); - free(flow); - continue; - } + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", + _ap_instance->flows[fd].port_id, fd); } return 0; @@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader() struct sockaddr_in r_saddr; while (true) { - flow_t * flow; - if (select(FD_SETSIZE, &shim_data(_ipcp)->flow_fd_s, NULL, NULL, NULL) @@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader() (struct sockaddr *) &r_saddr, (unsigned *) &n); - flow = shim_data(_ipcp)->fd_to_flow_ptr[fd]; - if (flow->state == FLOW_PENDING) { - if (connect(fd, - (struct sockaddr *) &r_saddr, - sizeof r_saddr) - < 0) - continue; - flow->state = FLOW_ALLOCATED; - } - /* send the sdu to the correct port_id */ - LOG_MISSING; + ipcp_udp_flow_write(fd, buf, n); } } @@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - pthread_t handler; - pthread_t sdu_reader; int enable = 1; if (conf->type != THIS_TYPE) { @@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) dnsstr, INET_ADDRSTRLEN); else - strcpy(dnsstr, "not set.\n"); + strcpy(dnsstr, "not set"); shim_data(_ipcp)->ip_addr = conf->ip_addr; shim_data(_ipcp)->dns_addr = conf->dns_addr; @@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { + socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { LOG_DBGF("Can't create socket."); return -1; } @@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - pthread_create(&handler, NULL, ipcp_udp_listener, NULL); - pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_create(&_ap_instance->handler, + NULL, + ipcp_udp_listener, + NULL); + pthread_create(&_ap_instance->sdu_reader[0], + NULL, + ipcp_udp_sdu_reader, + NULL); + + pthread_create(&_ap_instance->sdu_reader[1], + NULL, + ipcp_udp_sdu_reader, + NULL); + + _ap_instance->ping_pong = 0; _ipcp->state = IPCP_ENROLLED; - LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.", - _ipcp->data->iname->name, _ipcp->data->iname->id); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", + getpid()); LOG_DBG("Bound to IP address %s.", ipstr); LOG_DBG("DNS server address is %s.", dnsstr); @@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name) } int ipcp_udp_flow_alloc(uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos) { - struct udp_flow * flow = NULL; struct sockaddr_in l_saddr; struct sockaddr_in r_saddr; + struct sockaddr_in rf_saddr; + int fd; + int n; + + char * recv_buf = NULL; struct hostent * h; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; - LOG_DBG("Received flow allocation request from %s to %s.", - src_ap_name, dst_name); - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t port_id, if (qos != NULL) LOG_DBGF("QoS requested. UDP/IP can't do that."); - flow = malloc(sizeof *flow); - if (flow == NULL) - return -1; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - return -1; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ memset((char *) &l_saddr, 0, sizeof l_saddr); @@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t port_id, l_saddr.sin_addr.s_addr = local_ip; l_saddr.sin_port = 0; - if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { - char ipstr[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, - &l_saddr.sin_addr.s_addr, - ipstr, - INET_ADDRSTRLEN); - close(flow->fd); - free(flow); + if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { + close(fd); return -1; } h = gethostbyname(dst_name); if (h == NULL) { LOG_DBGF("Could not resolve %s.", dst_name); - close(flow->fd); - free(flow); + close(fd); return -1; } - memset((char *) &r_saddr, 0, sizeof r_saddr); r_saddr.sin_family = AF_INET; - r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]); + r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0])); r_saddr.sin_port = LISTEN_PORT; + /* at least try to get the packet on the wire */ - while (sendto(flow->fd, dst_name, strlen(dst_name), 0, + while (sendto(fd, dst_name, strlen(dst_name), 0, (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { } - flow->flow.port_id = port_id; - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - /* add flow to the list */ + /* wait for the other shim IPCP to respond */ - pthread_mutex_lock(&_ipcp->data->flow_lock); + recv_buf = malloc(strlen(dst_name) + 1); + n = sizeof(rf_saddr); + n = recvfrom(fd, + recv_buf, + strlen(dst_name) + 1, + 0, + (struct sockaddr *) &rf_saddr, + (unsigned *) &n); - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - pthread_mutex_unlock(&_ipcp->data->flow_lock); - close(flow->fd); - free(flow); + if (connect(fd, + (struct sockaddr *) &rf_saddr, + sizeof rf_saddr) + < 0) { + free(recv_buf); return -1; } - pthread_mutex_unlock(&_ipcp->data->flow_lock); + if (!strcmp(recv_buf, dst_name)) + LOG_WARN("Incorrect echo from server"); + + free(recv_buf); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + close(fd); + } /* tell IRMd that flow allocation "worked" */ - if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) { + if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { LOG_ERR("Failed to notify IRMd about flow allocation reply"); - close(flow->fd); - ipcp_data_del_flow(_ipcp->data, flow->flow.port_id); + close(fd); + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - return 0; + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd); + + return fd; } int ipcp_udp_flow_alloc_resp(uint32_t port_id, + pid_t n_pid, int response) { - struct udp_flow * flow = - (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id); - if (flow == NULL) { - return -1; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; } - if (response) { - ipcp_data_del_flow(_ipcp->data, port_id); + if (response) return 0; - } /* awaken pending flow */ - if (flow->flow.state != FLOW_PENDING) + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + LOG_DBGF("Flow was not pending."); return -1; + } + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + return 0; + } - flow->flow.state = FLOW_ALLOCATED; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd); return 0; } int ipcp_udp_flow_dealloc(uint32_t port_id) { - return 0; -} + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } -int ipcp_udp_du_write(uint32_t port_id, - size_t map_index) -{ + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); return 0; } -int ipcp_udp_du_read(uint32_t port_id, - size_t map_index) +/* FIXME: may be crap, didn't think this one through */ +int ipcp_udp_flow_dealloc_arr(uint32_t port_id) { - return 0; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } + + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + return ipcp_flow_dealloc(0, port_id); } struct ipcp * ipcp_udp_create(char * ap_name) @@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name) struct ipcp_udp_data * data; struct ipcp_ops * ops; + if (shim_ap_init(ap_name) < 0) + return NULL; + i = malloc(sizeof *i); if (i == NULL) return NULL; - data = ipcp_udp_data_create(ap_name); + data = ipcp_udp_data_create(); if (data == NULL) { free(i); return NULL; @@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name) ops->ipcp_flow_alloc = ipcp_udp_flow_alloc; ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp; ops->ipcp_flow_dealloc = ipcp_udp_flow_dealloc; - ops->ipcp_du_read = ipcp_udp_du_read; - ops->ipcp_du_write = ipcp_udp_du_write; i->data = (struct ipcp_data *) data; i->ops = ops; @@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +/* FIXME: stop eating the CPU */ +void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); + int fd; + int len = 0; + char * buf; + if (e == NULL) + continue; + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) + continue; + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) + continue; + + if (len == 0) + continue; + + send(fd, buf, len, 0); + + shm_release_du_buff(_ap_instance->dum, e->index); + } + + return (void *) 1; +} + int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ @@ -680,6 +900,7 @@ int main (int argc, char * argv[]) sigaction(SIGINT, &sig_act, NULL); sigaction(SIGTERM, &sig_act, NULL); sigaction(SIGHUP, &sig_act, NULL); + sigaction(SIGPIPE, &sig_act, NULL); _ipcp = ipcp_udp_create(argv[2]); if (_ipcp == NULL) { @@ -687,7 +908,18 @@ int main (int argc, char * argv[]) exit(1); } - ipcp_main_loop(_ipcp); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); + pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->mainloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader[0], NULL); + pthread_join(_ap_instance->sdu_reader[1], NULL); + + ipcp_udp_destroy(_ipcp); + + shim_ap_fini(); exit(0); } diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c index 036f5877..e5e8b32d 100644 --- a/src/ipcpd/shim-udp/tests/shim_udp_test.c +++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c @@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv) _ipcp = ipcp_udp_create(ipcp_name); if (_ipcp == NULL) { LOG_ERR("Could not instantiate shim IPCP."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv) if (ipcp_udp_name_reg("bogus name")) { LOG_ERR("Failed to register application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } if (ipcp_udp_name_unreg("bogus name")) { LOG_ERR("Failed to unregister application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if (ipcp_udp_name_reg(bogus)) { LOG_ERR("Failed to register application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } @@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if(ipcp_udp_name_unreg(bogus)) { LOG_ERR("Failed to unregister application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(0); } diff --git a/src/irmd/main.c b/src/irmd/main.c index 67254feb..75b8506e 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -42,41 +43,146 @@ #include #include #include +#include /* FIXME: this smells like part of namespace management */ #define ALL_DIFS "*" +#ifndef IRMD_MAX_FLOWS + #define IRMD_MAX_FLOWS 4096 +#endif + +#ifndef IRMD_THREADPOOL_SIZE + #define IRMD_THREADPOOL_SIZE 10 +#endif + + + +enum flow_state { + FLOW_NULL = 0, + FLOW_PENDING, + FLOW_ALLOCATED +}; + struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; + + pthread_mutex_t lock; }; -/* currently supports only registering whatevercast groups of a single AP */ +/* currently supports only registering whatevercast groups of a single AP-I */ struct reg_name_entry { struct list_head next; /* generic whatevercast name */ char * name; - /* FIXME: resolve name instead */ + /* FIXME: make a list resolve to AP-I instead */ instance_name_t * api; - uint32_t reg_ap_id; + + bool accept; + char * req_ap_name; + char * req_ae_name; + bool flow_arrived; + + pthread_mutex_t fa_lock; +}; + +/* keeps track of port_id's between N and N - 1 */ +struct port_map_entry { + struct list_head next; + + uint32_t port_id; + + pid_t n_pid; + pid_t n_1_pid; + + enum flow_state state; }; struct irm { - /* FIXME: list of ipcps can be merged with registered names */ + /* FIXME: list of ipcps could be merged with registered names */ struct list_head ipcps; struct list_head reg_names; + int sockfd; + + /* keep track of all flows in this processing system */ + struct bmp * port_ids; + + /* maps port_ids to pid pair */ + struct list_head port_map; + struct shm_du_map * dum; -}; -struct irm * instance = NULL; + pthread_t * threadpool; + + pthread_mutex_t lock; +} * instance = NULL; -static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) +static struct port_map_entry * get_port_map_entry(uint32_t port_id) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &instance->port_map) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + if (e->port_id == port_id) + return e; + } + + return NULL; +} + +static struct port_map_entry * get_port_map_entry_n(pid_t n_pid) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &instance->port_map) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + if (e->n_pid == n_pid) + return e; + } + + return NULL; +} + +static struct ipcp_entry * ipcp_entry_create() +{ + struct ipcp_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->api = NULL; + e->dif_name = NULL; + + INIT_LIST_HEAD(&e->next); + pthread_mutex_init(&e->lock, NULL); + + return e; +} + +static void ipcp_entry_destroy(struct ipcp_entry * e) +{ + if (e == NULL) + return; + + if (e->api != NULL) + instance_name_destroy(e->api); + + if (e->dif_name != NULL) + free(e->dif_name); + + free(e); +} + +static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api) { - struct ipcp_entry * tmp = NULL; struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { @@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) return tmp; } - return tmp; + return NULL; } static instance_name_t * get_ipcp_by_name(char * ap_name) @@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create() if (e == NULL) return NULL; - e->reg_ap_id = rand() % INT_MAX; - e->name = NULL; + e->name = NULL; + e->api = NULL; + e->accept = false; + e->req_ap_name = NULL; + e->req_ae_name = NULL; + e->flow_arrived = false; + pthread_mutex_init(&e->fa_lock, NULL); INIT_LIST_HEAD(&e->next); return e; @@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create() static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e, char * name, - instance_name_t * api) + instance_name_t * api) { if (e == NULL || name == NULL || api == NULL) return NULL; @@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e) free(e->name); instance_name_destroy(e->api); + + if (e->req_ap_name != NULL) + free(e->req_ap_name); + if (e->req_ae_name != NULL) + free(e->req_ae_name); + + free(e); + return 0; } -static struct reg_name_entry * find_reg_name_entry_by_name(char * name) +static struct reg_name_entry * get_reg_name_entry_by_name(char * name) { struct list_head * pos = NULL; @@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name) return NULL; } -static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) +static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid) { struct list_head * pos = NULL; @@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) struct reg_name_entry * e = list_entry(pos, struct reg_name_entry, next); - if (reg_ap_id == e->reg_ap_id) + if (e->api->id == pid) return e; } @@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) /* FIXME: add only name when we have NSM solved */ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) { - struct reg_name_entry * e = find_reg_name_entry_by_name(name); + struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) { e = reg_name_entry_create(); - e = reg_name_entry_init(e, name, api); + if (e == NULL) + return -1; + + if (reg_name_entry_init(e, name, api) == NULL) { + reg_name_entry_destroy(e); + return -1; + } + list_add(&e->next, &instance->reg_names); return 0; } @@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) static int reg_name_entry_del_name(char * name) { - struct reg_name_entry * e = find_reg_name_entry_by_name(name); + struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) return 0; @@ -240,34 +366,38 @@ static pid_t create_ipcp(char * ap_name, pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { - LOG_ERR("Failed to create IPCP"); + LOG_ERR("Failed to create IPCP."); return -1; } - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) { + tmp = ipcp_entry_create(); + if (tmp == NULL) return -1; - } INIT_LIST_HEAD(&tmp->next); tmp->api = instance_name_create(); if (tmp->api == NULL) { - free(tmp); + ipcp_entry_destroy(tmp); return -1; } if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) { instance_name_destroy(tmp->api); - free(tmp); + ipcp_entry_destroy(tmp); return -1; } tmp->dif_name = NULL; - LOG_DBG("Created IPC process with pid %d", pid); + pthread_mutex_lock(&instance->lock); list_add(&tmp->next, &instance->ipcps); + + pthread_mutex_unlock(&instance->lock); + + LOG_INFO("Created IPCP %s-%d ", ap_name, pid); + return pid; } @@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api) struct list_head * pos = NULL; struct list_head * n = NULL; + if (api == NULL) + return 0; + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { LOG_ERR("No such IPCP in the system."); - return -1; + return 0; } - LOG_DBG("Destroying ipcp %s-%d", api->name, api->id); - if (ipcp_destroy(api->id)) - LOG_ERR("Could not destroy IPCP"); + LOG_ERR("Could not destroy IPCP."); list_for_each_safe(pos, n, &(instance->ipcps)) { struct ipcp_entry * tmp = @@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api) if (instance_name_cmp(api, tmp->api) == 0) list_del(&tmp->next); + + ipcp_entry_destroy(tmp); } + LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); + return 0; } @@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t * api, return -1; } - entry = find_ipcp_entry_by_name(api); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - LOG_ERR("No such IPCP"); + LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { - LOG_ERR("Failed to strdup"); + LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { - LOG_ERR("Could not bootstrap IPCP"); + LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + LOG_INFO("Bootstrapped IPCP %s-%d. in DIF %s", + api->name, api->id, conf->dif_name); + return 0; } @@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; - entry = find_ipcp_entry_by_name(api); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - LOG_ERR("No such IPCP"); + LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { - LOG_ERR("Failed to strdup"); + LOG_ERR("Failed to strdup."); return -1; } member = da_resolve_daf(dif_name); if (member == NULL) { - LOG_ERR("Could not find a member of that DIF"); + LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; return -1; @@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t * api, n_1_difs_size = da_resolve_dap(member, n_1_difs); if (n_1_difs_size < 1) { - LOG_ERR("Could not find N-1 DIFs"); + LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; return -1; } if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) { - LOG_ERR("Could not enroll IPCP"); + LOG_ERR("Could not enroll IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", + api->name, api->id, dif_name); + return 0; } @@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api, size_t difs_size) { if (ipcp_reg(api->id, difs, difs_size)) { - LOG_ERR("Could not register IPCP to N-1 DIF(s)"); + LOG_ERR("Could not register IPCP to N-1 DIF(s)."); return -1; } @@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t * api, { if (ipcp_unreg(api->id, difs, difs_size)) { - LOG_ERR("Could not unregister IPCP from N-1 DIF(s)"); + LOG_ERR("Could not unregister IPCP from N-1 DIF(s)."); return -1; } return 0; } -static int ap_unreg_id(uint32_t reg_ap_id, - pid_t pid, +static int ap_unreg_id(pid_t pid, char ** difs, size_t len) { int i; int ret = 0; - struct reg_name_entry * rne = NULL; - struct list_head * pos = NULL; + struct reg_name_entry * rne = NULL; + struct list_head * pos = NULL; - rne = find_reg_name_entry_by_id(reg_ap_id); + rne = get_reg_name_entry_by_id(pid); if (rne == NULL) return 0; /* no such id */ @@ -458,7 +598,6 @@ static int ap_reg(char * ap_name, { int i; int ret = 0; - int reg_ap_id = 0; struct list_head * pos = NULL; struct reg_name_entry * rne = NULL; @@ -466,18 +605,18 @@ static int ap_reg(char * ap_name, instance_name_t * ipcpi = NULL; if (instance->ipcps.next == NULL) - LOG_ERR("No IPCPs in this system."); + return -1; /* check if this ap_name is already registered */ - rne = find_reg_name_entry_by_name(ap_name); + rne = get_reg_name_entry_by_name(ap_name); if (rne != NULL) return -1; /* can only register one instance for now */ - rne = reg_name_entry_create(); - if (rne == NULL) + api = instance_name_create(); + if (api == NULL) { return -1; + } - api = instance_name_create(); if (instance_name_init_from(api, ap_name, ap_id) == NULL) { instance_name_destroy(api); return -1; @@ -488,12 +627,6 @@ static int ap_reg(char * ap_name, * contains a single instance only */ - if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) { - reg_name_entry_destroy(rne); - instance_name_destroy(api); - return -1; - } - if (strcmp(difs[0], ALL_DIFS) == 0) { list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = @@ -528,11 +661,10 @@ static int ap_reg(char * ap_name, return -1; } /* for now, we register single instances */ - reg_name_entry_add_name_instance(strdup(ap_name), - instance_name_dup(api)); - instance_name_destroy(api); + ret = reg_name_entry_add_name_instance(strdup(ap_name), + api); - return reg_ap_id; + return ret; } static int ap_unreg(char * ap_name, @@ -542,149 +674,304 @@ static int ap_unreg(char * ap_name, { struct reg_name_entry * tmp = NULL; - instance_name_t * api = instance_name_create(); - if (api == NULL) - return -1; - - if (instance_name_init_from(api, ap_name, ap_id) == NULL) { - instance_name_destroy(api); - return -1; - } - /* check if ap_name is registered */ - tmp = find_reg_name_entry_by_name(api->name); - if (tmp == NULL) { - instance_name_destroy(api); + tmp = get_reg_name_entry_by_id(ap_id); + if (tmp == NULL) return 0; - } else { - return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len); - } -} + if (strcmp(ap_name, tmp->api->name)) + return 0; -static int flow_accept(int fd, - pid_t pid, - char * ap_name, - char * ae_name) -{ - return -1; + return ap_unreg_id(ap_id, difs, len); } -static int flow_alloc_resp(int fd, - int result) +static struct port_map_entry * flow_accept(pid_t pid, + char ** ap_name, + char ** ae_name) { - return -1; + bool arrived = false; + + struct timespec ts = {0, 100000}; + + struct port_map_entry * pme; + struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); + if (rne == NULL) { + LOG_DBGF("Unregistered AP calling accept()."); + return NULL; + } + + if (rne->accept) { + LOG_DBGF("This AP still has a pending accept()."); + return NULL; + } + + rne->accept = true; + + /* FIXME: wait for a thread that runs select() on flow_arrived */ + while (!arrived) { + /* FIXME: this needs locking */ + rne = get_reg_name_entry_by_id(pid); + if (rne == NULL) + return NULL; + arrived = rne->flow_arrived; + nanosleep(&ts, NULL); + } + + pme = get_port_map_entry_n(pid); + if (pme == NULL) { + LOG_ERR("Port_id was not created yet."); + return NULL; + } + + pthread_mutex_lock(&rne->fa_lock); + *ap_name = rne->req_ap_name; + if (ae_name != NULL) + *ae_name = rne->req_ae_name; + pthread_mutex_unlock(&rne->fa_lock); + + return pme; } -static int flow_alloc(char * dst_name, - char * src_ap_name, - char * src_ae_name, - struct qos_spec * qos, - int oflags) +static int flow_alloc_resp(pid_t n_pid, + uint32_t port_id, + int response) { - int port_id = 0; - pid_t pid = get_ipcp_by_dst_name(dst_name)->id; + struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); + struct port_map_entry * pme = get_port_map_entry(port_id); - LOG_DBG("flow alloc received from %s-%s to %s.", - src_ap_name, src_ae_name, dst_name); + if (rne == NULL || pme == NULL) + return -1; + + /* FIXME: check all instances associated with the name */ + if (!rne->accept) { + LOG_ERR("No process listening for this name."); + return -1; + } + + /* + * consider the flow as handled + * once we can handle a list of AP-I's, remove it from the list + */ + + rne->flow_arrived = false; + rne->accept = false; + + if (!response) + pme->state = FLOW_ALLOCATED; - return ipcp_flow_alloc(pid, - port_id, - dst_name, - src_ap_name, - src_ae_name, - qos); + return ipcp_flow_alloc_resp(pme->n_1_pid, + port_id, + pme->n_pid, + response); } -static int flow_alloc_res(int fd) +static struct port_map_entry * flow_alloc(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name, + struct qos_spec * qos) { + struct port_map_entry * e = malloc(sizeof(*e)); + if (e == NULL) { + LOG_ERR("Failed malloc of port_map_entry."); + return NULL; + } - return -1; + e->port_id = bmp_allocate(instance->port_ids); + e->n_pid = pid; + e->state = FLOW_PENDING; + e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + + list_add(&e->next, &instance->port_map); + + if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, + e->port_id, + e->n_pid, + dst_name, + src_ap_name, + src_ae_name, + qos) < 0) { + list_del(&e->next); + bmp_release(instance->port_ids, e->port_id); + free(e); + return NULL; + } + + return e; } -static int flow_dealloc(int fd) +static int flow_alloc_res(uint32_t port_id) { - return -1; + bool allocated = false; + struct port_map_entry * e; + struct timespec ts = {0,100000}; + + while (!allocated) { + /* FIXME: this needs locking */ + e = get_port_map_entry(port_id); + if (e == NULL) { + LOG_DBGF("Could not locate port_id %u", port_id); + return -1; + } + if (e->state == FLOW_ALLOCATED) + allocated = true; + nanosleep(&ts, NULL); + } + + return 0; } -static int flow_cntl(int fd, - int oflags) +static int flow_dealloc(uint32_t port_id) { - return -1; + pid_t n_1_pid; + + struct port_map_entry * e = get_port_map_entry(port_id); + if (e == NULL) + return 0; + + n_1_pid = e->n_1_pid; + + list_del(&e->next); + free(e); + + return ipcp_flow_dealloc(n_1_pid, port_id); } -static int flow_req_arr(char * dst_name, - char * ap_name, - char * ae_name) +static struct port_map_entry * flow_req_arr(pid_t pid, + char * dst_name, + char * ap_name, + char * ae_name) { - return -1; + struct reg_name_entry * rne; + struct port_map_entry * pme; + + rne = get_reg_name_entry_by_name(dst_name); + if (rne == NULL) { + LOG_DBGF("Destination name %s unknown.", dst_name); + return NULL; + } + + pme = malloc(sizeof(*pme)); + if (pme == NULL) { + LOG_ERR("Failed malloc of port_map_entry."); + return NULL; + } + + pme->port_id = bmp_allocate(instance->port_ids); + pme->n_pid = rne->api->id; + pme->state = FLOW_PENDING; + pme->n_1_pid = pid; + + list_add(&pme->next, &instance->port_map); + + pthread_mutex_lock(&rne->fa_lock); + + rne->req_ap_name = strdup(ap_name); + rne->req_ae_name = strdup(ae_name); + + rne->flow_arrived = true; + + pthread_mutex_unlock(&rne->fa_lock); + + return pme; } static int flow_alloc_reply(uint32_t port_id, - int result) + int response) { - return -1; + struct port_map_entry * e; + + /* FIXME: do this under lock */ + if (!response) { + e = get_port_map_entry(port_id); + if (e == NULL) + return -1; + e->state = FLOW_ALLOCATED; + } + + /* FIXME: does this need to be propagated to the IPCP? */ + + return 0; } static int flow_dealloc_ipcp(uint32_t port_id) { - return -1; + struct port_map_entry * e = get_port_map_entry(port_id); + if (e == NULL) + return 0; + + list_del(&e->next); + free(e); + + return 0; +} + +static void irm_destroy(struct irm * irm) +{ + struct list_head * h; + struct list_head * t; + + if (irm == NULL) + return; + + if (irm->threadpool != NULL) + free(irm->threadpool); + + if (irm->port_ids != NULL) + bmp_destroy(irm->port_ids); + /* clear the lists */ + list_for_each_safe(h, t, &irm->ipcps) { + struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); + destroy_ipcp(e->api); + } + + list_for_each_safe(h, t, &irm->reg_names) { + struct reg_name_entry * e = list_entry(h, + struct reg_name_entry, + next); + char * difs [1] = {ALL_DIFS}; + ap_unreg_id(e->api->id, difs, 1); + } + + list_for_each_safe(h, t, &irm->port_map) { + struct port_map_entry * e = list_entry(h, + struct port_map_entry, + next); + list_del(&e->next); + free(e); + } + + if (irm->dum != NULL) + shm_du_map_destroy(irm->dum); + + close(irm->sockfd); + free(irm); } void irmd_sig_handler(int sig, siginfo_t * info, void * c) { + int i; + switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: - shm_du_map_close(instance->dum); - free(instance); - exit(0); + if (instance->threadpool != NULL) { + for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) + pthread_cancel(instance->threadpool[i]); + } + + case SIGPIPE: + LOG_DBG("Ignoring SIGPIPE."); default: return; } } -int main() +void * mainloop() { - int sockfd; uint8_t buf[IRM_MSG_BUF_SIZE]; - struct sigaction sig_act; - - /* init sig_act */ - memset(&sig_act, 0, sizeof sig_act); - - /* install signal traps */ - sig_act.sa_sigaction = &irmd_sig_handler; - sig_act.sa_flags = SA_SIGINFO; - - sigaction(SIGINT, &sig_act, NULL); - sigaction(SIGTERM, &sig_act, NULL); - sigaction(SIGHUP, &sig_act, NULL); - - if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) - unlink("/dev/shm/" SHM_DU_MAP_FILENAME); - - instance = malloc(sizeof(*instance)); - if (instance == NULL) - return -1; - - if ((instance->dum = shm_du_map_create()) == NULL) { - free(instance); - return -1; - } - - INIT_LIST_HEAD(&instance->ipcps); - INIT_LIST_HEAD(&instance->reg_names); - - sockfd = server_socket_open(IRM_SOCK_PATH); - if (sockfd < 0) { - shm_du_map_close(instance->dum); - free(instance); - return -1; - } - while (true) { int cli_sockfd; irm_msg_t * msg; @@ -692,18 +979,19 @@ int main() instance_name_t api; buffer_t buffer; irm_msg_t ret_msg = IRM_MSG__INIT; + struct port_map_entry * e = NULL; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; - cli_sockfd = accept(sockfd, 0, 0); + cli_sockfd = accept(instance->sockfd, 0, 0); if (cli_sockfd < 0) { - LOG_ERR("Cannot accept new connection"); + LOG_ERR("Cannot accept new connection."); continue; } count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); if (count <= 0) { - LOG_ERR("Failed to read from socket"); + LOG_ERR("Failed to read from socket."); close(cli_sockfd); continue; } @@ -750,11 +1038,11 @@ int main() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_REG: - ret_msg.has_fd = true; - ret_msg.fd = ap_reg(msg->ap_name, - msg->pid, - msg->dif_name, - msg->n_dif_name); + ret_msg.has_result = true; + ret_msg.result = ap_reg(msg->ap_name, + msg->pid, + msg->dif_name, + msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_UNREG: ret_msg.has_result = true; @@ -764,43 +1052,57 @@ int main() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - ret_msg.has_fd = true; - ret_msg.fd = flow_accept(msg->fd, - msg->pid, - ret_msg.ap_name, - ret_msg.ae_name); + e = flow_accept(msg->pid, + &ret_msg.ap_name, + &ret_msg.ae_name); + if (e == NULL) + break; + + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: ret_msg.has_result = true; - ret_msg.result = flow_alloc_resp(msg->fd, - msg->result); + ret_msg.result = flow_alloc_resp(msg->pid, + msg->port_id, + msg->response); break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: - ret_msg.has_fd = true; - ret_msg.fd = flow_alloc(msg->dst_name, - msg->ap_name, - msg->ae_name, - NULL, - msg->oflags); + e = flow_alloc(msg->pid, + msg->dst_name, + msg->ap_name, + msg->ae_name, + NULL); + if (e == NULL) + break; + + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: - ret_msg.has_response = true; - ret_msg.response = flow_alloc_res(msg->fd); - break; - case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->fd); + ret_msg.result = flow_alloc_res(msg->port_id); break; - case IRM_MSG_CODE__IRM_FLOW_CONTROL: + case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_cntl(msg->fd, - msg->oflags); + ret_msg.result = flow_dealloc(msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: + e = flow_req_arr(msg->pid, + msg->dst_name, + msg->ap_name, + msg->ae_name); + if (e == NULL) + break; + ret_msg.has_port_id = true; - ret_msg.port_id = flow_req_arr(msg->dst_name, - msg->ap_name, - msg->ae_name); + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_pid; break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: ret_msg.has_result = true; @@ -812,7 +1114,7 @@ int main() ret_msg.result = flow_dealloc_ipcp(msg->port_id); break; default: - LOG_ERR("Don't know that message code"); + LOG_ERR("Don't know that message code."); break; } @@ -820,7 +1122,7 @@ int main() buffer.size = irm_msg__get_packed_size(&ret_msg); if (buffer.size == 0) { - LOG_ERR("Failed to send reply message"); + LOG_ERR("Failed to send reply message."); close(cli_sockfd); continue; } @@ -842,6 +1144,84 @@ int main() free(buffer.data); close(cli_sockfd); } +} + +static struct irm * irm_create() +{ + struct irm * i = malloc(sizeof(*i)); + if (i == NULL) + return NULL; + + if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) + unlink("/dev/shm/" SHM_DU_MAP_FILENAME); + + i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + if (i->threadpool == NULL) { + irm_destroy(i); + return NULL; + } + + if ((i->dum = shm_du_map_create()) == NULL) { + irm_destroy(i); + return NULL; + } + + INIT_LIST_HEAD(&i->ipcps); + INIT_LIST_HEAD(&i->reg_names); + INIT_LIST_HEAD(&i->port_map); + + i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); + if (i->port_ids == NULL) { + irm_destroy(i); + return NULL; + } + + i->sockfd = server_socket_open(IRM_SOCK_PATH); + if (i->sockfd < 0) { + irm_destroy(instance); + return NULL; + } + + pthread_mutex_init(&i->lock, NULL); + + return i; +} + +int main() +{ + struct sigaction sig_act; + + int t = 0; + + /* init sig_act */ + memset(&sig_act, 0, sizeof sig_act); + + /* install signal traps */ + sig_act.sa_sigaction = &irmd_sig_handler; + sig_act.sa_flags = SA_SIGINFO; + + sigaction(SIGINT, &sig_act, NULL); + sigaction(SIGTERM, &sig_act, NULL); + sigaction(SIGHUP, &sig_act, NULL); + sigaction(SIGPIPE, &sig_act, NULL); + + instance = irm_create(); + if (instance == NULL) + return 1; + + /* + * FIXME: we need a main loop that delegates messages to subthreads in a + * way that avoids all possible deadlocks for local apps + */ + + for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) + pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + + /* wait for (all of them) to return */ + for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) + pthread_join(instance->threadpool[t], NULL); + + irm_destroy(instance); return 0; } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4922e07c..53a7b354 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES ipcp.c irm.c list.c + shm_ap_rbuff.c shm_du_map.c sockets.c utils.c diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 8aabb4f4..0e3c968f 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset) return NULL; tmp = malloc(sizeof(*tmp)); - if (!tmp) + if (tmp == NULL) return NULL; - tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap))); - if (!tmp->bitmap) + tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long)); + if (!tmp->bitmap) { + free(tmp); return NULL; + } tmp->size = bits; tmp->offset = offset; @@ -140,8 +142,6 @@ int bmp_destroy(struct bmp * b) static ssize_t bad_id(struct bmp * b) { - assert(b); - return b->offset - 1; } @@ -149,8 +149,8 @@ ssize_t bmp_allocate(struct bmp * b) { ssize_t id; - if (!b) - return bad_id(b); + if (b == NULL) + return -1; id = (ssize_t) find_next_zero_bit(b->bitmap, b->size); diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..40bf2dc3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@ #include #include #include +#include +#include +#include +#include +#include #include +#include -int ap_reg(char * ap_name, - char ** difs, - size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { + struct shm_ap_rbuff * rb; + uint32_t port_id; + uint32_t oflags; + + /* don't think this needs locking */ +}; + +struct ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name) { - irm_msg_t msg = IRM_MSG__INIT; + _ap_instance = malloc(sizeof(struct ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} +#endif + +int ap_reg(char ** difs, + size_t len) +{ + irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = bmp_allocate(_ap_instance->fds); - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } + if (_ap_instance == NULL) { + LOG_DBG("ap_init was not called"); + return -1; + } + msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + if (recv_msg->result < 0) + fd = -1; + irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int ap_unreg(char * ap_name, - char ** difs, - size_t difs_size) +int ap_unreg(char ** difs, + size_t len) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; int ret = -1; - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -102,38 +219,48 @@ int ap_unreg(char * ap_name, return ret; } -int flow_accept(int fd, - char * ap_name, - char * ae_name) +int flow_accept(int fd, + char ** ap_name, + char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int cli_fd = 0; - - if (ap_name == NULL) { - return -EINVAL; - } + int cfd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - cli_fd = recv_msg->fd; - ap_name = recv_msg->ap_name; - ae_name = recv_msg->ae_name; + + cfd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + _ap_instance->flows[cfd].port_id = recv_msg->port_id; + _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; + + *ap_name = strdup(recv_msg->ap_name); + if (ae_name != NULL) + *ae_name = strdup(recv_msg->ae_name); irm_msg__free_unpacked(recv_msg, NULL); - return cli_fd; + + bmp_release(_ap_instance->fds, fd); + + return cfd; } int flow_alloc_resp(int fd, @@ -145,9 +272,9 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; msg.has_response = true; msg.response = response; @@ -155,7 +282,7 @@ int flow_alloc_resp(int fd, if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -167,41 +294,49 @@ int flow_alloc_resp(int fd, } int flow_alloc(char * dst_name, - char * src_ap_name, char * src_ae_name, - struct qos_spec * qos, - int oflags) + struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = -1; - if (dst_name == NULL || - src_ap_name == NULL) { + if (dst_name == NULL) return -EINVAL; - } if (src_ae_name == NULL) src_ae_name = UNKNOWN_AE; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = src_ap_name; + msg.ap_name = _ap_instance->api->name; + msg.has_pid = true; + msg.pid = _ap_instance->api->id; msg.ae_name = src_ae_name; - msg.has_oflags = true; - msg.oflags = oflags; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + fd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[fd].rb == NULL) { + bmp_release(_ap_instance->fds, fd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + _ap_instance->flows[fd].port_id = recv_msg->port_id; + _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + irm_msg__free_unpacked(recv_msg, NULL); + return fd; } @@ -211,17 +346,15 @@ int flow_alloc_res(int fd) irm_msg_t * recv_msg = NULL; int result = 0; - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -238,17 +371,15 @@ int flow_dealloc(int fd) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -259,47 +390,50 @@ int flow_dealloc(int fd) return ret; } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; - msg.oflags = oflags; + return -1; +} - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + if (index == -1) return -1; - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; } - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; + return 0; } -ssize_t flow_write(int fd, - void * buf, - size_t count) +ssize_t flow_read(int fd, void * buf, size_t count) { - LOG_MISSING; + struct rb_entry * e = NULL; + int n; + uint8_t * sdu; + /* FIXME: move this to a thread */ + while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) + e = shm_ap_rbuff_read(_ap_instance->rb); + + n = shm_du_map_read_sdu(&sdu, + _ap_instance->dum, + e->index); + if (n < 0) + return -1; - return -1; -} + memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, - void * buf, - size_t count) -{ - LOG_MISSING; + shm_release_du_buff(_ap_instance->dum, e->index); - return -1; + return n; } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 387572b3..75676915 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -121,6 +121,8 @@ pid_t ipcp_create(char * ipcp_name, return pid; } + /* clear fd table */ + if (ipcp_type == IPCP_NORMAL) exec_name = IPCP_NORMAL_EXEC; else if (ipcp_type == IPCP_SHIM_UDP) @@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_ENROLL; - msg.member_name = malloc(sizeof(*(msg.member_name))); - if (msg.member_name == NULL) { - LOG_ERR("Failed to malloc."); - return -1; - } - msg.n_1_dif = n_1_dif; msg.member_name = member_name; + msg.n_1_dif = n_1_dif; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) { @@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t pid, if (name == NULL) return -1; - msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; - msg.name = name; + msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; + msg.name = name; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t pid, int ipcp_flow_alloc(pid_t pid, uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, @@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; msg.src_ap_name = src_ap_name; msg.src_ae_name = src_ae_name; msg.dst_name = dst_name; - msg.port_id = port_id; - msg.has_port_id = true; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { ipcp_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t pid, int ipcp_flow_alloc_resp(pid_t pid, uint32_t port_id, - int result) + pid_t n_pid, + int response) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; - msg.has_result = true; - msg.result = result; + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; + msg.has_response = true; + msg.response = response; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t pid, return ret; } -int ipcp_flow_req_arr(pid_t pid, - char * dst_name, - char * src_ap_name, - char * src_ae_name) +int ipcp_flow_req_arr(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = -1; + int port_id = -1; if (src_ap_name == NULL || src_ae_name == NULL) return -EINVAL; msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; msg.dst_name = dst_name; msg.ap_name = src_ap_name; msg.ae_name = src_ae_name; - msg.pid = pid; - msg.has_pid = true; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + port_id = recv_msg->port_id; irm_msg__free_unpacked(recv_msg, NULL); - return fd; + return port_id; } int ipcp_flow_alloc_reply(pid_t pid, @@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; @@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { irm_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index da4bb469..daca011d 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -25,6 +25,8 @@ message ipcp_msg { optional string src_ap_name = 9; optional string src_ae_name = 10; optional dif_config_msg conf = 11; - optional int32 result = 12; - optional int32 fd = 13; + optional int32 fd = 12; + optional int32 pid = 13; + optional int32 response = 14; + optional int32 result = 15; }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 89e2c882..c336614e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -36,13 +36,10 @@ enum irm_msg_code { IRM_FLOW_ALLOC = 11; IRM_FLOW_ALLOC_RES = 12; IRM_FLOW_DEALLOC = 13; - IRM_FLOW_CONTROL = 14; - IRM_FLOW_WRITE = 15; - IRM_FLOW_READ = 16; - IPCP_FLOW_REQ_ARR = 17; - IPCP_FLOW_ALLOC_REPLY = 18; - IPCP_FLOW_DEALLOC = 19; - IRM_REPLY = 20; + IPCP_FLOW_REQ_ARR = 14; + IPCP_FLOW_ALLOC_REPLY = 15; + IPCP_FLOW_DEALLOC = 16; + IRM_REPLY = 17; }; message irm_msg { @@ -52,12 +49,11 @@ message irm_msg { optional uint32 api_id = 3; optional uint32 ipcp_type = 5; repeated string dif_name = 6; - optional int32 fd = 7; - optional int32 response = 8; - optional int32 oflags = 9; - optional string dst_name = 10; - optional uint32 port_id = 11; - optional int32 pid = 12; - optional dif_config_msg conf = 13; - optional int32 result = 14; + optional int32 response = 7; + optional string dst_name = 8; + optional uint32 port_id = 9; + optional int32 pid = 10; + optional dif_config_msg conf = 11; + optional int32 cfd = 12; + optional int32 result = 13; }; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c new file mode 100644 index 00000000..4bd64775 --- /dev/null +++ b/src/lib/shm_ap_rbuff.c @@ -0,0 +1,268 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for application processes + * + * Dimitri Staessens + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#define OUROBOROS_PREFIX "shm_ap_rbuff" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ + + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\ + & (SHM_RBUFF_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) + +struct shm_ap_rbuff { + struct rb_entry * shm_base; /* start of entry */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pid_t pid; /* pid to which this rb belongs */ + int fd; +}; + +struct shm_ap_rbuff * shm_ap_rbuff_create() +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + pthread_mutexattr_t attr; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF "%d", getpid()); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed creating ring buffer."); + free(rb); + return NULL; + } + + if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) { + LOG_DBGF("Failed to extend ringbuffer."); + free(rb); + return NULL; + } + + if (write(shm_fd, "", 1) != 1) { + LOG_DBGF("Failed to finalise extension of ringbuffer."); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->shm_mutex, &attr); + + *rb->ptr_head = 0; + *rb->ptr_tail = 0; + + rb->fd = shm_fd; + rb->pid = getpid(); + + return rb; +} + +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF "%d", pid); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed opening shared memory %s.", fn); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + rb->fd = shm_fd; + rb->pid = pid; + + return rb; +} +void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + sprintf(fn, SHM_AP_RBUFF "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (rb->pid != getpid()) { + LOG_ERR("Tried to destroy other AP's rbuff."); + return; + } + + sprintf(fn, SHM_AP_RBUFF "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to unlink shm."); + + free(rb); +} + +int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) +{ + struct rb_entry * pos; + + if (rb == NULL || e == NULL) + return -1; + + pthread_mutex_lock(rb->shm_mutex); + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->shm_mutex); + return -1; + } + + pos = rb->shm_base + *rb->ptr_head; + *pos = *e; + *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return 0; +} +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +{ + struct rb_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + if (rb == NULL) + return NULL; + + pthread_mutex_lock(rb->shm_mutex); + + if (shm_rbuff_used(rb) == 0) { + pthread_mutex_unlock(rb->shm_mutex); + return NULL; + } + + *e = *(rb->shm_base + *rb->ptr_tail); + + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return e; +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index dfccca6a..56062c9d 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -45,6 +45,9 @@ ((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \ SHM_DU_BUFF_BLOCK_SIZE))) +#define idx_to_du_buff_ptr(dum, idx) \ + ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) + #define block_ptr_to_idx(dum, sdb) \ (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) @@ -52,27 +55,31 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ + idx_to_du_buff_ptr(dum, idx)->du_head) + #define MIN(a,b)(a < b ? a : b) struct shm_du_buff { - size_t size; - size_t du_head; - size_t du_tail; + size_t size; + size_t du_head; + size_t du_tail; + size_t garbage; }; struct shm_du_map { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - int fd; + uint8_t * shm_base; /* start of blocks */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + int fd; }; struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; pthread_mutexattr_t attr; dum = malloc(sizeof *dum); @@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; + + dum = malloc(sizeof *dum); + if (dum == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); if (shm_fd == -1) { @@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open() return NULL; } - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - dum->shm_base = shm_base; dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); @@ -192,6 +199,19 @@ void shm_du_map_close(struct shm_du_map * dum) return; } + if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(dum); +} + +void shm_du_map_destroy(struct shm_du_map * dum) +{ + if (dum == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); @@ -201,32 +221,33 @@ void shm_du_map_close(struct shm_du_map * dum) free(dum); } -struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +int shm_create_du_buff(struct shm_du_map * dum, + size_t size, + size_t headspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; long blocks = 0; int sz = size + sizeof *sdb; int sz2 = headspace + len + sizeof *sdb; - uint8_t * write_pos; + uint8_t * write_pos; size_t copy_len; + size_t index; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); - return NULL; + return -1; } if (headspace >= size) { LOG_DBGF("Index out of bounds."); - return NULL; + return -1; } if (headspace + len > size) { LOG_DBGF("Buffer too small for data."); - return NULL; + return -1; } pthread_mutex_lock(dum->shm_mutex); @@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, if (sz2 < 0 && sz > 0) { pthread_mutex_unlock(dum->shm_mutex); LOG_DBG("Can't handle this packet now"); - return NULL; + return -1; } ++blocks; } if (!shm_map_free(dum, blocks)) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBGF("Allocation failed, Out of Memory."); - return NULL; + return -1; } sdb = get_head_ptr(dum); sdb->size = size; + sdb->garbage = 0; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, --blocks; } + index = *dum->ptr_head - 1; + pthread_mutex_unlock(dum->shm_mutex); - return sdb; + return index; } -int shm_release_du_buff(struct shm_du_map * dum) +/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ +int shm_du_map_read_sdu(uint8_t ** dst, + struct shm_du_map * dum, + size_t idx) +{ + size_t len = 0; + + if (idx > SHM_BLOCKS_IN_MAP) + return -1; + + pthread_mutex_lock(dum->shm_mutex); + + if (*dum->ptr_head == *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return -1; + } + + *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + + sizeof(struct shm_du_buff) + + idx_to_du_buff_ptr(dum, idx)->du_head; + len = sdu_size(dum, idx); + + pthread_mutex_unlock(dum->shm_mutex); + + return len; +} + +int shm_release_du_buff(struct shm_du_map * dum, size_t idx) { long sz; long blocks = 0; + + /* FIXME: this is crap for the test */ + if (idx > SHM_BLOCKS_IN_MAP) + idx = *dum->ptr_tail; + pthread_mutex_lock(dum->shm_mutex); if (*dum->ptr_head == *dum->ptr_tail) { - LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do."); pthread_mutex_unlock(dum->shm_mutex); return -1; } - sz = get_tail_ptr(dum)->size; + idx_to_du_buff_ptr(dum, idx)->garbage = 1; - while (sz + (long) sizeof (struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; + if (idx != *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return 0; + } + + while (get_tail_ptr(dum)->garbage == 1) { + sz = get_tail_ptr(dum)->size; + + while (sz + (long) sizeof (struct shm_du_buff) > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *(dum->ptr_tail) = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); } - *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); pthread_mutex_unlock(dum->shm_mutex); return 0; @@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, } uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, } int shm_du_buff_head_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb, } int shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c index 85a82e4d..55938a62 100644 --- a/src/lib/tests/shm_du_map_test.c +++ b/src/lib/tests/shm_du_map_test.c @@ -32,7 +32,7 @@ #include -#define SIZE_OF_DU_BUFF 24 +#define SIZE_OF_DU_BUFF 32 #define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF) #define MAX(a,b) (a > b ? a : b) @@ -44,7 +44,7 @@ void * produce() { struct shm_du_map * dum; long test_buf_size = 0; - uint8_t * test_values; + uint8_t * test_values; int headspace; int tailspace; long i; @@ -66,9 +66,8 @@ void * produce() test_values[i] = 170; clock_gettime(CLOCK_MONOTONIC, &starttime); - for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) { - struct shm_du_buff * sdb; - size_t len; + for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) { + size_t len; test_buf_size = TEST_BUFF_SIZE; @@ -77,21 +76,19 @@ void * produce() len = test_buf_size - (headspace + tailspace); - sdb = shm_create_du_buff(dum, - test_buf_size, - headspace, - test_values, - len); - - if (sdb != NULL) { - bytes_written += len; - } - else { - sync = -2; - break; + if (shm_create_du_buff(dum, + test_buf_size, + headspace, + test_values, + len) < 0) { + continue; } + + bytes_written += len; } + sync = -2; + clock_gettime(CLOCK_MONOTONIC, &stoptime); elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) - (starttime.tv_sec + starttime.tv_nsec / 1000000000.0); @@ -104,13 +101,14 @@ void * produce() sync = -1; + shm_du_map_close(dum); + return 0; } void * consume() { struct shm_du_map * dum; - struct timespec ts; ts.tv_sec = 0; @@ -123,10 +121,15 @@ void * consume() return (void *)-1; } - while (!sync) { - while (!shm_release_du_buff(dum)); - nanosleep(&ts, NULL); + while (true) { + shm_release_du_buff(dum, 1823429173941); + if (sync) + break; } + nanosleep(&ts, NULL); + + + shm_du_map_close(dum); return 0; } @@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv) return -1; } - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_create(&consumer, NULL, consume, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv) LOG_INFO("starting concurrency test."); + sync = 0; + dum = shm_du_map_create(); res1 = (int) pthread_create(&producer, NULL, produce, NULL); @@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_join(producer, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c index 8d3fc322..36942028 100644 --- a/src/tools/echo/echo_client.c +++ b/src/tools/echo/echo_client.c @@ -23,19 +23,23 @@ #define CLIENT_AP_NAME "echo-client" #include +#include int client_main() { int fd = 0; int result = 0; - uint8_t buf[BUF_SIZE]; + char buf[BUF_SIZE]; char * message = "Client says hi!"; ssize_t count = 0; - fd = flow_alloc(SERVER_AP_NAME, CLIENT_AP_NAME, - NULL, NULL, 0); + if(ap_init(CLIENT_AP_NAME)) + return -1; + + fd = flow_alloc(SERVER_AP_NAME, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow\n"); + ap_fini(); return -1; } @@ -43,12 +47,14 @@ int client_main() if (result < 0) { printf("Flow allocation refused\n"); flow_dealloc(fd); + ap_fini(); return -1; } if (flow_write(fd, message, strlen(message) + 1) == -1) { printf("Failed to write SDU\n"); flow_dealloc(fd); + ap_fini(); return -1; } @@ -56,6 +62,7 @@ int client_main() if (count < 0) { printf("Failed to read SDU\n"); flow_dealloc(fd); + ap_fini(); return -1; } @@ -63,5 +70,7 @@ int client_main() flow_dealloc(fd); + ap_fini(); + return 0; } diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index e457e22b..4b1a17b1 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -20,6 +20,8 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include + #include #include #include @@ -27,71 +29,82 @@ #include +#ifdef OUROBOROS_CONFIG_DEBUG + #define OUROBOROS_PREFIX "echo-server" + #include +#endif + #define DIF_NAME "*" void shutdown_server(int signo) { char * dif = DIF_NAME; - if (ap_unreg(SERVER_AP_NAME, &dif, 1)) { - printf("Failed to unregister application\n"); + if (ap_unreg(&dif, 1)) { + printf("Failed to unregister application.\n"); + ap_fini(); exit(EXIT_FAILURE); } + ap_fini(); exit(EXIT_SUCCESS); } int server_main() { - int server_fd = 0; - int client_fd = 0; + int server_fd = 0; + int client_fd = 0; char * dif = DIF_NAME; char * client_name = NULL; - uint8_t buf[BUF_SIZE]; + char buf[BUF_SIZE]; ssize_t count = 0; - printf("Starting the server\n"); + printf("Starting the server.\n"); /* Manual cleanup is required for now */ if (signal(SIGINT, shutdown_server) == SIG_ERR) { - printf("Can't install signal handler\n"); + printf("Can't install signal handler.\n"); return -1; } - server_fd = ap_reg(SERVER_AP_NAME, &dif, 1); - if (server_fd < 0) { - printf("Failed to register application\n"); + if(ap_init(SERVER_AP_NAME)) { return -1; } - printf("Echo server started...\n"); + server_fd = ap_reg(&dif, 1); + if (server_fd < 0) { + printf("Failed to register application.\n"); + ap_fini(); + return -1; + } while (true) { client_fd = flow_accept(server_fd, - client_name, NULL); + &client_name, NULL); if (client_fd < 0) { - continue; + printf("Failed to accept flow.\n"); + break; } - printf("New flow from %s\n", client_name); + printf("New flow from %s.\n", client_name); if (flow_alloc_resp(client_fd, 0)) { - printf("Failed to give an allocate response\n"); + printf("Failed to give an allocate response.\n"); flow_dealloc(client_fd); continue; } - count = flow_read(client_fd, buf, BUF_SIZE); + count = flow_read(client_fd, (void **) &buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU\n"); + printf("Failed to read SDU.\n"); flow_dealloc(client_fd); continue; } - printf("Message from client is %.*s\n", (int) count, buf); + printf("Message from client is %.*s.\n", (int) count, buf); if (flow_write(client_fd, buf, count) == -1) { - printf("Failed to write SDU\n"); + printf("Failed to write SDU.\n"); flow_dealloc(client_fd); continue; } @@ -99,5 +112,7 @@ int server_main() flow_dealloc(client_fd); } + ap_fini(); + return 0; } -- cgit v1.2.3