diff options
Diffstat (limited to 'src/irmd')
| -rw-r--r-- | src/irmd/main.c | 764 | 
1 files changed, 574 insertions, 190 deletions
| diff --git a/src/irmd/main.c b/src/irmd/main.c index 67254feb..a6403612 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,6 +34,7 @@  #include <ouroboros/utils.h>  #include <ouroboros/dif_config.h>  #include <ouroboros/shm_du_map.h> +#include <ouroboros/bitmap.h>  #include <sys/socket.h>  #include <sys/un.h> @@ -42,41 +43,146 @@  #include <errno.h>  #include <string.h>  #include <limits.h> +#include <pthread.h>  /* FIXME: this smells like part of namespace management */  #define ALL_DIFS "*" +#ifndef IRMD_MAX_FLOWS +  #define IRMD_MAX_FLOWS 4096 +#endif + +#ifndef IRMD_THREADPOOL_SIZE +  #define IRMD_THREADPOOL_SIZE 3 +#endif + + + +enum flow_state { +        FLOW_NULL = 0, +        FLOW_PENDING, +        FLOW_ALLOCATED +}; +  struct ipcp_entry {          struct list_head  next;          instance_name_t * api;          char *            dif_name; + +        pthread_mutex_t   lock;  }; -/* currently supports only registering whatevercast groups of a single AP */ +/* currently supports only registering whatevercast groups of a single AP-I */  struct reg_name_entry {          struct list_head next;          /* generic whatevercast name */          char *             name; -        /* FIXME: resolve name instead */ +        /* FIXME: make a list resolve to AP-I instead */          instance_name_t  * api; -        uint32_t           reg_ap_id; + +        bool   accept; +        char * req_ap_name; +        char * req_ae_name; +        bool   flow_arrived; + +        pthread_mutex_t fa_lock; +}; + +/* keeps track of port_id's between N and N - 1 */ +struct port_map_entry { +        struct list_head next; + +        uint32_t port_id; + +        pid_t    n_pid; +        pid_t    n_1_pid; + +        enum flow_state state;  };  struct irm { -        /* FIXME: list of ipcps can be merged with registered names */ +        /* FIXME: list of ipcps could be merged with registered names */          struct list_head ipcps;          struct list_head reg_names; +        int sockfd; + +        /* keep track of all flows in this processing system */ +        struct bmp * port_ids; + +        /* maps port_ids to pid pair */ +        struct list_head port_map; +          struct shm_du_map * dum; -}; -struct irm * instance = NULL; +        pthread_t * threadpool; + +        pthread_mutex_t lock; +} * instance = NULL; -static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) +static struct port_map_entry * get_port_map_entry(uint32_t port_id) +{ +        struct list_head * pos = NULL; + +        list_for_each(pos, &instance->port_map) { +                struct port_map_entry * e = +                        list_entry(pos, struct port_map_entry, next); + +                if (e->port_id == port_id) +                        return e; +        } + +        return NULL; +} + +static struct port_map_entry * get_port_map_entry_n(pid_t n_pid) +{ +        struct list_head * pos = NULL; + +        list_for_each(pos, &instance->port_map) { +                struct port_map_entry * e = +                        list_entry(pos, struct port_map_entry, next); + +                if (e->n_pid == n_pid) +                        return e; +        } + +        return NULL; +} + +static struct ipcp_entry * ipcp_entry_create() +{ +        struct ipcp_entry * e = malloc(sizeof(*e)); +        if (e == NULL) +                return NULL; + +        e->api = NULL; +        e->dif_name = NULL; + +        INIT_LIST_HEAD(&e->next); +        pthread_mutex_init(&e->lock, NULL); + +        return e; +} + +static void ipcp_entry_destroy(struct ipcp_entry * e) +{ +        if (e == NULL) +                return; + +        if (e->api != NULL) +                instance_name_destroy(e->api); + +        if (e->dif_name != NULL) +                free(e->dif_name); + +        free(e); +} + +static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api)  { -        struct ipcp_entry * tmp = NULL;          struct list_head * pos = NULL;          list_for_each(pos, &instance->ipcps) { @@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)                          return tmp;          } -        return tmp; +        return NULL;  }  static instance_name_t * get_ipcp_by_name(char * ap_name) @@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create()          if (e == NULL)                  return NULL; -        e->reg_ap_id = rand() % INT_MAX; -        e->name  = NULL; +        e->name         = NULL; +        e->api          = NULL; +        e->accept       = false; +        e->req_ap_name  = NULL; +        e->req_ae_name  = NULL; +        e->flow_arrived = false; +        pthread_mutex_init(&e->fa_lock, NULL);          INIT_LIST_HEAD(&e->next);          return e; @@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create()  static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e,                                                     char *                  name, -                                                   instance_name_t       * api) +                                                   instance_name_t *       api)  {          if (e == NULL || name == NULL || api == NULL)                  return NULL; @@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)          free(e->name);          instance_name_destroy(e->api); + +        if (e->req_ap_name != NULL) +                free(e->req_ap_name); +        if (e->req_ae_name != NULL) +                free(e->req_ae_name); + +        free(e); +          return 0;  } -static struct reg_name_entry * find_reg_name_entry_by_name(char * name) +static struct reg_name_entry * get_reg_name_entry_by_name(char * name)  {          struct list_head * pos = NULL; @@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name)          return NULL;  } -static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) +static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid)  {          struct list_head * pos = NULL; @@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)                  struct reg_name_entry * e =                          list_entry(pos, struct reg_name_entry, next); -                if (reg_ap_id == e->reg_ap_id) +                if (e->api->id == pid)                          return e;          } @@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)  /* FIXME: add only name when we have NSM solved */  static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)  { -        struct reg_name_entry * e = find_reg_name_entry_by_name(name); +        struct reg_name_entry * e = get_reg_name_entry_by_name(name);          if (e == NULL) {                  e = reg_name_entry_create(); -                e = reg_name_entry_init(e, name, api); +                if (e == NULL) +                        return -1; + +                if (reg_name_entry_init(e, name, api) == NULL) { +                        reg_name_entry_destroy(e); +                        return -1; +                } +                  list_add(&e->next, &instance->reg_names);                  return 0;          } @@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)  static int reg_name_entry_del_name(char * name)  { -        struct reg_name_entry * e = find_reg_name_entry_by_name(name); +        struct reg_name_entry * e = get_reg_name_entry_by_name(name);          if (e == NULL)                  return 0; @@ -240,34 +366,38 @@ static pid_t create_ipcp(char *         ap_name,          pid = ipcp_create(ap_name, ipcp_type);          if (pid == -1) { -                LOG_ERR("Failed to create IPCP"); +                LOG_ERR("Failed to create IPCP.");                  return -1;          } -        tmp = malloc(sizeof(*tmp)); -        if (tmp == NULL) { +        tmp = ipcp_entry_create(); +        if (tmp == NULL)                  return -1; -        }          INIT_LIST_HEAD(&tmp->next);          tmp->api = instance_name_create();          if (tmp->api == NULL) { -                free(tmp); +                ipcp_entry_destroy(tmp);                  return -1;          }          if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {                  instance_name_destroy(tmp->api); -                free(tmp); +                ipcp_entry_destroy(tmp);                  return -1;          }          tmp->dif_name = NULL; -        LOG_DBG("Created IPC process with pid %d", pid); +        pthread_mutex_lock(&instance->lock);          list_add(&tmp->next, &instance->ipcps); + +        pthread_mutex_unlock(&instance->lock); + +        LOG_INFO("Created IPCP %s-%d ", ap_name, pid); +          return pid;  } @@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api)          struct list_head * pos = NULL;          struct list_head * n = NULL; +        if (api == NULL) +                return 0; +          if (api->id == 0)                  api = get_ipcp_by_name(api->name);          if (api == NULL) {                  LOG_ERR("No such IPCP in the system."); -                return -1; +                return 0;          } -        LOG_DBG("Destroying ipcp %s-%d", api->name, api->id); -          if (ipcp_destroy(api->id)) -                LOG_ERR("Could not destroy IPCP"); +                LOG_ERR("Could not destroy IPCP.");          list_for_each_safe(pos, n, &(instance->ipcps)) {                  struct ipcp_entry * tmp = @@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api)                  if (instance_name_cmp(api, tmp->api) == 0)                          list_del(&tmp->next); + +                ipcp_entry_destroy(tmp);          } +        LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); +          return 0;  } @@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t *  api,                  return -1;          } -        entry = find_ipcp_entry_by_name(api); +        entry = get_ipcp_entry_by_name(api);          if (entry == NULL) { -                LOG_ERR("No such IPCP"); +                LOG_ERR("No such IPCP.");                  return -1;          }          entry->dif_name = strdup(conf->dif_name);          if (entry->dif_name == NULL) { -                LOG_ERR("Failed to strdup"); +                LOG_ERR("Failed to strdup.");                  return -1;          }          if (ipcp_bootstrap(entry->api->id, conf)) { -                LOG_ERR("Could not bootstrap IPCP"); +                LOG_ERR("Could not bootstrap IPCP.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          } +        LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", +                 api->name, api->id, conf->dif_name); +          return 0;  } @@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t  * api,          ssize_t n_1_difs_size = 0;          struct ipcp_entry * entry = NULL; -        entry = find_ipcp_entry_by_name(api); +        entry = get_ipcp_entry_by_name(api);          if (entry == NULL) { -                LOG_ERR("No such IPCP"); +                LOG_ERR("No such IPCP.");                  return -1;          }          entry->dif_name = strdup(dif_name);          if (entry->dif_name == NULL) { -                LOG_ERR("Failed to strdup"); +                LOG_ERR("Failed to strdup.");                  return -1;          }          member = da_resolve_daf(dif_name);          if (member == NULL) { -                LOG_ERR("Could not find a member of that DIF"); +                LOG_ERR("Could not find a member of that DIF.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1; @@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t  * api,          n_1_difs_size = da_resolve_dap(member, n_1_difs);          if (n_1_difs_size < 1) { -                LOG_ERR("Could not find N-1 DIFs"); +                LOG_ERR("Could not find N-1 DIFs.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          }          if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) { -                LOG_ERR("Could not enroll IPCP"); +                LOG_ERR("Could not enroll IPCP.");                  free(entry->dif_name);                  entry->dif_name = NULL;                  return -1;          } +        LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", +                 api->name, api->id, dif_name); +          return 0;  } @@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api,                      size_t            difs_size)  {          if (ipcp_reg(api->id, difs, difs_size)) { -                LOG_ERR("Could not register IPCP to N-1 DIF(s)"); +                LOG_ERR("Could not register IPCP to N-1 DIF(s).");                  return -1;          } @@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t  * api,  {          if (ipcp_unreg(api->id, difs, difs_size)) { -                LOG_ERR("Could not unregister IPCP from N-1 DIF(s)"); +                LOG_ERR("Could not unregister IPCP from N-1 DIF(s).");                  return -1;          }          return 0;  } -static int ap_unreg_id(uint32_t reg_ap_id, -                       pid_t    pid, +static int ap_unreg_id(pid_t    pid,                         char **  difs,                         size_t   len)  {          int i;          int ret = 0; -        struct reg_name_entry * rne    = NULL; -        struct list_head      * pos  = NULL; +        struct reg_name_entry * rne = NULL; +        struct list_head      * pos = NULL; -        rne = find_reg_name_entry_by_id(reg_ap_id); +        rne = get_reg_name_entry_by_id(pid);          if (rne == NULL)                  return 0; /* no such id */ @@ -458,7 +598,6 @@ static int ap_reg(char *  ap_name,  {          int i;          int ret = 0; -        int reg_ap_id = 0;          struct list_head * pos = NULL;          struct reg_name_entry * rne = NULL; @@ -466,18 +605,18 @@ static int ap_reg(char *  ap_name,          instance_name_t * ipcpi = NULL;          if (instance->ipcps.next == NULL) -                LOG_ERR("No IPCPs in this system."); +                return -1;          /* check if this ap_name is already registered */ -        rne = find_reg_name_entry_by_name(ap_name); +        rne = get_reg_name_entry_by_name(ap_name);          if (rne != NULL)                  return -1; /* can only register one instance for now */ -        rne = reg_name_entry_create(); -        if (rne == NULL) +        api = instance_name_create(); +        if (api == NULL) {                  return -1; +        } -        api = instance_name_create();          if (instance_name_init_from(api, ap_name, ap_id) == NULL) {                  instance_name_destroy(api);                  return -1; @@ -488,12 +627,6 @@ static int ap_reg(char *  ap_name,           * contains a single instance only           */ -        if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) { -                reg_name_entry_destroy(rne); -                instance_name_destroy(api); -                return -1; -        } -          if (strcmp(difs[0], ALL_DIFS) == 0) {                  list_for_each(pos, &instance->ipcps) {                          struct ipcp_entry * e = @@ -528,11 +661,10 @@ static int ap_reg(char *  ap_name,                  return -1;          }          /* for now, we register single instances */ -        reg_name_entry_add_name_instance(strdup(ap_name), -                                         instance_name_dup(api)); -        instance_name_destroy(api); +        ret = reg_name_entry_add_name_instance(strdup(ap_name), +                                               api); -        return reg_ap_id; +        return ret;  }  static int ap_unreg(char *  ap_name, @@ -542,149 +674,304 @@ static int ap_unreg(char *  ap_name,  {          struct reg_name_entry * tmp = NULL; -        instance_name_t * api = instance_name_create(); -        if (api == NULL) -                return -1; - -        if (instance_name_init_from(api, ap_name, ap_id) == NULL) { -                instance_name_destroy(api); -                return -1; -        } -          /* check if ap_name is registered */ -        tmp = find_reg_name_entry_by_name(api->name); -        if (tmp == NULL) { -                instance_name_destroy(api); +        tmp = get_reg_name_entry_by_id(ap_id); +        if (tmp == NULL)                  return 0; -        } else { -                return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len); -        } -} +        if (strcmp(ap_name, tmp->api->name)) +                return 0; -static int flow_accept(int fd, -                       pid_t pid, -                       char * ap_name, -                       char * ae_name) -{ -        return -1; +        return ap_unreg_id(ap_id, difs, len);  } -static int flow_alloc_resp(int fd, -                           int result) +static struct port_map_entry * flow_accept(pid_t    pid, +                                           char **  ap_name, +                                           char **  ae_name)  { -        return -1; +        bool arrived = false; + +        struct timespec         ts = {0, 100000}; + +        struct port_map_entry * pme; +        struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); +        if (rne == NULL) { +                LOG_DBGF("Unregistered AP calling accept()."); +                return NULL; +        } + +        if (rne->accept) { +                LOG_DBGF("This AP still has a pending accept()."); +                return NULL; +        } + +        rne->accept = true; + +        /* FIXME: wait for a thread that runs select() on flow_arrived */ +        while (!arrived) { +                /* FIXME: this needs locking */ +                rne = get_reg_name_entry_by_id(pid); +                if (rne == NULL) +                        return NULL; +                arrived = rne->flow_arrived; +                nanosleep(&ts, NULL); +        } + +        pme = get_port_map_entry_n(pid); +        if (pme == NULL) { +                LOG_ERR("Port_id was not created yet."); +                return NULL; +        } + +        pthread_mutex_lock(&rne->fa_lock); +        *ap_name = rne->req_ap_name; +        if (ae_name != NULL) +                *ae_name = rne->req_ae_name; +        pthread_mutex_unlock(&rne->fa_lock); + +        return pme;  } -static int flow_alloc(char * dst_name, -                      char * src_ap_name, -                      char * src_ae_name, -                      struct qos_spec * qos, -                      int oflags) +static int flow_alloc_resp(pid_t     n_pid, +                           uint32_t  port_id, +                           int       response)  { -        int   port_id = 0; -        pid_t pid     = get_ipcp_by_dst_name(dst_name)->id; +        struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); +        struct port_map_entry * pme = get_port_map_entry(port_id); -        LOG_DBG("flow alloc received from %s-%s to %s.", -                 src_ap_name, src_ae_name, dst_name); +        if (rne == NULL || pme == NULL) +                return -1; -        return ipcp_flow_alloc(pid, -                               port_id, -                               dst_name, -                               src_ap_name, -                               src_ae_name, -                               qos); +        /* FIXME: check all instances associated with the name */ +        if (!rne->accept) { +                LOG_ERR("No process listening for this name."); +                return -1; +        } + +        /* +         * consider the flow as handled +         * once we can handle a list of AP-I's, remove it from the list +         */ + +        rne->flow_arrived = false; +        rne->accept       = false; + +        if (!response) +                pme->state = FLOW_ALLOCATED; + +        return ipcp_flow_alloc_resp(pme->n_1_pid, +                                    port_id, +                                    pme->n_pid, +                                    response);  } -static int flow_alloc_res(int fd) +static struct port_map_entry * flow_alloc(pid_t  pid, +                                          char * dst_name, +                                          char * src_ap_name, +                                          char * src_ae_name, +                                          struct qos_spec * qos)  { +        struct port_map_entry * e = malloc(sizeof(*e)); +        if (e == NULL) { +                LOG_ERR("Failed malloc of port_map_entry."); +                return NULL; +        } -        return -1; +        e->port_id = bmp_allocate(instance->port_ids); +        e->n_pid   = pid; +        e->state   = FLOW_PENDING; +        e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + +        list_add(&e->next, &instance->port_map); + +        if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, +                            e->port_id, +                            e->n_pid, +                            dst_name, +                            src_ap_name, +                            src_ae_name, +                            qos) < 0) { +                list_del(&e->next); +                bmp_release(instance->port_ids, e->port_id); +                free(e); +                return NULL; +        } + +        return e;  } -static int flow_dealloc(int fd) +static int flow_alloc_res(uint32_t port_id)  { -        return -1; +        bool allocated = false; +        struct port_map_entry * e; +        struct timespec ts = {0,100000}; + +        while (!allocated) { +                /* FIXME: this needs locking */ +                e = get_port_map_entry(port_id); +                if (e == NULL) { +                        LOG_DBGF("Could not locate port_id %u", port_id); +                        return -1; +                } +                if (e->state == FLOW_ALLOCATED) +                        allocated = true; +                nanosleep(&ts, NULL); +        } + +        return 0;  } -static int flow_cntl(int fd, -                     int oflags) +static int flow_dealloc(uint32_t port_id)  { -        return -1; +        pid_t    n_1_pid; + +        struct port_map_entry * e = get_port_map_entry(port_id); +        if (e == NULL) +                return 0; + +        n_1_pid = e->n_1_pid; + +        list_del(&e->next); +        free(e); + +        return ipcp_flow_dealloc(n_1_pid, port_id);  } -static int flow_req_arr(char * dst_name, -                        char * ap_name, -                        char * ae_name) +static struct port_map_entry * flow_req_arr(pid_t  pid, +                                            char * dst_name, +                                            char * ap_name, +                                            char * ae_name)  { -        return -1; +        struct reg_name_entry * rne; +        struct port_map_entry * pme; + +        rne = get_reg_name_entry_by_name(dst_name); +        if (rne == NULL) { +                LOG_DBGF("Destination name %s unknown.", dst_name); +                return NULL; +        } + +        pme = malloc(sizeof(*pme)); +        if (pme == NULL) { +                LOG_ERR("Failed malloc of port_map_entry."); +                return NULL; +        } + +        pme->port_id = bmp_allocate(instance->port_ids); +        pme->n_pid   = rne->api->id; +        pme->state   = FLOW_PENDING; +        pme->n_1_pid = pid; + +        list_add(&pme->next, &instance->port_map); + +        pthread_mutex_lock(&rne->fa_lock); + +        rne->req_ap_name = strdup(ap_name); +        rne->req_ae_name = strdup(ae_name); + +        rne->flow_arrived = true; + +        pthread_mutex_unlock(&rne->fa_lock); + +        return pme;  }  static int flow_alloc_reply(uint32_t port_id, -                            int      result) +                            int      response)  { -        return -1; +        struct port_map_entry * e; + +        /* FIXME: do this under lock */ +        if (!response) { +                e = get_port_map_entry(port_id); +                if (e == NULL) +                        return -1; +                e->state = FLOW_ALLOCATED; +        } + +        /* FIXME: does this need to be propagated to the IPCP? */ + +        return 0;  }  static int flow_dealloc_ipcp(uint32_t port_id)  { -        return -1; +        struct port_map_entry * e = get_port_map_entry(port_id); +        if (e == NULL) +                return 0; + +        list_del(&e->next); +        free(e); + +        return 0; +} + +static void irm_destroy(struct irm * irm) +{ +        struct list_head * h; +        struct list_head * t; + +        if (irm == NULL) +                return; + +        if (irm->threadpool != NULL) +                free(irm->threadpool); + +        if (irm->port_ids != NULL) +                bmp_destroy(irm->port_ids); +        /* clear the lists */ +        list_for_each_safe(h, t, &irm->ipcps) { +                struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); +                destroy_ipcp(e->api); +        } + +        list_for_each_safe(h, t, &irm->reg_names) { +                struct reg_name_entry * e = list_entry(h, +                                                       struct reg_name_entry, +                                                       next); +                char * difs [1] = {ALL_DIFS}; +                ap_unreg_id(e->api->id, difs, 1); +        } + +        list_for_each_safe(h, t, &irm->port_map) { +                struct port_map_entry * e = list_entry(h, +                                                       struct port_map_entry, +                                                       next); +                list_del(&e->next); +                free(e); +        } + +        if (irm->dum != NULL) +                shm_du_map_destroy(irm->dum); + +        close(irm->sockfd); +        free(irm);  }  void irmd_sig_handler(int sig, siginfo_t * info, void * c)  { +        int i; +          switch(sig) {          case SIGINT:          case SIGTERM:          case SIGHUP: -                shm_du_map_close(instance->dum); -                free(instance); -                exit(0); +                if (instance->threadpool != NULL) { +                        for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) +                                pthread_cancel(instance->threadpool[i]); +                } + +        case SIGPIPE: +                LOG_DBG("Ignoring SIGPIPE.");          default:                  return;          }  } -int main() +void * mainloop()  { -        int     sockfd;          uint8_t buf[IRM_MSG_BUF_SIZE]; -        struct sigaction sig_act; - -        /* init sig_act */ -        memset(&sig_act, 0, sizeof sig_act); - -        /* install signal traps */ -        sig_act.sa_sigaction = &irmd_sig_handler; -        sig_act.sa_flags     = SA_SIGINFO; - -        sigaction(SIGINT,  &sig_act, NULL); -        sigaction(SIGTERM, &sig_act, NULL); -        sigaction(SIGHUP,  &sig_act, NULL); - -        if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) -                unlink("/dev/shm/" SHM_DU_MAP_FILENAME); - -        instance = malloc(sizeof(*instance)); -        if (instance == NULL) -                return -1; - -        if ((instance->dum = shm_du_map_create()) == NULL) { -                free(instance); -                return -1; -        } - -        INIT_LIST_HEAD(&instance->ipcps); -        INIT_LIST_HEAD(&instance->reg_names); - -        sockfd = server_socket_open(IRM_SOCK_PATH); -        if (sockfd < 0) { -                shm_du_map_close(instance->dum); -                free(instance); -                return -1; -        } -          while (true) {                  int cli_sockfd;                  irm_msg_t * msg; @@ -692,18 +979,19 @@ int main()                  instance_name_t api;                  buffer_t buffer;                  irm_msg_t ret_msg = IRM_MSG__INIT; +                struct port_map_entry * e = NULL;                  ret_msg.code = IRM_MSG_CODE__IRM_REPLY; -                cli_sockfd = accept(sockfd, 0, 0); +                cli_sockfd = accept(instance->sockfd, 0, 0);                  if (cli_sockfd < 0) { -                        LOG_ERR("Cannot accept new connection"); +                        LOG_ERR("Cannot accept new connection.");                          continue;                  }                  count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE);                  if (count <= 0) { -                        LOG_ERR("Failed to read from socket"); +                        LOG_ERR("Failed to read from socket.");                          close(cli_sockfd);                          continue;                  } @@ -750,11 +1038,11 @@ int main()                                                      msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_AP_REG: -                        ret_msg.has_fd = true; -                        ret_msg.fd = ap_reg(msg->ap_name, -                                            msg->pid, -                                            msg->dif_name, -                                            msg->n_dif_name); +                        ret_msg.has_result = true; +                        ret_msg.result = ap_reg(msg->ap_name, +                                                msg->pid, +                                                msg->dif_name, +                                                msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_AP_UNREG:                          ret_msg.has_result = true; @@ -764,43 +1052,57 @@ int main()                                                    msg->n_dif_name);                          break;                  case IRM_MSG_CODE__IRM_FLOW_ACCEPT: -                        ret_msg.has_fd = true; -                        ret_msg.fd = flow_accept(msg->fd, -                                                 msg->pid, -                                                 ret_msg.ap_name, -                                                 ret_msg.ae_name); +                        e = flow_accept(msg->pid, +                                        &ret_msg.ap_name, +                                        &ret_msg.ae_name); +                        if (e == NULL) +                                break; + +                        ret_msg.has_port_id = true; +                        ret_msg.port_id     = e->port_id; +                        ret_msg.has_pid     = true; +                        ret_msg.pid         = e->n_1_pid;                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:                          ret_msg.has_result = true; -                        ret_msg.result = flow_alloc_resp(msg->fd, -                                                         msg->result); +                        ret_msg.result = flow_alloc_resp(msg->pid, +                                                         msg->port_id, +                                                         msg->response);                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC: -                        ret_msg.has_fd = true; -                        ret_msg.fd = flow_alloc(msg->dst_name, -                                                msg->ap_name, -                                                msg->ae_name, -                                                NULL, -                                                msg->oflags); +                        e = flow_alloc(msg->pid, +                                       msg->dst_name, +                                       msg->ap_name, +                                       msg->ae_name, +                                       NULL); +                        if (e == NULL) +                                break; + +                        ret_msg.has_port_id = true; +                        ret_msg.port_id     = e->port_id; +                        ret_msg.has_pid     = true; +                        ret_msg.pid         = e->n_1_pid;                          break;                  case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: -                        ret_msg.has_response = true; -                        ret_msg.response = flow_alloc_res(msg->fd); -                        break; -                case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true; -                        ret_msg.result = flow_dealloc(msg->fd); +                        ret_msg.result = flow_alloc_res(msg->port_id);                          break; -                case IRM_MSG_CODE__IRM_FLOW_CONTROL: +                case IRM_MSG_CODE__IRM_FLOW_DEALLOC:                          ret_msg.has_result = true; -                        ret_msg.result = flow_cntl(msg->fd, -                                                   msg->oflags); +                        ret_msg.result = flow_dealloc(msg->port_id);                          break;                  case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: +                        e = flow_req_arr(msg->pid, +                                         msg->dst_name, +                                         msg->ap_name, +                                         msg->ae_name); +                        if (e == NULL) +                                break; +                          ret_msg.has_port_id = true; -                        ret_msg.port_id = flow_req_arr(msg->dst_name, -                                                       msg->ap_name, -                                                       msg->ae_name); +                        ret_msg.port_id     = e->port_id; +                        ret_msg.has_pid     = true; +                        ret_msg.pid         = e->n_pid;                          break;                  case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:                          ret_msg.has_result = true; @@ -812,7 +1114,7 @@ int main()                          ret_msg.result = flow_dealloc_ipcp(msg->port_id);                          break;                  default: -                        LOG_ERR("Don't know that message code"); +                        LOG_ERR("Don't know that message code.");                          break;                  } @@ -820,7 +1122,7 @@ int main()                  buffer.size = irm_msg__get_packed_size(&ret_msg);                  if (buffer.size == 0) { -                        LOG_ERR("Failed to send reply message"); +                        LOG_ERR("Failed to send reply message.");                          close(cli_sockfd);                          continue;                  } @@ -842,6 +1144,88 @@ int main()                  free(buffer.data);                  close(cli_sockfd);          } +} + +static struct irm * irm_create() +{ +        struct irm * i = malloc(sizeof(*i)); +        if (i == NULL) +                return NULL; + +        if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) +                unlink("/dev/shm/" SHM_DU_MAP_FILENAME); + +        i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); +        if (i->threadpool == NULL) { +                irm_destroy(i); +                return NULL; +        } + +        if ((i->dum = shm_du_map_create()) == NULL) { +                irm_destroy(i); +                return NULL; +        } + +        INIT_LIST_HEAD(&i->ipcps); +        INIT_LIST_HEAD(&i->reg_names); +        INIT_LIST_HEAD(&i->port_map); + +        i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); +        if (i->port_ids == NULL) { +                irm_destroy(i); +                return NULL; +        } + +        i->sockfd = server_socket_open(IRM_SOCK_PATH); +        if (i->sockfd < 0) { +                irm_destroy(i); +                return NULL; +        } + +        pthread_mutex_init(&i->lock, NULL); + +        return i; +} + +int main() +{ +        struct sigaction sig_act; + +        int t = 0; + +        /* init sig_act */ +        memset(&sig_act, 0, sizeof sig_act); + +        /* install signal traps */ +        sig_act.sa_sigaction = &irmd_sig_handler; +        sig_act.sa_flags     = SA_SIGINFO; + +        if (sigaction(SIGINT,  &sig_act, NULL) < 0) +                exit(1); +        if (sigaction(SIGTERM, &sig_act, NULL) < 0) +                exit(1); +        if (sigaction(SIGHUP,  &sig_act, NULL) < 0) +                exit(1); +        if (sigaction(SIGPIPE, &sig_act, NULL) < 0) +                exit(1); + +        instance = irm_create(); +        if (instance == NULL) +                return 1; + +        /* +         * FIXME: we need a main loop that delegates messages to subthreads in a +         * way that avoids all possible deadlocks for local apps +         */ + +        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) +                pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + +        /* wait for (all of them) to return */ +        for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) +                pthread_join(instance->threadpool[t], NULL); + +        irm_destroy(instance);          return 0;  } | 
