summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/eth/eth.c54
-rw-r--r--src/ipcpd/ipcp.c77
-rw-r--r--src/ipcpd/ipcp.h9
-rw-r--r--src/ipcpd/local/main.c58
-rw-r--r--src/ipcpd/udp/main.c56
-rw-r--r--src/ipcpd/unicast/fa.c81
6 files changed, 100 insertions, 235 deletions
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 12bd294e..ef64c07e 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -136,7 +136,6 @@
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + ETH_MTU_MAX)
#endif
-#define ALLOC_TIMEO 10 /* ms */
#define NAME_QUERY_TIMEO 2000 /* ms */
#define MGMT_TIMEO 100 /* ms */
#define MGMT_FRAME_SIZE 2048
@@ -575,32 +574,10 @@ static int eth_ipcp_req(uint8_t * r_addr,
const void * data,
size_t len)
{
- struct timespec ts = {0, ALLOC_TIMEO * MILLION};
- struct timespec abstime;
- int fd;
- time_t mpl = IPCP_ETH_MPL;
+ int fd;
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- /* reply to IRM, called under lock to prevent race */
- fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data, len);
if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
log_err("Could not get new flow from IRMd.");
return -1;
}
@@ -615,11 +592,6 @@ static int eth_ipcp_req(uint8_t * r_addr,
pthread_rwlock_unlock(&eth_data.flows_lock);
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
#if defined(BUILD_ETH_DIX)
log_dbg("New flow request, fd %d, remote endpoint %d.", fd, r_eid);
#elif defined(BUILD_ETH_LLC)
@@ -1704,8 +1676,6 @@ static int eth_ipcp_flow_alloc_resp(int fd,
const void * data,
size_t len)
{
- struct timespec ts = {0, ALLOC_TIMEO * MILLION};
- struct timespec abstime;
#if defined(BUILD_ETH_DIX)
uint16_t r_eid;
#elif defined(BUILD_ETH_LLC)
@@ -1714,26 +1684,8 @@ static int eth_ipcp_flow_alloc_resp(int fd,
#endif
uint8_t r_addr[MAC_SIZE];
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ if (ipcp_wait_flow_resp(fd) < 0)
return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
pthread_rwlock_wrlock(&eth_data.flows_lock);
#if defined(BUILD_ETH_DIX)
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index f40c70e6..eb83f3cc 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -35,6 +35,7 @@
#define OUROBOROS_PREFIX "ipcpd/ipcp"
#define IPCP_INFO "info"
+#define ALLOC_TIMEOUT 10 * MILLION /* 10 ms */
#include <ouroboros/hash.h>
#include <ouroboros/logs.h>
@@ -256,6 +257,82 @@ static void * acceptloop(void * o)
return (void *) 0;
}
+int ipcp_wait_flow_req_arr(const uint8_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const void * data,
+ size_t len)
+{
+ struct timespec ts = {0, ALLOC_TIMEOUT};
+ struct timespec abstime;
+ int fd;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_err("Won't allocate over non-operational IPCP.");
+ return -EIPCPSTATE;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
+ fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ log_err("Failed to get fd for flow.");
+ return -ENOTALLOC;
+ }
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ return fd;
+
+}
+
+int ipcp_wait_flow_resp(const int fd)
+{
+ struct timespec ts = {0, ALLOC_TIMEOUT};
+ struct timespec abstime;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
+ ts_add(&abstime, &ts, &abstime);
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &abstime);
+ }
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ assert(ipcpi.alloc_id == fd);
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ return 0;
+}
+
static void free_msg(void * o)
{
ipcp_msg__free_unpacked((ipcp_msg_t *) o, NULL);
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 2d0e39de..9b08d66e 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -136,6 +136,15 @@ enum ipcp_state ipcp_get_state(void);
int ipcp_parse_arg(int argc,
char * argv[]);
+/* Helper functions to handle races during flow allocation */
+int ipcp_wait_flow_req_arr(const uint8_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const void * data,
+ size_t len);
+
+int ipcp_wait_flow_resp(const int fd);
+
/* Helper functions for directory entries, could be moved */
uint8_t * ipcp_hash_dup(const uint8_t * hash);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 2a5199bb..a2f8c6dc 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -48,8 +48,7 @@
#include <sys/wait.h>
#include <assert.h>
-#define THIS_TYPE IPCP_LOCAL
-#define ALLOC_TIMEOUT 10 /* ms */
+#define THIS_TYPE IPCP_LOCAL
struct ipcp ipcpi;
@@ -196,38 +195,14 @@ static int local_ipcp_flow_alloc(int fd,
const void * data,
size_t len)
{
- struct timespec ts = {0, ALLOC_TIMEOUT * MILLION};
- struct timespec abstime;
- int out_fd = -1;
- time_t mpl = IPCP_LOCAL_MPL;
+ int out_fd = -1;
log_dbg("Allocating flow to " HASH_FMT32 " on fd %d.",
HASH_VAL32(dst), fd);
assert(dst);
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- out_fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl,
- data, len);
+ out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data, len);
if (out_fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
log_dbg("Flow allocation failed: %d", out_fd);
return -1;
}
@@ -239,11 +214,6 @@ static int local_ipcp_flow_alloc(int fd,
pthread_rwlock_unlock(&local_data.lock);
- ipcpi.alloc_id = out_fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
fset_add(local_data.flows, fd);
log_info("Pending local allocation request on fd %d.", fd);
@@ -256,31 +226,11 @@ static int local_ipcp_flow_alloc_resp(int fd,
const void * data,
size_t len)
{
- struct timespec ts = {0, ALLOC_TIMEOUT * MILLION};
- struct timespec abstime;
int out_fd = -1;
time_t mpl = IPCP_LOCAL_MPL;
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ if (ipcp_wait_flow_resp(fd) < 0)
return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
pthread_rwlock_wrlock(&local_data.lock);
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index 86bb1afe..aacf976e 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -66,7 +66,6 @@
#define IPCP_UDP_BUF_SIZE 8980
#define IPCP_UDP_MSG_SIZE 8980
#define DNS_TTL 86400
-#define FD_UPDATE_TIMEOUT 100 /* microseconds */
#define SADDR ((struct sockaddr *) &udp_data.s_saddr)
#define SADDR_SIZE (sizeof(udp_data.s_saddr))
@@ -290,31 +289,10 @@ static int udp_ipcp_port_req(struct sockaddr_in * c_saddr,
const void * data,
size_t len)
{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- struct timespec abstime;
- int fd;
- time_t mpl = IPCP_UDP_MPL;
+ int fd;
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond, &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- log_dbg("Won't allocate over non-operational IPCP.");
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- /* reply to IRM */
- fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data, len);
if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
log_err("Could not get new flow from IRMd.");
return -1;
}
@@ -326,11 +304,6 @@ static int udp_ipcp_port_req(struct sockaddr_in * c_saddr,
pthread_rwlock_unlock(&udp_data.flows_lock);
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
log_dbg("Pending allocation request, fd %d, remote eid %d.",
fd, d_eid);
@@ -1056,34 +1029,11 @@ static int udp_ipcp_flow_alloc_resp(int fd,
const void * data,
size_t len)
{
- struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000};
- struct timespec abstime;
struct sockaddr_in saddr;
int d_eid;
- if (resp)
- return 0;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
+ if (ipcp_wait_flow_resp(fd) < 0)
return -1;
- }
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
pthread_rwlock_rdlock(&udp_data.flows_lock);
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 52ef9684..c1560de0 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -470,81 +470,6 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
return len;
}
-static int fa_wait_irmd_alloc(uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len)
-{
- struct timespec ts = {0, TIMEOUT};
- struct timespec abstime;
- int fd;
- time_t mpl = IPCP_UNICAST_MPL;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Won't allocate over non-operational IPCP.");
- return -EIPCPSTATE;
- }
-
- assert(ipcpi.alloc_id == -1);
-
- fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len);
- if (fd < 0) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- log_dbg("Failed to get fd for flow.");
- return -ENOTALLOC;
- }
-
- ipcpi.alloc_id = fd;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- return fd;
-}
-
-static int fa_wait_irmd_alloc_resp(int fd)
-{
- struct timespec ts = {0, TIMEOUT};
- struct timespec abstime;
-
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
- pthread_mutex_lock(&ipcpi.alloc_lock);
-
- while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
- ts_add(&abstime, &ts, &abstime);
- pthread_cond_timedwait(&ipcpi.alloc_cond,
- &ipcpi.alloc_lock,
- &abstime);
- }
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_mutex_unlock(&ipcpi.alloc_lock);
- return -1;
- }
-
- assert(ipcpi.alloc_id == fd);
-
- ipcpi.alloc_id = -1;
- pthread_cond_broadcast(&ipcpi.alloc_cond);
-
- pthread_mutex_unlock(&ipcpi.alloc_lock);
-
- return 0;
-}
-
static int fa_handle_flow_req(struct fa_msg * msg,
size_t len)
{
@@ -552,6 +477,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
int fd;
qosspec_t qs;
struct fa_flow * flow;
+ uint8_t * dst;
uint8_t * data; /* Piggbacked data on flow alloc request. */
size_t dlen; /* Length of piggybacked data. */
@@ -561,6 +487,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
return -EPERM;
}
+ dst = (uint8_t *)(msg + 1);
data = (uint8_t *) msg + msg_len;
dlen = len - msg_len;
@@ -574,7 +501,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qs.cypher_s = ntoh16(msg->cypher_s);
qs.timeout = ntoh32(msg->timeout);
- fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, data, dlen);
if (fd < 0)
return fd;
@@ -881,7 +808,7 @@ int fa_alloc_resp(int fd,
flow = &fa.flows[fd];
- if (fa_wait_irmd_alloc_resp(fd) < 0)
+ if (ipcp_wait_flow_resp(fd) < 0)
goto fail_alloc_resp;
if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {