diff options
author | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-05-07 16:11:09 +0200 |
---|---|---|
committer | dimitri staessens <dimitri.staessens@intec.ugent.be> | 2016-05-07 16:11:09 +0200 |
commit | eb9f44379d5316e7f7e9311d7a66d2041eca743a (patch) | |
tree | 2489605a42bb2c9582c0c4e912c2de0c40512b2a /src/ipcpd | |
parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
download | ouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.tar.gz ouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.zip |
irmd: flow allocation and fast path
This commit has a first implementation of flow allocation (the "slow
path") and read/write (the "fast path") for ouroboros. It provides
basic but unstable communications over the shared memory.
It required a lot of changes all over the stack, and fixes a number of
previously undetected issues.
This PR still need heavy revision regarding data model, locking and
cleanup.
lib/dev: modifications to the API. It now uses an ap_init() call to
set the AP name and sets the Instance ID to the pid of the process. It
also binds the AP to the shared memory and creates tables for mappings
in the fast path. A call to ap_fini() releases the resources.
lib/shm_ap_rbuff: added ring buffer for data exchange between
processes in the fast path. It passes an index in the shm_du_map.
lib/shm_du_map: rewrote API to work with calls from dev.c. Garbage
collector added. Tests updated to new API.
ipcpd/ipcp-data: removed everything related to flows, as these are
universal for all ap's and kept in ap_data (dev.c), or similar structs
for shim ipcps.
shim-udp: added flow allocator and read/write functions and shm
elements.
irmd: revised data model and structures necessary for flow allocation.
tools: echo updated to new dev.h API.
messaging system was updated to comply with new flow allocation
messages. All exchanges use pid and port_id to bootstrap the fast
path.
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/flow.c | 38 | ||||
-rw-r--r-- | src/ipcpd/flow.h | 12 | ||||
-rw-r--r-- | src/ipcpd/ipcp-data.c | 104 | ||||
-rw-r--r-- | src/ipcpd/ipcp-data.h | 16 | ||||
-rw-r--r-- | src/ipcpd/ipcp-ops.h | 9 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 20 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 3 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 524 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/tests/shim_udp_test.c | 12 |
9 files changed, 408 insertions, 330 deletions
diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c index c436733b..ae8f848c 100644 --- a/src/ipcpd/flow.c +++ b/src/ipcpd/flow.c @@ -27,7 +27,7 @@ #include <ouroboros/logs.h> -flow_t * flow_create(int32_t port_id) +flow_t * flow_create(uint32_t port_id) { flow_t * flow = malloc(sizeof *flow); if (flow == NULL) { @@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id) INIT_LIST_HEAD(&flow->list); flow->port_id = port_id; - flow->oflags = FLOW_O_DEFAULT; - flow->state = FLOW_NULL; + flow->state = FLOW_NULL; pthread_mutex_init(&flow->lock, NULL); @@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow) return; free(flow); } - -int flow_set_opts(flow_t * flow, uint16_t opts) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return -1; - } - - pthread_mutex_lock(&flow->lock); - - if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) { - flow->oflags = FLOW_O_DEFAULT; - pthread_mutex_unlock(&flow->lock); - LOG_WARN("Invalid flow options. Setting default."); - return -1; - } - - flow->oflags = opts; - - pthread_mutex_unlock(&flow->lock); - - return 0; -} - -uint16_t flow_get_opts(const flow_t * flow) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return FLOW_O_INVALID; - } - - return flow->oflags; -} diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index 000de5ad..0a3e90d1 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -25,6 +25,7 @@ #include <ouroboros/common.h> #include <ouroboros/list.h> +#include <ouroboros/shm_ap_rbuff.h> #include <pthread.h> /* same values as fcntl.h */ @@ -47,17 +48,14 @@ enum flow_state { typedef struct flow { struct list_head list; - int32_t port_id; - uint16_t oflags; - enum flow_state state; + uint32_t port_id; + struct shm_ap_rbuff * rb; + enum flow_state state; pthread_mutex_t lock; } flow_t; -flow_t * flow_create(int32_t port_id); +flow_t * flow_create(uint32_t port_id); void flow_destroy(flow_t * flow); -int flow_set_opts(flow_t * flow, uint16_t opts); -uint16_t flow_get_opts(const flow_t * flow); - #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 72407a53..76fc4bcd 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create() if (data == NULL) return NULL; - data->iname = NULL; data->type = 0; - data->dum = NULL; return data; } struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type) { if (dst == NULL) return NULL; - dst->iname = instance_name_create(); - if (dst->iname == NULL) - return NULL; - - if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - dst->type = ipcp_type; - dst->dum = shm_du_map_open(); - if (dst->dum == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - /* init the lists */ INIT_LIST_HEAD(&dst->registry); - INIT_LIST_HEAD(&dst->flows); INIT_LIST_HEAD(&dst->directory); /* init the mutexes */ pthread_mutex_init(&dst->reg_lock, NULL); pthread_mutex_init(&dst->dir_lock, NULL); - pthread_mutex_init(&dst->flow_lock, NULL); return dst; } @@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data) dir_entry_destroy(list_entry(h, struct dir_entry, list)); } -static void clear_flows(struct ipcp_data * data) -{ - struct list_head * h; - struct list_head * t; - list_for_each_safe(h, t, &data->flows) - flow_destroy(list_entry(h, flow_t, list)); - -} - void ipcp_data_destroy(struct ipcp_data * data) { if (data == NULL) return; - /* FIXME: finish all pending operations here */ - - if (data->iname != NULL) - instance_name_destroy(data->iname); - data->iname = NULL; - - if (data->dum != NULL) - shm_du_map_close(data->dum); - data->dum = NULL; + /* FIXME: finish all pending operations here and cancel all threads */ pthread_mutex_lock(&data->reg_lock); pthread_mutex_lock(&data->dir_lock); - pthread_mutex_lock(&data->flow_lock); /* clear the lists */ clear_registry(data); clear_directory(data); - clear_flows(data); /* * no need to unlock, just free the entire thing - * pthread_mutex_unlock(&data->flow_lock); * pthread_mutex_unlock(&data->dir_lock); * pthread_mutex_unlock(&data->reg_lock); */ @@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data, return addr; } - -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id) -{ - struct list_head * h; - list_for_each(h, &data->flows) { - flow_t * f = list_entry(h, flow_t, list); - if (f->port_id == port_id) - return f; - } - - return NULL; -} - -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id) -{ - return ipcp_data_find_flow(data, port_id) != NULL; -} - -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow) -{ - if (data == NULL || flow == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - if (ipcp_data_has_flow(data, flow->port_id)) { - pthread_mutex_unlock(&data->flow_lock); - return -2; - } - - list_add(&flow->list,&data->flows); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} - -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id) -{ - flow_t * f; - - if (data == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - f = ipcp_data_find_flow(data, port_id); - if (f == NULL) - return -1; - - list_del(&f->list); - - free(f); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 1dea8c3c..2e86ba11 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -34,17 +34,11 @@ #include "flow.h" struct ipcp_data { - instance_name_t * iname; enum ipcp_type type; - struct shm_du_map * dum; - struct list_head registry; pthread_mutex_t reg_lock; - struct list_head flows; - pthread_mutex_t flow_lock; - struct list_head directory; pthread_mutex_t dir_lock; @@ -53,7 +47,6 @@ struct ipcp_data { struct ipcp_data * ipcp_data_create(); struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type); void ipcp_data_destroy(struct ipcp_data * data); @@ -73,13 +66,4 @@ bool ipcp_data_is_in_directory(struct ipcp_data * data, const char * ap_name); uint64_t ipcp_data_get_addr(struct ipcp_data * data, const char * ap_name); -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id); -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id); -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow); -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id); - #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index 2ccb2e59..91b6cac9 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -39,20 +39,15 @@ struct ipcp_ops { int (* ipcp_name_reg)(char * name); int (* ipcp_name_unreg)(char * name); int (* ipcp_flow_alloc)(uint32_t port_id, + pid_t n_pid, char * dst_ap_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos); int (* ipcp_flow_alloc_resp)(uint32_t port_id, + pid_t n_pid, int response); int (* ipcp_flow_dealloc)(uint32_t port_id); - - /* FIXME: let's see how this will work with the shm_du_map */ - int (* ipcp_du_write)(uint32_t port_id, - size_t map_index); - - int (* ipcp_du_read)(uint32_t port_id, - size_t map_index); }; #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d6f373cd..13632a80 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[]) return 0; } -int ipcp_main_loop(struct ipcp * _ipcp) +void * ipcp_main_loop(void * o) { int lsockfd; int sockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; + struct ipcp * _ipcp = (struct ipcp *) o; ipcp_msg_t * msg; ssize_t count; @@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp) if (_ipcp == NULL) { LOG_ERR("Invalid ipcp struct."); - return 1; + return (void *) 1; } sockfd = server_socket_open(ipcp_sock_path(getpid())); if (sockfd < 0) { LOG_ERR("Could not open server socket."); - return 1; + return (void *) 1; } while (true) { @@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) conf.max_pdu_size = conf_msg->max_pdu_size; } if (conf_msg->ipcp_type == IPCP_SHIM_UDP) { - conf.ip_addr = conf_msg->ip_addr; + conf.ip_addr = conf_msg->ip_addr; conf.dns_addr = conf_msg->dns_addr; } @@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len); + _ipcp->ops->ipcp_unreg(msg->dif_names, + msg->len); break; case IPCP_MSG_CODE__IPCP_NAME_REG: if (_ipcp->ops->ipcp_name_reg == NULL) { @@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp) LOG_ERR("Flow_alloc unsupported."); break; } - ret_msg.has_fd = true; - ret_msg.fd = + ret_msg.has_result = true; + ret_msg.result = _ipcp->ops->ipcp_flow_alloc(msg->port_id, + msg->pid, msg->dst_name, msg->src_ap_name, msg->src_ae_name, @@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) ret_msg.has_result = true; ret_msg.result = _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, + msg->pid, msg->result); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: @@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp) close(lsockfd); } - return 0; + return NULL; } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9decac8b..393af994 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -43,7 +43,8 @@ struct ipcp { int irmd_fd; }; -int ipcp_main_loop(); +void * ipcp_main_loop(void * o); +void * ipcp_sdu_loop(void * o); int ipcp_arg_check(int argc, char * argv[]); #endif diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 460fe9e3..1f7bb12f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,12 +24,13 @@ #include "ipcp.h" #include "flow.h" #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/ipcp.h> #include <ouroboros/dif_config.h> #include <ouroboros/sockets.h> -#include <ouroboros/dev.h> +#include <ouroboros/bitmap.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */ struct ipcp * _ipcp; #endif +/* + * copied from ouroboros/dev. The shim needs access to the internals + * because it doesn't follow all steps necessary steps to get + * the info + */ + +#define UNKNOWN_AP "__UNKNOWN_AP__" +#define UNKNOWN_AE "__UNKNOWN_AE__" + +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +/* the shim needs access to these internals */ +struct shim_ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader[2]; + int ping_pong; +} * _ap_instance; + +int shim_ap_init(char * ap_name) +{ + _ap_instance = malloc(sizeof(struct shim_ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void shim_ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; i ++) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} + +static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count, + 0, + buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + + if (index == -1) + return -1; + + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; + } + + return 0; +} + +/* + * end copy from dev.c + */ + struct ipcp_udp_data { /* keep ipcp_data first for polymorphism */ struct ipcp_data ipcp_data; @@ -79,39 +218,15 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - flow_t * fd_to_flow_ptr[FD_SETSIZE]; - pthread_mutex_t lock; + pthread_mutex_t lock; }; -struct udp_flow { - /* keep flow first for polymorphism */ - flow_t flow; - int fd; -}; - -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) -{ - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGHUP: - LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); - if (info->si_pid == irmd_pid) { - /* shm_du_map_close(_ipcp->data->dum); */ - exit(0); - } - default: - return; - } -} - -struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) +struct ipcp_udp_data * ipcp_udp_data_create() { struct ipcp_udp_data * udp_data; struct ipcp_data * data; enum ipcp_type ipcp_type; - int n; udp_data = malloc(sizeof *udp_data); if (udp_data == NULL) { @@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) ipcp_type = THIS_TYPE; data = (struct ipcp_data *) udp_data; - if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) { + if (ipcp_data_init(data, ipcp_type) == NULL) { free(udp_data); return NULL; } FD_ZERO(&udp_data->flow_fd_s); - for (n = 0; n < FD_SETSIZE; ++n) - udp_data->fd_to_flow_ptr[n] = NULL; return udp_data; } +void ipcp_udp_data_destroy(struct ipcp_udp_data * data) +{ + if (data == NULL) + return; + + ipcp_data_destroy((struct ipcp_data *) data); +} + +void ipcp_udp_destroy(struct ipcp * ipcp) +{ + ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data); + shim_ap_fini(); + free(ipcp); +} + +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGHUP: + if (info->si_pid == irmd_pid || info->si_pid == 0) { + LOG_DBG("Terminating by order of %d. Bye.", + info->si_pid); + pthread_cancel(_ap_instance->mainloop); + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader[0]); + pthread_cancel(_ap_instance->sdu_reader[1]); + pthread_cancel(_ap_instance->sduloop); + exit(0); + } + default: + return; + } +} + static void * ipcp_udp_listener() { char buf[SHIM_UDP_BUF_SIZE]; @@ -141,10 +290,10 @@ static void * ipcp_udp_listener() struct sockaddr_in f_saddr; struct sockaddr_in c_saddr; struct hostent * hostp; - struct udp_flow * flow; int sfd = shim_data(_ipcp)->s_fd; while (true) { + int fd; n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); @@ -157,16 +306,7 @@ static void * ipcp_udp_listener() if (hostp == NULL) continue; - /* create a new socket for the server */ - flow = malloc(sizeof *flow); - if (flow == NULL) - continue; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - continue; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); memset((char *) &f_saddr, 0, sizeof f_saddr); f_saddr.sin_family = AF_INET; @@ -185,36 +325,33 @@ static void * ipcp_udp_listener() * the flow structure */ - if (connect(flow->fd, + if (connect(fd, (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { - close(flow->fd); - free(flow); + close(fd); continue; } + /* echo back the packet */ + while(send(fd, buf, strlen(buf), 0) < 0) + ; + /* reply to IRM */ - flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf, - UNKNOWN_AP, ""); - if (flow->flow.port_id < 0) { + _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + if (_ap_instance->flows[fd].port_id < 0) { LOG_ERR("Could not get port id from IRMd"); - close(flow->fd); - free(flow); + close(fd); continue; } - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - close(flow->fd); - free(flow); - continue; - } + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", + _ap_instance->flows[fd].port_id, fd); } return 0; @@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader() struct sockaddr_in r_saddr; while (true) { - flow_t * flow; - if (select(FD_SETSIZE, &shim_data(_ipcp)->flow_fd_s, NULL, NULL, NULL) @@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader() (struct sockaddr *) &r_saddr, (unsigned *) &n); - flow = shim_data(_ipcp)->fd_to_flow_ptr[fd]; - if (flow->state == FLOW_PENDING) { - if (connect(fd, - (struct sockaddr *) &r_saddr, - sizeof r_saddr) - < 0) - continue; - flow->state = FLOW_ALLOCATED; - } - /* send the sdu to the correct port_id */ - LOG_MISSING; + ipcp_udp_flow_write(fd, buf, n); } } @@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - pthread_t handler; - pthread_t sdu_reader; int enable = 1; if (conf->type != THIS_TYPE) { @@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) dnsstr, INET_ADDRSTRLEN); else - strcpy(dnsstr, "not set.\n"); + strcpy(dnsstr, "not set"); shim_data(_ipcp)->ip_addr = conf->ip_addr; shim_data(_ipcp)->dns_addr = conf->dns_addr; @@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { + socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { LOG_DBGF("Can't create socket."); return -1; } @@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - pthread_create(&handler, NULL, ipcp_udp_listener, NULL); - pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_create(&_ap_instance->handler, + NULL, + ipcp_udp_listener, + NULL); + pthread_create(&_ap_instance->sdu_reader[0], + NULL, + ipcp_udp_sdu_reader, + NULL); + + pthread_create(&_ap_instance->sdu_reader[1], + NULL, + ipcp_udp_sdu_reader, + NULL); + + _ap_instance->ping_pong = 0; _ipcp->state = IPCP_ENROLLED; - LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.", - _ipcp->data->iname->name, _ipcp->data->iname->id); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", + getpid()); LOG_DBG("Bound to IP address %s.", ipstr); LOG_DBG("DNS server address is %s.", dnsstr); @@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name) } int ipcp_udp_flow_alloc(uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos) { - struct udp_flow * flow = NULL; struct sockaddr_in l_saddr; struct sockaddr_in r_saddr; + struct sockaddr_in rf_saddr; + int fd; + int n; + + char * recv_buf = NULL; struct hostent * h; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; - LOG_DBG("Received flow allocation request from %s to %s.", - src_ap_name, dst_name); - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t port_id, if (qos != NULL) LOG_DBGF("QoS requested. UDP/IP can't do that."); - flow = malloc(sizeof *flow); - if (flow == NULL) - return -1; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - return -1; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ memset((char *) &l_saddr, 0, sizeof l_saddr); @@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t port_id, l_saddr.sin_addr.s_addr = local_ip; l_saddr.sin_port = 0; - if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { - char ipstr[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, - &l_saddr.sin_addr.s_addr, - ipstr, - INET_ADDRSTRLEN); - close(flow->fd); - free(flow); + if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { + close(fd); return -1; } h = gethostbyname(dst_name); if (h == NULL) { LOG_DBGF("Could not resolve %s.", dst_name); - close(flow->fd); - free(flow); + close(fd); return -1; } - memset((char *) &r_saddr, 0, sizeof r_saddr); r_saddr.sin_family = AF_INET; - r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]); + r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0])); r_saddr.sin_port = LISTEN_PORT; + /* at least try to get the packet on the wire */ - while (sendto(flow->fd, dst_name, strlen(dst_name), 0, + while (sendto(fd, dst_name, strlen(dst_name), 0, (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { } - flow->flow.port_id = port_id; - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - /* add flow to the list */ + /* wait for the other shim IPCP to respond */ - pthread_mutex_lock(&_ipcp->data->flow_lock); + recv_buf = malloc(strlen(dst_name) + 1); + n = sizeof(rf_saddr); + n = recvfrom(fd, + recv_buf, + strlen(dst_name) + 1, + 0, + (struct sockaddr *) &rf_saddr, + (unsigned *) &n); - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - pthread_mutex_unlock(&_ipcp->data->flow_lock); - close(flow->fd); - free(flow); + if (connect(fd, + (struct sockaddr *) &rf_saddr, + sizeof rf_saddr) + < 0) { + free(recv_buf); return -1; } - pthread_mutex_unlock(&_ipcp->data->flow_lock); + if (!strcmp(recv_buf, dst_name)) + LOG_WARN("Incorrect echo from server"); + + free(recv_buf); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + close(fd); + } /* tell IRMd that flow allocation "worked" */ - if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) { + if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { LOG_ERR("Failed to notify IRMd about flow allocation reply"); - close(flow->fd); - ipcp_data_del_flow(_ipcp->data, flow->flow.port_id); + close(fd); + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - return 0; + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd); + + return fd; } int ipcp_udp_flow_alloc_resp(uint32_t port_id, + pid_t n_pid, int response) { - struct udp_flow * flow = - (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id); - if (flow == NULL) { - return -1; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; } - if (response) { - ipcp_data_del_flow(_ipcp->data, port_id); + if (response) return 0; - } /* awaken pending flow */ - if (flow->flow.state != FLOW_PENDING) + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + LOG_DBGF("Flow was not pending."); return -1; + } + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + return 0; + } - flow->flow.state = FLOW_ALLOCATED; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd); return 0; } int ipcp_udp_flow_dealloc(uint32_t port_id) { - return 0; -} + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } -int ipcp_udp_du_write(uint32_t port_id, - size_t map_index) -{ + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); return 0; } -int ipcp_udp_du_read(uint32_t port_id, - size_t map_index) +/* FIXME: may be crap, didn't think this one through */ +int ipcp_udp_flow_dealloc_arr(uint32_t port_id) { - return 0; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } + + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + return ipcp_flow_dealloc(0, port_id); } struct ipcp * ipcp_udp_create(char * ap_name) @@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name) struct ipcp_udp_data * data; struct ipcp_ops * ops; + if (shim_ap_init(ap_name) < 0) + return NULL; + i = malloc(sizeof *i); if (i == NULL) return NULL; - data = ipcp_udp_data_create(ap_name); + data = ipcp_udp_data_create(); if (data == NULL) { free(i); return NULL; @@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name) ops->ipcp_flow_alloc = ipcp_udp_flow_alloc; ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp; ops->ipcp_flow_dealloc = ipcp_udp_flow_dealloc; - ops->ipcp_du_read = ipcp_udp_du_read; - ops->ipcp_du_write = ipcp_udp_du_write; i->data = (struct ipcp_data *) data; i->ops = ops; @@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +/* FIXME: stop eating the CPU */ +void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); + int fd; + int len = 0; + char * buf; + if (e == NULL) + continue; + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) + continue; + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) + continue; + + if (len == 0) + continue; + + send(fd, buf, len, 0); + + shm_release_du_buff(_ap_instance->dum, e->index); + } + + return (void *) 1; +} + int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ @@ -680,6 +900,7 @@ int main (int argc, char * argv[]) sigaction(SIGINT, &sig_act, NULL); sigaction(SIGTERM, &sig_act, NULL); sigaction(SIGHUP, &sig_act, NULL); + sigaction(SIGPIPE, &sig_act, NULL); _ipcp = ipcp_udp_create(argv[2]); if (_ipcp == NULL) { @@ -687,7 +908,18 @@ int main (int argc, char * argv[]) exit(1); } - ipcp_main_loop(_ipcp); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); + pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->mainloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader[0], NULL); + pthread_join(_ap_instance->sdu_reader[1], NULL); + + ipcp_udp_destroy(_ipcp); + + shim_ap_fini(); exit(0); } diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c index 036f5877..e5e8b32d 100644 --- a/src/ipcpd/shim-udp/tests/shim_udp_test.c +++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c @@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv) _ipcp = ipcp_udp_create(ipcp_name); if (_ipcp == NULL) { LOG_ERR("Could not instantiate shim IPCP."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv) if (ipcp_udp_name_reg("bogus name")) { LOG_ERR("Failed to register application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } if (ipcp_udp_name_unreg("bogus name")) { LOG_ERR("Failed to unregister application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if (ipcp_udp_name_reg(bogus)) { LOG_ERR("Failed to register application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } @@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if(ipcp_udp_name_unreg(bogus)) { LOG_ERR("Failed to unregister application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(0); } |