diff options
-rw-r--r-- | src/irmd/main.c | 1065 |
1 files changed, 807 insertions, 258 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index dff052a1..bc57c4b2 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -50,46 +50,170 @@ #include <pthread.h> #include <sys/stat.h> -#ifndef IRMD_MAX_FLOWS #define IRMD_MAX_FLOWS 4096 -#endif -#ifndef IRMD_THREADPOOL_SIZE #define IRMD_THREADPOOL_SIZE 3 -#endif -#ifndef IRMD_FLOW_TIMEOUT #define IRMD_FLOW_TIMEOUT 5000 /* ms */ -#endif #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ +#define REG_AP_AUTO 0x0001 +/* FIXME: add support for unique */ +#define REG_AP_UNIQUE 0x0002 + +#define reg_entry_has_api(e, id) (reg_entry_get_reg_instance(e, id) != NULL) +#define reg_entry_has_ap_name(e, name) (reg_entry_get_ap_name(e, name) != NULL) +#define reg_entry_has_ap_auto(e, name) (reg_entry_get_reg_auto(e, name) != NULL) + struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; }; -/* currently supports only registering whatevercast groups of a single AP-I */ -struct reg_name_entry { +enum irm_state { + IRMD_NULL = 0, + IRMD_RUNNING, + IRMD_SHUTDOWN +}; + +enum reg_name_state { + REG_NAME_NULL = 0, + REG_NAME_IDLE, + REG_NAME_AUTO_ACCEPT, + REG_NAME_AUTO_EXEC, + REG_NAME_FLOW_ACCEPT, + REG_NAME_FLOW_ARRIVED +}; + +enum reg_i_state { + REG_I_NULL = 0, + REG_I_SLEEP, + REG_I_WAKE +}; + +struct reg_instance { + struct list_head next; + pid_t pid; + + /* the pid will block on this */ + enum reg_i_state state; + pthread_cond_t wakeup; + pthread_mutex_t mutex; +}; + +static struct reg_instance * reg_instance_create(pid_t pid) +{ + struct reg_instance * i; + i = malloc(sizeof(*i)); + if (i == NULL) + return NULL; + + i->pid = pid; + i->state = REG_I_WAKE; + + pthread_mutex_init(&i->mutex, NULL); + pthread_cond_init(&i->wakeup, NULL); + + INIT_LIST_HEAD(&i->next); + + return i; +} + +static void reg_instance_sleep(struct reg_instance * i) +{ + pthread_mutex_lock(&i->mutex); + if (i->state != REG_I_WAKE) { + pthread_mutex_unlock(&i->mutex); + return; + } + + i->state = REG_I_SLEEP; + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) &i->mutex); + + while (i->state == REG_I_SLEEP) + pthread_cond_wait(&i->wakeup, &i->mutex); + + pthread_cleanup_pop(true); +} + +static void reg_instance_wake(struct reg_instance * i) +{ + pthread_mutex_lock(&i->mutex); + + if (i->state == REG_I_NULL) { + pthread_mutex_unlock(&i->mutex); + return; + } + + i->state = REG_I_WAKE; + + pthread_mutex_unlock(&i->mutex); + pthread_cond_signal(&i->wakeup); +} + +static void reg_instance_destroy(struct reg_instance * i) +{ + bool wait = true; + pthread_mutex_lock(&i->mutex); + i->state = REG_I_NULL; + + pthread_cond_broadcast(&i->wakeup); + pthread_mutex_unlock(&i->mutex); + + while (wait) { + pthread_mutex_lock(&i->mutex); + if (pthread_cond_destroy(&i->wakeup) < 0) + pthread_cond_broadcast(&i->wakeup); + else + wait = false; + pthread_mutex_unlock(&i->mutex); + } + + pthread_mutex_destroy(&i->mutex); + + free(i); +} + +struct reg_auto { + struct list_head next; + char * ap_name; + char ** argv; +}; + +struct reg_ap_name { + struct list_head next; + char * ap_name; +}; + +/* an entry in the registry */ +struct reg_entry { struct list_head next; - /* generic whatevercast name */ - char * name; + /* generic name */ + char * name; + + /* names of the aps that can listen to this name */ + struct list_head ap_names; + + enum reg_name_state state; + + uint32_t flags; - /* FIXME: make a list resolve to AP-I instead */ - instance_name_t * api; - char ** argv; - bool autoexec; + /* auto execution info */ + struct list_head auto_ap_info; + + /* known instances */ + struct list_head ap_instances; - bool accept; char * req_ae_name; int response; - int flow_arrived; pthread_cond_t acc_signal; - pthread_cond_t acc_arr_signal; - pthread_mutex_t acc_lock; + pthread_mutex_t state_lock; }; /* keeps track of port_id's between N and N - 1 */ @@ -105,13 +229,15 @@ struct port_map_entry { pthread_mutex_t res_lock; enum flow_state state; + struct timespec t0; }; struct irm { - /* FIXME: list of ipcps could be merged with registered names */ + /* FIXME: list of ipcps could be merged into the registry */ struct list_head ipcps; - struct list_head reg_names; + + struct list_head registry; pthread_rwlock_t reg_lock; /* keep track of all flows in this processing system */ @@ -120,13 +246,14 @@ struct irm { struct list_head port_map; pthread_rwlock_t flows_lock; + enum irm_state state; struct shm_du_map * dum; pthread_t * threadpool; int sockfd; pthread_rwlock_t state_lock; - pthread_t cleanup_flows; - pthread_t shm_sanitize; + pthread_t cleanup_flows; + pthread_t shm_sanitize; } * instance = NULL; static struct port_map_entry * port_map_entry_create() @@ -156,6 +283,29 @@ static struct port_map_entry * port_map_entry_create() return e; } +static void port_map_entry_destroy(struct port_map_entry * e) +{ + bool wait = true; + pthread_mutex_lock(&e->res_lock); + e->state = FLOW_NULL; + + pthread_cond_broadcast(&e->res_signal); + pthread_mutex_unlock(&e->res_lock); + + while (wait) { + pthread_mutex_lock(&e->res_lock); + if (pthread_cond_destroy(&e->res_signal) < 0) + pthread_cond_broadcast(&e->res_signal); + else + wait = false; + pthread_mutex_unlock(&e->res_lock); + } + + pthread_mutex_destroy(&e->res_lock); + + free(e); +} + static struct port_map_entry * get_port_map_entry(int port_id) { struct list_head * pos = NULL; @@ -272,87 +422,181 @@ static instance_name_t * get_ipcp_by_dst_name(char * dst_name, return NULL; } -static struct reg_name_entry * reg_name_entry_create() +static struct reg_entry * reg_entry_create() { - struct reg_name_entry * e = malloc(sizeof(*e)); + struct reg_entry * e = malloc(sizeof(*e)); if (e == NULL) return NULL; e->name = NULL; - e->api = NULL; - e->argv = NULL; - e->autoexec = false; - e->accept = false; + e->state = REG_NAME_NULL; + e->flags = 0; + e->req_ae_name = NULL; - e->flow_arrived = -1; + e->response = -1; - if (pthread_cond_init(&e->acc_arr_signal, NULL)) { - free(e); + return e; +} + +static struct reg_entry * reg_entry_init(struct reg_entry * e, + char * name, + char * ap_name, + uint32_t flags) +{ + if (e == NULL || name == NULL || ap_name == NULL) return NULL; - } + + struct reg_ap_name * n = malloc(sizeof(*n)); + if (n == NULL) + return NULL; + + INIT_LIST_HEAD(&e->next); + INIT_LIST_HEAD(&e->ap_names); + INIT_LIST_HEAD(&e->auto_ap_info); + INIT_LIST_HEAD(&e->ap_instances); + + e->name = name; + e->flags = flags; + n->ap_name = ap_name; + + list_add(&n->next, &e->ap_names); if (pthread_cond_init(&e->acc_signal, NULL)) { free(e); return NULL; } - if (pthread_mutex_init(&e->acc_lock, NULL)) { + if (pthread_mutex_init(&e->state_lock, NULL)) { free(e); return NULL; } - INIT_LIST_HEAD(&e->next); + e->state = REG_NAME_IDLE; return e; } -static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e, - char * name, - instance_name_t * api, - char ** argv, - bool ae) +static void reg_entry_destroy(struct reg_entry * e) { - if (e == NULL || name == NULL || api == NULL) - return NULL; + struct list_head * pos = NULL; + struct list_head * n = NULL; - e->name = name; - e->api = api; - e->argv = argv; - e->autoexec = ae; + bool wait = true; - return e; + if (e == NULL) + return; + + pthread_mutex_lock(&e->state_lock); + + e->state = REG_NAME_NULL; + + pthread_cond_broadcast(&e->acc_signal); + pthread_mutex_unlock(&e->state_lock); + + while (wait) { + pthread_mutex_lock(&e->state_lock); + if (pthread_cond_destroy(&e->acc_signal) < 0) + pthread_cond_broadcast(&e->acc_signal); + else + wait = false; + pthread_mutex_unlock(&e->state_lock); + } + + pthread_mutex_destroy(&e->state_lock); + + if (e->name != NULL) + free(e->name); + + if (e->req_ae_name != NULL) + free(e->req_ae_name); + + list_for_each_safe(pos, n, &e->ap_instances) { + struct reg_instance * i = + list_entry(pos, struct reg_instance, next); + reg_instance_destroy(i); + } + + list_for_each_safe(pos, n, &e->auto_ap_info) { + struct reg_auto * a = + list_entry(pos, struct reg_auto, next); + + if (a->argv != NULL) { + char ** t = a->argv; + while (*a->argv != NULL) + free(*(a->argv++)); + free(t); + } + + free(a->ap_name); + free(a); + } + + list_for_each_safe(pos, n, &e->ap_names) { + struct reg_ap_name * n = + list_entry(pos, struct reg_ap_name, next); + + free(n->ap_name); + free(n); + } + + free(e); } -static int reg_name_entry_destroy(struct reg_name_entry * e) +static struct reg_ap_name * reg_entry_get_ap_name(struct reg_entry * e, + char * ap_name) { - if (e == NULL) - return 0; + struct list_head * pos = NULL; - if (e->accept) { - pthread_mutex_lock(&e->acc_lock); - e->flow_arrived = -2; - pthread_mutex_unlock(&e->acc_lock); - pthread_cond_broadcast(&e->acc_arr_signal); - sched_yield(); + list_for_each(pos, &e->ap_names) { + struct reg_ap_name * n = + list_entry(pos, struct reg_ap_name, next); + + if (strcmp(ap_name, n->ap_name) == 0) + return n; } - free(e->name); - instance_name_destroy(e->api); + return NULL; +} - free(e); +static struct reg_instance * reg_entry_get_reg_instance(struct reg_entry * e, + pid_t pid) +{ + struct list_head * pos = NULL; - e = NULL; + list_for_each(pos, &e->ap_instances) { + struct reg_instance * r = + list_entry(pos, struct reg_instance, next); - return 0; + if (r->pid == pid) + return r; + } + + return NULL; +} + +static struct reg_auto * reg_entry_get_reg_auto(struct reg_entry * e, + char * ap_name) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &e->auto_ap_info) { + struct reg_auto * a = + list_entry(pos, struct reg_auto, next); + + if (strcmp(ap_name, a->ap_name) == 0) + return a; + } + + return NULL; } -static struct reg_name_entry * get_reg_name_entry_by_name(char * name) +static struct reg_entry * get_reg_entry_by_name(char * name) { struct list_head * pos = NULL; - list_for_each(pos, &instance->reg_names) { - struct reg_name_entry * e = - list_entry(pos, struct reg_name_entry, next); + list_for_each(pos, &instance->registry) { + struct reg_entry * e = + list_entry(pos, struct reg_entry, next); if (strcmp(name, e->name) == 0) return e; @@ -361,72 +605,287 @@ static struct reg_name_entry * get_reg_name_entry_by_name(char * name) return NULL; } -static struct reg_name_entry * get_reg_name_entry_by_ap_name(char * ap_name) +static struct reg_entry * get_reg_entry_by_ap_name(char * ap_name) { struct list_head * pos = NULL; - list_for_each(pos, &instance->reg_names) { - struct reg_name_entry * e = - list_entry(pos, struct reg_name_entry, next); + list_for_each(pos, &instance->registry) { + struct list_head * p = NULL; + struct reg_entry * e = + list_entry(pos, struct reg_entry, next); - if (strcmp(ap_name, e->api->name) == 0) - return e; + list_for_each(p, &e->ap_names) { + struct reg_ap_name * n = + list_entry(p, struct reg_ap_name, next); + + if (strcmp(n->ap_name, ap_name) == 0) + return e; + } } return NULL; } -static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid) +static struct reg_entry * get_reg_entry_by_ap_id(pid_t pid) { struct list_head * pos = NULL; - list_for_each(pos, &instance->reg_names) { - struct reg_name_entry * e = - list_entry(pos, struct reg_name_entry, next); + list_for_each(pos, &instance->registry) { + struct list_head * p = NULL; + struct reg_entry * e = + list_entry(pos, struct reg_entry, next); - if (e->api->id == pid) - return e; + list_for_each(p, &e->ap_instances) { + struct reg_instance * r = + list_entry(p, struct reg_instance, next); + + if (r->pid == pid) + return e; + } } return NULL; } -/* FIXME: add only name when we have NSM solved */ -static int reg_name_entry_add_name_instance(char * name, - instance_name_t * api, - char ** argv, - bool autoexec) +static int registry_add_entry(char * name, char * ap_name, uint32_t flags) { - struct reg_name_entry * e = get_reg_name_entry_by_name(name); + struct reg_entry * e = NULL; + + if (name == NULL || ap_name == NULL) + return -EINVAL; + + e = get_reg_entry_by_name(name); + if (e != NULL) { + LOG_DBG("Name %s already registered.", name); + return -1; + } + + e = reg_entry_create(); if (e == NULL) { - e = reg_name_entry_create(); - if (e == NULL) - return -1; + LOG_DBG("Could not create registry entry."); + return -1; + } - if (reg_name_entry_init(e, name, api, argv, autoexec) - == NULL) { - reg_name_entry_destroy(e); - return -1; + e = reg_entry_init(e, name, ap_name, flags); + if (e == NULL) { + LOG_DBG("Could not initialize registry entry."); + reg_entry_destroy(e); + return -1; + } + + list_add(&e->next, &instance->registry); + + return 0; +} + +static int registry_add_ap_auto(char * name, + char * ap_name, + char ** argv) +{ + struct reg_entry * e; + struct reg_auto * a; + + if (name == NULL || ap_name == NULL) + return -EINVAL; + + e = get_reg_entry_by_name(name); + if (e == NULL) { + LOG_DBG("Name %s not found in registry.", name); + return -1; + } + + if (!(e->flags & REG_AP_AUTO)) { + LOG_DBG("%s does not allow auto-instantiation.", name); + return -1; + } + + if (!reg_entry_has_ap_name(e, ap_name)) { + LOG_DBG("AP name %s not associated with %s.", ap_name, name); + return -1; + } + + if (e->state == REG_NAME_NULL) { + LOG_DBG("Tried to add instantiation info in NULL state."); + return -1; + } + + a = reg_entry_get_reg_auto(e, ap_name); + if (a != NULL) { + LOG_DBG("Updating auto-instantiation info for %s.", ap_name); + list_del(&a->next); + free(a->ap_name); + if (a->argv != NULL) { + while (*a->argv != NULL) + free(*a->argv++); } + } else { + a = malloc(sizeof(*a)); + if (a == NULL) + return -1; + } - list_add(&e->next, &instance->reg_names); - return 0; + a->ap_name = ap_name; + a->argv = argv; + + if(e->state == REG_NAME_IDLE) + e->state = REG_NAME_AUTO_ACCEPT; + + list_add(&a->next, &e->auto_ap_info); + + return 0; +} + +#if 0 +static int registry_remove_ap_auto(char * name, + char * ap_name) +{ + struct reg_entry * e; + struct reg_auto * a; + + if (name == NULL || ap_name == NULL) + return -EINVAL; + + e = get_reg_entry_by_name(name); + if (e == NULL) { + LOG_DBG("Name %s not found in registry.", name); + return -1; } - /* already exists, we don't have NSM yet */ - return -1; + a = reg_entry_get_reg_auto(e, ap_name); + if (a == NULL) { + LOG_DBG("Auto-instantiation info for %s not found.", ap_name); + return -1; + } + + list_del(&a->next); + + if(e->state == REG_NAME_AUTO_ACCEPT && list_empty(&e->auto_ap_info)) + e->state = REG_NAME_IDLE; + + return 0; } +#endif -static int reg_name_entry_del_name(char * name) +static struct reg_instance * registry_add_ap_instance(char * name, + pid_t pid) { - struct reg_name_entry * e = get_reg_name_entry_by_name(name); + struct reg_entry * e = NULL; + struct reg_instance * i = NULL; + + if (name == NULL || pid == 0) + return NULL; + + e = get_reg_entry_by_name(name); + if (e == NULL) { + LOG_DBG("Name %s not found in registry.", name); + return NULL; + } + + if (pid == 0) { + LOG_DBG("Invalid pid."); + return NULL; + } + + if (reg_entry_has_api(e, pid)) { + LOG_DBG("Instance already registered with this name."); + return NULL; + } + + if (e->state == REG_NAME_NULL) { + LOG_DBG("Tried to add instance in NULL state."); + return NULL; + } + + i = reg_instance_create(pid); + if (i == NULL) { + LOG_DBG("Failed to create reg_instance"); + return NULL; + } + + if(e->state == REG_NAME_IDLE || e->state == REG_NAME_AUTO_EXEC) { + e->state = REG_NAME_FLOW_ACCEPT; + pthread_cond_signal(&e->acc_signal); + } + + list_add(&i->next, &e->ap_instances); + + return i; +} + +static int registry_remove_ap_instance(char * name, pid_t pid) +{ + struct reg_entry * e = NULL; + struct reg_instance * i = NULL; + + if (name == NULL || pid == 0) + return -1; + + e = get_reg_entry_by_name(name); + if (e == NULL) { + LOG_DBG("Name %s is not registered.", name); + return -1; + } + + i = reg_entry_get_reg_instance(e, pid); + if (i == NULL) { + LOG_DBG("Instance %d is not accepting flows for %s.", + pid, name); + return -1; + } + + list_del(&i->next); + + if (list_empty(&e->ap_instances)) { + if ((e->flags & REG_AP_AUTO) && + !list_empty(&e->auto_ap_info)) + e->state = REG_NAME_AUTO_ACCEPT; + else + e->state = REG_NAME_IDLE; + } else { + e->state = REG_NAME_FLOW_ACCEPT; + } + + return 0; +} + +static pid_t registry_resolve_api(struct reg_entry * e) +{ + struct list_head * pos = NULL; + + /* FIXME: now just returns the first accepting instance */ + list_for_each(pos, &e->ap_instances) { + struct reg_instance * r = + list_entry(pos, struct reg_instance, next); + return r->pid; + } + + return 0; +} + +static char ** registry_resolve_auto(struct reg_entry * e) +{ + struct list_head * pos = NULL; + + /* FIXME: now just returns the first accepting instance */ + list_for_each(pos, &e->auto_ap_info) { + struct reg_auto * r = + list_entry(pos, struct reg_auto, next); + return r->argv; + } + + return NULL; +} + +static void registry_del_name(char * name) +{ + struct reg_entry * e = get_reg_entry_by_name(name); if (e == NULL) - return 0; + return; list_del(&e->next); - reg_name_entry_destroy(e); + reg_entry_destroy(e); - return 0; + return; } static pid_t create_ipcp(char * ap_name, @@ -437,6 +896,11 @@ static pid_t create_ipcp(char * ap_name, pthread_rwlock_rdlock(&instance->state_lock); + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return 0; + } + pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { pthread_rwlock_unlock(&instance->state_lock); @@ -483,16 +947,21 @@ static pid_t create_ipcp(char * ap_name, static int destroy_ipcp(instance_name_t * api) { struct list_head * pos = NULL; - struct list_head * n = NULL; + struct list_head * n = NULL; pid_t pid = 0; if (api == NULL) return 0; + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_wrlock(&instance->reg_lock); + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_ERR("No such IPCP in the system."); return 0; } @@ -511,6 +980,8 @@ static int destroy_ipcp(instance_name_t * api) } } + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); LOG_INFO("Destroyed IPCP %d.", pid); @@ -523,6 +994,12 @@ static int bootstrap_ipcp(instance_name_t * api, struct ipcp_entry * entry = NULL; pthread_rwlock_rdlock(&instance->state_lock); + + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return -1; + } + pthread_rwlock_wrlock(&instance->reg_lock); if (api->id == 0) @@ -577,6 +1054,12 @@ static int enroll_ipcp(instance_name_t * api, struct ipcp_entry * entry = NULL; pthread_rwlock_rdlock(&instance->state_lock); + + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return -1; + } + pthread_rwlock_rdlock(&instance->reg_lock); entry = get_ipcp_entry_by_name(api); @@ -623,6 +1106,7 @@ static int enroll_ipcp(instance_name_t * api, return 0; } +/* FIXME: distinction between registering names and associating instances */ static int ap_reg(char * name, char * ap_name, pid_t ap_id, @@ -634,42 +1118,36 @@ static int ap_reg(char * name, { int i; int ret = 0; + struct list_head * pos = NULL; - struct reg_name_entry * rne = NULL; + char ** argv_dup = NULL; + char * apn = path_strip(ap_name); - instance_name_t * api = NULL; - char ** argv_dup = NULL; + uint32_t flags = 0; pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->reg_lock); - if (instance->ipcps.next == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); + if (instance->state != IRMD_RUNNING) { pthread_rwlock_unlock(&instance->state_lock); return -1; } - api = instance_name_create(); - if (api == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); - return -1; - } + pthread_rwlock_wrlock(&instance->reg_lock); - if (instance_name_init_from(api, path_strip(ap_name), ap_id) == NULL) { + if (list_empty(&instance->ipcps)) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - instance_name_destroy(api); return -1; } - /* check if this name is already registered */ - rne = get_reg_name_entry_by_name(name); - if (rne != NULL) { + if (autoexec) + flags |= REG_AP_AUTO; + + if (registry_add_entry(strdup(name), strdup(apn), flags) < 0) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - instance_name_destroy(api); - return -1; /* can only register one instance for now */ + LOG_ERR("Failed to register %s.", name); + return -1; } list_for_each(pos, &instance->ipcps) { @@ -684,39 +1162,36 @@ static int ap_reg(char * name, if (ipcp_name_reg(e->api->id, name)) { LOG_ERR("Could not register " "%s in DIF %s as %s.", - api->name, e->dif_name, name); + apn, e->dif_name, name); } else { LOG_INFO("Registered %s as %s in %s", - api->name, name, e->dif_name); + apn, name, e->dif_name); ++ret; } } } } - if (ret == 0) { + + if (ret == 0) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - instance_name_destroy(api); return -1; } - /* we need to duplicate argv */ - if (argc != 0) { - argv_dup = malloc((argc + 2) * sizeof(*argv_dup)); - argv_dup[0] = strdup(api->name); - for (i = 1; i <= argc; ++i) - argv_dup[i] = strdup(argv[i - 1]); - argv_dup[argc + 1] = NULL; - } - + if (autoexec) { + /* we need to duplicate argv */ + if (argc != 0) { + argv_dup = malloc((argc + 2) * sizeof(*argv_dup)); + argv_dup[0] = strdup(ap_name); + for (i = 1; i <= argc; ++i) + argv_dup[i] = strdup(argv[i - 1]); + argv_dup[argc + 1] = NULL; + } - /* for now, we register single instances */ - if ((ret = reg_name_entry_add_name_instance(strdup(name), - api, - argv_dup, - autoexec)) - < 0) - LOG_DBGF("Failed to add application %s.", api->name); + registry_add_ap_auto(name, strdup(apn), argv_dup); + } else { + registry_add_ap_instance(name, ap_id); + } pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); @@ -733,13 +1208,19 @@ static int ap_unreg(char * name, { int i; int ret = 0; - struct reg_name_entry * rne = NULL; - struct list_head * pos = NULL; + struct reg_entry * rne = NULL; + struct list_head * pos = NULL; if (name == NULL || len == 0 || difs == NULL || difs[0] == NULL) return -1; pthread_rwlock_rdlock(&instance->state_lock); + + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return -1; + } + pthread_rwlock_wrlock(&instance->reg_lock); if (!hard && strcmp(difs[0], "*") != 0) { @@ -768,7 +1249,7 @@ static int ap_unreg(char * name, } } - reg_name_entry_del_name(rne->name); + registry_del_name(rne->name); pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); @@ -776,69 +1257,62 @@ static int ap_unreg(char * name, return ret; } -static struct port_map_entry * flow_accept(pid_t pid, - char * srv_ap_name, - char ** dst_ae_name) +static struct port_map_entry * flow_accept(pid_t pid, + char * srv_ap_name, + char ** dst_ae_name) { - struct port_map_entry * pme; - struct reg_name_entry * rne = NULL; + struct port_map_entry * pme = NULL; + struct reg_entry * rne = NULL; + struct reg_instance * rgi = NULL; pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->reg_lock); - rne = get_reg_name_entry_by_ap_name(srv_ap_name); - if (rne == NULL) { - pthread_rwlock_unlock(&instance->reg_lock); + if (instance->state != IRMD_RUNNING) { pthread_rwlock_unlock(&instance->state_lock); - LOG_DBGF("AP %s is unknown.", srv_ap_name); return NULL; } - if (rne->api->id == 0) { - rne->api->id = pid; - } else if (rne->api->id != pid) { - pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_unlock(&instance->state_lock); - LOG_DBGF("Can only register one instance."); - LOG_MISSING; - return NULL; - } + pthread_rwlock_wrlock(&instance->reg_lock); - if (rne->accept) { + rne = get_reg_entry_by_ap_name(srv_ap_name); + if (rne == NULL) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - LOG_DBGF("This AP still has a pending accept()."); + LOG_ERR("AP %s is unknown.", srv_ap_name); return NULL; } - rne->accept = true; - rne->flow_arrived = -1; - - pthread_cond_broadcast(&rne->acc_signal); + if (!reg_entry_has_api(rne, pid)) { + rgi = registry_add_ap_instance(rne->name, pid); + if (rgi == NULL) { + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); + LOG_ERR("Failed to register instance %d with %s.", + pid,srv_ap_name); + return NULL; + } + LOG_INFO("New instance (%d) of %s added.", pid, srv_ap_name); + } pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - pthread_mutex_lock(&rne->acc_lock); - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &rne->acc_lock); + reg_instance_sleep(rgi); - while (rne->flow_arrived == -1) - pthread_cond_wait(&rne->acc_arr_signal, &rne->acc_lock); - - pthread_cleanup_pop(true); - - pthread_mutex_lock(&rne->acc_lock); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->reg_lock); + pthread_mutex_lock(&rne->state_lock); - /* ap with pending accept being unregistered */ - if (rne->flow_arrived == -2 ) { - pthread_mutex_unlock(&rne->acc_lock); + if (rne->state != REG_NAME_FLOW_ARRIVED) { + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); return NULL; } - pthread_mutex_unlock(&rne->acc_lock); + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); - pthread_rwlock_rdlock(&instance->state_lock); pthread_rwlock_rdlock(&instance->flows_lock); pme = get_port_map_entry_n(pid); @@ -849,6 +1323,8 @@ static struct port_map_entry * flow_accept(pid_t pid, return NULL; } + rne->req_ae_name = NULL; + if (dst_ae_name != NULL) *dst_ae_name = rne->req_ae_name; @@ -863,38 +1339,37 @@ static int flow_alloc_resp(pid_t n_pid, int response) { struct port_map_entry * pme = NULL; - struct reg_name_entry * rne = NULL; + struct reg_entry * rne = NULL; int ret = -1; pthread_rwlock_rdlock(&instance->state_lock); - pthread_rwlock_rdlock(&instance->reg_lock); - rne = get_reg_name_entry_by_id(n_pid); + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return -1; + } + + pthread_rwlock_wrlock(&instance->reg_lock); + + rne = get_reg_entry_by_ap_id(n_pid); if (rne == NULL) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); return -1; } - /* FIXME: check all instances associated with the name */ - if (!rne->accept) { + if (rne->state != REG_NAME_FLOW_ARRIVED) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - LOG_ERR("No process listening for this name."); + LOG_ERR("Process not listening for this name."); return -1; } - /* - * consider the flow as handled - * once we can handle a list of AP-I's, remove it from the list - */ - - pthread_mutex_lock(&rne->acc_lock); + pthread_mutex_lock(&rne->state_lock); - rne->accept = false; - rne->flow_arrived = -1; + registry_remove_ap_instance(rne->name, n_pid); - pthread_mutex_unlock(&rne->acc_lock); + pthread_mutex_unlock(&rne->state_lock); pthread_rwlock_unlock(&instance->reg_lock); @@ -909,14 +1384,12 @@ static int flow_alloc_resp(pid_t n_pid, } pme->state = FLOW_ALLOCATED; - pthread_rwlock_unlock(&instance->flows_lock); ret = ipcp_flow_alloc_resp(pme->n_1_pid, port_id, pme->n_pid, response); - } pthread_rwlock_unlock(&instance->state_lock); @@ -935,18 +1408,25 @@ static struct port_map_entry * flow_alloc(pid_t pid, /* FIXME: Map qos_spec to qos_cube */ + pthread_rwlock_rdlock(&instance->state_lock); + + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return NULL; + } + pme = port_map_entry_create(); if (pme == NULL) { - LOG_ERR("Failed malloc of port_map_entry."); + pthread_rwlock_unlock(&instance->state_lock); + LOG_ERR("Failed to create port_map_entry."); return NULL; } - pme->n_pid = pid; - pme->state = FLOW_PENDING; + pme->n_pid = pid; + pme->state = FLOW_PENDING; if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) LOG_WARN("Failed to set timestamp."); - pthread_rwlock_rdlock(&instance->state_lock); pthread_rwlock_rdlock(&instance->reg_lock); if (qos != NULL) @@ -956,7 +1436,7 @@ static struct port_map_entry * flow_alloc(pid_t pid, if (ipcp == NULL) { pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - LOG_DBG("Unknown DIF name."); + LOG_ERR("Unknown DIF name."); return NULL; } @@ -995,22 +1475,30 @@ static int flow_alloc_res(int port_id) struct port_map_entry * e; pthread_rwlock_rdlock(&instance->state_lock); + + if (instance->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&instance->state_lock); + return -1; + } pthread_rwlock_rdlock(&instance->flows_lock); e = get_port_map_entry(port_id); if (e == NULL) { + LOG_ERR("Could not find port %d.", port_id); pthread_rwlock_unlock(&instance->flows_lock); pthread_rwlock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_NULL) { + LOG_ERR("Port %d is deprecated.", port_id); pthread_rwlock_unlock(&instance->flows_lock); pthread_rwlock_unlock(&instance->state_lock); return -1; } if (e->state == FLOW_ALLOCATED) { + LOG_ERR("Port %d already allocated.", port_id); pthread_rwlock_unlock(&instance->flows_lock); pthread_rwlock_unlock(&instance->state_lock); return 0; @@ -1092,10 +1580,10 @@ static int flow_dealloc(int port_id) return ret; } -static int auto_execute(char * ap, char ** argv) +static int auto_execute(char ** argv) { pid_t pid; - LOG_INFO("Executing %s.", ap); + LOG_INFO("Executing %s.", argv[0]); pid = fork(); if (pid == -1) { LOG_ERR("Failed to fork"); @@ -1106,90 +1594,117 @@ static int auto_execute(char * ap, char ** argv) return pid; } - execv(ap, argv); + execv(argv[0], argv); - LOG_ERR("Failed to execute."); + LOG_ERR("Failed to execute %s.", argv[0]); exit(EXIT_FAILURE); - return 0; } static struct port_map_entry * flow_req_arr(pid_t pid, char * dst_name, char * ae_name) { - struct reg_name_entry * rne; - struct port_map_entry * pme; - bool acc_wait = true; + struct reg_entry * rne = NULL; + struct port_map_entry * pme = NULL; - pme = malloc(sizeof(*pme)); + bool acc_wait = true; + + pme = port_map_entry_create(); if (pme == NULL) { - LOG_ERR("Failed malloc of port_map_entry."); + LOG_ERR("Failed to create port_map_entry."); return NULL; } pme->state = FLOW_PENDING; pme->n_1_pid = pid; + if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) + LOG_WARN("Failed to set timestamp."); pthread_rwlock_rdlock(&instance->state_lock); pthread_rwlock_rdlock(&instance->reg_lock); - pthread_rwlock_wrlock(&instance->flows_lock); - - pme->port_id = bmp_allocate(instance->port_ids); - rne = get_reg_name_entry_by_name(dst_name); + rne = get_reg_entry_by_name(dst_name); if (rne == NULL) { - pthread_rwlock_unlock(&instance->flows_lock); pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); - LOG_DBGF("Destination name %s unknown.", dst_name); + LOG_ERR("Unknown name: %s.", dst_name); free(pme); return NULL; } - pme->n_pid = rne->api->id; + pthread_mutex_lock(&rne->state_lock); - list_add(&pme->next, &instance->port_map); + switch (rne->state) { + case REG_NAME_IDLE: + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); + LOG_ERR("No AP's for %s.", dst_name); + free(pme); + return NULL; + case REG_NAME_AUTO_ACCEPT: + rne->state = REG_NAME_AUTO_EXEC; + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); - pthread_rwlock_unlock(&instance->flows_lock); + if (auto_execute(registry_resolve_auto(rne)) < 0) { + free(pme); + return NULL; + } - pthread_mutex_lock(&rne->acc_lock); + pthread_mutex_lock(&rne->state_lock); + pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, + (void *) &rne->state_lock); - pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock, - (void *) &rne->acc_lock); + while (rne->state == REG_NAME_AUTO_EXEC) + pthread_cond_wait(&rne->acc_signal, + &rne->state_lock); - rne->req_ae_name = ae_name; + pthread_cleanup_pop(true); - if (rne->accept == false) { - if (rne->autoexec) { - pthread_rwlock_wrlock(&instance->flows_lock); - pme->n_pid = auto_execute(rne->api->name, rne->argv); - pthread_rwlock_unlock(&instance->flows_lock); - while (rne->accept == false) - pthread_cond_wait(&rne->acc_signal, - &rne->acc_lock); - } else { - pthread_mutex_unlock(&rne->acc_lock); - LOG_WARN("%s is not accepting flow allocations.", - rne->name); + pthread_rwlock_rdlock(&instance->state_lock); + pthread_rwlock_rdlock(&instance->reg_lock); + pthread_mutex_lock(&rne->state_lock); + case REG_NAME_FLOW_ACCEPT: + pme->n_pid = registry_resolve_api(rne); + if(pme->n_pid == 0) { + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + pthread_rwlock_unlock(&instance->state_lock); + LOG_ERR("Invalid pid returned."); return NULL; } + pthread_mutex_unlock(&rne->state_lock); + pthread_rwlock_unlock(&instance->reg_lock); + break; + default: + LOG_ERR("IRMd in wrong state."); + break; } - rne->flow_arrived = 0; + pthread_rwlock_wrlock(&instance->flows_lock); + pme->port_id = bmp_allocate(instance->port_ids); - if (pthread_cond_signal(&rne->acc_arr_signal)) - LOG_ERR("Failed to send signal."); + list_add(&pme->next, &instance->port_map); - pthread_cleanup_pop(true); + pthread_rwlock_unlock(&instance->flows_lock); + + rne->req_ae_name = ae_name; + + rne->state = REG_NAME_FLOW_ARRIVED; + + reg_instance_wake(reg_entry_get_reg_instance(rne, pme->n_pid)); + + pthread_mutex_unlock(&rne->state_lock); while (acc_wait) { - pthread_mutex_lock(&rne->acc_lock); - acc_wait = (rne->flow_arrived != -1); - pthread_mutex_unlock(&rne->acc_lock); + pthread_mutex_lock(&rne->state_lock); + acc_wait = (rne->state == REG_NAME_FLOW_ARRIVED); + pthread_mutex_unlock(&rne->state_lock); } - pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); return pme; @@ -1258,26 +1773,30 @@ static void irm_destroy() struct list_head * h; struct list_head * t; - pthread_rwlock_wrlock(&instance->state_lock); - pthread_rwlock_wrlock(&instance->reg_lock); + + pthread_rwlock_rdlock(&instance->state_lock); + + if (instance->state != IRMD_NULL) + LOG_WARN("Unsafe destroy."); if (instance->threadpool != NULL) free(instance->threadpool); + pthread_rwlock_wrlock(&instance->reg_lock); + if (instance->port_ids != NULL) bmp_destroy(instance->port_ids); /* clear the lists */ list_for_each_safe(h, t, &instance->ipcps) { struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); - destroy_ipcp(e->api); + list_del(&e->next); + ipcp_entry_destroy(e); } - list_for_each_safe(h, t, &instance->reg_names) { - struct reg_name_entry * e = list_entry(h, - struct reg_name_entry, - next); + list_for_each_safe(h, t, &instance->registry) { + struct reg_entry * e = list_entry(h, struct reg_entry, next); list_del(&e->next); - reg_name_entry_destroy(e); + reg_entry_destroy(e); } pthread_rwlock_unlock(&instance->reg_lock); @@ -1290,7 +1809,7 @@ static void irm_destroy() next); list_del(&e->next); - free(e); + port_map_entry_destroy(e); } pthread_rwlock_unlock(&instance->flows_lock); @@ -1316,7 +1835,7 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) case SIGHUP: pthread_rwlock_wrlock(&instance->state_lock); - + instance->state = IRMD_NULL; pthread_rwlock_unlock(&instance->state_lock); @@ -1343,6 +1862,10 @@ void * irm_flow_cleaner() struct timespec now; struct list_head * pos = NULL; struct list_head * n = NULL; + + struct list_head * pos2 = NULL; + struct list_head * n2 = NULL; + struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION, IRMD_CLEANUP_TIMER % BILLION}; @@ -1357,11 +1880,12 @@ void * irm_flow_cleaner() list_for_each_safe(pos, n, &(instance->port_map)) { struct port_map_entry * e = list_entry(pos, struct port_map_entry, next); + pthread_mutex_lock(&e->res_lock); if (e->state == FLOW_PENDING && ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) { - LOG_DBGF("Pending port_id %d timed out.", + LOG_INFO("Pending port_id %d timed out.", e->port_id); e->state = FLOW_NULL; pthread_cond_signal(&e->res_signal); @@ -1375,7 +1899,7 @@ void * irm_flow_cleaner() bmp_release(instance->port_ids, e->port_id); list_del(&e->next); - LOG_DBGF("Process %d gone, %d deallocated.", + LOG_INFO("Process %d gone, %d deallocated.", e->n_pid, e->port_id); ipcp_flow_dealloc(e->n_1_pid, e->port_id); free(e); @@ -1389,6 +1913,29 @@ void * irm_flow_cleaner() } pthread_rwlock_unlock(&instance->flows_lock); + + pthread_rwlock_wrlock(&instance->reg_lock); + + list_for_each_safe(pos, n, &(instance->registry)) { + struct reg_entry * e = + list_entry(pos, struct reg_entry, next); + + list_for_each_safe(pos2, n2, &e->ap_instances) { + struct reg_instance * r = + list_entry(pos2, + struct reg_instance, + next); + if (kill(r->pid, 0) < 0) { + LOG_INFO("Process %d gone, " + "instance deleted.", + r->pid); + registry_remove_ap_instance(e->name, + r->pid); + } + } + } + + pthread_rwlock_unlock(&instance->reg_lock); pthread_rwlock_unlock(&instance->state_lock); nanosleep(&timeout, NULL); @@ -1631,7 +2178,7 @@ static struct irm * irm_create() } INIT_LIST_HEAD(&instance->ipcps); - INIT_LIST_HEAD(&instance->reg_names); + INIT_LIST_HEAD(&instance->registry); INIT_LIST_HEAD(&instance->port_map); instance->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); @@ -1660,6 +2207,8 @@ static struct irm * irm_create() return NULL; } + instance->state = IRMD_RUNNING; + return instance; } |