From c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 18 Sep 2016 06:27:43 +0200 Subject: lib, ipcp: Revise fast path and flow interfaces IPCPs can now use ap_init() to initialize the memory. All flows are accessed using flow descriptors, this greatly simplifies IPCP development. Reverts the fast path to a single ap_rbuff per process. Splits lib/ipcp into irmd/ipcp and lib/ipcp-dev. Adds a lib/shim-dev holding tailored functions for shims. Moves the buffer_t to utils.h. Fixes the shim-eth-llc length field. Removes the flow from shared.h. Fixes #4 Fixes #5 --- src/ipcpd/local/main.c | 494 ++++++++++--------------------------------------- 1 file changed, 99 insertions(+), 395 deletions(-) (limited to 'src/ipcpd/local') diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index c0809429..1ccec0c0 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -22,17 +22,10 @@ #include #include "ipcp.h" -#include "flow.h" #include -#include -#include -#include -#include -#include -#include -#include -#include #include +#include +#include #define OUROBOROS_PREFIX "ipcpd/local" #include @@ -46,176 +39,51 @@ #define THIS_TYPE IPCP_LOCAL -#define shim_data(type) ((struct ipcp_local_data *) type->data) - /* global for trapping signal */ int irmd_api; -/* this IPCP's data */ -#ifdef MAKE_CHECK -extern struct ipcp * _ipcp; /* defined in test */ -#else -struct ipcp * _ipcp; -#endif - -/* - * copied from ouroboros/dev. The shim needs access to the internals - * because it doesn't follow all steps necessary steps to get - * the info - */ - -/* the shim needs access to these internals */ -struct shim_ap_data { - pid_t api; - struct shm_rdrbuff * rdrb; - struct bmp * fds; - struct shm_ap_rbuff * rb; - - int in_out[AP_MAX_FLOWS]; +struct { + int in_out[IRMD_MAX_FLOWS]; - struct flow flows[AP_MAX_FLOWS]; - pthread_rwlock_t flows_lock; - - pthread_t mainloop; + pthread_rwlock_t lock; pthread_t sduloop; +} local_data; -} * _ap_instance; - -static int shim_ap_init() +void local_data_init() { int i; + for (i = 0; i < IRMD_MAX_FLOWS; ++i) + local_data.in_out[i] = -1; - _ap_instance = malloc(sizeof(struct shim_ap_data)); - if (_ap_instance == NULL) { - return -1; - } - - _ap_instance->api = getpid(); - - _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); - if (_ap_instance->fds == NULL) { - free(_ap_instance); - return -1; - } - - _ap_instance->rdrb = shm_rdrbuff_open(); - if (_ap_instance->rdrb == NULL) { - bmp_destroy(_ap_instance->fds); - free(_ap_instance); - return -1; - } - - _ap_instance->rb = shm_ap_rbuff_create_n(); - if (_ap_instance->rb == NULL) { - shm_rdrbuff_close(_ap_instance->rdrb); - bmp_destroy(_ap_instance->fds); - free(_ap_instance); - return -1; - } - - for (i = 0; i < AP_MAX_FLOWS; i ++) { - _ap_instance->flows[i].rb = NULL; - _ap_instance->flows[i].port_id = -1; - _ap_instance->flows[i].state = FLOW_NULL; - _ap_instance->in_out[i] = -1; - } - - pthread_rwlock_init(&_ap_instance->flows_lock, NULL); - - return 0; -} - -void shim_ap_fini() -{ - int i = 0; - - if (_ap_instance == NULL) - return; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_SHUTDOWN) - LOG_WARN("Cleaning up AP while not in shutdown."); - - if (_ap_instance->fds != NULL) - bmp_destroy(_ap_instance->fds); - - /* remove all remaining sdus */ - while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) - shm_rdrbuff_remove(_ap_instance->rdrb, i); - - if (_ap_instance->rdrb != NULL) - shm_rdrbuff_close(_ap_instance->rdrb); - if (_ap_instance->rb != NULL) - shm_ap_rbuff_destroy(_ap_instance->rb); - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - for (i = 0; i < AP_MAX_FLOWS; i ++) - if (_ap_instance->flows[i].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[i].rb); - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - free(_ap_instance); + pthread_rwlock_init(&local_data.lock, NULL); } -/* only call this under flows_lock */ -static int port_id_to_fd(int port_id) +void local_data_fini() { - int i; - - for (i = 0; i < AP_MAX_FLOWS; ++i) { - if (_ap_instance->flows[i].port_id == port_id - && _ap_instance->flows[i].state != FLOW_NULL) - return i; - } - - return -1; + pthread_rwlock_destroy(&local_data.lock); } -/* - * end copy from dev.c - */ - -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_local_sdu_loop(void * o) { while (true) { - struct rb_entry * e; - int fd; - - e = shm_ap_rbuff_read(_ap_instance->rb); - if (e == NULL) { - continue; - } + struct rb_entry e; + int fd = local_flow_read(&e); - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (_ipcp->state != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); return (void *) 1; /* -ENOTENROLLED */ } - pthread_rwlock_rdlock(&_ap_instance->flows_lock); - fd = _ap_instance->in_out[port_id_to_fd(e->port_id)]; - if (fd == -1) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - free(e); - continue; - } - - e->port_id = _ap_instance->flows[fd].port_id; - - while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0) - ; + pthread_rwlock_rdlock(&local_data.lock); + fd = local_data.in_out[fd]; + pthread_rwlock_unlock(&local_data.lock); - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + if (fd != -1) + local_flow_write(fd, &e); - free(e); + pthread_rwlock_unlock(&ipcpi.state_lock); } return (void *) 1; @@ -223,10 +91,6 @@ static void * ipcp_local_sdu_loop(void * o) void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - switch(sig) { case SIGINT: case SIGTERM: @@ -236,11 +100,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) LOG_DBG("IPCP %d terminating by order of %d. Bye.", getpid(), info->si_pid); - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(_ipcp, IPCP_SHUTDOWN); + ipcp_set_state(IPCP_SHUTDOWN); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); } default: return; @@ -254,307 +118,154 @@ static int ipcp_local_bootstrap(struct dif_config * conf) return -1; } - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("IPCP in wrong state."); return -1; } - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); - pthread_create(&_ap_instance->sduloop, - NULL, - ipcp_local_sdu_loop, - NULL); + pthread_create(&local_data.sduloop, NULL, ipcp_local_sdu_loop, NULL); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Bootstrapped local IPCP with api %d.", - getpid()); + LOG_INFO("Bootstrapped local IPCP with api %d.", getpid()); return 0; } static int ipcp_local_name_reg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(ipcpi.data, name)) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBGF("Failed to add %s to local registry.", name); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBG("Registered %s.", name); + LOG_INFO("Registered %s.", name); return 0; } static int ipcp_local_name_unreg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); + ipcp_data_del_reg_entry(ipcpi.data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + + LOG_INFO("Unregistered %s.", name); return 0; } -static int ipcp_local_flow_alloc(pid_t n_api, - int port_id, +static int ipcp_local_flow_alloc(int fd, char * dst_name, char * src_ae_name, enum qos_cube qos) { - int in_fd = -1; int out_fd = -1; - struct shm_ap_rbuff * rb; - - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s on fd %d.", dst_name, fd); if (dst_name == NULL || src_ae_name == NULL) return -1; /* This ipcpd has all QoS */ - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBGF("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) { - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; /* -ENORBUFF */ - } - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); + pthread_rwlock_wrlock(&local_data.lock); - in_fd = bmp_allocate(_ap_instance->fds); - if (!bmp_is_id_valid(_ap_instance->fds, in_fd)) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -EMFILE; - } + out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); - _ap_instance->flows[in_fd].port_id = port_id; - _ap_instance->flows[in_fd].state = FLOW_PENDING; - _ap_instance->flows[in_fd].rb = rb; + local_data.in_out[fd] = out_fd; + local_data.in_out[out_fd] = fd; - LOG_DBGF("Pending local flow with port_id %d.", port_id); + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - /* reply to IRM */ - port_id = ipcp_flow_req_arr(getpid(), - dst_name, - src_ae_name); - - if (port_id < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not get port id from IRMd"); - /* shm_ap_rbuff_close(n_api); */ - return -1; - } - - out_fd = bmp_allocate(_ap_instance->fds); - if (!bmp_is_id_valid(_ap_instance->fds, out_fd)) { - /* shm_ap_rbuff_close(n_api); */ - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; /* -ENOMOREFDS */ - } - - _ap_instance->flows[out_fd].port_id = port_id; - _ap_instance->flows[out_fd].rb = NULL; - _ap_instance->flows[out_fd].state = FLOW_PENDING; - - _ap_instance->in_out[in_fd] = out_fd; - _ap_instance->in_out[out_fd] = in_fd; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - - LOG_DBGF("Pending local allocation request, port_id %d.", port_id); + LOG_INFO("Pending local allocation request on fd %d.", fd); return 0; } -static int ipcp_local_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +static int ipcp_local_flow_alloc_resp(int fd, int response) { - struct shm_ap_rbuff * rb; - int in_fd = -1; int out_fd = -1; int ret = -1; + LOG_DBG("Received response for fd %d: %d.", fd, response); + if (response) return 0; - pthread_rwlock_rdlock(&_ipcp->state_lock); - - /* awaken pending flow */ - - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - in_fd = port_id_to_fd(port_id); - if (in_fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Could not find flow with port_id %d.", port_id); - return -1; - } - - if (_ap_instance->flows[in_fd].state != FLOW_PENDING) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow was not pending."); - return -1; - } - - rb = shm_ap_rbuff_open_s(n_api); - if (rb == NULL) { - LOG_ERR("Could not open N + 1 ringbuffer."); - _ap_instance->flows[in_fd].state = FLOW_NULL; - _ap_instance->flows[in_fd].port_id = -1; - _ap_instance->in_out[in_fd] = -1; - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - return -1; - } - - _ap_instance->flows[in_fd].state = FLOW_ALLOCATED; - _ap_instance->flows[in_fd].rb = rb; - - LOG_DBGF("Accepted flow, port_id %d on fd %d.", port_id, in_fd); + pthread_rwlock_rdlock(&ipcpi.state_lock); - out_fd = _ap_instance->in_out[in_fd]; + out_fd = local_data.in_out[fd]; if (out_fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("No pending local flow with port_id %d.", port_id); + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - if (_ap_instance->flows[out_fd].state != FLOW_PENDING) { - /* FIXME: clean up other end */ - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Flow was not pending."); - return -1; - } - - _ap_instance->flows[out_fd].state = FLOW_ALLOCATED; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if ((ret = ipcp_flow_alloc_reply(getpid(), - _ap_instance->flows[out_fd].port_id, - response)) < 0) { - return -1; /* -EPIPE */ - } + if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) + return -1; - LOG_INFO("Flow allocation completed, port_ids (%d, %d).", - _ap_instance->flows[out_fd].port_id, - _ap_instance->flows[in_fd].port_id); + LOG_INFO("Flow allocation completed, fds (%d, %d).", out_fd, fd); return ret; } -static int ipcp_local_flow_dealloc(int port_id) +static int ipcp_local_flow_dealloc(int fd) { - int fd = -1; - struct shm_ap_rbuff * rb; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&_ap_instance->flows_lock); - - fd = port_id_to_fd(port_id); - if (fd < 0) { - pthread_rwlock_unlock(&_ap_instance->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); - LOG_DBGF("Could not find flow with port_id %d.", port_id); - return 0; - } - - bmp_release(_ap_instance->fds, fd); - - if (_ap_instance->in_out[fd] != -1) - _ap_instance->in_out[_ap_instance->in_out[fd]] = -1; - - _ap_instance->in_out[fd] = -1; - - _ap_instance->flows[fd].state = FLOW_NULL; - _ap_instance->flows[fd].port_id = -1; - rb = _ap_instance->flows[fd].rb; - _ap_instance->flows[fd].rb = NULL; - - pthread_rwlock_unlock(&_ap_instance->flows_lock); - - if (rb != NULL) - shm_ap_rbuff_close(rb); - - pthread_rwlock_unlock(&_ipcp->state_lock); - - LOG_DBGF("Flow with port_id %d deallocated.", port_id); - - return 0; -} - -static struct ipcp * ipcp_local_create() -{ - struct ipcp * i; - struct ipcp_ops * ops; + int out_fd = -1; - i = ipcp_instance_create(); - if (i == NULL) - return NULL; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&local_data.lock); - i->data = ipcp_data_create(); - if (i->data == NULL) { - free(i); - return NULL; - } + out_fd = local_data.in_out[fd]; - if (ipcp_data_init(i->data, THIS_TYPE) == NULL) { - free(i->data); - free(i); - return NULL; + if (out_fd != -1) { + local_data.in_out[out_fd] = -1; + flow_dealloc(out_fd); } - ops = malloc(sizeof(*ops)); - if (ops == NULL) { - free(i->data); - free(i); - return NULL; - } + local_data.in_out[fd] = -1; - ops->ipcp_bootstrap = ipcp_local_bootstrap; - ops->ipcp_enroll = NULL; /* shim */ - ops->ipcp_name_reg = ipcp_local_name_reg; - ops->ipcp_name_unreg = ipcp_local_name_unreg; - ops->ipcp_flow_alloc = ipcp_local_flow_alloc; - ops->ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp; - ops->ipcp_flow_dealloc = ipcp_local_flow_dealloc; + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - i->ops = ops; + LOG_INFO("Flow with fd %d deallocated.", fd); - i->state = IPCP_INIT; - - return i; + return 0; } -#ifndef MAKE_CHECK +static struct ipcp_ops local_ops = { + .ipcp_bootstrap = ipcp_local_bootstrap, + .ipcp_enroll = NULL, /* shim */ + .ipcp_name_reg = ipcp_local_name_reg, + .ipcp_name_unreg = ipcp_local_name_unreg, + .ipcp_flow_alloc = ipcp_local_flow_alloc, + .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp, + .ipcp_flow_dealloc = ipcp_local_flow_dealloc +}; int main(int argc, char * argv[]) { @@ -571,7 +282,9 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (shim_ap_init() < 0) { + local_data_init(); + + if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -591,17 +304,13 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_local_create(); - if (_ipcp == NULL) { - LOG_ERR("Failed to create IPCP."); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (ipcp_init(THIS_TYPE, &local_ops) < 0) { close_logfile(); exit(EXIT_FAILURE); } - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - - pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (ipcp_create_r(getpid())) { @@ -610,21 +319,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - pthread_join(_ap_instance->mainloop, NULL); - - pthread_cancel(_ap_instance->sduloop); - pthread_join(_ap_instance->sduloop, NULL); + ipcp_fini(); - shim_ap_fini(); + pthread_cancel(local_data.sduloop); + pthread_join(local_data.sduloop, NULL); - ipcp_data_destroy(_ipcp->data); + ap_fini(); - free(_ipcp->ops); - free(_ipcp); + local_data_fini(); close_logfile(); exit(EXIT_SUCCESS); } - -#endif /* MAKE_CHECK */ -- cgit v1.2.3