diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-10 15:11:51 +0200 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-05-10 15:11:51 +0200 |
commit | b4397485d19dc8bd4c109641e0495f8c5a5f4d16 (patch) | |
tree | 03b2e0c45029be54f6372b3d5de2135cefb80bf0 | |
parent | a952b77225418ce0e92f14e58f2db792ce9a72b0 (diff) | |
parent | f45237ea48631f2d7bc345f27da0f6fac795718b (diff) | |
download | ouroboros-b4397485d19dc8bd4c109641e0495f8c5a5f4d16.tar.gz ouroboros-b4397485d19dc8bd4c109641e0495f8c5a5f4d16.zip |
Merged in dstaesse/ouroboros/irmd-threads (pull request #70)
irmd: use pthread_cond_wait
-rw-r--r-- | src/irmd/main.c | 451 | ||||
-rw-r--r-- | src/lib/dev.c | 5 | ||||
-rw-r--r-- | src/lib/ipcp.c | 2 |
3 files changed, 321 insertions, 137 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 946ed13d..b660511c 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -66,8 +66,6 @@ struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; - - pthread_mutex_t lock; }; /* currently supports only registering whatevercast groups of a single AP-I */ @@ -83,9 +81,11 @@ struct reg_name_entry { bool accept; char * req_ap_name; char * req_ae_name; + int response; bool flow_arrived; - pthread_mutex_t fa_lock; + pthread_cond_t acc_signal; + pthread_mutex_t acc_lock; }; /* keeps track of port_id's between N and N - 1 */ @@ -97,6 +97,9 @@ struct port_map_entry { pid_t n_pid; pid_t n_1_pid; + pthread_cond_t res_signal; + pthread_mutex_t res_lock; + enum flow_state state; }; @@ -117,9 +120,33 @@ struct irm { pthread_t * threadpool; - pthread_mutex_t lock; + pthread_mutex_t r_lock; } * instance = NULL; +static struct port_map_entry * port_map_entry_create() +{ + struct port_map_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->n_pid = 0; + e->n_1_pid = 0; + e->port_id = 0; + e->state = FLOW_NULL; + + if (pthread_cond_init(&e->res_signal, NULL)) { + free(e); + return NULL; + } + + if (pthread_mutex_init(&e->res_lock, NULL)) { + free(e); + return NULL; + } + + return e; +} + static struct port_map_entry * get_port_map_entry(int port_id) { struct list_head * pos = NULL; @@ -160,7 +187,6 @@ static struct ipcp_entry * ipcp_entry_create() e->dif_name = NULL; INIT_LIST_HEAD(&e->next); - pthread_mutex_init(&e->lock, NULL); return e; } @@ -254,7 +280,16 @@ static struct reg_name_entry * reg_name_entry_create() e->req_ae_name = NULL; e->flow_arrived = false; - pthread_mutex_init(&e->fa_lock, NULL); + if (pthread_cond_init(&e->acc_signal, NULL)) { + free(e); + return NULL; + } + + if (pthread_mutex_init(&e->acc_lock, NULL)) { + free(e); + return NULL; + } + INIT_LIST_HEAD(&e->next); return e; @@ -388,13 +423,13 @@ static pid_t create_ipcp(char * ap_name, tmp->dif_name = NULL; - pthread_mutex_lock(&instance->lock); + pthread_mutex_lock(&instance->r_lock); list_add(&tmp->next, &instance->ipcps); - pthread_mutex_unlock(&instance->lock); + pthread_mutex_unlock(&instance->r_lock); - LOG_INFO("Created IPCP %s-%d ", ap_name, pid); + LOG_INFO("Created IPCP %s-%d.", ap_name, pid); return pid; } @@ -403,6 +438,8 @@ static int destroy_ipcp(instance_name_t * api) { struct list_head * pos = NULL; struct list_head * n = NULL; + pid_t pid = 0; + if (api == NULL) return 0; @@ -415,6 +452,7 @@ static int destroy_ipcp(instance_name_t * api) return 0; } + pid = api->id; if (ipcp_destroy(api->id)) LOG_ERR("Could not destroy IPCP."); @@ -428,7 +466,7 @@ static int destroy_ipcp(instance_name_t * api) ipcp_entry_destroy(tmp); } - LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); + LOG_INFO("Destroyed IPCP %d.", pid); return 0; } @@ -438,33 +476,41 @@ static int bootstrap_ipcp(instance_name_t * api, { struct ipcp_entry * entry = NULL; + pthread_mutex_lock(&instance->r_lock); + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP in the system."); return -1; } entry = get_ipcp_entry_by_name(api); if (entry == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + pthread_mutex_unlock(&instance->r_lock); + LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", api->name, api->id, conf->dif_name); @@ -479,14 +525,18 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; + pthread_mutex_lock(&instance->r_lock); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Failed to strdup."); return -1; } @@ -496,6 +546,7 @@ static int enroll_ipcp(instance_name_t * api, LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; + pthread_mutex_unlock(&instance->r_lock); return -1; } @@ -504,13 +555,18 @@ static int enroll_ipcp(instance_name_t * api, LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; + pthread_mutex_unlock(&instance->r_lock); return -1; } - if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) { + pthread_mutex_unlock(&instance->r_lock); + + if (ipcp_enroll(api->id, member, n_1_difs[0])) { LOG_ERR("Could not enroll IPCP."); + pthread_mutex_lock(&instance->r_lock); free(entry->dif_name); entry->dif_name = NULL; + pthread_mutex_unlock(&instance->r_lock); return -1; } @@ -545,50 +601,6 @@ static int unreg_ipcp(instance_name_t * api, return 0; } -static int ap_unreg_id(pid_t pid, - char ** difs, - size_t len) -{ - int i; - int ret = 0; - struct reg_name_entry * rne = NULL; - struct list_head * pos = NULL; - - rne = get_reg_name_entry_by_id(pid); - if (rne == NULL) - return 0; /* no such id */ - - if (instance->ipcps.next == NULL) { - LOG_ERR("No IPCPs in this system."); - return 0; - } - - if (strcmp(difs[0], ALL_DIFS) == 0) { - list_for_each(pos, &instance->ipcps) { - struct ipcp_entry * e = - list_entry(pos, struct ipcp_entry, next); - - if (ipcp_name_unreg(e->api->id, rne->name)) { - LOG_ERR("Could not unregister %s in DIF %s.", - rne->name, e->dif_name); - --ret; - } - } - } else { - for (i = 0; i < len; ++i) { - if (ipcp_name_unreg(pid, rne->name)) { - LOG_ERR("Could not unregister %s in DIF %s.", - rne->name, difs[i]); - --ret; - } - } - } - - reg_name_entry_del_name(rne->name); - - return ret; -} - static int ap_reg(char * ap_name, pid_t ap_id, char ** difs, @@ -602,24 +614,35 @@ static int ap_reg(char * ap_name, instance_name_t * api = NULL; instance_name_t * ipcpi = NULL; - if (instance->ipcps.next == NULL) - return -1; - /* check if this ap_name is already registered */ - rne = get_reg_name_entry_by_name(ap_name); - if (rne != NULL) - return -1; /* can only register one instance for now */ + pthread_mutex_lock(&instance->r_lock); + + if (instance->ipcps.next == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return -1; + } api = instance_name_create(); if (api == NULL) { + pthread_mutex_unlock(&instance->r_lock); return -1; } if (instance_name_init_from(api, ap_name, ap_id) == NULL) { + pthread_mutex_unlock(&instance->r_lock); instance_name_destroy(api); return -1; } + /* check if this ap_name is already registered */ + + rne = get_reg_name_entry_by_name(ap_name); + if (rne != NULL) { + instance_name_destroy(api); + pthread_mutex_unlock(&instance->r_lock); + return -1; /* can only register one instance for now */ + } + /* * for now, the whatevercast name is the same as the ap_name and * contains a single instance only @@ -656,12 +679,15 @@ static int ap_reg(char * ap_name, if (ret == 0) { instance_name_destroy(api); + pthread_mutex_unlock(&instance->r_lock); return -1; } /* for now, we register single instances */ ret = reg_name_entry_add_name_instance(strdup(ap_name), api); + pthread_mutex_unlock(&instance->r_lock); + return ret; } @@ -670,62 +696,104 @@ static int ap_unreg(char * ap_name, char ** difs, size_t len) { - struct reg_name_entry * tmp = NULL; + int i; + int ret = 0; + struct reg_name_entry * rne = NULL; + struct list_head * pos = NULL; + + pthread_mutex_lock(&instance->r_lock); /* check if ap_name is registered */ - tmp = get_reg_name_entry_by_id(ap_id); - if (tmp == NULL) + rne = get_reg_name_entry_by_id(ap_id); + if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return 0; /* no such id */ + } + + if (strcmp(ap_name, rne->api->name)) { + pthread_mutex_unlock(&instance->r_lock); return 0; + } - if (strcmp(ap_name, tmp->api->name)) + if (instance->ipcps.next == NULL) { + pthread_mutex_unlock(&instance->r_lock); + LOG_ERR("No IPCPs in this system."); return 0; + } + + if (strcmp(difs[0], ALL_DIFS) == 0) { + list_for_each(pos, &instance->ipcps) { + struct ipcp_entry * e = + list_entry(pos, struct ipcp_entry, next); + + if (ipcp_name_unreg(e->api->id, rne->name)) { + LOG_ERR("Could not unregister %s in DIF %s.", + rne->name, e->dif_name); + --ret; + } + } + } else { + for (i = 0; i < len; ++i) { + if (ipcp_name_unreg(ap_id, rne->name)) { + LOG_ERR("Could not unregister %s in DIF %s.", + rne->name, difs[i]); + --ret; + } + } + } + + reg_name_entry_del_name(rne->name); - return ap_unreg_id(ap_id, difs, len); + pthread_mutex_unlock(&instance->r_lock); + + return ret; } static struct port_map_entry * flow_accept(pid_t pid, char ** ap_name, char ** ae_name) { - bool arrived = false; + struct port_map_entry * pme; + struct reg_name_entry * rne = NULL; - struct timespec ts = {0, 100000}; + pthread_mutex_lock(&instance->r_lock); - struct port_map_entry * pme; - struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); + rne = get_reg_name_entry_by_id(pid); if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("Unregistered AP calling accept()."); return NULL; } - if (rne->accept) { + pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("This AP still has a pending accept()."); return NULL; } - rne->accept = true; + rne->accept = true; + rne->flow_arrived = false; + + pthread_mutex_unlock(&instance->r_lock); + pthread_mutex_lock(&rne->acc_lock); - /* FIXME: wait for a thread that runs select() on flow_arrived */ - while (!arrived) { - /* FIXME: this needs locking */ - rne = get_reg_name_entry_by_id(pid); - if (rne == NULL) - return NULL; - arrived = rne->flow_arrived; - nanosleep(&ts, NULL); - } + while (!rne->flow_arrived) + pthread_cond_wait(&rne->acc_signal, &rne->acc_lock); + + pthread_mutex_unlock(&rne->acc_lock); + pthread_mutex_lock(&instance->r_lock); pme = get_port_map_entry_n(pid); if (pme == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Port_id was not created yet."); return NULL; } - pthread_mutex_lock(&rne->fa_lock); *ap_name = rne->req_ap_name; if (ae_name != NULL) *ae_name = rne->req_ae_name; - pthread_mutex_unlock(&rne->fa_lock); + + pthread_mutex_unlock(&instance->r_lock); return pme; } @@ -734,14 +802,20 @@ static int flow_alloc_resp(pid_t n_pid, int port_id, int response) { - struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); - struct port_map_entry * pme = get_port_map_entry(port_id); + struct port_map_entry * pme = NULL; + struct reg_name_entry * rne = NULL; + + pthread_mutex_lock(&instance->r_lock); - if (rne == NULL || pme == NULL) + rne = get_reg_name_entry_by_id(n_pid); + if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); return -1; + } /* FIXME: check all instances associated with the name */ if (!rne->accept) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No process listening for this name."); return -1; } @@ -751,11 +825,15 @@ static int flow_alloc_resp(pid_t n_pid, * once we can handle a list of AP-I's, remove it from the list */ - rne->flow_arrived = false; rne->accept = false; + rne->flow_arrived = false; - if (!response) + if (!response) { + pme = get_port_map_entry(port_id); pme->state = FLOW_ALLOCATED; + } + + pthread_mutex_unlock(&instance->r_lock); return ipcp_flow_alloc_resp(pme->n_1_pid, port_id, @@ -769,53 +847,104 @@ static struct port_map_entry * flow_alloc(pid_t pid, char * src_ae_name, struct qos_spec * qos) { - struct port_map_entry * e = malloc(sizeof(*e)); - if (e == NULL) { + struct port_map_entry * pme; + instance_name_t * ipcp; + + pme = port_map_entry_create(); + if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); return NULL; } - e->port_id = bmp_allocate(instance->port_ids); - e->n_pid = pid; - e->state = FLOW_PENDING; - e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + pme->n_pid = pid; + pme->state = FLOW_PENDING; + + pthread_mutex_lock(&instance->r_lock); + + pme->port_id = bmp_allocate(instance->port_ids); + pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + + list_add(&pme->next, &instance->port_map); + + ipcp = get_ipcp_by_dst_name(dst_name); + + pthread_mutex_unlock(&instance->r_lock); - list_add(&e->next, &instance->port_map); + if (ipcp == NULL) { + LOG_DBG("unknown ipcp"); + return NULL; + } - if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, - e->port_id, - e->n_pid, + if (ipcp_flow_alloc(ipcp->id, + pme->port_id, + pme->n_pid, dst_name, src_ap_name, src_ae_name, qos) < 0) { - list_del(&e->next); - bmp_release(instance->port_ids, e->port_id); - free(e); + pthread_mutex_lock(&instance->r_lock); + + list_del(&pme->next); + + pthread_mutex_unlock(&instance->r_lock); + + bmp_release(instance->port_ids, pme->port_id); + free(pme); + return NULL; } - return e; + return pme; } static int flow_alloc_res(int port_id) { - bool allocated = false; struct port_map_entry * e; - struct timespec ts = {0,100000}; - while (!allocated) { - /* FIXME: this needs locking */ + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return -1; + } + + if (e->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&instance->r_lock); + return 0; + } + + pthread_mutex_unlock(&instance->r_lock); + pthread_mutex_lock(&e->res_lock); + + while (true) { + pthread_cond_wait(&e->res_signal, &e->res_lock); + + pthread_mutex_unlock(&e->res_lock); + pthread_mutex_lock(&instance->r_lock); + e = get_port_map_entry(port_id); if (e == NULL) { - LOG_DBGF("Could not locate port_id %d", port_id); + pthread_mutex_unlock(&instance->r_lock); return -1; } - if (e->state == FLOW_ALLOCATED) - allocated = true; - nanosleep(&ts, NULL); + if (e->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&instance->r_lock); + LOG_DBGF("Returning 0."); + return 0; + } + if (e->state == FLOW_NULL) { + list_del(&e->next); + pthread_mutex_unlock(&instance->r_lock); + free(e); + return -1; + + } + /* still pending, spurious wake */ } + pthread_mutex_unlock(&instance->r_lock); + return 0; } @@ -823,13 +952,22 @@ static int flow_dealloc(int port_id) { pid_t n_1_pid; - struct port_map_entry * e = get_port_map_entry(port_id); - if (e == NULL) + struct port_map_entry * e = NULL; + + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); return 0; + } n_1_pid = e->n_1_pid; list_del(&e->next); + + pthread_mutex_unlock(&instance->r_lock); + free(e); return ipcp_flow_dealloc(n_1_pid, port_id); @@ -843,33 +981,45 @@ static struct port_map_entry * flow_req_arr(pid_t pid, struct reg_name_entry * rne; struct port_map_entry * pme; - rne = get_reg_name_entry_by_name(dst_name); - if (rne == NULL) { - LOG_DBGF("Destination name %s unknown.", dst_name); - return NULL; - } - pme = malloc(sizeof(*pme)); if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); return NULL; } - pme->port_id = bmp_allocate(instance->port_ids); - pme->n_pid = rne->api->id; pme->state = FLOW_PENDING; pme->n_1_pid = pid; - list_add(&pme->next, &instance->port_map); + pthread_mutex_lock(&instance->r_lock); - pthread_mutex_lock(&rne->fa_lock); + pme->port_id = bmp_allocate(instance->port_ids); + + rne = get_reg_name_entry_by_name(dst_name); + if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); + LOG_DBGF("Destination name %s unknown.", dst_name); + free(pme); + return NULL; + } + + pme->n_pid = rne->api->id; + + list_add(&pme->next, &instance->port_map); rne->req_ap_name = strdup(ap_name); rne->req_ae_name = strdup(ae_name); + pthread_mutex_lock(&rne->acc_lock); + rne->flow_arrived = true; - pthread_mutex_unlock(&rne->fa_lock); + if (pthread_cond_signal(&rne->acc_signal)) + LOG_ERR("Failed to send signal."); + + pthread_mutex_unlock(&rne->acc_lock); + + pthread_mutex_unlock(&instance->r_lock); + return pme; } @@ -879,26 +1029,48 @@ static int flow_alloc_reply(int port_id, { struct port_map_entry * e; - /* FIXME: do this under lock */ - if (!response) { - e = get_port_map_entry(port_id); - if (e == NULL) - return -1; - e->state = FLOW_ALLOCATED; + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return -1; } - /* FIXME: does this need to be propagated to the IPCP? */ + pthread_mutex_lock(&e->res_lock); + + if (!response) + e->state = FLOW_ALLOCATED; + + else + e->state = FLOW_NULL; + + if (pthread_cond_signal(&e->res_signal)) + LOG_ERR("Failed to send signal."); + + pthread_mutex_unlock(&e->res_lock); + + pthread_mutex_unlock(&instance->r_lock); return 0; } static int flow_dealloc_ipcp(int port_id) { - struct port_map_entry * e = get_port_map_entry(port_id); - if (e == NULL) + struct port_map_entry * e = NULL; + + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); return 0; + } list_del(&e->next); + + pthread_mutex_unlock(&instance->r_lock); + free(e); return 0; @@ -927,8 +1099,8 @@ static void irm_destroy(struct irm * irm) struct reg_name_entry * e = list_entry(h, struct reg_name_entry, next); - char * difs [1] = {ALL_DIFS}; - ap_unreg_id(e->api->id, difs, 1); + list_del(&e->next); + free(e); } list_for_each_safe(h, t, &irm->port_map) { @@ -1053,6 +1225,7 @@ void * mainloop() e = flow_accept(msg->pid, &ret_msg.ap_name, &ret_msg.ae_name); + if (e == NULL) break; @@ -1180,7 +1353,15 @@ static struct irm * irm_create() return NULL; } - pthread_mutex_init(&i->lock, NULL); + if (pthread_mutex_init(&i->r_lock, NULL)) { + irm_destroy(i); + return NULL; + } + + if (pthread_mutex_init(&i->r_lock, NULL)) { + irm_destroy(i); + return NULL; + } return i; } diff --git a/src/lib/dev.c b/src/lib/dev.c index d574363b..c1cfe043 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -64,6 +64,7 @@ struct ap_data { int ap_init(char * ap_name) { + int i = 0; _ap_instance = malloc(sizeof(struct ap_data)); if (_ap_instance == NULL) { return -1; @@ -106,6 +107,10 @@ int ap_init(char * ap_name) return -1; } + for (i = 0; i < AP_MAX_FLOWS; ++i) + _ap_instance->flows[i].rb = NULL; + + return 0; } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 843582b9..1f1e5c99 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -148,8 +148,6 @@ pid_t ipcp_create(char * ipcp_name, strcat(full_name, exec_name); full_name[len] = '\0'; - LOG_DBG("Full name is %s", full_name); - char * argv[] = {full_name, irmd_pid, ipcp_name, |