summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c320
1 files changed, 131 insertions, 189 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 69ce765c..a79330ef 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -42,6 +42,7 @@
#include "utils.h"
#include "registry.h"
+#include "irm_flow.h"
#include <sys/socket.h>
#include <sys/un.h>
@@ -57,11 +58,11 @@
#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */
struct ipcp_entry {
- struct list_head next;
- char * name;
- pid_t api;
- enum ipcp_type type;
- char * dif_name;
+ struct list_head next;
+ char * name;
+ pid_t api;
+ enum ipcp_type type;
+ char * dif_name;
};
enum irm_state {
@@ -76,20 +77,6 @@ struct spawned_api {
};
/* keeps track of port_id's between N and N - 1 */
-struct irm_flow {
- struct list_head next;
-
- int port_id;
-
- pid_t n_api;
- pid_t n_1_api;
-
- struct timespec t0;
-
- enum flow_state state;
- pthread_cond_t state_cond;
- pthread_mutex_t state_lock;
-};
struct irm {
/* FIXME: list of ipcps could be merged into the registry */
@@ -118,59 +105,6 @@ struct irm {
pthread_t shm_sanitize;
} * irmd = NULL;
-static struct irm_flow * irm_flow_create()
-{
- struct irm_flow * e = malloc(sizeof(*e));
- if (e == NULL)
- return NULL;
-
- e->n_api = -1;
- e->n_1_api = -1;
- e->port_id = -1;
- e->state = FLOW_NULL;
-
- if (pthread_cond_init(&e->state_cond, NULL)) {
- free(e);
- return NULL;
- }
-
- if (pthread_mutex_init(&e->state_lock, NULL)) {
- free(e);
- return NULL;
- }
-
- e->t0.tv_sec = 0;
- e->t0.tv_nsec = 0;
-
- return e;
-}
-
-static void irm_flow_destroy(struct irm_flow * e)
-{
- pthread_mutex_lock(&e->state_lock);
-
- if (e->state == FLOW_PENDING)
- e->state = FLOW_DESTROY;
- else
- e->state = FLOW_NULL;
-
- pthread_cond_signal(&e->state_cond);
- pthread_mutex_unlock(&e->state_lock);
-
- pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
- (void *) &e->state_lock);
-
- while (e->state != FLOW_NULL)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
-
- pthread_cleanup_pop(true);
-
- pthread_cond_destroy(&e->state_cond);
- pthread_mutex_destroy(&e->state_lock);
-
- free(e);
-}
-
static struct irm_flow * get_irm_flow(int port_id)
{
struct list_head * pos = NULL;
@@ -294,8 +228,8 @@ static pid_t get_ipcp_by_dst_name(char * dst_name)
static pid_t create_ipcp(char * name,
enum ipcp_type ipcp_type)
{
- struct spawned_api * api;
- struct ipcp_entry * tmp = NULL;
+ struct spawned_api * api = NULL;
+ struct ipcp_entry * tmp = NULL;
struct list_head * pos;
@@ -738,9 +672,9 @@ static struct irm_flow * flow_accept(pid_t api,
char * srv_ap_name,
char ** dst_ae_name)
{
- struct irm_flow * pme = NULL;
- struct reg_entry * rne = NULL;
- struct reg_api * rgi = NULL;
+ struct irm_flow * f = NULL;
+ struct reg_entry * rne = NULL;
+ struct reg_api * rgi = NULL;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -759,7 +693,7 @@ static struct irm_flow * flow_accept(pid_t api,
return NULL;
}
- if (!reg_entry_get_reg_api(rne, api)) {
+ if ((rgi = reg_entry_get_reg_api(rne, api)) == NULL) {
rgi = registry_add_api_name(&irmd->registry,
api,
rne->name);
@@ -794,8 +728,8 @@ static struct irm_flow * flow_accept(pid_t api,
pthread_rwlock_rdlock(&irmd->flows_lock);
- pme = get_irm_flow_n(api);
- if (pme == NULL) {
+ f = get_irm_flow_n(api);
+ if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Port_id was not created yet.");
@@ -808,15 +742,15 @@ static struct irm_flow * flow_accept(pid_t api,
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- return pme;
+ return f;
}
static int flow_alloc_resp(pid_t n_api,
int port_id,
int response)
{
- struct irm_flow * pme = NULL;
- struct reg_entry * rne = NULL;
+ struct irm_flow * f = NULL;
+ struct reg_entry * rne = NULL;
int ret = -1;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -835,15 +769,16 @@ static int flow_alloc_resp(pid_t n_api,
return -1;
}
+ pthread_mutex_lock(&rne->state_lock);
+
if (rne->state != REG_NAME_FLOW_ARRIVED) {
+ pthread_mutex_unlock(&rne->state_lock);
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Process not listening for this name.");
return -1;
}
- pthread_mutex_lock(&rne->state_lock);
-
registry_del_api(&irmd->registry, n_api);
pthread_mutex_unlock(&rne->state_lock);
@@ -853,20 +788,20 @@ static int flow_alloc_resp(pid_t n_api,
if (!response) {
pthread_rwlock_wrlock(&irmd->flows_lock);
- pme = get_irm_flow(port_id);
- if (pme == NULL) {
+ f = get_irm_flow(port_id);
+ if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pme->state = FLOW_ALLOCATED;
- pthread_cond_signal(&pme->state_cond);
+ f->state = FLOW_ALLOCATED;
+ pthread_cond_signal(&f->state_cond);
pthread_rwlock_unlock(&irmd->flows_lock);
- ret = ipcp_flow_alloc_resp(pme->n_1_api,
+ ret = ipcp_flow_alloc_resp(f->n_1_api,
port_id,
- pme->n_api,
+ f->n_api,
response);
}
@@ -880,7 +815,7 @@ static struct irm_flow * flow_alloc(pid_t api,
char * src_ae_name,
struct qos_spec * qos)
{
- struct irm_flow * pme;
+ struct irm_flow * f;
pid_t ipcp;
/* FIXME: Map qos_spec to qos_cube */
@@ -892,16 +827,16 @@ static struct irm_flow * flow_alloc(pid_t api,
return NULL;
}
- pme = irm_flow_create();
- if (pme == NULL) {
+ f = irm_flow_create();
+ if (f == NULL) {
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Failed to create irm_flow.");
return NULL;
}
- pme->n_api = api;
- pme->state = FLOW_PENDING;
- if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
+ f->n_api = api;
+ f->state = FLOW_PENDING;
+ if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
LOG_WARN("Failed to set timestamp.");
pthread_rwlock_rdlock(&irmd->reg_lock);
@@ -917,44 +852,44 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- pme->port_id = bmp_allocate(irmd->port_ids);
- pme->n_1_api = ipcp;
+ f->port_id = bmp_allocate(irmd->port_ids);
+ f->n_1_api = ipcp;
- list_add(&pme->next, &irmd->irm_flows);
+ list_add(&f->next, &irmd->irm_flows);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
if (ipcp_flow_alloc(ipcp,
- pme->port_id,
- pme->n_api,
+ f->port_id,
+ f->n_api,
dst_name,
src_ae_name,
QOS_CUBE_BE) < 0) {
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- list_del(&pme->next);
- bmp_release(irmd->port_ids, pme->port_id);
+ list_del(&f->next);
+ bmp_release(irmd->port_ids, f->port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- free(pme);
+ free(f);
return NULL;
}
- return pme;
+ return f;
}
static void cleanup_alloc_res(void * o)
{
- struct irm_flow * e = (struct irm_flow *) o;
- if (e->state == FLOW_PENDING)
- e->state = FLOW_NULL;
- pthread_mutex_unlock(&e->state_lock);
+ struct irm_flow * f = (struct irm_flow *) o;
+ if (f->state == FLOW_PENDING)
+ f->state = FLOW_NULL;
+ pthread_mutex_unlock(&f->state_lock);
}
static int flow_alloc_res(int port_id)
{
- struct irm_flow * e;
+ struct irm_flow * f;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -964,22 +899,22 @@ static int flow_alloc_res(int port_id)
}
pthread_rwlock_rdlock(&irmd->flows_lock);
- e = get_irm_flow(port_id);
- if (e == NULL) {
+ f = get_irm_flow(port_id);
+ if (f == NULL) {
LOG_ERR("Could not find port %d.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- if (e->state == FLOW_NULL) {
+ if (f->state == FLOW_NULL) {
LOG_INFO("Port %d is deprecated.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- if (e->state == FLOW_ALLOCATED) {
+ if (f->state == FLOW_ALLOCATED) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
@@ -988,28 +923,28 @@ static int flow_alloc_res(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- pthread_mutex_lock(&e->state_lock);
- pthread_cleanup_push(cleanup_alloc_res, (void *) e);
+ pthread_mutex_lock(&f->state_lock);
+ pthread_cleanup_push(cleanup_alloc_res, (void *) f);
- while (e->state == FLOW_PENDING)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ while (f->state == FLOW_PENDING)
+ pthread_cond_wait(&f->state_cond, &f->state_lock);
pthread_cleanup_pop(true);
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&f->state_lock);
- if (e->state == FLOW_ALLOCATED) {
- pthread_mutex_unlock(&e->state_lock);
+ if (f->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&f->state_lock);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
- e->state = FLOW_NULL;
- pthread_cond_signal(&e->state_cond);
- pthread_mutex_unlock(&e->state_lock);
+ f->state = FLOW_NULL;
+ pthread_cond_signal(&f->state_cond);
+ pthread_mutex_unlock(&f->state_lock);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1021,22 +956,22 @@ static int flow_dealloc(int port_id)
pid_t n_1_api;
int ret = 0;
- struct irm_flow * e = NULL;
+ struct irm_flow * f = NULL;
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
bmp_release(irmd->port_ids, port_id);
- e = get_irm_flow(port_id);
- if (e == NULL) {
+ f = get_irm_flow(port_id);
+ if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
- n_1_api = e->n_1_api;
+ n_1_api = f->n_1_api;
- list_del(&e->next);
+ list_del(&f->next);
pthread_rwlock_unlock(&irmd->flows_lock);
@@ -1044,7 +979,7 @@ static int flow_dealloc(int port_id)
pthread_rwlock_unlock(&irmd->state_lock);
- irm_flow_destroy(e);
+ irm_flow_destroy(f);
return ret;
}
@@ -1085,22 +1020,23 @@ static struct irm_flow * flow_req_arr(pid_t api,
char * dst_name,
char * ae_name)
{
- struct reg_entry * rne = NULL;
- struct irm_flow * pme = NULL;
+ struct reg_entry * rne = NULL;
+ struct irm_flow * f = NULL;
+ struct reg_api * rgi = NULL;
enum reg_name_state state;
struct spawned_api * c_api;
- pme = irm_flow_create();
- if (pme == NULL) {
+ f = irm_flow_create();
+ if (f == NULL) {
LOG_ERR("Failed to create irm_flow.");
return NULL;
}
- pme->state = FLOW_PENDING;
- pme->n_1_api = api;
- if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0)
+ f->state = FLOW_PENDING;
+ f->n_1_api = api;
+ if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
LOG_WARN("Failed to set timestamp.");
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1111,7 +1047,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Unknown name: %s.", dst_name);
- free(pme);
+ free(f);
return NULL;
}
@@ -1124,14 +1060,14 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("No AP's for %s.", dst_name);
- free(pme);
+ free(f);
return NULL;
case REG_NAME_AUTO_ACCEPT:
c_api = malloc(sizeof(*c_api));
if (c_api == NULL) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- free(pme);
+ free(f);
return NULL;
}
@@ -1146,7 +1082,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_mutex_unlock(&rne->state_lock);
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- free(pme);
+ free(f);
free(c_api);
return NULL;
}
@@ -1154,6 +1090,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
list_add(&c_api->next, &irmd->spawned_apis);
pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
pthread_mutex_lock(&rne->state_lock);
pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
@@ -1164,18 +1101,20 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_cleanup_pop(true);
+ pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_rdlock(&irmd->reg_lock);
pthread_mutex_lock(&rne->state_lock);
if (rne->state == REG_NAME_DESTROY) {
rne->state = REG_NAME_NULL;
pthread_mutex_unlock(&rne->state_lock);
pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
return NULL;
}
pthread_mutex_unlock(&rne->state_lock);
case REG_NAME_FLOW_ACCEPT:
- pme->n_api = reg_entry_resolve_api(rne);
- if (pme->n_api == -1) {
+ f->n_api = reg_entry_resolve_api(rne);
+ if (f->n_api == -1) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("Invalid api returned.");
@@ -1187,70 +1126,73 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
LOG_ERR("IRMd in wrong state.");
- free(pme);
+ free(f);
return NULL;
}
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- pme->port_id = bmp_allocate(irmd->port_ids);
+ f->port_id = bmp_allocate(irmd->port_ids);
- list_add(&pme->next, &irmd->irm_flows);
+ list_add(&f->next, &irmd->irm_flows);
pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
pthread_mutex_lock(&rne->state_lock);
rne->req_ae_name = ae_name;
rne->state = REG_NAME_FLOW_ARRIVED;
- reg_api_wake(reg_entry_get_reg_api(rne, pme->n_api));
+ rgi = reg_entry_get_reg_api(rne, f->n_api);
pthread_mutex_unlock(&rne->state_lock);
-
+ pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ reg_api_wake(rgi);
+
+ pthread_mutex_lock(&rne->state_lock);
pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
(void *) &rne->state_lock);
- while (rne->state == REG_NAME_FLOW_ARRIVED &&
- irmd->state == IRMD_RUNNING)
+ while (rne->state == REG_NAME_FLOW_ARRIVED)
pthread_cond_wait(&rne->state_cond, &rne->state_lock);
pthread_cleanup_pop(true);
- return pme;
+ return f;
}
static int flow_alloc_reply(int port_id,
int response)
{
- struct irm_flow * e;
+ struct irm_flow * f;
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_rdlock(&irmd->flows_lock);
- e = get_irm_flow(port_id);
- if (e == NULL) {
+ f = get_irm_flow(port_id);
+ if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&f->state_lock);
if (!response)
- e->state = FLOW_ALLOCATED;
+ f->state = FLOW_ALLOCATED;
else
- e->state = FLOW_NULL;
+ f->state = FLOW_NULL;
- if (pthread_cond_signal(&e->state_cond))
+ if (pthread_cond_signal(&f->state_cond))
LOG_ERR("Failed to send signal.");
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&f->state_lock);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1260,24 +1202,24 @@ static int flow_alloc_reply(int port_id,
static int flow_dealloc_ipcp(int port_id)
{
- struct irm_flow * e = NULL;
+ struct irm_flow * f = NULL;
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- e = get_irm_flow(port_id);
- if (e == NULL) {
+ f = get_irm_flow(port_id);
+ if (f == NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
}
- list_del(&e->next);
+ list_del(&f->next);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- irm_flow_destroy(e);
+ irm_flow_destroy(f);
return 0;
}
@@ -1324,9 +1266,9 @@ static void irm_destroy()
pthread_rwlock_wrlock(&irmd->flows_lock);
list_for_each_safe(h, t, &irmd->irm_flows) {
- struct irm_flow * e = list_entry(h, struct irm_flow, next);
- list_del(&e->next);
- irm_flow_destroy(e);
+ struct irm_flow * f = list_entry(h, struct irm_flow, next);
+ list_del(&f->next);
+ irm_flow_destroy(f);
}
if (irmd->port_ids != NULL)
@@ -1405,46 +1347,46 @@ void * irm_flow_cleaner()
pthread_rwlock_wrlock(&irmd->flows_lock);
list_for_each_safe(pos, n, &(irmd->irm_flows)) {
- struct irm_flow * e =
+ struct irm_flow * f =
list_entry(pos, struct irm_flow, next);
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&f->state_lock);
- if (e->state == FLOW_PENDING &&
- ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) {
+ if (f->state == FLOW_PENDING &&
+ ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
LOG_INFO("Pending port_id %d timed out.",
- e->port_id);
- e->state = FLOW_NULL;
- pthread_cond_signal(&e->state_cond);
- pthread_mutex_unlock(&e->state_lock);
+ f->port_id);
+ f->state = FLOW_NULL;
+ pthread_cond_signal(&f->state_cond);
+ pthread_mutex_unlock(&f->state_lock);
continue;
}
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&f->state_lock);
- if (kill(e->n_api, 0) < 0) {
+ if (kill(f->n_api, 0) < 0) {
struct shm_ap_rbuff * n_rb =
- shm_ap_rbuff_open(e->n_api);
- bmp_release(irmd->port_ids, e->port_id);
+ shm_ap_rbuff_open(f->n_api);
+ bmp_release(irmd->port_ids, f->port_id);
- list_del(&e->next);
+ list_del(&f->next);
LOG_INFO("Process %d gone, %d deallocated.",
- e->n_api, e->port_id);
- ipcp_flow_dealloc(e->n_1_api, e->port_id);
+ f->n_api, f->port_id);
+ ipcp_flow_dealloc(f->n_1_api, f->port_id);
if (n_rb != NULL)
shm_ap_rbuff_destroy(n_rb);
- irm_flow_destroy(e);
+ irm_flow_destroy(f);
continue;
}
- if (kill(e->n_1_api, 0) < 0) {
+ if (kill(f->n_1_api, 0) < 0) {
struct shm_ap_rbuff * n_1_rb =
- shm_ap_rbuff_open(e->n_1_api);
- list_del(&e->next);
+ shm_ap_rbuff_open(f->n_1_api);
+ list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
- e->n_1_api, e->port_id);
+ f->n_1_api, f->port_id);
if (n_1_rb != NULL)
shm_ap_rbuff_destroy(n_1_rb);
- irm_flow_destroy(e);
+ irm_flow_destroy(f);
}
}