summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/main.c451
1 files changed, 316 insertions, 135 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 946ed13d..b660511c 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -66,8 +66,6 @@ struct ipcp_entry {
struct list_head next;
instance_name_t * api;
char * dif_name;
-
- pthread_mutex_t lock;
};
/* currently supports only registering whatevercast groups of a single AP-I */
@@ -83,9 +81,11 @@ struct reg_name_entry {
bool accept;
char * req_ap_name;
char * req_ae_name;
+ int response;
bool flow_arrived;
- pthread_mutex_t fa_lock;
+ pthread_cond_t acc_signal;
+ pthread_mutex_t acc_lock;
};
/* keeps track of port_id's between N and N - 1 */
@@ -97,6 +97,9 @@ struct port_map_entry {
pid_t n_pid;
pid_t n_1_pid;
+ pthread_cond_t res_signal;
+ pthread_mutex_t res_lock;
+
enum flow_state state;
};
@@ -117,9 +120,33 @@ struct irm {
pthread_t * threadpool;
- pthread_mutex_t lock;
+ pthread_mutex_t r_lock;
} * instance = NULL;
+static struct port_map_entry * port_map_entry_create()
+{
+ struct port_map_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->n_pid = 0;
+ e->n_1_pid = 0;
+ e->port_id = 0;
+ e->state = FLOW_NULL;
+
+ if (pthread_cond_init(&e->res_signal, NULL)) {
+ free(e);
+ return NULL;
+ }
+
+ if (pthread_mutex_init(&e->res_lock, NULL)) {
+ free(e);
+ return NULL;
+ }
+
+ return e;
+}
+
static struct port_map_entry * get_port_map_entry(int port_id)
{
struct list_head * pos = NULL;
@@ -160,7 +187,6 @@ static struct ipcp_entry * ipcp_entry_create()
e->dif_name = NULL;
INIT_LIST_HEAD(&e->next);
- pthread_mutex_init(&e->lock, NULL);
return e;
}
@@ -254,7 +280,16 @@ static struct reg_name_entry * reg_name_entry_create()
e->req_ae_name = NULL;
e->flow_arrived = false;
- pthread_mutex_init(&e->fa_lock, NULL);
+ if (pthread_cond_init(&e->acc_signal, NULL)) {
+ free(e);
+ return NULL;
+ }
+
+ if (pthread_mutex_init(&e->acc_lock, NULL)) {
+ free(e);
+ return NULL;
+ }
+
INIT_LIST_HEAD(&e->next);
return e;
@@ -388,13 +423,13 @@ static pid_t create_ipcp(char * ap_name,
tmp->dif_name = NULL;
- pthread_mutex_lock(&instance->lock);
+ pthread_mutex_lock(&instance->r_lock);
list_add(&tmp->next, &instance->ipcps);
- pthread_mutex_unlock(&instance->lock);
+ pthread_mutex_unlock(&instance->r_lock);
- LOG_INFO("Created IPCP %s-%d ", ap_name, pid);
+ LOG_INFO("Created IPCP %s-%d.", ap_name, pid);
return pid;
}
@@ -403,6 +438,8 @@ static int destroy_ipcp(instance_name_t * api)
{
struct list_head * pos = NULL;
struct list_head * n = NULL;
+ pid_t pid = 0;
+
if (api == NULL)
return 0;
@@ -415,6 +452,7 @@ static int destroy_ipcp(instance_name_t * api)
return 0;
}
+ pid = api->id;
if (ipcp_destroy(api->id))
LOG_ERR("Could not destroy IPCP.");
@@ -428,7 +466,7 @@ static int destroy_ipcp(instance_name_t * api)
ipcp_entry_destroy(tmp);
}
- LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id);
+ LOG_INFO("Destroyed IPCP %d.", pid);
return 0;
}
@@ -438,33 +476,41 @@ static int bootstrap_ipcp(instance_name_t * api,
{
struct ipcp_entry * entry = NULL;
+ pthread_mutex_lock(&instance->r_lock);
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No such IPCP in the system.");
return -1;
}
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ pthread_mutex_unlock(&instance->r_lock);
+
LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",
api->name, api->id, conf->dif_name);
@@ -479,14 +525,18 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
+ pthread_mutex_lock(&instance->r_lock);
+
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
@@ -496,6 +546,7 @@ static int enroll_ipcp(instance_name_t * api,
LOG_ERR("Could not find a member of that DIF.");
free(entry->dif_name);
entry->dif_name = NULL;
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
@@ -504,13 +555,18 @@ static int enroll_ipcp(instance_name_t * api,
LOG_ERR("Could not find N-1 DIFs.");
free(entry->dif_name);
entry->dif_name = NULL;
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
- if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) {
+ pthread_mutex_unlock(&instance->r_lock);
+
+ if (ipcp_enroll(api->id, member, n_1_difs[0])) {
LOG_ERR("Could not enroll IPCP.");
+ pthread_mutex_lock(&instance->r_lock);
free(entry->dif_name);
entry->dif_name = NULL;
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
@@ -545,50 +601,6 @@ static int unreg_ipcp(instance_name_t * api,
return 0;
}
-static int ap_unreg_id(pid_t pid,
- char ** difs,
- size_t len)
-{
- int i;
- int ret = 0;
- struct reg_name_entry * rne = NULL;
- struct list_head * pos = NULL;
-
- rne = get_reg_name_entry_by_id(pid);
- if (rne == NULL)
- return 0; /* no such id */
-
- if (instance->ipcps.next == NULL) {
- LOG_ERR("No IPCPs in this system.");
- return 0;
- }
-
- if (strcmp(difs[0], ALL_DIFS) == 0) {
- list_for_each(pos, &instance->ipcps) {
- struct ipcp_entry * e =
- list_entry(pos, struct ipcp_entry, next);
-
- if (ipcp_name_unreg(e->api->id, rne->name)) {
- LOG_ERR("Could not unregister %s in DIF %s.",
- rne->name, e->dif_name);
- --ret;
- }
- }
- } else {
- for (i = 0; i < len; ++i) {
- if (ipcp_name_unreg(pid, rne->name)) {
- LOG_ERR("Could not unregister %s in DIF %s.",
- rne->name, difs[i]);
- --ret;
- }
- }
- }
-
- reg_name_entry_del_name(rne->name);
-
- return ret;
-}
-
static int ap_reg(char * ap_name,
pid_t ap_id,
char ** difs,
@@ -602,24 +614,35 @@ static int ap_reg(char * ap_name,
instance_name_t * api = NULL;
instance_name_t * ipcpi = NULL;
- if (instance->ipcps.next == NULL)
- return -1;
- /* check if this ap_name is already registered */
- rne = get_reg_name_entry_by_name(ap_name);
- if (rne != NULL)
- return -1; /* can only register one instance for now */
+ pthread_mutex_lock(&instance->r_lock);
+
+ if (instance->ipcps.next == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1;
+ }
api = instance_name_create();
if (api == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
instance_name_destroy(api);
return -1;
}
+ /* check if this ap_name is already registered */
+
+ rne = get_reg_name_entry_by_name(ap_name);
+ if (rne != NULL) {
+ instance_name_destroy(api);
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1; /* can only register one instance for now */
+ }
+
/*
* for now, the whatevercast name is the same as the ap_name and
* contains a single instance only
@@ -656,12 +679,15 @@ static int ap_reg(char * ap_name,
if (ret == 0) {
instance_name_destroy(api);
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
/* for now, we register single instances */
ret = reg_name_entry_add_name_instance(strdup(ap_name),
api);
+ pthread_mutex_unlock(&instance->r_lock);
+
return ret;
}
@@ -670,62 +696,104 @@ static int ap_unreg(char * ap_name,
char ** difs,
size_t len)
{
- struct reg_name_entry * tmp = NULL;
+ int i;
+ int ret = 0;
+ struct reg_name_entry * rne = NULL;
+ struct list_head * pos = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
/* check if ap_name is registered */
- tmp = get_reg_name_entry_by_id(ap_id);
- if (tmp == NULL)
+ rne = get_reg_name_entry_by_id(ap_id);
+ if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return 0; /* no such id */
+ }
+
+ if (strcmp(ap_name, rne->api->name)) {
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
+ }
- if (strcmp(ap_name, tmp->api->name))
+ if (instance->ipcps.next == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ LOG_ERR("No IPCPs in this system.");
return 0;
+ }
+
+ if (strcmp(difs[0], ALL_DIFS) == 0) {
+ list_for_each(pos, &instance->ipcps) {
+ struct ipcp_entry * e =
+ list_entry(pos, struct ipcp_entry, next);
+
+ if (ipcp_name_unreg(e->api->id, rne->name)) {
+ LOG_ERR("Could not unregister %s in DIF %s.",
+ rne->name, e->dif_name);
+ --ret;
+ }
+ }
+ } else {
+ for (i = 0; i < len; ++i) {
+ if (ipcp_name_unreg(ap_id, rne->name)) {
+ LOG_ERR("Could not unregister %s in DIF %s.",
+ rne->name, difs[i]);
+ --ret;
+ }
+ }
+ }
+
+ reg_name_entry_del_name(rne->name);
- return ap_unreg_id(ap_id, difs, len);
+ pthread_mutex_unlock(&instance->r_lock);
+
+ return ret;
}
static struct port_map_entry * flow_accept(pid_t pid,
char ** ap_name,
char ** ae_name)
{
- bool arrived = false;
+ struct port_map_entry * pme;
+ struct reg_name_entry * rne = NULL;
- struct timespec ts = {0, 100000};
+ pthread_mutex_lock(&instance->r_lock);
- struct port_map_entry * pme;
- struct reg_name_entry * rne = get_reg_name_entry_by_id(pid);
+ rne = get_reg_name_entry_by_id(pid);
if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_DBGF("Unregistered AP calling accept().");
return NULL;
}
-
if (rne->accept) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_DBGF("This AP still has a pending accept().");
return NULL;
}
- rne->accept = true;
+ rne->accept = true;
+ rne->flow_arrived = false;
+
+ pthread_mutex_unlock(&instance->r_lock);
+ pthread_mutex_lock(&rne->acc_lock);
- /* FIXME: wait for a thread that runs select() on flow_arrived */
- while (!arrived) {
- /* FIXME: this needs locking */
- rne = get_reg_name_entry_by_id(pid);
- if (rne == NULL)
- return NULL;
- arrived = rne->flow_arrived;
- nanosleep(&ts, NULL);
- }
+ while (!rne->flow_arrived)
+ pthread_cond_wait(&rne->acc_signal, &rne->acc_lock);
+
+ pthread_mutex_unlock(&rne->acc_lock);
+ pthread_mutex_lock(&instance->r_lock);
pme = get_port_map_entry_n(pid);
if (pme == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Port_id was not created yet.");
return NULL;
}
- pthread_mutex_lock(&rne->fa_lock);
*ap_name = rne->req_ap_name;
if (ae_name != NULL)
*ae_name = rne->req_ae_name;
- pthread_mutex_unlock(&rne->fa_lock);
+
+ pthread_mutex_unlock(&instance->r_lock);
return pme;
}
@@ -734,14 +802,20 @@ static int flow_alloc_resp(pid_t n_pid,
int port_id,
int response)
{
- struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid);
- struct port_map_entry * pme = get_port_map_entry(port_id);
+ struct port_map_entry * pme = NULL;
+ struct reg_name_entry * rne = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
- if (rne == NULL || pme == NULL)
+ rne = get_reg_name_entry_by_id(n_pid);
+ if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
+ }
/* FIXME: check all instances associated with the name */
if (!rne->accept) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No process listening for this name.");
return -1;
}
@@ -751,11 +825,15 @@ static int flow_alloc_resp(pid_t n_pid,
* once we can handle a list of AP-I's, remove it from the list
*/
- rne->flow_arrived = false;
rne->accept = false;
+ rne->flow_arrived = false;
- if (!response)
+ if (!response) {
+ pme = get_port_map_entry(port_id);
pme->state = FLOW_ALLOCATED;
+ }
+
+ pthread_mutex_unlock(&instance->r_lock);
return ipcp_flow_alloc_resp(pme->n_1_pid,
port_id,
@@ -769,53 +847,104 @@ static struct port_map_entry * flow_alloc(pid_t pid,
char * src_ae_name,
struct qos_spec * qos)
{
- struct port_map_entry * e = malloc(sizeof(*e));
- if (e == NULL) {
+ struct port_map_entry * pme;
+ instance_name_t * ipcp;
+
+ pme = port_map_entry_create();
+ if (pme == NULL) {
LOG_ERR("Failed malloc of port_map_entry.");
return NULL;
}
- e->port_id = bmp_allocate(instance->port_ids);
- e->n_pid = pid;
- e->state = FLOW_PENDING;
- e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+ pme->n_pid = pid;
+ pme->state = FLOW_PENDING;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ pme->port_id = bmp_allocate(instance->port_ids);
+ pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+
+ list_add(&pme->next, &instance->port_map);
+
+ ipcp = get_ipcp_by_dst_name(dst_name);
+
+ pthread_mutex_unlock(&instance->r_lock);
- list_add(&e->next, &instance->port_map);
+ if (ipcp == NULL) {
+ LOG_DBG("unknown ipcp");
+ return NULL;
+ }
- if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id,
- e->port_id,
- e->n_pid,
+ if (ipcp_flow_alloc(ipcp->id,
+ pme->port_id,
+ pme->n_pid,
dst_name,
src_ap_name,
src_ae_name,
qos) < 0) {
- list_del(&e->next);
- bmp_release(instance->port_ids, e->port_id);
- free(e);
+ pthread_mutex_lock(&instance->r_lock);
+
+ list_del(&pme->next);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
+ bmp_release(instance->port_ids, pme->port_id);
+ free(pme);
+
return NULL;
}
- return e;
+ return pme;
}
static int flow_alloc_res(int port_id)
{
- bool allocated = false;
struct port_map_entry * e;
- struct timespec ts = {0,100000};
- while (!allocated) {
- /* FIXME: this needs locking */
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1;
+ }
+
+ if (e->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return 0;
+ }
+
+ pthread_mutex_unlock(&instance->r_lock);
+ pthread_mutex_lock(&e->res_lock);
+
+ while (true) {
+ pthread_cond_wait(&e->res_signal, &e->res_lock);
+
+ pthread_mutex_unlock(&e->res_lock);
+ pthread_mutex_lock(&instance->r_lock);
+
e = get_port_map_entry(port_id);
if (e == NULL) {
- LOG_DBGF("Could not locate port_id %d", port_id);
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
- if (e->state == FLOW_ALLOCATED)
- allocated = true;
- nanosleep(&ts, NULL);
+ if (e->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&instance->r_lock);
+ LOG_DBGF("Returning 0.");
+ return 0;
+ }
+ if (e->state == FLOW_NULL) {
+ list_del(&e->next);
+ pthread_mutex_unlock(&instance->r_lock);
+ free(e);
+ return -1;
+
+ }
+ /* still pending, spurious wake */
}
+ pthread_mutex_unlock(&instance->r_lock);
+
return 0;
}
@@ -823,13 +952,22 @@ static int flow_dealloc(int port_id)
{
pid_t n_1_pid;
- struct port_map_entry * e = get_port_map_entry(port_id);
- if (e == NULL)
+ struct port_map_entry * e = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
+ }
n_1_pid = e->n_1_pid;
list_del(&e->next);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
free(e);
return ipcp_flow_dealloc(n_1_pid, port_id);
@@ -843,33 +981,45 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
struct reg_name_entry * rne;
struct port_map_entry * pme;
- rne = get_reg_name_entry_by_name(dst_name);
- if (rne == NULL) {
- LOG_DBGF("Destination name %s unknown.", dst_name);
- return NULL;
- }
-
pme = malloc(sizeof(*pme));
if (pme == NULL) {
LOG_ERR("Failed malloc of port_map_entry.");
return NULL;
}
- pme->port_id = bmp_allocate(instance->port_ids);
- pme->n_pid = rne->api->id;
pme->state = FLOW_PENDING;
pme->n_1_pid = pid;
- list_add(&pme->next, &instance->port_map);
+ pthread_mutex_lock(&instance->r_lock);
- pthread_mutex_lock(&rne->fa_lock);
+ pme->port_id = bmp_allocate(instance->port_ids);
+
+ rne = get_reg_name_entry_by_name(dst_name);
+ if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ LOG_DBGF("Destination name %s unknown.", dst_name);
+ free(pme);
+ return NULL;
+ }
+
+ pme->n_pid = rne->api->id;
+
+ list_add(&pme->next, &instance->port_map);
rne->req_ap_name = strdup(ap_name);
rne->req_ae_name = strdup(ae_name);
+ pthread_mutex_lock(&rne->acc_lock);
+
rne->flow_arrived = true;
- pthread_mutex_unlock(&rne->fa_lock);
+ if (pthread_cond_signal(&rne->acc_signal))
+ LOG_ERR("Failed to send signal.");
+
+ pthread_mutex_unlock(&rne->acc_lock);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
return pme;
}
@@ -879,26 +1029,48 @@ static int flow_alloc_reply(int port_id,
{
struct port_map_entry * e;
- /* FIXME: do this under lock */
- if (!response) {
- e = get_port_map_entry(port_id);
- if (e == NULL)
- return -1;
- e->state = FLOW_ALLOCATED;
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1;
}
- /* FIXME: does this need to be propagated to the IPCP? */
+ pthread_mutex_lock(&e->res_lock);
+
+ if (!response)
+ e->state = FLOW_ALLOCATED;
+
+ else
+ e->state = FLOW_NULL;
+
+ if (pthread_cond_signal(&e->res_signal))
+ LOG_ERR("Failed to send signal.");
+
+ pthread_mutex_unlock(&e->res_lock);
+
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
}
static int flow_dealloc_ipcp(int port_id)
{
- struct port_map_entry * e = get_port_map_entry(port_id);
- if (e == NULL)
+ struct port_map_entry * e = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
+ }
list_del(&e->next);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
free(e);
return 0;
@@ -927,8 +1099,8 @@ static void irm_destroy(struct irm * irm)
struct reg_name_entry * e = list_entry(h,
struct reg_name_entry,
next);
- char * difs [1] = {ALL_DIFS};
- ap_unreg_id(e->api->id, difs, 1);
+ list_del(&e->next);
+ free(e);
}
list_for_each_safe(h, t, &irm->port_map) {
@@ -1053,6 +1225,7 @@ void * mainloop()
e = flow_accept(msg->pid,
&ret_msg.ap_name,
&ret_msg.ae_name);
+
if (e == NULL)
break;
@@ -1180,7 +1353,15 @@ static struct irm * irm_create()
return NULL;
}
- pthread_mutex_init(&i->lock, NULL);
+ if (pthread_mutex_init(&i->r_lock, NULL)) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ if (pthread_mutex_init(&i->r_lock, NULL)) {
+ irm_destroy(i);
+ return NULL;
+ }
return i;
}