summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c422
1 files changed, 282 insertions, 140 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 8cf03400..2be5e1b6 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -53,7 +53,7 @@
#endif
#ifndef IRMD_THREADPOOL_SIZE
- #define IRMD_THREADPOOL_SIZE 3
+ #define IRMD_THREADPOOL_SIZE 5
#endif
enum flow_state {
@@ -66,8 +66,6 @@ struct ipcp_entry {
struct list_head next;
instance_name_t * api;
char * dif_name;
-
- pthread_mutex_t lock;
};
/* currently supports only registering whatevercast groups of a single AP-I */
@@ -84,9 +82,9 @@ struct reg_name_entry {
char * req_ap_name;
char * req_ae_name;
int response;
+ bool flow_arrived;
- pthread_cond_t flow_arrived;
- pthread_mutex_t fa_lock;
+ pthread_cond_t acc_signal;
};
/* keeps track of port_id's between N and N - 1 */
@@ -98,6 +96,8 @@ struct port_map_entry {
pid_t n_pid;
pid_t n_1_pid;
+ pthread_cond_t res_signal;
+
enum flow_state state;
};
@@ -118,9 +118,28 @@ struct irm {
pthread_t * threadpool;
- pthread_mutex_t lock;
+ pthread_mutex_t r_lock;
} * instance = NULL;
+static struct port_map_entry * port_map_entry_create()
+{
+ struct port_map_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->n_pid = 0;
+ e->n_1_pid = 0;
+ e->port_id = 0;
+ e->state = FLOW_NULL;
+
+ if (pthread_cond_init(&e->res_signal, NULL)) {
+ free(e);
+ return NULL;
+ }
+
+ return e;
+}
+
static struct port_map_entry * get_port_map_entry(int port_id)
{
struct list_head * pos = NULL;
@@ -161,7 +180,6 @@ static struct ipcp_entry * ipcp_entry_create()
e->dif_name = NULL;
INIT_LIST_HEAD(&e->next);
- pthread_mutex_init(&e->lock, NULL);
return e;
}
@@ -251,13 +269,14 @@ static struct reg_name_entry * reg_name_entry_create()
e->name = NULL;
e->api = NULL;
e->accept = false;
- /* pending response */
- e->response = 1;
e->req_ap_name = NULL;
e->req_ae_name = NULL;
+ e->flow_arrived = false;
- pthread_mutex_init(&e->fa_lock, NULL);
- pthread_cond_init(&e->flow_arrived, NULL);
+ if (pthread_cond_init(&e->acc_signal, NULL)) {
+ free(e);
+ return NULL;
+ }
INIT_LIST_HEAD(&e->next);
@@ -290,8 +309,6 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)
if (e->req_ae_name != NULL)
free(e->req_ae_name);
- pthread_mutex_destroy(&e->fa_lock);
-
free(e);
return 0;
@@ -394,13 +411,13 @@ static pid_t create_ipcp(char * ap_name,
tmp->dif_name = NULL;
- pthread_mutex_lock(&instance->lock);
+ pthread_mutex_lock(&instance->r_lock);
list_add(&tmp->next, &instance->ipcps);
- pthread_mutex_unlock(&instance->lock);
+ pthread_mutex_unlock(&instance->r_lock);
- LOG_INFO("Created IPCP %s-%d ", ap_name, pid);
+ LOG_INFO("Created IPCP %s-%d.", ap_name, pid);
return pid;
}
@@ -409,6 +426,8 @@ static int destroy_ipcp(instance_name_t * api)
{
struct list_head * pos = NULL;
struct list_head * n = NULL;
+ pid_t pid = 0;
+
if (api == NULL)
return 0;
@@ -421,6 +440,7 @@ static int destroy_ipcp(instance_name_t * api)
return 0;
}
+ pid =api->id;
if (ipcp_destroy(api->id))
LOG_ERR("Could not destroy IPCP.");
@@ -434,7 +454,7 @@ static int destroy_ipcp(instance_name_t * api)
ipcp_entry_destroy(tmp);
}
- LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id);
+ LOG_INFO("Destroyed IPCP %d.", pid);
return 0;
}
@@ -444,33 +464,41 @@ static int bootstrap_ipcp(instance_name_t * api,
{
struct ipcp_entry * entry = NULL;
+ pthread_mutex_lock(&instance->r_lock);
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No such IPCP in the system.");
return -1;
}
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ pthread_mutex_unlock(&instance->r_lock);
+
LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",
api->name, api->id, conf->dif_name);
@@ -485,14 +513,18 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
+ pthread_mutex_lock(&instance->r_lock);
+
entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Failed to strdup.");
return -1;
}
@@ -502,6 +534,7 @@ static int enroll_ipcp(instance_name_t * api,
LOG_ERR("Could not find a member of that DIF.");
free(entry->dif_name);
entry->dif_name = NULL;
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
@@ -510,6 +543,7 @@ static int enroll_ipcp(instance_name_t * api,
LOG_ERR("Could not find N-1 DIFs.");
free(entry->dif_name);
entry->dif_name = NULL;
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
@@ -517,9 +551,12 @@ static int enroll_ipcp(instance_name_t * api,
LOG_ERR("Could not enroll IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
+ pthread_mutex_unlock(&instance->r_lock);
+
LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",
api->name, api->id, dif_name);
@@ -551,50 +588,6 @@ static int unreg_ipcp(instance_name_t * api,
return 0;
}
-static int ap_unreg_id(pid_t pid,
- char ** difs,
- size_t len)
-{
- int i;
- int ret = 0;
- struct reg_name_entry * rne = NULL;
- struct list_head * pos = NULL;
-
- rne = get_reg_name_entry_by_id(pid);
- if (rne == NULL)
- return 0; /* no such id */
-
- if (instance->ipcps.next == NULL) {
- LOG_ERR("No IPCPs in this system.");
- return 0;
- }
-
- if (strcmp(difs[0], ALL_DIFS) == 0) {
- list_for_each(pos, &instance->ipcps) {
- struct ipcp_entry * e =
- list_entry(pos, struct ipcp_entry, next);
-
- if (ipcp_name_unreg(e->api->id, rne->name)) {
- LOG_ERR("Could not unregister %s in DIF %s.",
- rne->name, e->dif_name);
- --ret;
- }
- }
- } else {
- for (i = 0; i < len; ++i) {
- if (ipcp_name_unreg(pid, rne->name)) {
- LOG_ERR("Could not unregister %s in DIF %s.",
- rne->name, difs[i]);
- --ret;
- }
- }
- }
-
- reg_name_entry_del_name(rne->name);
-
- return ret;
-}
-
static int ap_reg(char * ap_name,
pid_t ap_id,
char ** difs,
@@ -608,24 +601,35 @@ static int ap_reg(char * ap_name,
instance_name_t * api = NULL;
instance_name_t * ipcpi = NULL;
- if (instance->ipcps.next == NULL)
- return -1;
- /* check if this ap_name is already registered */
- rne = get_reg_name_entry_by_name(ap_name);
- if (rne != NULL)
- return -1; /* can only register one instance for now */
+ pthread_mutex_lock(&instance->r_lock);
+
+ if (instance->ipcps.next == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1;
+ }
api = instance_name_create();
if (api == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
instance_name_destroy(api);
return -1;
}
+ /* check if this ap_name is already registered */
+
+ rne = get_reg_name_entry_by_name(ap_name);
+ if (rne != NULL) {
+ instance_name_destroy(api);
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1; /* can only register one instance for now */
+ }
+
/*
* for now, the whatevercast name is the same as the ap_name and
* contains a single instance only
@@ -662,12 +666,15 @@ static int ap_reg(char * ap_name,
if (ret == 0) {
instance_name_destroy(api);
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
/* for now, we register single instances */
ret = reg_name_entry_add_name_instance(strdup(ap_name),
api);
+ pthread_mutex_unlock(&instance->r_lock);
+
return ret;
}
@@ -676,17 +683,57 @@ static int ap_unreg(char * ap_name,
char ** difs,
size_t len)
{
- struct reg_name_entry * tmp = NULL;
+ int i;
+ int ret = 0;
+ struct reg_name_entry * rne = NULL;
+ struct list_head * pos = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
/* check if ap_name is registered */
- tmp = get_reg_name_entry_by_id(ap_id);
- if (tmp == NULL)
+ rne = get_reg_name_entry_by_id(ap_id);
+ if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return 0; /* no such id */
+ }
+
+ if (strcmp(ap_name, rne->api->name)) {
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
+ }
- if (strcmp(ap_name, tmp->api->name))
+ if (instance->ipcps.next == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ LOG_ERR("No IPCPs in this system.");
return 0;
+ }
+
+ if (strcmp(difs[0], ALL_DIFS) == 0) {
+ list_for_each(pos, &instance->ipcps) {
+ struct ipcp_entry * e =
+ list_entry(pos, struct ipcp_entry, next);
+
+ if (ipcp_name_unreg(e->api->id, rne->name)) {
+ LOG_ERR("Could not unregister %s in DIF %s.",
+ rne->name, e->dif_name);
+ --ret;
+ }
+ }
+ } else {
+ for (i = 0; i < len; ++i) {
+ if (ipcp_name_unreg(ap_id, rne->name)) {
+ LOG_ERR("Could not unregister %s in DIF %s.",
+ rne->name, difs[i]);
+ --ret;
+ }
+ }
+ }
- return ap_unreg_id(ap_id, difs, len);
+ reg_name_entry_del_name(rne->name);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
+ return ret;
}
static struct port_map_entry * flow_accept(pid_t pid,
@@ -694,40 +741,40 @@ static struct port_map_entry * flow_accept(pid_t pid,
char ** ae_name)
{
struct port_map_entry * pme;
- struct reg_name_entry * rne = get_reg_name_entry_by_id(pid);
+ struct reg_name_entry * rne = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ rne = get_reg_name_entry_by_id(pid);
if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_DBGF("Unregistered AP calling accept().");
return NULL;
}
-
if (rne->accept) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_DBGF("This AP still has a pending accept().");
return NULL;
}
- rne->accept = true;
+ rne->accept = true;
+ rne->flow_arrived = false;
- pthread_mutex_lock(&rne->fa_lock);
-
- pthread_cond_wait(&rne->flow_arrived, &rne->fa_lock);
-
- if (rne->response == -1) {
- list_del(&rne->next);
- reg_name_entry_destroy(rne);
- return NULL;
- }
+ while (!rne->flow_arrived)
+ pthread_cond_wait(&rne->acc_signal, &instance->r_lock);
pme = get_port_map_entry_n(pid);
if (pme == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("Port_id was not created yet.");
- pthread_mutex_unlock(&rne->fa_lock);
return NULL;
}
*ap_name = rne->req_ap_name;
if (ae_name != NULL)
*ae_name = rne->req_ae_name;
- pthread_mutex_unlock(&rne->fa_lock);
+
+ pthread_mutex_unlock(&instance->r_lock);
return pme;
}
@@ -737,14 +784,19 @@ static int flow_alloc_resp(pid_t n_pid,
int response)
{
struct port_map_entry * pme = NULL;
- struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid);
- if (rne == NULL)
+ struct reg_name_entry * rne = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ rne = get_reg_name_entry_by_id(n_pid);
+ if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
+ }
- pthread_mutex_lock(&rne->fa_lock);
/* FIXME: check all instances associated with the name */
if (!rne->accept) {
- pthread_mutex_unlock(&rne->fa_lock);
+ pthread_mutex_unlock(&instance->r_lock);
LOG_ERR("No process listening for this name.");
return -1;
}
@@ -754,15 +806,15 @@ static int flow_alloc_resp(pid_t n_pid,
* once we can handle a list of AP-I's, remove it from the list
*/
- rne->accept = false;
- rne->response = response;
+ rne->accept = false;
+ rne->flow_arrived = false;
if (!response) {
pme = get_port_map_entry(port_id);
pme->state = FLOW_ALLOCATED;
}
- pthread_mutex_unlock(&rne->fa_lock);
+ pthread_mutex_unlock(&instance->r_lock);
return ipcp_flow_alloc_resp(pme->n_1_pid,
port_id,
@@ -776,53 +828,96 @@ static struct port_map_entry * flow_alloc(pid_t pid,
char * src_ae_name,
struct qos_spec * qos)
{
- struct port_map_entry * e = malloc(sizeof(*e));
- if (e == NULL) {
+ struct port_map_entry * pme;
+ instance_name_t * ipcp;
+
+ pme = port_map_entry_create();
+ if (pme == NULL) {
LOG_ERR("Failed malloc of port_map_entry.");
return NULL;
}
- e->port_id = bmp_allocate(instance->port_ids);
- e->n_pid = pid;
- e->state = FLOW_PENDING;
- e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+ pme->port_id = bmp_allocate(instance->port_ids);
+ pme->n_pid = pid;
+ pme->state = FLOW_PENDING;
+ pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ list_add(&pme->next, &instance->port_map);
+
+ ipcp = get_ipcp_by_dst_name(dst_name);
- list_add(&e->next, &instance->port_map);
+ pthread_mutex_unlock(&instance->r_lock);
- if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id,
- e->port_id,
- e->n_pid,
+ if (ipcp == NULL) {
+ LOG_DBG("unknown ipcp");
+ return NULL;
+ }
+
+ if (ipcp_flow_alloc(ipcp->id,
+ pme->port_id,
+ pme->n_pid,
dst_name,
src_ap_name,
src_ae_name,
qos) < 0) {
- list_del(&e->next);
- bmp_release(instance->port_ids, e->port_id);
- free(e);
+ pthread_mutex_lock(&instance->r_lock);
+
+ list_del(&pme->next);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
+ bmp_release(instance->port_ids, pme->port_id);
+ free(pme);
+
return NULL;
}
- return e;
+ return pme;
}
static int flow_alloc_res(int port_id)
{
- bool allocated = false;
struct port_map_entry * e;
- struct timespec ts = {0,100000};
- while (!allocated) {
- /* FIXME: this needs locking */
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1;
+ }
+
+ if (e->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return 0;
+ }
+
+ while (true) {
+ pthread_cond_wait(&e->res_signal, &instance->r_lock);
e = get_port_map_entry(port_id);
if (e == NULL) {
- LOG_DBGF("Could not locate port_id %d", port_id);
+ pthread_mutex_unlock(&instance->r_lock);
return -1;
}
- if (e->state == FLOW_ALLOCATED)
- allocated = true;
- nanosleep(&ts, NULL);
+ if (e->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&instance->r_lock);
+ LOG_DBGF("Returning 0.");
+ return 0;
+ }
+ if (e->state == FLOW_NULL) {
+ list_del(&e->next);
+ pthread_mutex_unlock(&instance->r_lock);
+ free(e);
+ return -1;
+
+ }
+ /* still pending, spurious wake */
}
+ pthread_mutex_unlock(&instance->r_lock);
+
return 0;
}
@@ -830,13 +925,22 @@ static int flow_dealloc(int port_id)
{
pid_t n_1_pid;
- struct port_map_entry * e = get_port_map_entry(port_id);
- if (e == NULL)
+ struct port_map_entry * e = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
+ }
n_1_pid = e->n_1_pid;
list_del(&e->next);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
free(e);
return ipcp_flow_dealloc(n_1_pid, port_id);
@@ -850,12 +954,6 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
struct reg_name_entry * rne;
struct port_map_entry * pme;
- rne = get_reg_name_entry_by_name(dst_name);
- if (rne == NULL) {
- LOG_DBGF("Destination name %s unknown.", dst_name);
- return NULL;
- }
-
pme = malloc(sizeof(*pme));
if (pme == NULL) {
LOG_ERR("Failed malloc of port_map_entry.");
@@ -863,19 +961,32 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
}
pme->port_id = bmp_allocate(instance->port_ids);
- pme->n_pid = rne->api->id;
pme->state = FLOW_PENDING;
pme->n_1_pid = pid;
- list_add(&pme->next, &instance->port_map);
+ pthread_mutex_lock(&instance->r_lock);
- pthread_mutex_lock(&rne->fa_lock);
+ rne = get_reg_name_entry_by_name(dst_name);
+ if (rne == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ LOG_DBGF("Destination name %s unknown.", dst_name);
+ free(pme);
+ return NULL;
+ }
+
+ pme->n_pid = rne->api->id;
+
+ list_add(&pme->next, &instance->port_map);
rne->req_ap_name = strdup(ap_name);
rne->req_ae_name = strdup(ae_name);
- pthread_mutex_unlock(&rne->fa_lock);
- pthread_cond_signal(&rne->flow_arrived);
+ rne->flow_arrived = true;
+
+ pthread_mutex_unlock(&instance->r_lock);
+
+ if (pthread_cond_signal(&rne->acc_signal))
+ LOG_ERR("Failed to send signal.");
return pme;
}
@@ -885,26 +996,48 @@ static int flow_alloc_reply(int port_id,
{
struct port_map_entry * e;
- /* FIXME: do this under lock */
- if (!response) {
- e = get_port_map_entry(port_id);
- if (e == NULL)
- return -1;
- e->state = FLOW_ALLOCATED;
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return -1;
+ }
+
+ if (e->state == FLOW_ALLOCATED) {
+ pthread_mutex_unlock(&instance->r_lock);
+ return 0;
}
- /* FIXME: does this need to be propagated to the IPCP? */
+ if (!response)
+ e->state = FLOW_ALLOCATED;
+
+ else
+ e->state = FLOW_NULL;
+
+ pthread_mutex_unlock(&instance->r_lock);
+ if (pthread_cond_signal(&e->res_signal))
+ LOG_ERR("Failed to send signal.");
return 0;
}
static int flow_dealloc_ipcp(int port_id)
{
- struct port_map_entry * e = get_port_map_entry(port_id);
- if (e == NULL)
+ struct port_map_entry * e = NULL;
+
+ pthread_mutex_lock(&instance->r_lock);
+
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ pthread_mutex_unlock(&instance->r_lock);
return 0;
+ }
list_del(&e->next);
+
+ pthread_mutex_unlock(&instance->r_lock);
+
free(e);
return 0;
@@ -933,8 +1066,8 @@ static void irm_destroy(struct irm * irm)
struct reg_name_entry * e = list_entry(h,
struct reg_name_entry,
next);
- char * difs [1] = {ALL_DIFS};
- ap_unreg_id(e->api->id, difs, 1);
+ list_del(&e->next);
+ free(e);
}
list_for_each_safe(h, t, &irm->port_map) {
@@ -1059,6 +1192,7 @@ void * mainloop()
e = flow_accept(msg->pid,
&ret_msg.ap_name,
&ret_msg.ae_name);
+
if (e == NULL)
break;
@@ -1186,7 +1320,15 @@ static struct irm * irm_create()
return NULL;
}
- pthread_mutex_init(&i->lock, NULL);
+ if (pthread_mutex_init(&i->r_lock, NULL)) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ if (pthread_mutex_init(&i->r_lock, NULL)) {
+ irm_destroy(i);
+ return NULL;
+ }
return i;
}