summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/ipcp.c34
-rw-r--r--src/ipcpd/ipcp.h6
-rw-r--r--src/ipcpd/shim-udp/main.c456
-rw-r--r--src/lib/dev.c212
-rw-r--r--src/lib/shm_ap_rbuff.c4
5 files changed, 566 insertions, 146 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 1e122b73..76d3620b 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -29,6 +29,22 @@
#define OUROBOROS_PREFIX "ipcpd/ipcp"
#include <ouroboros/logs.h>
+struct ipcp * ipcp_instance_create()
+{
+ struct ipcp * i = malloc(sizeof *i);
+ if (i == NULL)
+ return NULL;
+
+ i->data = NULL;
+ i->ops = NULL;
+ i->irmd_fd = -1;
+ i->state = IPCP_INIT;
+
+ rw_lock_init(&i->state_lock);
+
+ return i;
+}
+
int ipcp_arg_check(int argc, char * argv[])
{
if (argc != 3)
@@ -52,25 +68,33 @@ void * ipcp_main_loop(void * o)
uint8_t buf[IPCP_MSG_BUF_SIZE];
struct ipcp * _ipcp = (struct ipcp *) o;
- ipcp_msg_t * msg;
- ssize_t count;
- buffer_t buffer;
- ipcp_msg_t ret_msg = IPCP_MSG__INIT;
+ ipcp_msg_t * msg;
+ ssize_t count;
+ buffer_t buffer;
+ ipcp_msg_t ret_msg = IPCP_MSG__INIT;
dif_config_msg_t * conf_msg;
struct dif_config conf;
+ char * sock_path;
+
if (_ipcp == NULL) {
LOG_ERR("Invalid ipcp struct.");
return (void *) 1;
}
- sockfd = server_socket_open(ipcp_sock_path(getpid()));
+ sock_path = ipcp_sock_path(getpid());
+ if (sock_path == NULL)
+ return (void *) 1;
+
+ sockfd = server_socket_open(sock_path);
if (sockfd < 0) {
LOG_ERR("Could not open server socket.");
return (void *) 1;
}
+ free(sock_path);
+
while (true) {
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 393af994..c9002d4d 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -23,6 +23,8 @@
#ifndef IPCPD_IPCP_H
#define IPCPD_IPCP_H
+#include <ouroboros/rw_lock.h>
+
#include "ipcp-ops.h"
#include "ipcp-data.h"
@@ -38,11 +40,13 @@ enum ipcp_state {
struct ipcp {
struct ipcp_data * data;
struct ipcp_ops * ops;
+ int irmd_fd;
enum ipcp_state state;
- int irmd_fd;
+ rw_lock_t state_lock;
};
+struct ipcp * ipcp_instance_create();
void * ipcp_main_loop(void * o);
void * ipcp_sdu_loop(void * o);
int ipcp_arg_check(int argc, char * argv[]);
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index f9a8c42b..917c343b 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -34,6 +34,7 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/flow.h>
#include <ouroboros/dev.h>
+#include <ouroboros/rw_lock.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -82,18 +83,25 @@ struct shim_ap_data {
instance_name_t * api;
struct shm_du_map * dum;
struct bmp * fds;
-
struct shm_ap_rbuff * rb;
+ rw_lock_t data_lock;
+
struct flow flows[AP_MAX_FLOWS];
+ rw_lock_t flows_lock;
+
+ pthread_t mainloop;
+ pthread_t sduloop;
+ pthread_t handler;
+ pthread_t sdu_reader;
+
+ rw_lock_t thread_lock;
- pthread_t mainloop;
- pthread_t sduloop;
- pthread_t handler;
- pthread_t sdu_reader;
} * _ap_instance;
static int shim_ap_init(char * ap_name)
{
+ int i;
+
_ap_instance = malloc(sizeof(struct shim_ap_data));
if (_ap_instance == NULL) {
return -1;
@@ -131,11 +139,22 @@ static int shim_ap_init(char * ap_name)
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
instance_name_destroy(_ap_instance->api);
+ shm_du_map_close(_ap_instance->dum);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
}
+ for (i = 0; i < AP_MAX_FLOWS; i ++) {
+ _ap_instance->flows[i].rb = NULL;
+ _ap_instance->flows[i].port_id = -1;
+ _ap_instance->flows[i].state = FLOW_NULL;
+ }
+
+ rw_lock_init(&_ap_instance->flows_lock);
+ rw_lock_init(&_ap_instance->thread_lock);
+ rw_lock_init(&_ap_instance->data_lock);
+
return 0;
}
@@ -145,6 +164,9 @@ void shim_ap_fini()
if (_ap_instance == NULL)
return;
+
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
if (_ap_instance->api != NULL)
instance_name_destroy(_ap_instance->api);
if (_ap_instance->fds != NULL)
@@ -153,41 +175,76 @@ void shim_ap_fini()
shm_du_map_close(_ap_instance->dum);
if (_ap_instance->rb != NULL)
shm_ap_rbuff_destroy(_ap_instance->rb);
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
for (i = 0; i < AP_MAX_FLOWS; i ++)
if (_ap_instance->flows[i].rb != NULL)
shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
free(_ap_instance);
}
static int port_id_to_fd(int port_id)
{
int i;
- for (i = 0; i < AP_MAX_FLOWS; ++i)
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (_ap_instance->flows[i].port_id == port_id
- && _ap_instance->flows[i].state != FLOW_NULL)
+ && _ap_instance->flows[i].state != FLOW_NULL) {
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return i;
+ }
+ }
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return -1;
}
static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
{
/* the AP chooses the amount of headspace and tailspace */
- size_t index = shm_create_du_buff(_ap_instance->dum,
- count,
- 0,
- buf,
- count);
- struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
-
- if (index == -1)
+ size_t index;
+ struct rb_entry e;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count);
+
+ if (index == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
+
+ e.index = index;
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
+ e.port_id = _ap_instance->flows[fd].port_id;
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
shm_release_du_buff(_ap_instance->dum, index);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return -EPIPE;
}
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return 0;
}
@@ -207,8 +264,7 @@ struct ipcp_udp_data {
int s_fd;
fd_set flow_fd_s;
-
- pthread_mutex_t lock;
+ rw_lock_t fd_lock;
};
struct ipcp_udp_data * ipcp_udp_data_create()
@@ -230,6 +286,8 @@ struct ipcp_udp_data * ipcp_udp_data_create()
return NULL;
}
+ rw_lock_init(&udp_data->fd_lock);
+
FD_ZERO(&udp_data->flow_fd_s);
return udp_data;
@@ -237,21 +295,49 @@ struct ipcp_udp_data * ipcp_udp_data_create()
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGINT);
+ bool clean_threads = false;
+
switch(sig) {
case SIGINT:
case SIGTERM:
case SIGHUP:
if (info->si_pid == irmd_pid || info->si_pid == 0) {
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
LOG_DBG("Terminating by order of %d. Bye.",
info->si_pid);
+
+ rw_lock_wrlock(&_ipcp->state_lock);
+
+ if (_ipcp->state == IPCP_ENROLLED) {
+ clean_threads = true;
+ }
+
+ _ipcp->state = IPCP_SHUTDOWN;
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ if (clean_threads) {
+ rw_lock_wrlock(&_ap_instance->thread_lock);
+
+ pthread_cancel(_ap_instance->handler);
+ pthread_cancel(_ap_instance->sdu_reader);
+ pthread_cancel(_ap_instance->sduloop);
+
+ pthread_join(_ap_instance->sduloop, NULL);
+ pthread_join(_ap_instance->handler, NULL);
+ pthread_join(_ap_instance->sdu_reader, NULL);
+
+ rw_lock_unlock(&_ap_instance->thread_lock);
+ }
+
pthread_cancel(_ap_instance->mainloop);
- pthread_cancel(_ap_instance->handler);
- pthread_cancel(_ap_instance->sdu_reader);
- pthread_cancel(_ap_instance->sduloop);
- /* FIXME: should be called after join */
- shim_ap_fini();
- exit(0);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+
}
default:
return;
@@ -269,6 +355,7 @@ static void * ipcp_udp_listener()
while (true) {
int fd;
+ int port_id;
memset(&buf, 0, SHIM_UDP_BUF_SIZE);
n = sizeof c_saddr;
n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
@@ -316,21 +403,27 @@ static void * ipcp_udp_listener()
/* reply to IRM */
- _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(),
- buf,
- UNKNOWN_AP,
- UNKNOWN_AE);
- if (_ap_instance->flows[fd].port_id < 0) {
+ port_id = ipcp_flow_req_arr(getpid(),
+ buf,
+ UNKNOWN_AP,
+ UNKNOWN_AE);
+
+ if (port_id < 0) {
LOG_ERR("Could not get port id from IRMd");
close(fd);
continue;
}
- _ap_instance->flows[fd].rb = NULL;
- _ap_instance->flows[fd].state = FLOW_PENDING;
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[fd].port_id = port_id;
+ _ap_instance->flows[fd].rb = NULL;
+ _ap_instance->flows[fd].state = FLOW_PENDING;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
- _ap_instance->flows[fd].port_id, fd);
+ port_id, fd);
}
return 0;
@@ -341,12 +434,26 @@ static void * ipcp_udp_sdu_reader()
int n;
int fd;
char buf[SHIM_UDP_MAX_SDU_SIZE];
- struct timeval tv = {0, 10};
+ struct timeval tv = {0, 1000};
struct sockaddr_in r_saddr;
fd_set read_fds;
while (true) {
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ return (void *) 0;
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ rw_lock_rdlock(&shim_data(_ipcp)->fd_lock);
+
read_fds = shim_data(_ipcp)->flow_fd_s;
+
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
continue;
@@ -370,22 +477,97 @@ static void * ipcp_udp_sdu_reader()
return (void *) 0;
}
+/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
+static void * ipcp_udp_sdu_loop(void * o)
+{
+ while (true) {
+ struct rb_entry * e;
+ int fd;
+ int len = 0;
+ char * buf;
+
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ return (void *) 0;
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+
+ if (e == NULL) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ continue;
+ }
+
+ len = shm_du_map_read_sdu((uint8_t **) &buf,
+ _ap_instance->dum,
+ e->index);
+ if (len == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ free(e);
+ continue;
+ }
+
+ fd = port_id_to_fd(e->port_id);
+
+ if (fd == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ free(e);
+ continue;
+ }
+
+ if (len == 0) {
+ rw_lock_unlock(&_ap_instance->data_lock);
+ free(e);
+ continue;
+ }
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ if (send(fd, buf, len, 0) < 0)
+ LOG_ERR("Failed to send SDU.");
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ shm_release_du_buff(_ap_instance->dum, e->index);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ free(e);
+ }
+
+ return (void *) 1;
+}
+
static int ipcp_udp_bootstrap(struct dif_config * conf)
{
char ipstr[INET_ADDRSTRLEN];
char dnsstr[INET_ADDRSTRLEN];
- int enable = 1;
+ int enable = 1;
+ int fd = -1;
if (conf->type != THIS_TYPE) {
LOG_ERR("Config doesn't match IPCP type.");
return -1;
}
+ rw_lock_wrlock(&_ipcp->state_lock);
+
if (_ipcp->state != IPCP_INIT) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("IPCP in wrong state.");
return -1;
}
+ _ipcp->state = IPCP_BOOTSTRAPPING;
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
if (inet_ntop(AF_INET,
&conf->ip_addr,
ipstr,
@@ -409,37 +591,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
strcpy(dnsstr, "not set");
}
- shim_data(_ipcp)->ip_addr = conf->ip_addr;
- shim_data(_ipcp)->dns_addr = conf->dns_addr;
-
/* UDP listen server */
-
- if ((shim_data(_ipcp)->s_fd =
- socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
- LOG_DBGF("Can't create socket.");
+ if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
+ LOG_ERR("Can't create socket.");
return -1;
}
- if (setsockopt(shim_data(_ipcp)->s_fd,
+ if (setsockopt(fd,
SOL_SOCKET,
- SO_REUSEADDR,
- &enable,
+ SO_REUSEADDR,
+ &enable,
sizeof(int)) < 0) {
- LOG_DBGF("Setsockopt(SO_REUSEADDR) failed.");
+ LOG_WARN("Setsockopt(SO_REUSEADDR) failed.");
}
+ shim_data(_ipcp)->s_fd = fd;
+ shim_data(_ipcp)->ip_addr = conf->ip_addr;
+ shim_data(_ipcp)->dns_addr = conf->dns_addr;
+
shim_data(_ipcp)->s_saddr.sin_family = AF_INET;
shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr;
shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT;
- if (bind(shim_data(_ipcp)->s_fd,
+ if (bind(fd,
(struct sockaddr *) &shim_data(_ipcp)->s_saddr,
sizeof shim_data(_ipcp)->s_saddr ) < 0) {
LOG_ERR("Couldn't bind to %s.", ipstr);
return -1;
}
- FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+ rw_lock_wrlock(&_ap_instance->thread_lock);
pthread_create(&_ap_instance->handler,
NULL,
@@ -450,8 +631,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
ipcp_udp_sdu_reader,
NULL);
+ pthread_create(&_ap_instance->sduloop,
+ NULL,
+ ipcp_udp_sdu_loop,
+ NULL);
+
+ rw_lock_unlock(&_ap_instance->thread_lock);
+
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
+ FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
+ rw_lock_wrlock(&_ipcp->state_lock);
+
_ipcp->state = IPCP_ENROLLED;
+ rw_lock_unlock(&_ipcp->state_lock);
+
LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.",
getpid());
@@ -596,16 +794,21 @@ static int ipcp_udp_name_reg(char * name)
uint32_t ip_addr;
#endif
- if (_ipcp->state != IPCP_ENROLLED) {
- LOG_DBGF("Won't register with non-enrolled IPCP.");
- return -1;
- }
-
if (strlen(name) > 24) {
LOG_ERR("DNS names cannot be longer than 24 chars.");
return -1;
}
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("Won't register with non-enrolled IPCP.");
+ return -1; /* -ENOTENROLLED */
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
@@ -660,6 +863,16 @@ static int ipcp_udp_name_unreg(char * name)
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
/* unregister application with DNS server */
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("IPCP is not enrolled");
+ return -1; /* -ENOTENROLLED */
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
dns_addr = shim_data(_ipcp)->dns_addr;
if (dns_addr != 0) {
if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN)
@@ -698,10 +911,21 @@ static int ipcp_udp_flow_alloc(int port_id,
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
uint32_t dns_addr = 0;
#endif
+ struct shm_ap_rbuff * rb;
if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)
return -1;
+ rw_lock_rdlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_ENROLLED) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("Won't allocate flow with non-enrolled IPCP.");
+ return -1; /* -ENOTENROLLED */
+ }
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
if (strlen(dst_name) > 255
|| strlen(src_ap_name) > 255
|| strlen(src_ae_name) > 255) {
@@ -727,6 +951,7 @@ static int ipcp_udp_flow_alloc(int port_id,
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
dns_addr = shim_data(_ipcp)->dns_addr;
+
if (dns_addr != 0) {
ip_addr = ddns_resolve(dst_name, dns_addr);
if (ip_addr == 0) {
@@ -790,35 +1015,45 @@ static int ipcp_udp_flow_alloc(int port_id,
free(recv_buf);
- _ap_instance->flows[fd].port_id = port_id;
- _ap_instance->flows[fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
- if (_ap_instance->flows[fd].rb == NULL) {
+ rb = shm_ap_rbuff_open(n_pid);
+ if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
close(fd);
- return -1;
+ return -1; /* -ENORBUFF */
}
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[fd].port_id = port_id;
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = rb;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
/* tell IRMd that flow allocation "worked" */
if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
+ shm_ap_rbuff_close(rb);
LOG_ERR("Failed to notify IRMd about flow allocation reply");
close(fd);
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
return -1;
}
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd);
return fd;
}
static int ipcp_udp_flow_alloc_resp(int port_id,
- pid_t n_pid,
- int response)
+ pid_t n_pid,
+ int response)
{
+ struct shm_ap_rbuff * rb;
int fd = port_id_to_fd(port_id);
if (fd < 0) {
LOG_DBGF("Could not find flow with port_id %d.", port_id);
@@ -830,22 +1065,44 @@ static int ipcp_udp_flow_alloc_resp(int port_id,
/* awaken pending flow */
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
if (_ap_instance->flows[fd].state != FLOW_PENDING) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
LOG_DBGF("Flow was not pending.");
return -1;
}
- _ap_instance->flows[fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
- if (_ap_instance->flows[fd].rb == NULL) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rb = shm_ap_rbuff_open(n_pid);
+ if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
_ap_instance->flows[fd].state = FLOW_NULL;
_ap_instance->flows[fd].port_id = 0;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return 0;
}
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = rb;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
+
FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);
return 0;
@@ -854,18 +1111,33 @@ static int ipcp_udp_flow_alloc_resp(int port_id,
static int ipcp_udp_flow_dealloc(int port_id)
{
int fd = port_id_to_fd(port_id);
+ struct shm_ap_rbuff * rb;
+
if (fd < 0) {
LOG_DBGF("Could not find flow with port_id %d.", port_id);
return 0;
}
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
_ap_instance->flows[fd].state = FLOW_NULL;
_ap_instance->flows[fd].port_id = 0;
- if (_ap_instance->flows[fd].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+ rb = _ap_instance->flows[fd].rb;
+ _ap_instance->flows[fd].rb = NULL;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ if (rb != NULL)
+ shm_ap_rbuff_close(rb);
+
+ rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+
close(fd);
+
return 0;
}
@@ -878,7 +1150,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name)
if (shim_ap_init(ap_name) < 0)
return NULL;
- i = malloc(sizeof *i);
+ i = ipcp_instance_create();
if (i == NULL)
return NULL;
@@ -915,45 +1187,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name)
#ifndef MAKE_CHECK
-/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
-/* FIXME: stop eating the CPU */
-static void * ipcp_udp_sdu_loop(void * o)
-{
- while (true) {
- struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb);
- int fd;
- int len = 0;
- char * buf;
- if (e == NULL)
- continue;
-
- len = shm_du_map_read_sdu((uint8_t **) &buf,
- _ap_instance->dum,
- e->index);
- if (len == -1)
- continue;
-
- fd = port_id_to_fd(e->port_id);
-
- if (fd == -1)
- continue;
-
- if (len == 0)
- continue;
-
- send(fd, buf, len, 0);
-
- shm_release_du_buff(_ap_instance->dum, e->index);
- }
-
- return (void *) 1;
-}
-
int main (int argc, char * argv[])
{
/* argument 1: pid of irmd ? */
/* argument 2: ap name */
struct sigaction sig_act;
+ sigset_t sigset;
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGINT);
+ sigaddset(&sigset, SIGQUIT);
+ sigaddset(&sigset, SIGHUP);
+ sigaddset(&sigset, SIGPIPE);
if (ipcp_arg_check(argc, argv)) {
LOG_ERR("Wrong arguments.");
@@ -981,13 +1225,19 @@ int main (int argc, char * argv[])
exit(1);
}
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
- pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL);
- pthread_join(_ap_instance->sduloop, NULL);
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+
pthread_join(_ap_instance->mainloop, NULL);
- pthread_join(_ap_instance->handler, NULL);
- pthread_join(_ap_instance->sdu_reader, NULL);
+
+ shim_ap_fini();
+
+ free(_ipcp->data);
+ free(_ipcp->ops);
+ free(_ipcp);
exit(0);
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ae27a05f..440f40f9 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -31,6 +31,7 @@
#include <ouroboros/shm_du_map.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/utils.h>
+#include <ouroboros/rw_lock.h>
#include <stdlib.h>
#include <string.h>
@@ -47,9 +48,11 @@ struct ap_data {
instance_name_t * api;
struct shm_du_map * dum;
struct bmp * fds;
-
struct shm_ap_rbuff * rb;
+ rw_lock_t data_lock;
+
struct flow flows[AP_MAX_FLOWS];
+ rw_lock_t flows_lock;
} * _ap_instance;
int ap_init(char * ap_name)
@@ -92,14 +95,19 @@ int ap_init(char * ap_name)
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
instance_name_destroy(_ap_instance->api);
+ shm_du_map_close(_ap_instance->dum);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
}
- for (i = 0; i < AP_MAX_FLOWS; ++i)
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
_ap_instance->flows[i].rb = NULL;
+ _ap_instance->flows[i].port_id = -1;
+ }
+ rw_lock_init(&_ap_instance->flows_lock);
+ rw_lock_init(&_ap_instance->data_lock);
return 0;
}
@@ -110,6 +118,9 @@ void ap_fini(void)
if (_ap_instance == NULL)
return;
+
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
if (_ap_instance->api != NULL)
instance_name_destroy(_ap_instance->api);
if (_ap_instance->fds != NULL)
@@ -122,6 +133,8 @@ void ap_fini(void)
if (_ap_instance->flows[i].rb != NULL)
shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
free(_ap_instance);
}
@@ -142,7 +155,7 @@ int ap_reg(char ** difs,
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = bmp_allocate(_ap_instance->fds);
+ int fd = -1;
if (difs == NULL ||
len == 0 ||
@@ -157,11 +170,16 @@ int ap_reg(char ** difs,
msg.code = IRM_MSG_CODE__IRM_AP_REG;
msg.has_pid = true;
- msg.pid = _ap_instance->api->id;
- msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
msg.n_dif_name = len;
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -176,6 +194,12 @@ int ap_reg(char ** difs,
irm_msg__free_unpacked(recv_msg, NULL);
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
+ fd = bmp_allocate(_ap_instance->fds);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return fd;
}
@@ -194,11 +218,16 @@ int ap_unreg(char ** difs,
msg.code = IRM_MSG_CODE__IRM_AP_UNREG;
msg.has_pid = true;
- msg.pid = _ap_instance->api->id;
- msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
msg.n_dif_name = len;
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -224,8 +253,13 @@ int flow_accept(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.pid = _ap_instance->api->id;
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -235,18 +269,8 @@ int flow_accept(int fd,
return -1;
}
- cfd = bmp_allocate(_ap_instance->fds);
-
- _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
- if (_ap_instance->flows[cfd].rb == NULL) {
- bmp_release(_ap_instance->fds, cfd);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
*ap_name = strdup(recv_msg->ap_name);
if (*ap_name == NULL) {
- bmp_release(_ap_instance->fds, cfd);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -254,21 +278,46 @@ int flow_accept(int fd,
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- bmp_release(_ap_instance->fds, cfd);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
}
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
+ cfd = bmp_allocate(_ap_instance->fds);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[cfd].rb == NULL) {
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
+ bmp_release(_ap_instance->fds, cfd);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ return -1;
+ }
+
_ap_instance->flows[cfd].port_id = recv_msg->port_id;
_ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
-
+ rw_lock_unlock(&_ap_instance->flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
bmp_release(_ap_instance->fds, fd);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return cfd;
}
@@ -281,9 +330,21 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_pid = true;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.pid = _ap_instance->api->id;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
msg.has_port_id = true;
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
msg.port_id = _ap_instance->flows[fd].port_id;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
msg.has_response = true;
msg.response = response;
@@ -318,10 +379,15 @@ int flow_alloc(char * dst_name,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = dst_name;
- msg.ap_name = _ap_instance->api->name;
+ msg.ae_name = src_ae_name;
msg.has_pid = true;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.pid = _ap_instance->api->id;
- msg.ae_name = src_ae_name;
+ msg.ap_name = _ap_instance->api->name;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
@@ -332,11 +398,23 @@ int flow_alloc(char * dst_name,
return -1;
}
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
fd = bmp_allocate(_ap_instance->fds);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
_ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);
if (_ap_instance->flows[fd].rb == NULL) {
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
bmp_release(_ap_instance->fds, fd);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -344,6 +422,8 @@ int flow_alloc(char * dst_name,
_ap_instance->flows[fd].port_id = recv_msg->port_id;
_ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
@@ -357,8 +437,13 @@ int flow_alloc_res(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
msg.has_port_id = true;
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
msg.port_id = _ap_instance->flows[fd].port_id;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -382,8 +467,14 @@ int flow_dealloc(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
msg.has_port_id = true;
+
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.port_id = _ap_instance->flows[fd].port_id;
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -401,42 +492,77 @@ int flow_dealloc(int fd)
int flow_cntl(int fd, int cmd, int oflags)
{
- int old = _ap_instance->flows[fd].oflags;
+ int old;
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ old = _ap_instance->flows[fd].oflags;
+
switch (cmd) {
case FLOW_F_GETFL: /* GET FLOW FLAGS */
- return _ap_instance->flows[fd].oflags;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
_ap_instance->flows[fd].oflags = oflags;
+ rw_lock_unlock(&_ap_instance->flows_lock);
return old;
default:
+ rw_lock_unlock(&_ap_instance->flows_lock);
return FLOW_O_INVALID; /* unknown command */
}
}
ssize_t flow_write(int fd, void * buf, size_t count)
{
- size_t index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count);
- struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
- if (index == -1)
+ size_t index;
+ struct rb_entry e;
+
+ if (buf == NULL)
+ return 0;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ index = shm_create_du_buff(_ap_instance->dum,
+ count + DU_BUFF_HEADSPACE +
+ DU_BUFF_TAILSPACE,
+ DU_BUFF_HEADSPACE,
+ (uint8_t *) buf,
+ count);
+ if (index == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
+ e.index = index;
+ e.port_id = _ap_instance->flows[fd].port_id;
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
shm_release_du_buff(_ap_instance->dum, index);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return -EPIPE;
}
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return 0;
} else {
while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0)
- ;
+ LOG_DBGF("Couldn't write to rbuff.");
}
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return 0;
}
@@ -446,27 +572,41 @@ ssize_t flow_read(int fd, void * buf, size_t count)
int n;
uint8_t * sdu;
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
e = shm_ap_rbuff_read(_ap_instance->rb);
} else {
- /* FIXME: move this to a thread */
+
+ /* FIXME: this will throw away packets for other fd's */
while (e == NULL ||
- e->port_id != _ap_instance->flows[fd].port_id)
+ e->port_id != _ap_instance->flows[fd].port_id) {
e = shm_ap_rbuff_read(_ap_instance->rb);
+ }
}
- if (e == NULL)
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ if (e == NULL) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
n = shm_du_map_read_sdu(&sdu,
_ap_instance->dum,
e->index);
- if (n < 0)
+ if (n < 0) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
memcpy(buf, sdu, MIN(n, count));
shm_release_du_buff(_ap_instance->dum, e->index);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return n;
}
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6c04ccc5..da6f0e33 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -253,8 +253,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
}
e = malloc(sizeof(*e));
- if (e == NULL)
+ if (e == NULL) {
+ pthread_mutex_unlock(rb->shm_mutex);
return NULL;
+ }
*e = *(rb->shm_base + *rb->ptr_tail);