summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-14 16:39:27 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-14 21:34:04 +0200
commit037fec33cda726d0078e23798f462ad273153dd5 (patch)
tree25c9ef679a0aaa93e5f01f2a68512d8eaf76f3e7
parentc56a4ed3b865b4b240c6f01809c935b7b86d160b (diff)
downloadouroboros-037fec33cda726d0078e23798f462ad273153dd5.tar.gz
ouroboros-037fec33cda726d0078e23798f462ad273153dd5.zip
ipcpd: shim-udp: complete locking
Added necessary locks for the shim-udp. This PR also improves thread management, the main thread now starts a mainloop thread, which spawns sdu handler threads when it the IPCP is enrolled. If the IPCP exits the enrolled state, the sdu loop is cancelled.
-rw-r--r--src/ipcpd/ipcp.c34
-rw-r--r--src/ipcpd/ipcp.h6
-rw-r--r--src/ipcpd/shim-udp/main.c455
-rw-r--r--src/lib/shm_ap_rbuff.c9
4 files changed, 390 insertions, 114 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 13632a80..060178bf 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -29,6 +29,22 @@
#define OUROBOROS_PREFIX "ipcpd/ipcp"
#include <ouroboros/logs.h>
+struct ipcp * ipcp_instance_create()
+{
+ struct ipcp * i = malloc(sizeof *i);
+ if (i == NULL)
+ return NULL;
+
+ i->data = NULL;
+ i->ops = NULL;
+ i->irmd_fd = -1;
+ i->state = IPCP_INIT;
+
+ rw_lock_init(&i->state_lock);
+
+ return i;
+}
+
int ipcp_arg_check(int argc, char * argv[])
{
if (argc != 3)
@@ -52,25 +68,33 @@ void * ipcp_main_loop(void * o)
uint8_t buf[IPCP_MSG_BUF_SIZE];
struct ipcp * _ipcp = (struct ipcp *) o;
- ipcp_msg_t * msg;
- ssize_t count;
- buffer_t buffer;
- ipcp_msg_t ret_msg = IPCP_MSG__INIT;
+ ipcp_msg_t * msg;
+ ssize_t count;
+ buffer_t buffer;
+ ipcp_msg_t ret_msg = IPCP_MSG__INIT;
dif_config_msg_t * conf_msg;
struct dif_config conf;
+ char * sock_path;
+
if (_ipcp == NULL) {
LOG_ERR("Invalid ipcp struct.");
return (void *) 1;
}
- sockfd = server_socket_open(ipcp_sock_path(getpid()));
+ sock_path = ipcp_sock_path(getpid());
+ if (sock_path == NULL)
+ return (void *) 1;
+
+ sockfd = server_socket_open(sock_path);
if (sockfd < 0) {
LOG_ERR("Could not open server socket.");
return (void *) 1;
}
+ free(sock_path);
+
while (true) {
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 393af994..c9002d4d 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -23,6 +23,8 @@
#ifndef IPCPD_IPCP_H
#define IPCPD_IPCP_H
+#include <ouroboros/rw_lock.h>
+
#include "ipcp-ops.h"
#include "ipcp-data.h"
@@ -38,11 +40,13 @@ enum ipcp_state {
struct ipcp {
struct ipcp_data * data;
struct ipcp_ops * ops;
+ int irmd_fd;
enum ipcp_state state;
- int irmd_fd;
+ rw_lock_t state_lock;
};
+struct ipcp * ipcp_instance_create();
void * ipcp_main_loop(void * o);
void * ipcp_sdu_loop(void * o);
int ipcp_arg_check(int argc, char * argv[]);
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 14a698ee..3296540e 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -33,6 +33,7 @@
#include <ouroboros/sockets.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
+#include <ouroboros/rw_lock.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -81,18 +82,25 @@ struct shim_ap_data {
instance_name_t * api;
struct shm_du_map * dum;
struct bmp * fds;
-
struct shm_ap_rbuff * rb;
+ rw_lock_t data_lock;
+
struct flow flows[AP_MAX_FLOWS];
+ rw_lock_t flows_lock;
+
+ pthread_t mainloop;
+ pthread_t sduloop;
+ pthread_t handler;
+ pthread_t sdu_reader;
+
+ rw_lock_t thread_lock;
- pthread_t mainloop;
- pthread_t sduloop;
- pthread_t handler;
- pthread_t sdu_reader;
} * _ap_instance;
static int shim_ap_init(char * ap_name)
{
+ int i;
+
_ap_instance = malloc(sizeof(struct shim_ap_data));
if (_ap_instance == NULL) {
return -1;
@@ -130,11 +138,22 @@ static int shim_ap_init(char * ap_name)
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
instance_name_destroy(_ap_instance->api);
+ shm_du_map_close(_ap_instance->dum);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
}
+ for (i = 0; i < AP_MAX_FLOWS; i ++) {
+ _ap_instance->flows[i].rb = NULL;
+ _ap_instance->flows[i].port_id = -1;
+ _ap_instance->flows[i].state = FLOW_NULL;
+ }
+
+ rw_lock_init(&_ap_instance->flows_lock);
+ rw_lock_init(&_ap_instance->thread_lock);
+ rw_lock_init(&_ap_instance->data_lock);
+
return 0;
}
@@ -144,6 +163,9 @@ void shim_ap_fini()
if (_ap_instance == NULL)
return;
+
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
if (_ap_instance->api != NULL)
instance_name_destroy(_ap_instance->api);
if (_ap_instance->fds != NULL)
@@ -152,41 +174,76 @@ void shim_ap_fini()
shm_du_map_close(_ap_instance->dum);
if (_ap_instance->rb != NULL)
shm_ap_rbuff_destroy(_ap_instance->rb);
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
for (i = 0; i < AP_MAX_FLOWS; i ++)
if (_ap_instance->flows[i].rb != NULL)
shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
free(_ap_instance);
}
static int port_id_to_fd(int port_id)
{
int i;
- for (i = 0; i < AP_MAX_FLOWS; ++i)
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (_ap_instance->flows[i].port_id == port_id
- && _ap_instance->flows[i].state != FLOW_NULL)
+ && _ap_instance->flows[i].state != FLOW_NULL) {
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return i;
+ }
+ }
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return -1;
}
static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
{
/* the AP chooses the amount of headspace and tailspace */
- size_t index = shm_create_du_buff(_ap_instance->dum,
- count,
- 0,
- buf,
- count);
- struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
-
- if (index == -1)
+ size_t index;
+ struct rb_entry e;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count);
+
+ if (index == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
+
+ e.index = index;
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
+ e.port_id = _ap_instance->flows[fd].port_id;
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
shm_release_du_buff(_ap_instance->dum, index);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return -EPIPE;
}
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return 0;
}
@@ -206,8 +263,7 @@ struct ipcp_udp_data {
int s_fd;
fd_set flow_fd_s;
-
- pthread_mutex_t lock;
+ rw_lock_t fd_lock;
};
struct ipcp_udp_data * ipcp_udp_data_create()
@@ -229,6 +285,8 @@ struct ipcp_udp_data * ipcp_udp_data_create()
return NULL;
}
+ rw_lock_init(&udp_data->fd_lock);
+
FD_ZERO(&udp_data->flow_fd_s);
return udp_data;
@@ -236,21 +294,49 @@ struct ipcp_udp_data * ipcp_udp_data_create()
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGINT);
+ bool clean_threads = false;
+
switch(sig) {
case SIGINT:
case SIGTERM:
case SIGHUP:
if (info->si_pid == irmd_pid || info->si_pid == 0) {
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
LOG_DBG("Terminating by order of %d. Bye.",
info->si_pid);
+
+ rw_lock_wrlock(&_ipcp->state_lock);
+
+ if (_ipcp->state == IPCP_ENROLLED) {
+ clean_threads = true;
+ }
+
+ _ipcp->state = IPCP_SHUTDOWN;
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ if (clean_threads) {
+ rw_lock_wrlock(&_ap_instance->thread_lock);
+
+ pthread_cancel(_ap_instance->handler);
+ pthread_cancel(_ap_instance->sdu_reader);
+ pthread_cancel(_ap_instance->sduloop);
+
+ pthread_join(_ap_instance->sduloop, NULL);
+ pthread_join(_ap_instance->handler, NULL);
+ pthread_join(_ap_instance->sdu_reader, NULL);
+
+ rw_lock_unlock(&_ap_instance->thread_lock);
+ }
+
pthread_cancel(_ap_instance->mainloop);
- pthread_cancel(_ap_instance->handler);
- pthread_cancel(_ap_instance->sdu_reader);
- pthread_cancel(_ap_instance->sduloop);
- /* FIXME: should be called after join */
- shim_ap_fini();
- exit(0);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+
}
default:
return;
@@ -268,6 +354,7 @@ static void * ipcp_udp_listener()
while (true) {
int fd;
+ int port_id;
memset(&buf, 0, SHIM_UDP_BUF_SIZE);
n = sizeof c_saddr;
n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
@@ -315,21 +402,27 @@ static void * ipcp_udp_listener()
/* reply to IRM */
- _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(),
- buf,
- UNKNOWN_AP,
- UNKNOWN_AE);
- if (_ap_instance->flows[fd].port_id < 0) {
+ port_id = ipcp_flow_req_arr(getpid(),
+ buf,
+ UNKNOWN_AP,
+ UNKNOWN_AE);
+
+ if (port_id < 0) {
LOG_ERR("Could not get port id from IRMd");
close(fd);
continue;
}
- _ap_instance->flows[fd].rb = NULL;
- _ap_instance->flows[fd].state = FLOW_PENDING;
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[fd].port_id = port_id;
+ _ap_instance->flows[fd].rb = NULL;
+ _ap_instance->flows[fd].state = FLOW_PENDING;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
- _ap_instance->flows[fd].port_id, fd);
+ port_id, fd);
}
return 0;
@@ -340,12 +433,26 @@ static void * ipcp_udp_sdu_reader()
int n;
int fd;
char buf[SHIM_UDP_MAX_SDU_SIZE];
- struct timeval tv = {0, 10};
+ struct timeval tv = {0, 1000};
struct sockaddr_in r_saddr;
fd_set read_fds;
while (true) {
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ return (void *) 0;
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ rw_lock_rdlock(&shim_data(_ipcp)->fd_lock);
+
read_fds = shim_data(_ipcp)->flow_fd_s;
+
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
continue;
@@ -369,22 +476,96 @@ static void * ipcp_udp_sdu_reader()
return (void *) 0;
}
+/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
+static void * ipcp_udp_sdu_loop(void * o)
+{
+ while (true) {
+ struct rb_entry * e;
+ int fd;
+ int len = 0;
+ char * buf;
+
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ return (void *) 0;
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+
+ if (e == NULL) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ continue;
+ }
+
+ len = shm_du_map_read_sdu((uint8_t **) &buf,
+ _ap_instance->dum,
+ e->index);
+ if (len == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ free(e);
+ continue;
+ }
+
+ fd = port_id_to_fd(e->port_id);
+
+ if (fd == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ free(e);
+ continue;
+ }
+
+ if (len == 0) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ free(e);
+ continue;
+ }
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ send(fd, buf, len, 0);
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ shm_release_du_buff(_ap_instance->dum, e->index);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ free(e);
+ }
+
+ return (void *) 1;
+}
+
static int ipcp_udp_bootstrap(struct dif_config * conf)
{
char ipstr[INET_ADDRSTRLEN];
char dnsstr[INET_ADDRSTRLEN];
- int enable = 1;
+ int enable = 1;
+ int fd = -1;
if (conf->type != THIS_TYPE) {
LOG_ERR("Config doesn't match IPCP type.");
return -1;
}
+ rw_lock_wrlock(&_ipcp->state_lock);
+
if (_ipcp->state != IPCP_INIT) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("IPCP in wrong state.");
return -1;
}
+ _ipcp->state = IPCP_BOOTSTRAPPING;
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
if (inet_ntop(AF_INET,
&conf->ip_addr,
ipstr,
@@ -408,37 +589,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
strcpy(dnsstr, "not set");
}
- shim_data(_ipcp)->ip_addr = conf->ip_addr;
- shim_data(_ipcp)->dns_addr = conf->dns_addr;
-
/* UDP listen server */
-
- if ((shim_data(_ipcp)->s_fd =
- socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
- LOG_DBGF("Can't create socket.");
+ if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
+ LOG_ERR("Can't create socket.");
return -1;
}
- if (setsockopt(shim_data(_ipcp)->s_fd,
+ if (setsockopt(fd,
SOL_SOCKET,
- SO_REUSEADDR,
- &enable,
+ SO_REUSEADDR,
+ &enable,
sizeof(int)) < 0) {
- LOG_DBGF("Setsockopt(SO_REUSEADDR) failed.");
+ LOG_WARN("Setsockopt(SO_REUSEADDR) failed.");
}
+ shim_data(_ipcp)->s_fd = fd;
+ shim_data(_ipcp)->ip_addr = conf->ip_addr;
+ shim_data(_ipcp)->dns_addr = conf->dns_addr;
+
shim_data(_ipcp)->s_saddr.sin_family = AF_INET;
shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr;
shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT;
- if (bind(shim_data(_ipcp)->s_fd,
+ if (bind(fd,
(struct sockaddr *) &shim_data(_ipcp)->s_saddr,
sizeof shim_data(_ipcp)->s_saddr ) < 0) {
LOG_ERR("Couldn't bind to %s.", ipstr);
return -1;
}
- FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+ rw_lock_wrlock(&_ap_instance->thread_lock);
pthread_create(&_ap_instance->handler,
NULL,
@@ -449,8 +629,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
ipcp_udp_sdu_reader,
NULL);
+ pthread_create(&_ap_instance->sduloop,
+ NULL,
+ ipcp_udp_sdu_loop,
+ NULL);
+
+ rw_lock_unlock(&_ap_instance->thread_lock);
+
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
+ FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
+ rw_lock_wrlock(&_ipcp->state_lock);
+
_ipcp->state = IPCP_ENROLLED;
+ rw_lock_unlock(&_ipcp->state_lock);
+
LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.",
getpid());
@@ -595,16 +792,21 @@ static int ipcp_udp_name_reg(char * name)
uint32_t ip_addr;
#endif
- if (_ipcp->state != IPCP_ENROLLED) {
- LOG_DBGF("Won't register with non-enrolled IPCP.");
- return -1;
- }
-
if (strlen(name) > 24) {
LOG_ERR("DNS names cannot be longer than 24 chars.");
return -1;
}
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("Won't register with non-enrolled IPCP.");
+ return -1; /* -ENOTENROLLED */
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
@@ -659,6 +861,16 @@ static int ipcp_udp_name_unreg(char * name)
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
/* unregister application with DNS server */
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("IPCP is not enrolled");
+ return -1; /* -ENOTENROLLED */
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
dns_addr = shim_data(_ipcp)->dns_addr;
if (dns_addr != 0) {
if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN)
@@ -697,10 +909,21 @@ static int ipcp_udp_flow_alloc(int port_id,
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
uint32_t dns_addr = 0;
#endif
+ struct shm_ap_rbuff * rb;
if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)
return -1;
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("Won't allocate flow with non-enrolled IPCP.");
+ return -1; /* -ENOTENROLLED */
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
if (strlen(dst_name) > 255
|| strlen(src_ap_name) > 255
|| strlen(src_ae_name) > 255) {
@@ -726,6 +949,7 @@ static int ipcp_udp_flow_alloc(int port_id,
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
dns_addr = shim_data(_ipcp)->dns_addr;
+
if (dns_addr != 0) {
ip_addr = ddns_resolve(dst_name, dns_addr);
if (ip_addr == 0) {
@@ -789,35 +1013,45 @@ static int ipcp_udp_flow_alloc(int port_id,
free(recv_buf);
- _ap_instance->flows[fd].port_id = port_id;
- _ap_instance->flows[fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
- if (_ap_instance->flows[fd].rb == NULL) {
+ rb = shm_ap_rbuff_open(n_pid);
+ if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
close(fd);
- return -1;
+ return -1; /* -ENORBUFF */
}
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[fd].port_id = port_id;
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = rb;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
/* tell IRMd that flow allocation "worked" */
if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
+ shm_ap_rbuff_close(rb);
LOG_ERR("Failed to notify IRMd about flow allocation reply");
close(fd);
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
return -1;
}
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd);
return fd;
}
static int ipcp_udp_flow_alloc_resp(int port_id,
- pid_t n_pid,
- int response)
+ pid_t n_pid,
+ int response)
{
+ struct shm_ap_rbuff * rb;
int fd = port_id_to_fd(port_id);
if (fd < 0) {
LOG_DBGF("Could not find flow with port_id %d.", port_id);
@@ -829,22 +1063,44 @@ static int ipcp_udp_flow_alloc_resp(int port_id,
/* awaken pending flow */
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
if (_ap_instance->flows[fd].state != FLOW_PENDING) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
LOG_DBGF("Flow was not pending.");
return -1;
}
- _ap_instance->flows[fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
- if (_ap_instance->flows[fd].rb == NULL) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rb = shm_ap_rbuff_open(n_pid);
+ if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
_ap_instance->flows[fd].state = FLOW_NULL;
_ap_instance->flows[fd].port_id = 0;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return 0;
}
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = rb;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);
return 0;
@@ -853,18 +1109,33 @@ static int ipcp_udp_flow_alloc_resp(int port_id,
static int ipcp_udp_flow_dealloc(int port_id)
{
int fd = port_id_to_fd(port_id);
+ struct shm_ap_rbuff * rb;
+
if (fd < 0) {
LOG_DBGF("Could not find flow with port_id %d.", port_id);
return 0;
}
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
_ap_instance->flows[fd].state = FLOW_NULL;
_ap_instance->flows[fd].port_id = 0;
- if (_ap_instance->flows[fd].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+ rb = _ap_instance->flows[fd].rb;
+ _ap_instance->flows[fd].rb = NULL;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ if (rb != NULL)
+ shm_ap_rbuff_close(rb);
+
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
close(fd);
+
return 0;
}
@@ -877,7 +1148,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name)
if (shim_ap_init(ap_name) < 0)
return NULL;
- i = malloc(sizeof *i);
+ i = ipcp_instance_create();
if (i == NULL)
return NULL;
@@ -914,45 +1185,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name)
#ifndef MAKE_CHECK
-/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
-/* FIXME: stop eating the CPU */
-static void * ipcp_udp_sdu_loop(void * o)
-{
- while (true) {
- struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb);
- int fd;
- int len = 0;
- char * buf;
- if (e == NULL)
- continue;
-
- len = shm_du_map_read_sdu((uint8_t **) &buf,
- _ap_instance->dum,
- e->index);
- if (len == -1)
- continue;
-
- fd = port_id_to_fd(e->port_id);
-
- if (fd == -1)
- continue;
-
- if (len == 0)
- continue;
-
- send(fd, buf, len, 0);
-
- shm_release_du_buff(_ap_instance->dum, e->index);
- }
-
- return (void *) 1;
-}
-
int main (int argc, char * argv[])
{
/* argument 1: pid of irmd ? */
/* argument 2: ap name */
struct sigaction sig_act;
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGINT);
+ sigaddset(&sigset, SIGQUIT);
+ sigaddset(&sigset, SIGHUP);
+ sigaddset(&sigset, SIGPIPE);
if (ipcp_arg_check(argc, argv)) {
LOG_ERR("Wrong arguments.");
@@ -980,13 +1223,19 @@ int main (int argc, char * argv[])
exit(1);
}
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
- pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL);
- pthread_join(_ap_instance->sduloop, NULL);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+
pthread_join(_ap_instance->mainloop, NULL);
- pthread_join(_ap_instance->handler, NULL);
- pthread_join(_ap_instance->sdu_reader, NULL);
+
+ shim_ap_fini();
+
+ free(_ipcp->data);
+ free(_ipcp->ops);
+ free(_ipcp);
exit(0);
}
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6c977cbb..6c04ccc5 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -245,18 +245,17 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
if (rb == NULL)
return NULL;
- e = malloc(sizeof(*e));
- if (e == NULL)
- return NULL;
-
pthread_mutex_lock(rb->shm_mutex);
if (shm_rbuff_used(rb) == 0) {
pthread_mutex_unlock(rb->shm_mutex);
- free(e);
return NULL;
}
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
*e = *(rb->shm_base + *rb->ptr_tail);
*rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);