summaryrefslogtreecommitdiff
path: root/src/ipcpd/shim-udp
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-19 22:25:46 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd/shim-udp
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/ipcpd/shim-udp')
-rw-r--r--src/ipcpd/shim-udp/main.c84
1 files changed, 61 insertions, 23 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 7c109a8a..050623e4 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -27,6 +27,9 @@
#include <ouroboros/utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -75,6 +78,7 @@ struct {
struct sockaddr_in s_saddr;
int s_fd;
+ flow_set_t * np1_flows;
fd_set flow_fd_s;
/* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */
int uf_to_fd[FD_SETSIZE];
@@ -90,7 +94,7 @@ struct {
pthread_mutex_t fd_set_lock;
} udp_data;
-static void udp_data_init()
+static int udp_data_init()
{
int i;
@@ -104,13 +108,21 @@ static void udp_data_init()
FD_ZERO(&udp_data.flow_fd_s);
+ udp_data.np1_flows = flow_set_create();
+ if (udp_data.np1_flows == NULL)
+ return -ENOMEM;
+
pthread_rwlock_init(&udp_data.flows_lock, NULL);
pthread_cond_init(&udp_data.fd_set_cond, NULL);
pthread_mutex_init(&udp_data.fd_set_lock, NULL);
+
+ return 0;
}
static void udp_data_fini()
{
+ flow_set_destroy(udp_data.np1_flows);
+
pthread_rwlock_destroy(&udp_data.flows_lock);
pthread_mutex_destroy(&udp_data.fd_set_lock);
pthread_cond_destroy(&udp_data.fd_set_cond);
@@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port)
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
close(skfd);
@@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader()
static void * ipcp_udp_sdu_loop(void * o)
{
+ int fd;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000};
+ struct shm_du_buff * sdb;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd;
- struct shm_du_buff * sdb;
+ int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = ipcp_read_shim(&sdb);
- if (fd < 0)
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
continue;
+ }
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Bad read from fd %d.", fd);
+ continue;
+ }
- fd = udp_data.fd_to_uf[fd].skfd;
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ fd = udp_data.fd_to_uf[fd].skfd;
+
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (send(fd,
- shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- 0) < 0)
- LOG_ERR("Failed to send SDU.");
+ if (send(fd,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ 0) < 0)
+ LOG_ERR("Failed to send SDU.");
- ipcp_flow_del(sdb);
+ ipcp_flow_del(sdb);
+ }
}
return (void *) 1;
@@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd,
udp_data.fd_to_uf[fd].skfd = skfd;
udp_data.uf_to_fd[skfd] = fd;
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)
set_fd(skfd);
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd)
{
int skfd = -1;
int remote_udp = -1;
+ struct timespec t = {0, 10000};
struct sockaddr_in r_saddr;
socklen_t r_saddr_len = sizeof(r_saddr);
+ flow_set_del(udp_data.np1_flows, fd);
+
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);
@@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd)
close(skfd);
- flow_dealloc(fd);
-
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
@@ -1149,13 +1184,16 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- udp_data_init();
-
if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
+ if (udp_data_init() < 0) {
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
/* store the process id of the irmd */
irmd_api = atoi(argv[1]);
@@ -1196,10 +1234,10 @@ int main(int argc, char * argv[])
pthread_join(udp_data.handler, NULL);
pthread_join(udp_data.sdu_reader, NULL);
- ap_fini();
-
udp_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);