summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/normal/dht.c23
-rw-r--r--src/irmd/ipcp.c66
-rw-r--r--src/irmd/main.c25
3 files changed, 80 insertions, 34 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index 74618658..954ca670 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -813,7 +813,14 @@ static void lookup_new_addrs(struct lookup * lu,
static enum lookup_state lookup_wait(struct lookup * lu)
{
+ struct timespec timeo = {KAD_T_RESP, 0};
+ struct timespec abs;
enum lookup_state state;
+ int ret = 0;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ ts_add(&abs, &timeo, &abs);
pthread_mutex_lock(&lu->lock);
@@ -823,11 +830,14 @@ static enum lookup_state lookup_wait(struct lookup * lu)
pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu);
while (lu->state == LU_PENDING)
- pthread_cond_wait(&lu->cond, &lu->lock);
+ ret = -pthread_cond_timedwait(&lu->cond, &lu->lock, &abs);
pthread_cleanup_pop(false);
- state = lu->state;
+ if (ret == -ETIMEDOUT)
+ state = LU_COMPLETE;
+ else
+ state = lu->state;
pthread_mutex_unlock(&lu->lock);
@@ -1483,6 +1493,7 @@ static struct lookup * kad_lookup(struct dht * dht,
uint64_t addrs[KAD_ALPHA + 1];
enum lookup_state state;
struct lookup * lu;
+ size_t out = 0;
lu = lookup_create(dht, id);
if (lu == NULL)
@@ -1498,7 +1509,8 @@ static struct lookup * kad_lookup(struct dht * dht,
return NULL;
}
- if (kad_find(dht, id, addrs, code) == 0) {
+ out += kad_find(dht, id, addrs, code);
+ if (out == 0) {
pthread_rwlock_wrlock(&dht->lock);
list_del(&lu->next);
pthread_rwlock_unlock(&dht->lock);
@@ -1507,17 +1519,18 @@ static struct lookup * kad_lookup(struct dht * dht,
}
while ((state = lookup_wait(lu)) != LU_COMPLETE) {
+ --out;
switch (state) {
case LU_UPDATE:
lookup_new_addrs(lu, addrs);
- if (addrs[0] == 0) {
+ if (addrs[0] == 0 && out == 0) {
pthread_rwlock_wrlock(&dht->lock);
list_del(&lu->next);
pthread_rwlock_unlock(&dht->lock);
return lu;
}
- kad_find(dht, id, addrs, code);
+ out += kad_find(dht, id, addrs, code);
break;
case LU_DESTROY:
pthread_rwlock_wrlock(&dht->lock);
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 28e91b18..bf71bc3d 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -48,13 +48,12 @@ static void close_ptr(void * o)
ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
ipcp_msg_t * msg)
{
- int sockfd = 0;
- buffer_t buf;
- char * sock_path = NULL;
- ssize_t count = 0;
- ipcp_msg_t * recv_msg = NULL;
- struct timeval tv = {(SOCKET_TIMEOUT / 1000),
- (SOCKET_TIMEOUT % 1000) * 1000};
+ int sockfd = 0;
+ buffer_t buf;
+ char * sock_path = NULL;
+ ssize_t count = 0;
+ ipcp_msg_t * recv_msg = NULL;
+ struct timeval tv;
if (kill(api, 0) < 0)
return NULL;
@@ -83,6 +82,29 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
return NULL;
}
+ switch (msg->code) {
+ case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
+ tv.tv_sec = BOOTSTRAP_TIMEOUT / 1000;
+ tv.tv_usec = (BOOTSTRAP_TIMEOUT % 1000) * 1000;
+ break;
+ case IPCP_MSG_CODE__IPCP_ENROLL:
+ tv.tv_sec = ENROLL_TIMEOUT / 1000;
+ tv.tv_usec = (ENROLL_TIMEOUT % 1000) * 1000;
+ break;
+ case IPCP_MSG_CODE__IPCP_REG:
+ tv.tv_sec = REG_TIMEOUT / 1000;
+ tv.tv_usec = (REG_TIMEOUT % 1000) * 1000;
+ break;
+ case IPCP_MSG_CODE__IPCP_QUERY:
+ tv.tv_sec = QUERY_TIMEOUT / 1000;
+ tv.tv_usec = (QUERY_TIMEOUT % 1000) * 1000;
+ break;
+ default:
+ tv.tv_sec = SOCKET_TIMEOUT / 1000;
+ tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000;
+ break;
+ }
+
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
(void *) &tv, sizeof(tv)))
log_warn("Failed to set timeout on socket.");
@@ -187,9 +209,9 @@ int ipcp_destroy(pid_t api)
int ipcp_bootstrap(pid_t api,
ipcp_config_msg_t * conf)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
if (conf == NULL)
return -EINVAL;
@@ -223,7 +245,7 @@ int ipcp_enroll(pid_t api,
if (dst == NULL)
return -EINVAL;
- msg.code = IPCP_MSG_CODE__IPCP_ENROLL;
+ msg.code = IPCP_MSG_CODE__IPCP_ENROLL;
msg.dst_name = (char *) dst;
recv_msg = send_recv_ipcp_msg(api, &msg);
@@ -259,9 +281,9 @@ int ipcp_reg(pid_t api,
const uint8_t * hash,
size_t len)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
assert(hash);
@@ -289,9 +311,9 @@ int ipcp_unreg(pid_t api,
const uint8_t * hash,
size_t len)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
msg.code = IPCP_MSG_CODE__IPCP_UNREG;
msg.has_hash = true;
@@ -317,9 +339,9 @@ int ipcp_query(pid_t api,
const uint8_t * hash,
size_t len)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
msg.code = IPCP_MSG_CODE__IPCP_QUERY;
msg.has_hash = true;
@@ -348,9 +370,9 @@ int ipcp_flow_alloc(pid_t api,
size_t len,
qoscube_t cube)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
assert(dst);
@@ -385,9 +407,9 @@ int ipcp_flow_alloc_resp(pid_t api,
pid_t n_api,
int response)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
msg.has_port_id = true;
@@ -415,9 +437,9 @@ int ipcp_flow_alloc_resp(pid_t api,
int ipcp_flow_dealloc(pid_t api,
int port_id)
{
- ipcp_msg_t msg = IPCP_MSG__INIT;
+ ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
+ int ret = -1;
msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
msg.has_port_id = true;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 96b0b729..3f83ab2c 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -222,7 +222,8 @@ static struct ipcp_entry * get_ipcp_entry_by_name(const char * name)
return NULL;
}
-static struct ipcp_entry * get_ipcp_by_dst_name(const char * name)
+static struct ipcp_entry * get_ipcp_by_dst_name(const char * name,
+ pid_t src)
{
struct list_head * p;
struct list_head * h;
@@ -233,7 +234,7 @@ static struct ipcp_entry * get_ipcp_by_dst_name(const char * name)
list_for_each_safe(p, h, &irmd.ipcps) {
struct ipcp_entry * e = list_entry(p, struct ipcp_entry, next);
- if (e->dif_name == NULL)
+ if (e->dif_name == NULL || e->api == src)
continue;
hash = malloc(IPCP_HASH_LEN(e));
@@ -1103,7 +1104,7 @@ static int flow_alloc(pid_t api,
int state;
uint8_t * hash;
- ipcp = get_ipcp_by_dst_name(dst);
+ ipcp = get_ipcp_by_dst_name(dst, api);
if (ipcp == NULL) {
log_info("Destination %s unreachable.", dst);
return -1;
@@ -1199,7 +1200,7 @@ static int flow_dealloc(pid_t api,
if (irm_flow_get_state(f) == FLOW_DEALLOC_PENDING) {
list_del(&f->next);
if ((kill(f->n_api, 0) < 0 && f->n_1_api == -1) ||
- (kill (f->n_1_api, 0) < 0 && f->n_api == -1))
+ (kill(f->n_1_api, 0) < 0 && f->n_api == -1))
irm_flow_set_state(f, FLOW_NULL);
clear_irm_flow(f);
irm_flow_destroy(f);
@@ -1638,6 +1639,8 @@ void * irm_sanitize(void * o)
pthread_rwlock_wrlock(&irmd.flows_lock);
list_for_each_safe(p, h, &irmd.irm_flows) {
+ int ipcpi;
+ int port_id;
struct irm_flow * f =
list_entry(p, struct irm_flow, next);
@@ -1645,9 +1648,13 @@ void * irm_sanitize(void * o)
&& ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
log_dbg("Pending port_id %d timed out.",
f->port_id);
- f->n_1_api = -1;
+ f->n_api = -1;
irm_flow_set_state(f, FLOW_DEALLOC_PENDING);
- ipcp_flow_dealloc(f->n_1_api, f->port_id);
+ ipcpi = f->n_1_api;
+ port_id = f->port_id;
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ ipcp_flow_dealloc(ipcpi, port_id);
+ pthread_rwlock_wrlock(&irmd.flows_lock);
continue;
}
@@ -1660,7 +1667,11 @@ void * irm_sanitize(void * o)
shm_flow_set_destroy(set);
f->n_api = -1;
irm_flow_set_state(f, FLOW_DEALLOC_PENDING);
- ipcp_flow_dealloc(f->n_1_api, f->port_id);
+ ipcpi = f->n_1_api;
+ port_id = f->port_id;
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ ipcp_flow_dealloc(ipcpi, port_id);
+ pthread_rwlock_wrlock(&irmd.flows_lock);
continue;
}