diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/flow.c | 38 | ||||
-rw-r--r-- | src/ipcpd/flow.h | 12 | ||||
-rw-r--r-- | src/ipcpd/ipcp-data.c | 104 | ||||
-rw-r--r-- | src/ipcpd/ipcp-data.h | 16 | ||||
-rw-r--r-- | src/ipcpd/ipcp-ops.h | 9 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 20 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 3 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 524 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/tests/shim_udp_test.c | 12 | ||||
-rw-r--r-- | src/irmd/main.c | 760 | ||||
-rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/bitmap.c | 14 | ||||
-rw-r--r-- | src/lib/dev.c | 330 | ||||
-rw-r--r-- | src/lib/ipcp.c | 65 | ||||
-rw-r--r-- | src/lib/ipcpd_messages.proto | 6 | ||||
-rw-r--r-- | src/lib/irmd_messages.proto | 26 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 268 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 143 | ||||
-rw-r--r-- | src/lib/tests/shm_du_map_test.c | 53 | ||||
-rw-r--r-- | src/tools/echo/echo_client.c | 15 | ||||
-rw-r--r-- | src/tools/echo/echo_server.c | 53 |
21 files changed, 1714 insertions, 758 deletions
diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c index c436733b..ae8f848c 100644 --- a/src/ipcpd/flow.c +++ b/src/ipcpd/flow.c @@ -27,7 +27,7 @@ #include <ouroboros/logs.h> -flow_t * flow_create(int32_t port_id) +flow_t * flow_create(uint32_t port_id) { flow_t * flow = malloc(sizeof *flow); if (flow == NULL) { @@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id) INIT_LIST_HEAD(&flow->list); flow->port_id = port_id; - flow->oflags = FLOW_O_DEFAULT; - flow->state = FLOW_NULL; + flow->state = FLOW_NULL; pthread_mutex_init(&flow->lock, NULL); @@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow) return; free(flow); } - -int flow_set_opts(flow_t * flow, uint16_t opts) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return -1; - } - - pthread_mutex_lock(&flow->lock); - - if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) { - flow->oflags = FLOW_O_DEFAULT; - pthread_mutex_unlock(&flow->lock); - LOG_WARN("Invalid flow options. Setting default."); - return -1; - } - - flow->oflags = opts; - - pthread_mutex_unlock(&flow->lock); - - return 0; -} - -uint16_t flow_get_opts(const flow_t * flow) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return FLOW_O_INVALID; - } - - return flow->oflags; -} diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index 000de5ad..0a3e90d1 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -25,6 +25,7 @@ #include <ouroboros/common.h> #include <ouroboros/list.h> +#include <ouroboros/shm_ap_rbuff.h> #include <pthread.h> /* same values as fcntl.h */ @@ -47,17 +48,14 @@ enum flow_state { typedef struct flow { struct list_head list; - int32_t port_id; - uint16_t oflags; - enum flow_state state; + uint32_t port_id; + struct shm_ap_rbuff * rb; + enum flow_state state; pthread_mutex_t lock; } flow_t; -flow_t * flow_create(int32_t port_id); +flow_t * flow_create(uint32_t port_id); void flow_destroy(flow_t * flow); -int flow_set_opts(flow_t * flow, uint16_t opts); -uint16_t flow_get_opts(const flow_t * flow); - #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 72407a53..76fc4bcd 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create() if (data == NULL) return NULL; - data->iname = NULL; data->type = 0; - data->dum = NULL; return data; } struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type) { if (dst == NULL) return NULL; - dst->iname = instance_name_create(); - if (dst->iname == NULL) - return NULL; - - if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - dst->type = ipcp_type; - dst->dum = shm_du_map_open(); - if (dst->dum == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - /* init the lists */ INIT_LIST_HEAD(&dst->registry); - INIT_LIST_HEAD(&dst->flows); INIT_LIST_HEAD(&dst->directory); /* init the mutexes */ pthread_mutex_init(&dst->reg_lock, NULL); pthread_mutex_init(&dst->dir_lock, NULL); - pthread_mutex_init(&dst->flow_lock, NULL); return dst; } @@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data) dir_entry_destroy(list_entry(h, struct dir_entry, list)); } -static void clear_flows(struct ipcp_data * data) -{ - struct list_head * h; - struct list_head * t; - list_for_each_safe(h, t, &data->flows) - flow_destroy(list_entry(h, flow_t, list)); - -} - void ipcp_data_destroy(struct ipcp_data * data) { if (data == NULL) return; - /* FIXME: finish all pending operations here */ - - if (data->iname != NULL) - instance_name_destroy(data->iname); - data->iname = NULL; - - if (data->dum != NULL) - shm_du_map_close(data->dum); - data->dum = NULL; + /* FIXME: finish all pending operations here and cancel all threads */ pthread_mutex_lock(&data->reg_lock); pthread_mutex_lock(&data->dir_lock); - pthread_mutex_lock(&data->flow_lock); /* clear the lists */ clear_registry(data); clear_directory(data); - clear_flows(data); /* * no need to unlock, just free the entire thing - * pthread_mutex_unlock(&data->flow_lock); * pthread_mutex_unlock(&data->dir_lock); * pthread_mutex_unlock(&data->reg_lock); */ @@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data, return addr; } - -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id) -{ - struct list_head * h; - list_for_each(h, &data->flows) { - flow_t * f = list_entry(h, flow_t, list); - if (f->port_id == port_id) - return f; - } - - return NULL; -} - -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id) -{ - return ipcp_data_find_flow(data, port_id) != NULL; -} - -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow) -{ - if (data == NULL || flow == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - if (ipcp_data_has_flow(data, flow->port_id)) { - pthread_mutex_unlock(&data->flow_lock); - return -2; - } - - list_add(&flow->list,&data->flows); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} - -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id) -{ - flow_t * f; - - if (data == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - f = ipcp_data_find_flow(data, port_id); - if (f == NULL) - return -1; - - list_del(&f->list); - - free(f); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 1dea8c3c..2e86ba11 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -34,17 +34,11 @@ #include "flow.h" struct ipcp_data { - instance_name_t * iname; enum ipcp_type type; - struct shm_du_map * dum; - struct list_head registry; pthread_mutex_t reg_lock; - struct list_head flows; - pthread_mutex_t flow_lock; - struct list_head directory; pthread_mutex_t dir_lock; @@ -53,7 +47,6 @@ struct ipcp_data { struct ipcp_data * ipcp_data_create(); struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type); void ipcp_data_destroy(struct ipcp_data * data); @@ -73,13 +66,4 @@ bool ipcp_data_is_in_directory(struct ipcp_data * data, const char * ap_name); uint64_t ipcp_data_get_addr(struct ipcp_data * data, const char * ap_name); -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id); -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id); -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow); -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id); - #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index 2ccb2e59..91b6cac9 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -39,20 +39,15 @@ struct ipcp_ops { int (* ipcp_name_reg)(char * name); int (* ipcp_name_unreg)(char * name); int (* ipcp_flow_alloc)(uint32_t port_id, + pid_t n_pid, char * dst_ap_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos); int (* ipcp_flow_alloc_resp)(uint32_t port_id, + pid_t n_pid, int response); int (* ipcp_flow_dealloc)(uint32_t port_id); - - /* FIXME: let's see how this will work with the shm_du_map */ - int (* ipcp_du_write)(uint32_t port_id, - size_t map_index); - - int (* ipcp_du_read)(uint32_t port_id, - size_t map_index); }; #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d6f373cd..13632a80 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[]) return 0; } -int ipcp_main_loop(struct ipcp * _ipcp) +void * ipcp_main_loop(void * o) { int lsockfd; int sockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; + struct ipcp * _ipcp = (struct ipcp *) o; ipcp_msg_t * msg; ssize_t count; @@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp) if (_ipcp == NULL) { LOG_ERR("Invalid ipcp struct."); - return 1; + return (void *) 1; } sockfd = server_socket_open(ipcp_sock_path(getpid())); if (sockfd < 0) { LOG_ERR("Could not open server socket."); - return 1; + return (void *) 1; } while (true) { @@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) conf.max_pdu_size = conf_msg->max_pdu_size; } if (conf_msg->ipcp_type == IPCP_SHIM_UDP) { - conf.ip_addr = conf_msg->ip_addr; + conf.ip_addr = conf_msg->ip_addr; conf.dns_addr = conf_msg->dns_addr; } @@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len); + _ipcp->ops->ipcp_unreg(msg->dif_names, + msg->len); break; case IPCP_MSG_CODE__IPCP_NAME_REG: if (_ipcp->ops->ipcp_name_reg == NULL) { @@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp) LOG_ERR("Flow_alloc unsupported."); break; } - ret_msg.has_fd = true; - ret_msg.fd = + ret_msg.has_result = true; + ret_msg.result = _ipcp->ops->ipcp_flow_alloc(msg->port_id, + msg->pid, msg->dst_name, msg->src_ap_name, msg->src_ae_name, @@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) ret_msg.has_result = true; ret_msg.result = _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, + msg->pid, msg->result); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: @@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp) close(lsockfd); } - return 0; + return NULL; } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9decac8b..393af994 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -43,7 +43,8 @@ struct ipcp { int irmd_fd; }; -int ipcp_main_loop(); +void * ipcp_main_loop(void * o); +void * ipcp_sdu_loop(void * o); int ipcp_arg_check(int argc, char * argv[]); #endif diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 460fe9e3..1f7bb12f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,12 +24,13 @@ #include "ipcp.h" #include "flow.h" #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/ipcp.h> #include <ouroboros/dif_config.h> #include <ouroboros/sockets.h> -#include <ouroboros/dev.h> +#include <ouroboros/bitmap.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */ struct ipcp * _ipcp; #endif +/* + * copied from ouroboros/dev. The shim needs access to the internals + * because it doesn't follow all steps necessary steps to get + * the info + */ + +#define UNKNOWN_AP "__UNKNOWN_AP__" +#define UNKNOWN_AE "__UNKNOWN_AE__" + +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +/* the shim needs access to these internals */ +struct shim_ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader[2]; + int ping_pong; +} * _ap_instance; + +int shim_ap_init(char * ap_name) +{ + _ap_instance = malloc(sizeof(struct shim_ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void shim_ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; i ++) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} + +static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count, + 0, + buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + + if (index == -1) + return -1; + + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; + } + + return 0; +} + +/* + * end copy from dev.c + */ + struct ipcp_udp_data { /* keep ipcp_data first for polymorphism */ struct ipcp_data ipcp_data; @@ -79,39 +218,15 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - flow_t * fd_to_flow_ptr[FD_SETSIZE]; - pthread_mutex_t lock; + pthread_mutex_t lock; }; -struct udp_flow { - /* keep flow first for polymorphism */ - flow_t flow; - int fd; -}; - -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) -{ - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGHUP: - LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); - if (info->si_pid == irmd_pid) { - /* shm_du_map_close(_ipcp->data->dum); */ - exit(0); - } - default: - return; - } -} - -struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) +struct ipcp_udp_data * ipcp_udp_data_create() { struct ipcp_udp_data * udp_data; struct ipcp_data * data; enum ipcp_type ipcp_type; - int n; udp_data = malloc(sizeof *udp_data); if (udp_data == NULL) { @@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) ipcp_type = THIS_TYPE; data = (struct ipcp_data *) udp_data; - if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) { + if (ipcp_data_init(data, ipcp_type) == NULL) { free(udp_data); return NULL; } FD_ZERO(&udp_data->flow_fd_s); - for (n = 0; n < FD_SETSIZE; ++n) - udp_data->fd_to_flow_ptr[n] = NULL; return udp_data; } +void ipcp_udp_data_destroy(struct ipcp_udp_data * data) +{ + if (data == NULL) + return; + + ipcp_data_destroy((struct ipcp_data *) data); +} + +void ipcp_udp_destroy(struct ipcp * ipcp) +{ + ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data); + shim_ap_fini(); + free(ipcp); +} + +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGHUP: + if (info->si_pid == irmd_pid || info->si_pid == 0) { + LOG_DBG("Terminating by order of %d. Bye.", + info->si_pid); + pthread_cancel(_ap_instance->mainloop); + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader[0]); + pthread_cancel(_ap_instance->sdu_reader[1]); + pthread_cancel(_ap_instance->sduloop); + exit(0); + } + default: + return; + } +} + static void * ipcp_udp_listener() { char buf[SHIM_UDP_BUF_SIZE]; @@ -141,10 +290,10 @@ static void * ipcp_udp_listener() struct sockaddr_in f_saddr; struct sockaddr_in c_saddr; struct hostent * hostp; - struct udp_flow * flow; int sfd = shim_data(_ipcp)->s_fd; while (true) { + int fd; n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); @@ -157,16 +306,7 @@ static void * ipcp_udp_listener() if (hostp == NULL) continue; - /* create a new socket for the server */ - flow = malloc(sizeof *flow); - if (flow == NULL) - continue; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - continue; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); memset((char *) &f_saddr, 0, sizeof f_saddr); f_saddr.sin_family = AF_INET; @@ -185,36 +325,33 @@ static void * ipcp_udp_listener() * the flow structure */ - if (connect(flow->fd, + if (connect(fd, (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { - close(flow->fd); - free(flow); + close(fd); continue; } + /* echo back the packet */ + while(send(fd, buf, strlen(buf), 0) < 0) + ; + /* reply to IRM */ - flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf, - UNKNOWN_AP, ""); - if (flow->flow.port_id < 0) { + _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + if (_ap_instance->flows[fd].port_id < 0) { LOG_ERR("Could not get port id from IRMd"); - close(flow->fd); - free(flow); + close(fd); continue; } - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - close(flow->fd); - free(flow); - continue; - } + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", + _ap_instance->flows[fd].port_id, fd); } return 0; @@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader() struct sockaddr_in r_saddr; while (true) { - flow_t * flow; - if (select(FD_SETSIZE, &shim_data(_ipcp)->flow_fd_s, NULL, NULL, NULL) @@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader() (struct sockaddr *) &r_saddr, (unsigned *) &n); - flow = shim_data(_ipcp)->fd_to_flow_ptr[fd]; - if (flow->state == FLOW_PENDING) { - if (connect(fd, - (struct sockaddr *) &r_saddr, - sizeof r_saddr) - < 0) - continue; - flow->state = FLOW_ALLOCATED; - } - /* send the sdu to the correct port_id */ - LOG_MISSING; + ipcp_udp_flow_write(fd, buf, n); } } @@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - pthread_t handler; - pthread_t sdu_reader; int enable = 1; if (conf->type != THIS_TYPE) { @@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) dnsstr, INET_ADDRSTRLEN); else - strcpy(dnsstr, "not set.\n"); + strcpy(dnsstr, "not set"); shim_data(_ipcp)->ip_addr = conf->ip_addr; shim_data(_ipcp)->dns_addr = conf->dns_addr; @@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { + socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { LOG_DBGF("Can't create socket."); return -1; } @@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - pthread_create(&handler, NULL, ipcp_udp_listener, NULL); - pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_create(&_ap_instance->handler, + NULL, + ipcp_udp_listener, + NULL); + pthread_create(&_ap_instance->sdu_reader[0], + NULL, + ipcp_udp_sdu_reader, + NULL); + + pthread_create(&_ap_instance->sdu_reader[1], + NULL, + ipcp_udp_sdu_reader, + NULL); + + _ap_instance->ping_pong = 0; _ipcp->state = IPCP_ENROLLED; - LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.", - _ipcp->data->iname->name, _ipcp->data->iname->id); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", + getpid()); LOG_DBG("Bound to IP address %s.", ipstr); LOG_DBG("DNS server address is %s.", dnsstr); @@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name) } int ipcp_udp_flow_alloc(uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos) { - struct udp_flow * flow = NULL; struct sockaddr_in l_saddr; struct sockaddr_in r_saddr; + struct sockaddr_in rf_saddr; + int fd; + int n; + + char * recv_buf = NULL; struct hostent * h; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; - LOG_DBG("Received flow allocation request from %s to %s.", - src_ap_name, dst_name); - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t port_id, if (qos != NULL) LOG_DBGF("QoS requested. UDP/IP can't do that."); - flow = malloc(sizeof *flow); - if (flow == NULL) - return -1; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - return -1; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ memset((char *) &l_saddr, 0, sizeof l_saddr); @@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t port_id, l_saddr.sin_addr.s_addr = local_ip; l_saddr.sin_port = 0; - if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { - char ipstr[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, - &l_saddr.sin_addr.s_addr, - ipstr, - INET_ADDRSTRLEN); - close(flow->fd); - free(flow); + if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { + close(fd); return -1; } h = gethostbyname(dst_name); if (h == NULL) { LOG_DBGF("Could not resolve %s.", dst_name); - close(flow->fd); - free(flow); + close(fd); return -1; } - memset((char *) &r_saddr, 0, sizeof r_saddr); r_saddr.sin_family = AF_INET; - r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]); + r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0])); r_saddr.sin_port = LISTEN_PORT; + /* at least try to get the packet on the wire */ - while (sendto(flow->fd, dst_name, strlen(dst_name), 0, + while (sendto(fd, dst_name, strlen(dst_name), 0, (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { } - flow->flow.port_id = port_id; - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - /* add flow to the list */ + /* wait for the other shim IPCP to respond */ - pthread_mutex_lock(&_ipcp->data->flow_lock); + recv_buf = malloc(strlen(dst_name) + 1); + n = sizeof(rf_saddr); + n = recvfrom(fd, + recv_buf, + strlen(dst_name) + 1, + 0, + (struct sockaddr *) &rf_saddr, + (unsigned *) &n); - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - pthread_mutex_unlock(&_ipcp->data->flow_lock); - close(flow->fd); - free(flow); + if (connect(fd, + (struct sockaddr *) &rf_saddr, + sizeof rf_saddr) + < 0) { + free(recv_buf); return -1; } - pthread_mutex_unlock(&_ipcp->data->flow_lock); + if (!strcmp(recv_buf, dst_name)) + LOG_WARN("Incorrect echo from server"); + + free(recv_buf); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + close(fd); + } /* tell IRMd that flow allocation "worked" */ - if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) { + if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { LOG_ERR("Failed to notify IRMd about flow allocation reply"); - close(flow->fd); - ipcp_data_del_flow(_ipcp->data, flow->flow.port_id); + close(fd); + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - return 0; + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd); + + return fd; } int ipcp_udp_flow_alloc_resp(uint32_t port_id, + pid_t n_pid, int response) { - struct udp_flow * flow = - (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id); - if (flow == NULL) { - return -1; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; } - if (response) { - ipcp_data_del_flow(_ipcp->data, port_id); + if (response) return 0; - } /* awaken pending flow */ - if (flow->flow.state != FLOW_PENDING) + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + LOG_DBGF("Flow was not pending."); return -1; + } + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + return 0; + } - flow->flow.state = FLOW_ALLOCATED; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd); return 0; } int ipcp_udp_flow_dealloc(uint32_t port_id) { - return 0; -} + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } -int ipcp_udp_du_write(uint32_t port_id, - size_t map_index) -{ + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); return 0; } -int ipcp_udp_du_read(uint32_t port_id, - size_t map_index) +/* FIXME: may be crap, didn't think this one through */ +int ipcp_udp_flow_dealloc_arr(uint32_t port_id) { - return 0; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } + + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + return ipcp_flow_dealloc(0, port_id); } struct ipcp * ipcp_udp_create(char * ap_name) @@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name) struct ipcp_udp_data * data; struct ipcp_ops * ops; + if (shim_ap_init(ap_name) < 0) + return NULL; + i = malloc(sizeof *i); if (i == NULL) return NULL; - data = ipcp_udp_data_create(ap_name); + data = ipcp_udp_data_create(); if (data == NULL) { free(i); return NULL; @@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name) ops->ipcp_flow_alloc = ipcp_udp_flow_alloc; ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp; ops->ipcp_flow_dealloc = ipcp_udp_flow_dealloc; - ops->ipcp_du_read = ipcp_udp_du_read; - ops->ipcp_du_write = ipcp_udp_du_write; i->data = (struct ipcp_data *) data; i->ops = ops; @@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +/* FIXME: stop eating the CPU */ +void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); + int fd; + int len = 0; + char * buf; + if (e == NULL) + continue; + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) + continue; + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) + continue; + + if (len == 0) + continue; + + send(fd, buf, len, 0); + + shm_release_du_buff(_ap_instance->dum, e->index); + } + + return (void *) 1; +} + int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ @@ -680,6 +900,7 @@ int main (int argc, char * argv[]) sigaction(SIGINT, &sig_act, NULL); sigaction(SIGTERM, &sig_act, NULL); sigaction(SIGHUP, &sig_act, NULL); + sigaction(SIGPIPE, &sig_act, NULL); _ipcp = ipcp_udp_create(argv[2]); if (_ipcp == NULL) { @@ -687,7 +908,18 @@ int main (int argc, char * argv[]) exit(1); } - ipcp_main_loop(_ipcp); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); + pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->mainloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader[0], NULL); + pthread_join(_ap_instance->sdu_reader[1], NULL); + + ipcp_udp_destroy(_ipcp); + + shim_ap_fini(); exit(0); } diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c index 036f5877..e5e8b32d 100644 --- a/src/ipcpd/shim-udp/tests/shim_udp_test.c +++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c @@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv) _ipcp = ipcp_udp_create(ipcp_name); if (_ipcp == NULL) { LOG_ERR("Could not instantiate shim IPCP."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv) if (ipcp_udp_name_reg("bogus name")) { LOG_ERR("Failed to register application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } if (ipcp_udp_name_unreg("bogus name")) { LOG_ERR("Failed to unregister application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if (ipcp_udp_name_reg(bogus)) { LOG_ERR("Failed to register application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } @@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if(ipcp_udp_name_unreg(bogus)) { LOG_ERR("Failed to unregister application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(0); } diff --git a/src/irmd/main.c b/src/irmd/main.c index 67254feb..75b8506e 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,6 +34,7 @@ #include <ouroboros/utils.h> #include <ouroboros/dif_config.h> #include <ouroboros/shm_du_map.h> +#include <ouroboros/bitmap.h> #include <sys/socket.h> #include <sys/un.h> @@ -42,41 +43,146 @@ #include <errno.h> #include <string.h> #include <limits.h> +#include <pthread.h> /* FIXME: this smells like part of namespace management */ #define ALL_DIFS "*" +#ifndef IRMD_MAX_FLOWS + #define IRMD_MAX_FLOWS 4096 +#endif + +#ifndef IRMD_THREADPOOL_SIZE + #define IRMD_THREADPOOL_SIZE 10 +#endif + + + +enum flow_state { + FLOW_NULL = 0, + FLOW_PENDING, + FLOW_ALLOCATED +}; + struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; + + pthread_mutex_t lock; }; -/* currently supports only registering whatevercast groups of a single AP */ +/* currently supports only registering whatevercast groups of a single AP-I */ struct reg_name_entry { struct list_head next; /* generic whatevercast name */ char * name; - /* FIXME: resolve name instead */ + /* FIXME: make a list resolve to AP-I instead */ instance_name_t * api; - uint32_t reg_ap_id; + + bool accept; + char * req_ap_name; + char * req_ae_name; + bool flow_arrived; + + pthread_mutex_t fa_lock; +}; + +/* keeps track of port_id's between N and N - 1 */ +struct port_map_entry { + struct list_head next; + + uint32_t port_id; + + pid_t n_pid; + pid_t n_1_pid; + + enum flow_state state; }; struct irm { - /* FIXME: list of ipcps can be merged with registered names */ + /* FIXME: list of ipcps could be merged with registered names */ struct list_head ipcps; struct list_head reg_names; + int sockfd; + + /* keep track of all flows in this processing system */ + struct bmp * port_ids; + + /* maps port_ids to pid pair */ + struct list_head port_map; + struct shm_du_map * dum; -}; -struct irm * instance = NULL; + pthread_t * threadpool; + + pthread_mutex_t lock; +} * instance = NULL; -static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) +static struct port_map_entry * get_port_map_entry(uint32_t port_id) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &instance->port_map) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + if (e->port_id == port_id) + return e; + } + + return NULL; +} + +static struct port_map_entry * get_port_map_entry_n(pid_t n_pid) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &instance->port_map) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + if (e->n_pid == n_pid) + return e; + } + + return NULL; +} + +static struct ipcp_entry * ipcp_entry_create() +{ + struct ipcp_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->api = NULL; + e->dif_name = NULL; + + INIT_LIST_HEAD(&e->next); + pthread_mutex_init(&e->lock, NULL); + + return e; +} + +static void ipcp_entry_destroy(struct ipcp_entry * e) +{ + if (e == NULL) + return; + + if (e->api != NULL) + instance_name_destroy(e->api); + + if (e->dif_name != NULL) + free(e->dif_name); + + free(e); +} + +static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api) { - struct ipcp_entry * tmp = NULL; struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { @@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) return tmp; } - return tmp; + return NULL; } static instance_name_t * get_ipcp_by_name(char * ap_name) @@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create() if (e == NULL) return NULL; - e->reg_ap_id = rand() % INT_MAX; - e->name = NULL; + e->name = NULL; + e->api = NULL; + e->accept = false; + e->req_ap_name = NULL; + e->req_ae_name = NULL; + e->flow_arrived = false; + pthread_mutex_init(&e->fa_lock, NULL); INIT_LIST_HEAD(&e->next); return e; @@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create() static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e, char * name, - instance_name_t * api) + instance_name_t * api) { if (e == NULL || name == NULL || api == NULL) return NULL; @@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e) free(e->name); instance_name_destroy(e->api); + + if (e->req_ap_name != NULL) + free(e->req_ap_name); + if (e->req_ae_name != NULL) + free(e->req_ae_name); + + free(e); + return 0; } -static struct reg_name_entry * find_reg_name_entry_by_name(char * name) +static struct reg_name_entry * get_reg_name_entry_by_name(char * name) { struct list_head * pos = NULL; @@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name) return NULL; } -static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) +static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid) { struct list_head * pos = NULL; @@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) struct reg_name_entry * e = list_entry(pos, struct reg_name_entry, next); - if (reg_ap_id == e->reg_ap_id) + if (e->api->id == pid) return e; } @@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) /* FIXME: add only name when we have NSM solved */ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) { - struct reg_name_entry * e = find_reg_name_entry_by_name(name); + struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) { e = reg_name_entry_create(); - e = reg_name_entry_init(e, name, api); + if (e == NULL) + return -1; + + if (reg_name_entry_init(e, name, api) == NULL) { + reg_name_entry_destroy(e); + return -1; + } + list_add(&e->next, &instance->reg_names); return 0; } @@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) static int reg_name_entry_del_name(char * name) { - struct reg_name_entry * e = find_reg_name_entry_by_name(name); + struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) return 0; @@ -240,34 +366,38 @@ static pid_t create_ipcp(char * ap_name, pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { - LOG_ERR("Failed to create IPCP"); + LOG_ERR("Failed to create IPCP."); return -1; } - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) { + tmp = ipcp_entry_create(); + if (tmp == NULL) return -1; - } INIT_LIST_HEAD(&tmp->next); tmp->api = instance_name_create(); if (tmp->api == NULL) { - free(tmp); + ipcp_entry_destroy(tmp); return -1; } if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) { instance_name_destroy(tmp->api); - free(tmp); + ipcp_entry_destroy(tmp); return -1; } tmp->dif_name = NULL; - LOG_DBG("Created IPC process with pid %d", pid); + pthread_mutex_lock(&instance->lock); list_add(&tmp->next, &instance->ipcps); + + pthread_mutex_unlock(&instance->lock); + + LOG_INFO("Created IPCP %s-%d ", ap_name, pid); + return pid; } @@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api) struct list_head * pos = NULL; struct list_head * n = NULL; + if (api == NULL) + return 0; + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { LOG_ERR("No such IPCP in the system."); - return -1; + return 0; } - LOG_DBG("Destroying ipcp %s-%d", api->name, api->id); - if (ipcp_destroy(api->id)) - LOG_ERR("Could not destroy IPCP"); + LOG_ERR("Could not destroy IPCP."); list_for_each_safe(pos, n, &(instance->ipcps)) { struct ipcp_entry * tmp = @@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api) if (instance_name_cmp(api, tmp->api) == 0) list_del(&tmp->next); + + ipcp_entry_destroy(tmp); } + LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); + return 0; } @@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t * api, return -1; } - entry = find_ipcp_entry_by_name(api); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - LOG_ERR("No such IPCP"); + LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { - LOG_ERR("Failed to strdup"); + LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { - LOG_ERR("Could not bootstrap IPCP"); + LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + LOG_INFO("Bootstrapped IPCP %s-%d. in DIF %s", + api->name, api->id, conf->dif_name); + return 0; } @@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; - entry = find_ipcp_entry_by_name(api); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - LOG_ERR("No such IPCP"); + LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { - LOG_ERR("Failed to strdup"); + LOG_ERR("Failed to strdup."); return -1; } member = da_resolve_daf(dif_name); if (member == NULL) { - LOG_ERR("Could not find a member of that DIF"); + LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; return -1; @@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t * api, n_1_difs_size = da_resolve_dap(member, n_1_difs); if (n_1_difs_size < 1) { - LOG_ERR("Could not find N-1 DIFs"); + LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; return -1; } if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) { - LOG_ERR("Could not enroll IPCP"); + LOG_ERR("Could not enroll IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", + api->name, api->id, dif_name); + return 0; } @@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api, size_t difs_size) { if (ipcp_reg(api->id, difs, difs_size)) { - LOG_ERR("Could not register IPCP to N-1 DIF(s)"); + LOG_ERR("Could not register IPCP to N-1 DIF(s)."); return -1; } @@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t * api, { if (ipcp_unreg(api->id, difs, difs_size)) { - LOG_ERR("Could not unregister IPCP from N-1 DIF(s)"); + LOG_ERR("Could not unregister IPCP from N-1 DIF(s)."); return -1; } return 0; } -static int ap_unreg_id(uint32_t reg_ap_id, - pid_t pid, +static int ap_unreg_id(pid_t pid, char ** difs, size_t len) { int i; int ret = 0; - struct reg_name_entry * rne = NULL; - struct list_head * pos = NULL; + struct reg_name_entry * rne = NULL; + struct list_head * pos = NULL; - rne = find_reg_name_entry_by_id(reg_ap_id); + rne = get_reg_name_entry_by_id(pid); if (rne == NULL) return 0; /* no such id */ @@ -458,7 +598,6 @@ static int ap_reg(char * ap_name, { int i; int ret = 0; - int reg_ap_id = 0; struct list_head * pos = NULL; struct reg_name_entry * rne = NULL; @@ -466,18 +605,18 @@ static int ap_reg(char * ap_name, instance_name_t * ipcpi = NULL; if (instance->ipcps.next == NULL) - LOG_ERR("No IPCPs in this system."); + return -1; /* check if this ap_name is already registered */ - rne = find_reg_name_entry_by_name(ap_name); + rne = get_reg_name_entry_by_name(ap_name); if (rne != NULL) return -1; /* can only register one instance for now */ - rne = reg_name_entry_create(); - if (rne == NULL) + api = instance_name_create(); + if (api == NULL) { return -1; + } - api = instance_name_create(); if (instance_name_init_from(api, ap_name, ap_id) == NULL) { instance_name_destroy(api); return -1; @@ -488,12 +627,6 @@ static int ap_reg(char * ap_name, * contains a single instance only */ - if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) { - reg_name_entry_destroy(rne); - instance_name_destroy(api); - return -1; - } - if (strcmp(difs[0], ALL_DIFS) == 0) { list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = @@ -528,11 +661,10 @@ static int ap_reg(char * ap_name, return -1; } /* for now, we register single instances */ - reg_name_entry_add_name_instance(strdup(ap_name), - instance_name_dup(api)); - instance_name_destroy(api); + ret = reg_name_entry_add_name_instance(strdup(ap_name), + api); - return reg_ap_id; + return ret; } static int ap_unreg(char * ap_name, @@ -542,149 +674,304 @@ static int ap_unreg(char * ap_name, { struct reg_name_entry * tmp = NULL; - instance_name_t * api = instance_name_create(); - if (api == NULL) - return -1; - - if (instance_name_init_from(api, ap_name, ap_id) == NULL) { - instance_name_destroy(api); - return -1; - } - /* check if ap_name is registered */ - tmp = find_reg_name_entry_by_name(api->name); - if (tmp == NULL) { - instance_name_destroy(api); + tmp = get_reg_name_entry_by_id(ap_id); + if (tmp == NULL) return 0; - } else { - return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len); - } -} + if (strcmp(ap_name, tmp->api->name)) + return 0; -static int flow_accept(int fd, - pid_t pid, - char * ap_name, - char * ae_name) -{ - return -1; + return ap_unreg_id(ap_id, difs, len); } -static int flow_alloc_resp(int fd, - int result) +static struct port_map_entry * flow_accept(pid_t pid, + char ** ap_name, + char ** ae_name) { - return -1; + bool arrived = false; + + struct timespec ts = {0, 100000}; + + struct port_map_entry * pme; + struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); + if (rne == NULL) { + LOG_DBGF("Unregistered AP calling accept()."); + return NULL; + } + + if (rne->accept) { + LOG_DBGF("This AP still has a pending accept()."); + return NULL; + } + + rne->accept = true; + + /* FIXME: wait for a thread that runs select() on flow_arrived */ + while (!arrived) { + /* FIXME: this needs locking */ + rne = get_reg_name_entry_by_id(pid); + if (rne == NULL) + return NULL; + arrived = rne->flow_arrived; + nanosleep(&ts, NULL); + } + + pme = get_port_map_entry_n(pid); + if (pme == NULL) { + LOG_ERR("Port_id was not created yet."); + return NULL; + } + + pthread_mutex_lock(&rne->fa_lock); + *ap_name = rne->req_ap_name; + if (ae_name != NULL) + *ae_name = rne->req_ae_name; + pthread_mutex_unlock(&rne->fa_lock); + + return pme; } -static int flow_alloc(char * dst_name, - char * src_ap_name, - char * src_ae_name, - struct qos_spec * qos, - int oflags) +static int flow_alloc_resp(pid_t n_pid, + uint32_t port_id, + int response) { - int port_id = 0; - pid_t pid = get_ipcp_by_dst_name(dst_name)->id; + struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); + struct port_map_entry * pme = get_port_map_entry(port_id); - LOG_DBG("flow alloc received from %s-%s to %s.", - src_ap_name, src_ae_name, dst_name); + if (rne == NULL || pme == NULL) + return -1; + + /* FIXME: check all instances associated with the name */ + if (!rne->accept) { + LOG_ERR("No process listening for this name."); + return -1; + } + + /* + * consider the flow as handled + * once we can handle a list of AP-I's, remove it from the list + */ + + rne->flow_arrived = false; + rne->accept = false; + + if (!response) + pme->state = FLOW_ALLOCATED; - return ipcp_flow_alloc(pid, - port_id, - dst_name, - src_ap_name, - src_ae_name, - qos); + return ipcp_flow_alloc_resp(pme->n_1_pid, + port_id, + pme->n_pid, + response); } -static int flow_alloc_res(int fd) +static struct port_map_entry * flow_alloc(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name, + struct qos_spec * qos) { + struct port_map_entry * e = malloc(sizeof(*e)); + if (e == NULL) { + LOG_ERR("Failed malloc of port_map_entry."); + return NULL; + } - return -1; + e->port_id = bmp_allocate(instance->port_ids); + e->n_pid = pid; + e->state = FLOW_PENDING; + e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + + list_add(&e->next, &instance->port_map); + + if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, + e->port_id, + e->n_pid, + dst_name, + src_ap_name, + src_ae_name, + qos) < 0) { + list_del(&e->next); + bmp_release(instance->port_ids, e->port_id); + free(e); + return NULL; + } + + return e; } -static int flow_dealloc(int fd) +static int flow_alloc_res(uint32_t port_id) { - return -1; + bool allocated = false; + struct port_map_entry * e; + struct timespec ts = {0,100000}; + + while (!allocated) { + /* FIXME: this needs locking */ + e = get_port_map_entry(port_id); + if (e == NULL) { + LOG_DBGF("Could not locate port_id %u", port_id); + return -1; + } + if (e->state == FLOW_ALLOCATED) + allocated = true; + nanosleep(&ts, NULL); + } + + return 0; } -static int flow_cntl(int fd, - int oflags) +static int flow_dealloc(uint32_t port_id) { - return -1; + pid_t n_1_pid; + + struct port_map_entry * e = get_port_map_entry(port_id); + if (e == NULL) + return 0; + + n_1_pid = e->n_1_pid; + + list_del(&e->next); + free(e); + + return ipcp_flow_dealloc(n_1_pid, port_id); } -static int flow_req_arr(char * dst_name, - char * ap_name, - char * ae_name) +static struct port_map_entry * flow_req_arr(pid_t pid, + char * dst_name, + char * ap_name, + char * ae_name) { - return -1; + struct reg_name_entry * rne; + struct port_map_entry * pme; + + rne = get_reg_name_entry_by_name(dst_name); + if (rne == NULL) { + LOG_DBGF("Destination name %s unknown.", dst_name); + return NULL; + } + + pme = malloc(sizeof(*pme)); + if (pme == NULL) { + LOG_ERR("Failed malloc of port_map_entry."); + return NULL; + } + + pme->port_id = bmp_allocate(instance->port_ids); + pme->n_pid = rne->api->id; + pme->state = FLOW_PENDING; + pme->n_1_pid = pid; + + list_add(&pme->next, &instance->port_map); + + pthread_mutex_lock(&rne->fa_lock); + + rne->req_ap_name = strdup(ap_name); + rne->req_ae_name = strdup(ae_name); + + rne->flow_arrived = true; + + pthread_mutex_unlock(&rne->fa_lock); + + return pme; } static int flow_alloc_reply(uint32_t port_id, - int result) + int response) { - return -1; + struct port_map_entry * e; + + /* FIXME: do this under lock */ + if (!response) { + e = get_port_map_entry(port_id); + if (e == NULL) + return -1; + e->state = FLOW_ALLOCATED; + } + + /* FIXME: does this need to be propagated to the IPCP? */ + + return 0; } static int flow_dealloc_ipcp(uint32_t port_id) { - return -1; + struct port_map_entry * e = get_port_map_entry(port_id); + if (e == NULL) + return 0; + + list_del(&e->next); + free(e); + + return 0; +} + +static void irm_destroy(struct irm * irm) +{ + struct list_head * h; + struct list_head * t; + + if (irm == NULL) + return; + + if (irm->threadpool != NULL) + free(irm->threadpool); + + if (irm->port_ids != NULL) + bmp_destroy(irm->port_ids); + /* clear the lists */ + list_for_each_safe(h, t, &irm->ipcps) { + struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); + destroy_ipcp(e->api); + } + + list_for_each_safe(h, t, &irm->reg_names) { + struct reg_name_entry * e = list_entry(h, + struct reg_name_entry, + next); + char * difs [1] = {ALL_DIFS}; + ap_unreg_id(e->api->id, difs, 1); + } + + list_for_each_safe(h, t, &irm->port_map) { + struct port_map_entry * e = list_entry(h, + struct port_map_entry, + next); + list_del(&e->next); + free(e); + } + + if (irm->dum != NULL) + shm_du_map_destroy(irm->dum); + + close(irm->sockfd); + free(irm); } void irmd_sig_handler(int sig, siginfo_t * info, void * c) { + int i; + switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: - shm_du_map_close(instance->dum); - free(instance); - exit(0); + if (instance->threadpool != NULL) { + for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) + pthread_cancel(instance->threadpool[i]); + } + + case SIGPIPE: + LOG_DBG("Ignoring SIGPIPE."); default: return; } } -int main() +void * mainloop() { - int sockfd; uint8_t buf[IRM_MSG_BUF_SIZE]; - struct sigaction sig_act; - - /* init sig_act */ - memset(&sig_act, 0, sizeof sig_act); - - /* install signal traps */ - sig_act.sa_sigaction = &irmd_sig_handler; - sig_act.sa_flags = SA_SIGINFO; - - sigaction(SIGINT, &sig_act, NULL); - sigaction(SIGTERM, &sig_act, NULL); - sigaction(SIGHUP, &sig_act, NULL); - - if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) - unlink("/dev/shm/" SHM_DU_MAP_FILENAME); - - instance = malloc(sizeof(*instance)); - if (instance == NULL) - return -1; - - if ((instance->dum = shm_du_map_create()) == NULL) { - free(instance); - return -1; - } - - INIT_LIST_HEAD(&instance->ipcps); - INIT_LIST_HEAD(&instance->reg_names); - - sockfd = server_socket_open(IRM_SOCK_PATH); - if (sockfd < 0) { - shm_du_map_close(instance->dum); - free(instance); - return -1; - } - while (true) { int cli_sockfd; irm_msg_t * msg; @@ -692,18 +979,19 @@ int main() instance_name_t api; buffer_t buffer; irm_msg_t ret_msg = IRM_MSG__INIT; + struct port_map_entry * e = NULL; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; - cli_sockfd = accept(sockfd, 0, 0); + cli_sockfd = accept(instance->sockfd, 0, 0); if (cli_sockfd < 0) { - LOG_ERR("Cannot accept new connection"); + LOG_ERR("Cannot accept new connection."); continue; } count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); if (count <= 0) { - LOG_ERR("Failed to read from socket"); + LOG_ERR("Failed to read from socket."); close(cli_sockfd); continue; } @@ -750,11 +1038,11 @@ int main() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_REG: - ret_msg.has_fd = true; - ret_msg.fd = ap_reg(msg->ap_name, - msg->pid, - msg->dif_name, - msg->n_dif_name); + ret_msg.has_result = true; + ret_msg.result = ap_reg(msg->ap_name, + msg->pid, + msg->dif_name, + msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_UNREG: ret_msg.has_result = true; @@ -764,43 +1052,57 @@ int main() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - ret_msg.has_fd = true; - ret_msg.fd = flow_accept(msg->fd, - msg->pid, - ret_msg.ap_name, - ret_msg.ae_name); + e = flow_accept(msg->pid, + &ret_msg.ap_name, + &ret_msg.ae_name); + if (e == NULL) + break; + + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: ret_msg.has_result = true; - ret_msg.result = flow_alloc_resp(msg->fd, - msg->result); + ret_msg.result = flow_alloc_resp(msg->pid, + msg->port_id, + msg->response); break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: - ret_msg.has_fd = true; - ret_msg.fd = flow_alloc(msg->dst_name, - msg->ap_name, - msg->ae_name, - NULL, - msg->oflags); + e = flow_alloc(msg->pid, + msg->dst_name, + msg->ap_name, + msg->ae_name, + NULL); + if (e == NULL) + break; + + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: - ret_msg.has_response = true; - ret_msg.response = flow_alloc_res(msg->fd); - break; - case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->fd); + ret_msg.result = flow_alloc_res(msg->port_id); break; - case IRM_MSG_CODE__IRM_FLOW_CONTROL: + case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_cntl(msg->fd, - msg->oflags); + ret_msg.result = flow_dealloc(msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: + e = flow_req_arr(msg->pid, + msg->dst_name, + msg->ap_name, + msg->ae_name); + if (e == NULL) + break; + ret_msg.has_port_id = true; - ret_msg.port_id = flow_req_arr(msg->dst_name, - msg->ap_name, - msg->ae_name); + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_pid; break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: ret_msg.has_result = true; @@ -812,7 +1114,7 @@ int main() ret_msg.result = flow_dealloc_ipcp(msg->port_id); break; default: - LOG_ERR("Don't know that message code"); + LOG_ERR("Don't know that message code."); break; } @@ -820,7 +1122,7 @@ int main() buffer.size = irm_msg__get_packed_size(&ret_msg); if (buffer.size == 0) { - LOG_ERR("Failed to send reply message"); + LOG_ERR("Failed to send reply message."); close(cli_sockfd); continue; } @@ -842,6 +1144,84 @@ int main() free(buffer.data); close(cli_sockfd); } +} + +static struct irm * irm_create() +{ + struct irm * i = malloc(sizeof(*i)); + if (i == NULL) + return NULL; + + if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) + unlink("/dev/shm/" SHM_DU_MAP_FILENAME); + + i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + if (i->threadpool == NULL) { + irm_destroy(i); + return NULL; + } + + if ((i->dum = shm_du_map_create()) == NULL) { + irm_destroy(i); + return NULL; + } + + INIT_LIST_HEAD(&i->ipcps); + INIT_LIST_HEAD(&i->reg_names); + INIT_LIST_HEAD(&i->port_map); + + i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); + if (i->port_ids == NULL) { + irm_destroy(i); + return NULL; + } + + i->sockfd = server_socket_open(IRM_SOCK_PATH); + if (i->sockfd < 0) { + irm_destroy(instance); + return NULL; + } + + pthread_mutex_init(&i->lock, NULL); + + return i; +} + +int main() +{ + struct sigaction sig_act; + + int t = 0; + + /* init sig_act */ + memset(&sig_act, 0, sizeof sig_act); + + /* install signal traps */ + sig_act.sa_sigaction = &irmd_sig_handler; + sig_act.sa_flags = SA_SIGINFO; + + sigaction(SIGINT, &sig_act, NULL); + sigaction(SIGTERM, &sig_act, NULL); + sigaction(SIGHUP, &sig_act, NULL); + sigaction(SIGPIPE, &sig_act, NULL); + + instance = irm_create(); + if (instance == NULL) + return 1; + + /* + * FIXME: we need a main loop that delegates messages to subthreads in a + * way that avoids all possible deadlocks for local apps + */ + + for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) + pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + + /* wait for (all of them) to return */ + for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) + pthread_join(instance->threadpool[t], NULL); + + irm_destroy(instance); return 0; } diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4922e07c..53a7b354 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES ipcp.c irm.c list.c + shm_ap_rbuff.c shm_du_map.c sockets.c utils.c diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 8aabb4f4..0e3c968f 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset) return NULL; tmp = malloc(sizeof(*tmp)); - if (!tmp) + if (tmp == NULL) return NULL; - tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap))); - if (!tmp->bitmap) + tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long)); + if (!tmp->bitmap) { + free(tmp); return NULL; + } tmp->size = bits; tmp->offset = offset; @@ -140,8 +142,6 @@ int bmp_destroy(struct bmp * b) static ssize_t bad_id(struct bmp * b) { - assert(b); - return b->offset - 1; } @@ -149,8 +149,8 @@ ssize_t bmp_allocate(struct bmp * b) { ssize_t id; - if (!b) - return bad_id(b); + if (b == NULL) + return -1; id = (ssize_t) find_next_zero_bit(b->bitmap, b->size); diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..40bf2dc3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@ #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/sockets.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/instance_name.h> +#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/utils.h> #include <stdlib.h> +#include <string.h> -int ap_reg(char * ap_name, - char ** difs, - size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { + struct shm_ap_rbuff * rb; + uint32_t port_id; + uint32_t oflags; + + /* don't think this needs locking */ +}; + +struct ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name) { - irm_msg_t msg = IRM_MSG__INIT; + _ap_instance = malloc(sizeof(struct ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} +#endif + +int ap_reg(char ** difs, + size_t len) +{ + irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = bmp_allocate(_ap_instance->fds); - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } + if (_ap_instance == NULL) { + LOG_DBG("ap_init was not called"); + return -1; + } + msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + if (recv_msg->result < 0) + fd = -1; + irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int ap_unreg(char * ap_name, - char ** difs, - size_t difs_size) +int ap_unreg(char ** difs, + size_t len) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; int ret = -1; - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -102,38 +219,48 @@ int ap_unreg(char * ap_name, return ret; } -int flow_accept(int fd, - char * ap_name, - char * ae_name) +int flow_accept(int fd, + char ** ap_name, + char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int cli_fd = 0; - - if (ap_name == NULL) { - return -EINVAL; - } + int cfd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - cli_fd = recv_msg->fd; - ap_name = recv_msg->ap_name; - ae_name = recv_msg->ae_name; + + cfd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + _ap_instance->flows[cfd].port_id = recv_msg->port_id; + _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; + + *ap_name = strdup(recv_msg->ap_name); + if (ae_name != NULL) + *ae_name = strdup(recv_msg->ae_name); irm_msg__free_unpacked(recv_msg, NULL); - return cli_fd; + + bmp_release(_ap_instance->fds, fd); + + return cfd; } int flow_alloc_resp(int fd, @@ -145,9 +272,9 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; msg.has_response = true; msg.response = response; @@ -155,7 +282,7 @@ int flow_alloc_resp(int fd, if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -167,41 +294,49 @@ int flow_alloc_resp(int fd, } int flow_alloc(char * dst_name, - char * src_ap_name, char * src_ae_name, - struct qos_spec * qos, - int oflags) + struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = -1; - if (dst_name == NULL || - src_ap_name == NULL) { + if (dst_name == NULL) return -EINVAL; - } if (src_ae_name == NULL) src_ae_name = UNKNOWN_AE; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = src_ap_name; + msg.ap_name = _ap_instance->api->name; + msg.has_pid = true; + msg.pid = _ap_instance->api->id; msg.ae_name = src_ae_name; - msg.has_oflags = true; - msg.oflags = oflags; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + fd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[fd].rb == NULL) { + bmp_release(_ap_instance->fds, fd); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + _ap_instance->flows[fd].port_id = recv_msg->port_id; + _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + irm_msg__free_unpacked(recv_msg, NULL); + return fd; } @@ -211,17 +346,15 @@ int flow_alloc_res(int fd) irm_msg_t * recv_msg = NULL; int result = 0; - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -238,17 +371,15 @@ int flow_dealloc(int fd) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -259,47 +390,50 @@ int flow_dealloc(int fd) return ret; } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; - msg.oflags = oflags; + return -1; +} - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + if (index == -1) return -1; - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; } - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; + return 0; } -ssize_t flow_write(int fd, - void * buf, - size_t count) +ssize_t flow_read(int fd, void * buf, size_t count) { - LOG_MISSING; + struct rb_entry * e = NULL; + int n; + uint8_t * sdu; + /* FIXME: move this to a thread */ + while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) + e = shm_ap_rbuff_read(_ap_instance->rb); + + n = shm_du_map_read_sdu(&sdu, + _ap_instance->dum, + e->index); + if (n < 0) + return -1; - return -1; -} + memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, - void * buf, - size_t count) -{ - LOG_MISSING; + shm_release_du_buff(_ap_instance->dum, e->index); - return -1; + return n; } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 387572b3..75676915 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -121,6 +121,8 @@ pid_t ipcp_create(char * ipcp_name, return pid; } + /* clear fd table */ + if (ipcp_type == IPCP_NORMAL) exec_name = IPCP_NORMAL_EXEC; else if (ipcp_type == IPCP_SHIM_UDP) @@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_ENROLL; - msg.member_name = malloc(sizeof(*(msg.member_name))); - if (msg.member_name == NULL) { - LOG_ERR("Failed to malloc."); - return -1; - } - msg.n_1_dif = n_1_dif; msg.member_name = member_name; + msg.n_1_dif = n_1_dif; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) { @@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t pid, if (name == NULL) return -1; - msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; - msg.name = name; + msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; + msg.name = name; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t pid, int ipcp_flow_alloc(pid_t pid, uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, @@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; msg.src_ap_name = src_ap_name; msg.src_ae_name = src_ae_name; msg.dst_name = dst_name; - msg.port_id = port_id; - msg.has_port_id = true; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { ipcp_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t pid, int ipcp_flow_alloc_resp(pid_t pid, uint32_t port_id, - int result) + pid_t n_pid, + int response) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; - msg.has_result = true; - msg.result = result; + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; + msg.has_response = true; + msg.response = response; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t pid, return ret; } -int ipcp_flow_req_arr(pid_t pid, - char * dst_name, - char * src_ap_name, - char * src_ae_name) +int ipcp_flow_req_arr(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = -1; + int port_id = -1; if (src_ap_name == NULL || src_ae_name == NULL) return -EINVAL; msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; msg.dst_name = dst_name; msg.ap_name = src_ap_name; msg.ae_name = src_ae_name; - msg.pid = pid; - msg.has_pid = true; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + port_id = recv_msg->port_id; irm_msg__free_unpacked(recv_msg, NULL); - return fd; + return port_id; } int ipcp_flow_alloc_reply(pid_t pid, @@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; @@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { irm_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index da4bb469..daca011d 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -25,6 +25,8 @@ message ipcp_msg { optional string src_ap_name = 9; optional string src_ae_name = 10; optional dif_config_msg conf = 11; - optional int32 result = 12; - optional int32 fd = 13; + optional int32 fd = 12; + optional int32 pid = 13; + optional int32 response = 14; + optional int32 result = 15; }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 89e2c882..c336614e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -36,13 +36,10 @@ enum irm_msg_code { IRM_FLOW_ALLOC = 11; IRM_FLOW_ALLOC_RES = 12; IRM_FLOW_DEALLOC = 13; - IRM_FLOW_CONTROL = 14; - IRM_FLOW_WRITE = 15; - IRM_FLOW_READ = 16; - IPCP_FLOW_REQ_ARR = 17; - IPCP_FLOW_ALLOC_REPLY = 18; - IPCP_FLOW_DEALLOC = 19; - IRM_REPLY = 20; + IPCP_FLOW_REQ_ARR = 14; + IPCP_FLOW_ALLOC_REPLY = 15; + IPCP_FLOW_DEALLOC = 16; + IRM_REPLY = 17; }; message irm_msg { @@ -52,12 +49,11 @@ message irm_msg { optional uint32 api_id = 3; optional uint32 ipcp_type = 5; repeated string dif_name = 6; - optional int32 fd = 7; - optional int32 response = 8; - optional int32 oflags = 9; - optional string dst_name = 10; - optional uint32 port_id = 11; - optional int32 pid = 12; - optional dif_config_msg conf = 13; - optional int32 result = 14; + optional int32 response = 7; + optional string dst_name = 8; + optional uint32 port_id = 9; + optional int32 pid = 10; + optional dif_config_msg conf = 11; + optional int32 cfd = 12; + optional int32 result = 13; }; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c new file mode 100644 index 00000000..4bd64775 --- /dev/null +++ b/src/lib/shm_ap_rbuff.c @@ -0,0 +1,268 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for application processes + * + * Dimitri Staessens <dimitri.staessens@intec.ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/shm_ap_rbuff.h> +#define OUROBOROS_PREFIX "shm_ap_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <stdbool.h> +#include <errno.h> + +#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ + + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\ + & (SHM_RBUFF_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) + +struct shm_ap_rbuff { + struct rb_entry * shm_base; /* start of entry */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pid_t pid; /* pid to which this rb belongs */ + int fd; +}; + +struct shm_ap_rbuff * shm_ap_rbuff_create() +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + pthread_mutexattr_t attr; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF "%d", getpid()); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed creating ring buffer."); + free(rb); + return NULL; + } + + if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) { + LOG_DBGF("Failed to extend ringbuffer."); + free(rb); + return NULL; + } + + if (write(shm_fd, "", 1) != 1) { + LOG_DBGF("Failed to finalise extension of ringbuffer."); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->shm_mutex, &attr); + + *rb->ptr_head = 0; + *rb->ptr_tail = 0; + + rb->fd = shm_fd; + rb->pid = getpid(); + + return rb; +} + +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF "%d", pid); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed opening shared memory %s.", fn); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + rb->fd = shm_fd; + rb->pid = pid; + + return rb; +} +void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + sprintf(fn, SHM_AP_RBUFF "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (rb->pid != getpid()) { + LOG_ERR("Tried to destroy other AP's rbuff."); + return; + } + + sprintf(fn, SHM_AP_RBUFF "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to unlink shm."); + + free(rb); +} + +int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) +{ + struct rb_entry * pos; + + if (rb == NULL || e == NULL) + return -1; + + pthread_mutex_lock(rb->shm_mutex); + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->shm_mutex); + return -1; + } + + pos = rb->shm_base + *rb->ptr_head; + *pos = *e; + *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return 0; +} +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +{ + struct rb_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + if (rb == NULL) + return NULL; + + pthread_mutex_lock(rb->shm_mutex); + + if (shm_rbuff_used(rb) == 0) { + pthread_mutex_unlock(rb->shm_mutex); + return NULL; + } + + *e = *(rb->shm_base + *rb->ptr_tail); + + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return e; +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index dfccca6a..56062c9d 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -45,6 +45,9 @@ ((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \ SHM_DU_BUFF_BLOCK_SIZE))) +#define idx_to_du_buff_ptr(dum, idx) \ + ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) + #define block_ptr_to_idx(dum, sdb) \ (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) @@ -52,27 +55,31 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ + idx_to_du_buff_ptr(dum, idx)->du_head) + #define MIN(a,b)(a < b ? a : b) struct shm_du_buff { - size_t size; - size_t du_head; - size_t du_tail; + size_t size; + size_t du_head; + size_t du_tail; + size_t garbage; }; struct shm_du_map { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - int fd; + uint8_t * shm_base; /* start of blocks */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + int fd; }; struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; pthread_mutexattr_t attr; dum = malloc(sizeof *dum); @@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; + + dum = malloc(sizeof *dum); + if (dum == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); if (shm_fd == -1) { @@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open() return NULL; } - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - dum->shm_base = shm_base; dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); @@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum) if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); + free(dum); +} + +void shm_du_map_destroy(struct shm_du_map * dum) +{ + if (dum == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) LOG_DBGF("Failed to unlink shm."); free(dum); } -struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +int shm_create_du_buff(struct shm_du_map * dum, + size_t size, + size_t headspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; long blocks = 0; int sz = size + sizeof *sdb; int sz2 = headspace + len + sizeof *sdb; - uint8_t * write_pos; + uint8_t * write_pos; size_t copy_len; + size_t index; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); - return NULL; + return -1; } if (headspace >= size) { LOG_DBGF("Index out of bounds."); - return NULL; + return -1; } if (headspace + len > size) { LOG_DBGF("Buffer too small for data."); - return NULL; + return -1; } pthread_mutex_lock(dum->shm_mutex); @@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, if (sz2 < 0 && sz > 0) { pthread_mutex_unlock(dum->shm_mutex); LOG_DBG("Can't handle this packet now"); - return NULL; + return -1; } ++blocks; } if (!shm_map_free(dum, blocks)) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBGF("Allocation failed, Out of Memory."); - return NULL; + return -1; } sdb = get_head_ptr(dum); sdb->size = size; + sdb->garbage = 0; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, --blocks; } + index = *dum->ptr_head - 1; + pthread_mutex_unlock(dum->shm_mutex); - return sdb; + return index; } -int shm_release_du_buff(struct shm_du_map * dum) +/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ +int shm_du_map_read_sdu(uint8_t ** dst, + struct shm_du_map * dum, + size_t idx) +{ + size_t len = 0; + + if (idx > SHM_BLOCKS_IN_MAP) + return -1; + + pthread_mutex_lock(dum->shm_mutex); + + if (*dum->ptr_head == *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return -1; + } + + *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + + sizeof(struct shm_du_buff) + + idx_to_du_buff_ptr(dum, idx)->du_head; + len = sdu_size(dum, idx); + + pthread_mutex_unlock(dum->shm_mutex); + + return len; +} + +int shm_release_du_buff(struct shm_du_map * dum, size_t idx) { long sz; long blocks = 0; + + /* FIXME: this is crap for the test */ + if (idx > SHM_BLOCKS_IN_MAP) + idx = *dum->ptr_tail; + pthread_mutex_lock(dum->shm_mutex); if (*dum->ptr_head == *dum->ptr_tail) { - LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do."); pthread_mutex_unlock(dum->shm_mutex); return -1; } - sz = get_tail_ptr(dum)->size; + idx_to_du_buff_ptr(dum, idx)->garbage = 1; - while (sz + (long) sizeof (struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; + if (idx != *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return 0; + } + + while (get_tail_ptr(dum)->garbage == 1) { + sz = get_tail_ptr(dum)->size; + + while (sz + (long) sizeof (struct shm_du_buff) > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *(dum->ptr_tail) = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); } - *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); pthread_mutex_unlock(dum->shm_mutex); return 0; @@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, } uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, } int shm_du_buff_head_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb, } int shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c index 85a82e4d..55938a62 100644 --- a/src/lib/tests/shm_du_map_test.c +++ b/src/lib/tests/shm_du_map_test.c @@ -32,7 +32,7 @@ #include <ouroboros/logs.h> -#define SIZE_OF_DU_BUFF 24 +#define SIZE_OF_DU_BUFF 32 #define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF) #define MAX(a,b) (a > b ? a : b) @@ -44,7 +44,7 @@ void * produce() { struct shm_du_map * dum; long test_buf_size = 0; - uint8_t * test_values; + uint8_t * test_values; int headspace; int tailspace; long i; @@ -66,9 +66,8 @@ void * produce() test_values[i] = 170; clock_gettime(CLOCK_MONOTONIC, &starttime); - for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) { - struct shm_du_buff * sdb; - size_t len; + for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) { + size_t len; test_buf_size = TEST_BUFF_SIZE; @@ -77,21 +76,19 @@ void * produce() len = test_buf_size - (headspace + tailspace); - sdb = shm_create_du_buff(dum, - test_buf_size, - headspace, - test_values, - len); - - if (sdb != NULL) { - bytes_written += len; - } - else { - sync = -2; - break; + if (shm_create_du_buff(dum, + test_buf_size, + headspace, + test_values, + len) < 0) { + continue; } + + bytes_written += len; } + sync = -2; + clock_gettime(CLOCK_MONOTONIC, &stoptime); elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) - (starttime.tv_sec + starttime.tv_nsec / 1000000000.0); @@ -104,13 +101,14 @@ void * produce() sync = -1; + shm_du_map_close(dum); + return 0; } void * consume() { struct shm_du_map * dum; - struct timespec ts; ts.tv_sec = 0; @@ -123,10 +121,15 @@ void * consume() return (void *)-1; } - while (!sync) { - while (!shm_release_du_buff(dum)); - nanosleep(&ts, NULL); + while (true) { + shm_release_du_buff(dum, 1823429173941); + if (sync) + break; } + nanosleep(&ts, NULL); + + + shm_du_map_close(dum); return 0; } @@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv) return -1; } - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_create(&consumer, NULL, consume, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv) LOG_INFO("starting concurrency test."); + sync = 0; + dum = shm_du_map_create(); res1 = (int) pthread_create(&producer, NULL, produce, NULL); @@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_join(producer, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); diff --git a/src/tools/echo/echo_client.c b/src/tools/echo/echo_client.c index 8d3fc322..36942028 100644 --- a/src/tools/echo/echo_client.c +++ b/src/tools/echo/echo_client.c @@ -23,19 +23,23 @@ #define CLIENT_AP_NAME "echo-client" #include <ouroboros/dev.h> +#include <stdlib.h> int client_main() { int fd = 0; int result = 0; - uint8_t buf[BUF_SIZE]; + char buf[BUF_SIZE]; char * message = "Client says hi!"; ssize_t count = 0; - fd = flow_alloc(SERVER_AP_NAME, CLIENT_AP_NAME, - NULL, NULL, 0); + if(ap_init(CLIENT_AP_NAME)) + return -1; + + fd = flow_alloc(SERVER_AP_NAME, NULL, NULL); if (fd < 0) { printf("Failed to allocate flow\n"); + ap_fini(); return -1; } @@ -43,12 +47,14 @@ int client_main() if (result < 0) { printf("Flow allocation refused\n"); flow_dealloc(fd); + ap_fini(); return -1; } if (flow_write(fd, message, strlen(message) + 1) == -1) { printf("Failed to write SDU\n"); flow_dealloc(fd); + ap_fini(); return -1; } @@ -56,6 +62,7 @@ int client_main() if (count < 0) { printf("Failed to read SDU\n"); flow_dealloc(fd); + ap_fini(); return -1; } @@ -63,5 +70,7 @@ int client_main() flow_dealloc(fd); + ap_fini(); + return 0; } diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c index e457e22b..4b1a17b1 100644 --- a/src/tools/echo/echo_server.c +++ b/src/tools/echo/echo_server.c @@ -20,6 +20,8 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#include <ouroboros/config.h> + #include <stdbool.h> #include <signal.h> #include <unistd.h> @@ -27,71 +29,82 @@ #include <ouroboros/dev.h> +#ifdef OUROBOROS_CONFIG_DEBUG + #define OUROBOROS_PREFIX "echo-server" + #include <ouroboros/logs.h> +#endif + #define DIF_NAME "*" void shutdown_server(int signo) { char * dif = DIF_NAME; - if (ap_unreg(SERVER_AP_NAME, &dif, 1)) { - printf("Failed to unregister application\n"); + if (ap_unreg(&dif, 1)) { + printf("Failed to unregister application.\n"); + ap_fini(); exit(EXIT_FAILURE); } + ap_fini(); exit(EXIT_SUCCESS); } int server_main() { - int server_fd = 0; - int client_fd = 0; + int server_fd = 0; + int client_fd = 0; char * dif = DIF_NAME; char * client_name = NULL; - uint8_t buf[BUF_SIZE]; + char buf[BUF_SIZE]; ssize_t count = 0; - printf("Starting the server\n"); + printf("Starting the server.\n"); /* Manual cleanup is required for now */ if (signal(SIGINT, shutdown_server) == SIG_ERR) { - printf("Can't install signal handler\n"); + printf("Can't install signal handler.\n"); return -1; } - server_fd = ap_reg(SERVER_AP_NAME, &dif, 1); - if (server_fd < 0) { - printf("Failed to register application\n"); + if(ap_init(SERVER_AP_NAME)) { return -1; } - printf("Echo server started...\n"); + server_fd = ap_reg(&dif, 1); + if (server_fd < 0) { + printf("Failed to register application.\n"); + ap_fini(); + return -1; + } while (true) { client_fd = flow_accept(server_fd, - client_name, NULL); + &client_name, NULL); if (client_fd < 0) { - continue; + printf("Failed to accept flow.\n"); + break; } - printf("New flow from %s\n", client_name); + printf("New flow from %s.\n", client_name); if (flow_alloc_resp(client_fd, 0)) { - printf("Failed to give an allocate response\n"); + printf("Failed to give an allocate response.\n"); flow_dealloc(client_fd); continue; } - count = flow_read(client_fd, buf, BUF_SIZE); + count = flow_read(client_fd, (void **) &buf, BUF_SIZE); if (count < 0) { - printf("Failed to read SDU\n"); + printf("Failed to read SDU.\n"); flow_dealloc(client_fd); continue; } - printf("Message from client is %.*s\n", (int) count, buf); + printf("Message from client is %.*s.\n", (int) count, buf); if (flow_write(client_fd, buf, count) == -1) { - printf("Failed to write SDU\n"); + printf("Failed to write SDU.\n"); flow_dealloc(client_fd); continue; } @@ -99,5 +112,7 @@ int server_main() flow_dealloc(client_fd); } + ap_fini(); + return 0; } |