summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-07 16:11:09 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-05-07 16:11:09 +0200
commiteb9f44379d5316e7f7e9311d7a66d2041eca743a (patch)
tree2489605a42bb2c9582c0c4e912c2de0c40512b2a /src/irmd/main.c
parentde8f2015cbd015b1cced366cb12c054be62c23b1 (diff)
downloadouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.tar.gz
ouroboros-eb9f44379d5316e7f7e9311d7a66d2041eca743a.zip
irmd: flow allocation and fast path
This commit has a first implementation of flow allocation (the "slow path") and read/write (the "fast path") for ouroboros. It provides basic but unstable communications over the shared memory. It required a lot of changes all over the stack, and fixes a number of previously undetected issues. This PR still need heavy revision regarding data model, locking and cleanup. lib/dev: modifications to the API. It now uses an ap_init() call to set the AP name and sets the Instance ID to the pid of the process. It also binds the AP to the shared memory and creates tables for mappings in the fast path. A call to ap_fini() releases the resources. lib/shm_ap_rbuff: added ring buffer for data exchange between processes in the fast path. It passes an index in the shm_du_map. lib/shm_du_map: rewrote API to work with calls from dev.c. Garbage collector added. Tests updated to new API. ipcpd/ipcp-data: removed everything related to flows, as these are universal for all ap's and kept in ap_data (dev.c), or similar structs for shim ipcps. shim-udp: added flow allocator and read/write functions and shm elements. irmd: revised data model and structures necessary for flow allocation. tools: echo updated to new dev.h API. messaging system was updated to comply with new flow allocation messages. All exchanges use pid and port_id to bootstrap the fast path.
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c760
1 files changed, 570 insertions, 190 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 67254feb..75b8506e 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -34,6 +34,7 @@
#include <ouroboros/utils.h>
#include <ouroboros/dif_config.h>
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/bitmap.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -42,41 +43,146 @@
#include <errno.h>
#include <string.h>
#include <limits.h>
+#include <pthread.h>
/* FIXME: this smells like part of namespace management */
#define ALL_DIFS "*"
+#ifndef IRMD_MAX_FLOWS
+ #define IRMD_MAX_FLOWS 4096
+#endif
+
+#ifndef IRMD_THREADPOOL_SIZE
+ #define IRMD_THREADPOOL_SIZE 10
+#endif
+
+
+
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_PENDING,
+ FLOW_ALLOCATED
+};
+
struct ipcp_entry {
struct list_head next;
instance_name_t * api;
char * dif_name;
+
+ pthread_mutex_t lock;
};
-/* currently supports only registering whatevercast groups of a single AP */
+/* currently supports only registering whatevercast groups of a single AP-I */
struct reg_name_entry {
struct list_head next;
/* generic whatevercast name */
char * name;
- /* FIXME: resolve name instead */
+ /* FIXME: make a list resolve to AP-I instead */
instance_name_t * api;
- uint32_t reg_ap_id;
+
+ bool accept;
+ char * req_ap_name;
+ char * req_ae_name;
+ bool flow_arrived;
+
+ pthread_mutex_t fa_lock;
+};
+
+/* keeps track of port_id's between N and N - 1 */
+struct port_map_entry {
+ struct list_head next;
+
+ uint32_t port_id;
+
+ pid_t n_pid;
+ pid_t n_1_pid;
+
+ enum flow_state state;
};
struct irm {
- /* FIXME: list of ipcps can be merged with registered names */
+ /* FIXME: list of ipcps could be merged with registered names */
struct list_head ipcps;
struct list_head reg_names;
+ int sockfd;
+
+ /* keep track of all flows in this processing system */
+ struct bmp * port_ids;
+
+ /* maps port_ids to pid pair */
+ struct list_head port_map;
+
struct shm_du_map * dum;
-};
-struct irm * instance = NULL;
+ pthread_t * threadpool;
+
+ pthread_mutex_t lock;
+} * instance = NULL;
-static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)
+static struct port_map_entry * get_port_map_entry(uint32_t port_id)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &instance->port_map) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ if (e->port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct port_map_entry * get_port_map_entry_n(pid_t n_pid)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &instance->port_map) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ if (e->n_pid == n_pid)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct ipcp_entry * ipcp_entry_create()
+{
+ struct ipcp_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->api = NULL;
+ e->dif_name = NULL;
+
+ INIT_LIST_HEAD(&e->next);
+ pthread_mutex_init(&e->lock, NULL);
+
+ return e;
+}
+
+static void ipcp_entry_destroy(struct ipcp_entry * e)
+{
+ if (e == NULL)
+ return;
+
+ if (e->api != NULL)
+ instance_name_destroy(e->api);
+
+ if (e->dif_name != NULL)
+ free(e->dif_name);
+
+ free(e);
+}
+
+static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api)
{
- struct ipcp_entry * tmp = NULL;
struct list_head * pos = NULL;
list_for_each(pos, &instance->ipcps) {
@@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)
return tmp;
}
- return tmp;
+ return NULL;
}
static instance_name_t * get_ipcp_by_name(char * ap_name)
@@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create()
if (e == NULL)
return NULL;
- e->reg_ap_id = rand() % INT_MAX;
- e->name = NULL;
+ e->name = NULL;
+ e->api = NULL;
+ e->accept = false;
+ e->req_ap_name = NULL;
+ e->req_ae_name = NULL;
+ e->flow_arrived = false;
+ pthread_mutex_init(&e->fa_lock, NULL);
INIT_LIST_HEAD(&e->next);
return e;
@@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create()
static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e,
char * name,
- instance_name_t * api)
+ instance_name_t * api)
{
if (e == NULL || name == NULL || api == NULL)
return NULL;
@@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)
free(e->name);
instance_name_destroy(e->api);
+
+ if (e->req_ap_name != NULL)
+ free(e->req_ap_name);
+ if (e->req_ae_name != NULL)
+ free(e->req_ae_name);
+
+ free(e);
+
return 0;
}
-static struct reg_name_entry * find_reg_name_entry_by_name(char * name)
+static struct reg_name_entry * get_reg_name_entry_by_name(char * name)
{
struct list_head * pos = NULL;
@@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name)
return NULL;
}
-static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
+static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid)
{
struct list_head * pos = NULL;
@@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
struct reg_name_entry * e =
list_entry(pos, struct reg_name_entry, next);
- if (reg_ap_id == e->reg_ap_id)
+ if (e->api->id == pid)
return e;
}
@@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
/* FIXME: add only name when we have NSM solved */
static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)
{
- struct reg_name_entry * e = find_reg_name_entry_by_name(name);
+ struct reg_name_entry * e = get_reg_name_entry_by_name(name);
if (e == NULL) {
e = reg_name_entry_create();
- e = reg_name_entry_init(e, name, api);
+ if (e == NULL)
+ return -1;
+
+ if (reg_name_entry_init(e, name, api) == NULL) {
+ reg_name_entry_destroy(e);
+ return -1;
+ }
+
list_add(&e->next, &instance->reg_names);
return 0;
}
@@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)
static int reg_name_entry_del_name(char * name)
{
- struct reg_name_entry * e = find_reg_name_entry_by_name(name);
+ struct reg_name_entry * e = get_reg_name_entry_by_name(name);
if (e == NULL)
return 0;
@@ -240,34 +366,38 @@ static pid_t create_ipcp(char * ap_name,
pid = ipcp_create(ap_name, ipcp_type);
if (pid == -1) {
- LOG_ERR("Failed to create IPCP");
+ LOG_ERR("Failed to create IPCP.");
return -1;
}
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL) {
+ tmp = ipcp_entry_create();
+ if (tmp == NULL)
return -1;
- }
INIT_LIST_HEAD(&tmp->next);
tmp->api = instance_name_create();
if (tmp->api == NULL) {
- free(tmp);
+ ipcp_entry_destroy(tmp);
return -1;
}
if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {
instance_name_destroy(tmp->api);
- free(tmp);
+ ipcp_entry_destroy(tmp);
return -1;
}
tmp->dif_name = NULL;
- LOG_DBG("Created IPC process with pid %d", pid);
+ pthread_mutex_lock(&instance->lock);
list_add(&tmp->next, &instance->ipcps);
+
+ pthread_mutex_unlock(&instance->lock);
+
+ LOG_INFO("Created IPCP %s-%d ", ap_name, pid);
+
return pid;
}
@@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api)
struct list_head * pos = NULL;
struct list_head * n = NULL;
+ if (api == NULL)
+ return 0;
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
LOG_ERR("No such IPCP in the system.");
- return -1;
+ return 0;
}
- LOG_DBG("Destroying ipcp %s-%d", api->name, api->id);
-
if (ipcp_destroy(api->id))
- LOG_ERR("Could not destroy IPCP");
+ LOG_ERR("Could not destroy IPCP.");
list_for_each_safe(pos, n, &(instance->ipcps)) {
struct ipcp_entry * tmp =
@@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api)
if (instance_name_cmp(api, tmp->api) == 0)
list_del(&tmp->next);
+
+ ipcp_entry_destroy(tmp);
}
+ LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id);
+
return 0;
}
@@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t * api,
return -1;
}
- entry = find_ipcp_entry_by_name(api);
+ entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- LOG_ERR("No such IPCP");
+ LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
- LOG_ERR("Failed to strdup");
+ LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
- LOG_ERR("Could not bootstrap IPCP");
+ LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ LOG_INFO("Bootstrapped IPCP %s-%d. in DIF %s",
+ api->name, api->id, conf->dif_name);
+
return 0;
}
@@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
- entry = find_ipcp_entry_by_name(api);
+ entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- LOG_ERR("No such IPCP");
+ LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
- LOG_ERR("Failed to strdup");
+ LOG_ERR("Failed to strdup.");
return -1;
}
member = da_resolve_daf(dif_name);
if (member == NULL) {
- LOG_ERR("Could not find a member of that DIF");
+ LOG_ERR("Could not find a member of that DIF.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
@@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t * api,
n_1_difs_size = da_resolve_dap(member, n_1_difs);
if (n_1_difs_size < 1) {
- LOG_ERR("Could not find N-1 DIFs");
+ LOG_ERR("Could not find N-1 DIFs.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) {
- LOG_ERR("Could not enroll IPCP");
+ LOG_ERR("Could not enroll IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",
+ api->name, api->id, dif_name);
+
return 0;
}
@@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api,
size_t difs_size)
{
if (ipcp_reg(api->id, difs, difs_size)) {
- LOG_ERR("Could not register IPCP to N-1 DIF(s)");
+ LOG_ERR("Could not register IPCP to N-1 DIF(s).");
return -1;
}
@@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t * api,
{
if (ipcp_unreg(api->id, difs, difs_size)) {
- LOG_ERR("Could not unregister IPCP from N-1 DIF(s)");
+ LOG_ERR("Could not unregister IPCP from N-1 DIF(s).");
return -1;
}
return 0;
}
-static int ap_unreg_id(uint32_t reg_ap_id,
- pid_t pid,
+static int ap_unreg_id(pid_t pid,
char ** difs,
size_t len)
{
int i;
int ret = 0;
- struct reg_name_entry * rne = NULL;
- struct list_head * pos = NULL;
+ struct reg_name_entry * rne = NULL;
+ struct list_head * pos = NULL;
- rne = find_reg_name_entry_by_id(reg_ap_id);
+ rne = get_reg_name_entry_by_id(pid);
if (rne == NULL)
return 0; /* no such id */
@@ -458,7 +598,6 @@ static int ap_reg(char * ap_name,
{
int i;
int ret = 0;
- int reg_ap_id = 0;
struct list_head * pos = NULL;
struct reg_name_entry * rne = NULL;
@@ -466,18 +605,18 @@ static int ap_reg(char * ap_name,
instance_name_t * ipcpi = NULL;
if (instance->ipcps.next == NULL)
- LOG_ERR("No IPCPs in this system.");
+ return -1;
/* check if this ap_name is already registered */
- rne = find_reg_name_entry_by_name(ap_name);
+ rne = get_reg_name_entry_by_name(ap_name);
if (rne != NULL)
return -1; /* can only register one instance for now */
- rne = reg_name_entry_create();
- if (rne == NULL)
+ api = instance_name_create();
+ if (api == NULL) {
return -1;
+ }
- api = instance_name_create();
if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
instance_name_destroy(api);
return -1;
@@ -488,12 +627,6 @@ static int ap_reg(char * ap_name,
* contains a single instance only
*/
- if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) {
- reg_name_entry_destroy(rne);
- instance_name_destroy(api);
- return -1;
- }
-
if (strcmp(difs[0], ALL_DIFS) == 0) {
list_for_each(pos, &instance->ipcps) {
struct ipcp_entry * e =
@@ -528,11 +661,10 @@ static int ap_reg(char * ap_name,
return -1;
}
/* for now, we register single instances */
- reg_name_entry_add_name_instance(strdup(ap_name),
- instance_name_dup(api));
- instance_name_destroy(api);
+ ret = reg_name_entry_add_name_instance(strdup(ap_name),
+ api);
- return reg_ap_id;
+ return ret;
}
static int ap_unreg(char * ap_name,
@@ -542,149 +674,304 @@ static int ap_unreg(char * ap_name,
{
struct reg_name_entry * tmp = NULL;
- instance_name_t * api = instance_name_create();
- if (api == NULL)
- return -1;
-
- if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
- instance_name_destroy(api);
- return -1;
- }
-
/* check if ap_name is registered */
- tmp = find_reg_name_entry_by_name(api->name);
- if (tmp == NULL) {
- instance_name_destroy(api);
+ tmp = get_reg_name_entry_by_id(ap_id);
+ if (tmp == NULL)
return 0;
- } else {
- return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len);
- }
-}
+ if (strcmp(ap_name, tmp->api->name))
+ return 0;
-static int flow_accept(int fd,
- pid_t pid,
- char * ap_name,
- char * ae_name)
-{
- return -1;
+ return ap_unreg_id(ap_id, difs, len);
}
-static int flow_alloc_resp(int fd,
- int result)
+static struct port_map_entry * flow_accept(pid_t pid,
+ char ** ap_name,
+ char ** ae_name)
{
- return -1;
+ bool arrived = false;
+
+ struct timespec ts = {0, 100000};
+
+ struct port_map_entry * pme;
+ struct reg_name_entry * rne = get_reg_name_entry_by_id(pid);
+ if (rne == NULL) {
+ LOG_DBGF("Unregistered AP calling accept().");
+ return NULL;
+ }
+
+ if (rne->accept) {
+ LOG_DBGF("This AP still has a pending accept().");
+ return NULL;
+ }
+
+ rne->accept = true;
+
+ /* FIXME: wait for a thread that runs select() on flow_arrived */
+ while (!arrived) {
+ /* FIXME: this needs locking */
+ rne = get_reg_name_entry_by_id(pid);
+ if (rne == NULL)
+ return NULL;
+ arrived = rne->flow_arrived;
+ nanosleep(&ts, NULL);
+ }
+
+ pme = get_port_map_entry_n(pid);
+ if (pme == NULL) {
+ LOG_ERR("Port_id was not created yet.");
+ return NULL;
+ }
+
+ pthread_mutex_lock(&rne->fa_lock);
+ *ap_name = rne->req_ap_name;
+ if (ae_name != NULL)
+ *ae_name = rne->req_ae_name;
+ pthread_mutex_unlock(&rne->fa_lock);
+
+ return pme;
}
-static int flow_alloc(char * dst_name,
- char * src_ap_name,
- char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+static int flow_alloc_resp(pid_t n_pid,
+ uint32_t port_id,
+ int response)
{
- int port_id = 0;
- pid_t pid = get_ipcp_by_dst_name(dst_name)->id;
+ struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid);
+ struct port_map_entry * pme = get_port_map_entry(port_id);
- LOG_DBG("flow alloc received from %s-%s to %s.",
- src_ap_name, src_ae_name, dst_name);
+ if (rne == NULL || pme == NULL)
+ return -1;
+
+ /* FIXME: check all instances associated with the name */
+ if (!rne->accept) {
+ LOG_ERR("No process listening for this name.");
+ return -1;
+ }
+
+ /*
+ * consider the flow as handled
+ * once we can handle a list of AP-I's, remove it from the list
+ */
+
+ rne->flow_arrived = false;
+ rne->accept = false;
+
+ if (!response)
+ pme->state = FLOW_ALLOCATED;
- return ipcp_flow_alloc(pid,
- port_id,
- dst_name,
- src_ap_name,
- src_ae_name,
- qos);
+ return ipcp_flow_alloc_resp(pme->n_1_pid,
+ port_id,
+ pme->n_pid,
+ response);
}
-static int flow_alloc_res(int fd)
+static struct port_map_entry * flow_alloc(pid_t pid,
+ char * dst_name,
+ char * src_ap_name,
+ char * src_ae_name,
+ struct qos_spec * qos)
{
+ struct port_map_entry * e = malloc(sizeof(*e));
+ if (e == NULL) {
+ LOG_ERR("Failed malloc of port_map_entry.");
+ return NULL;
+ }
- return -1;
+ e->port_id = bmp_allocate(instance->port_ids);
+ e->n_pid = pid;
+ e->state = FLOW_PENDING;
+ e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+
+ list_add(&e->next, &instance->port_map);
+
+ if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id,
+ e->port_id,
+ e->n_pid,
+ dst_name,
+ src_ap_name,
+ src_ae_name,
+ qos) < 0) {
+ list_del(&e->next);
+ bmp_release(instance->port_ids, e->port_id);
+ free(e);
+ return NULL;
+ }
+
+ return e;
}
-static int flow_dealloc(int fd)
+static int flow_alloc_res(uint32_t port_id)
{
- return -1;
+ bool allocated = false;
+ struct port_map_entry * e;
+ struct timespec ts = {0,100000};
+
+ while (!allocated) {
+ /* FIXME: this needs locking */
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ LOG_DBGF("Could not locate port_id %u", port_id);
+ return -1;
+ }
+ if (e->state == FLOW_ALLOCATED)
+ allocated = true;
+ nanosleep(&ts, NULL);
+ }
+
+ return 0;
}
-static int flow_cntl(int fd,
- int oflags)
+static int flow_dealloc(uint32_t port_id)
{
- return -1;
+ pid_t n_1_pid;
+
+ struct port_map_entry * e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return 0;
+
+ n_1_pid = e->n_1_pid;
+
+ list_del(&e->next);
+ free(e);
+
+ return ipcp_flow_dealloc(n_1_pid, port_id);
}
-static int flow_req_arr(char * dst_name,
- char * ap_name,
- char * ae_name)
+static struct port_map_entry * flow_req_arr(pid_t pid,
+ char * dst_name,
+ char * ap_name,
+ char * ae_name)
{
- return -1;
+ struct reg_name_entry * rne;
+ struct port_map_entry * pme;
+
+ rne = get_reg_name_entry_by_name(dst_name);
+ if (rne == NULL) {
+ LOG_DBGF("Destination name %s unknown.", dst_name);
+ return NULL;
+ }
+
+ pme = malloc(sizeof(*pme));
+ if (pme == NULL) {
+ LOG_ERR("Failed malloc of port_map_entry.");
+ return NULL;
+ }
+
+ pme->port_id = bmp_allocate(instance->port_ids);
+ pme->n_pid = rne->api->id;
+ pme->state = FLOW_PENDING;
+ pme->n_1_pid = pid;
+
+ list_add(&pme->next, &instance->port_map);
+
+ pthread_mutex_lock(&rne->fa_lock);
+
+ rne->req_ap_name = strdup(ap_name);
+ rne->req_ae_name = strdup(ae_name);
+
+ rne->flow_arrived = true;
+
+ pthread_mutex_unlock(&rne->fa_lock);
+
+ return pme;
}
static int flow_alloc_reply(uint32_t port_id,
- int result)
+ int response)
{
- return -1;
+ struct port_map_entry * e;
+
+ /* FIXME: do this under lock */
+ if (!response) {
+ e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return -1;
+ e->state = FLOW_ALLOCATED;
+ }
+
+ /* FIXME: does this need to be propagated to the IPCP? */
+
+ return 0;
}
static int flow_dealloc_ipcp(uint32_t port_id)
{
- return -1;
+ struct port_map_entry * e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return 0;
+
+ list_del(&e->next);
+ free(e);
+
+ return 0;
+}
+
+static void irm_destroy(struct irm * irm)
+{
+ struct list_head * h;
+ struct list_head * t;
+
+ if (irm == NULL)
+ return;
+
+ if (irm->threadpool != NULL)
+ free(irm->threadpool);
+
+ if (irm->port_ids != NULL)
+ bmp_destroy(irm->port_ids);
+ /* clear the lists */
+ list_for_each_safe(h, t, &irm->ipcps) {
+ struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);
+ destroy_ipcp(e->api);
+ }
+
+ list_for_each_safe(h, t, &irm->reg_names) {
+ struct reg_name_entry * e = list_entry(h,
+ struct reg_name_entry,
+ next);
+ char * difs [1] = {ALL_DIFS};
+ ap_unreg_id(e->api->id, difs, 1);
+ }
+
+ list_for_each_safe(h, t, &irm->port_map) {
+ struct port_map_entry * e = list_entry(h,
+ struct port_map_entry,
+ next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ if (irm->dum != NULL)
+ shm_du_map_destroy(irm->dum);
+
+ close(irm->sockfd);
+ free(irm);
}
void irmd_sig_handler(int sig, siginfo_t * info, void * c)
{
+ int i;
+
switch(sig) {
case SIGINT:
case SIGTERM:
case SIGHUP:
- shm_du_map_close(instance->dum);
- free(instance);
- exit(0);
+ if (instance->threadpool != NULL) {
+ for (i = 0; i < IRMD_THREADPOOL_SIZE; i++)
+ pthread_cancel(instance->threadpool[i]);
+ }
+
+ case SIGPIPE:
+ LOG_DBG("Ignoring SIGPIPE.");
default:
return;
}
}
-int main()
+void * mainloop()
{
- int sockfd;
uint8_t buf[IRM_MSG_BUF_SIZE];
- struct sigaction sig_act;
-
- /* init sig_act */
- memset(&sig_act, 0, sizeof sig_act);
-
- /* install signal traps */
- sig_act.sa_sigaction = &irmd_sig_handler;
- sig_act.sa_flags = SA_SIGINFO;
-
- sigaction(SIGINT, &sig_act, NULL);
- sigaction(SIGTERM, &sig_act, NULL);
- sigaction(SIGHUP, &sig_act, NULL);
-
- if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1)
- unlink("/dev/shm/" SHM_DU_MAP_FILENAME);
-
- instance = malloc(sizeof(*instance));
- if (instance == NULL)
- return -1;
-
- if ((instance->dum = shm_du_map_create()) == NULL) {
- free(instance);
- return -1;
- }
-
- INIT_LIST_HEAD(&instance->ipcps);
- INIT_LIST_HEAD(&instance->reg_names);
-
- sockfd = server_socket_open(IRM_SOCK_PATH);
- if (sockfd < 0) {
- shm_du_map_close(instance->dum);
- free(instance);
- return -1;
- }
-
while (true) {
int cli_sockfd;
irm_msg_t * msg;
@@ -692,18 +979,19 @@ int main()
instance_name_t api;
buffer_t buffer;
irm_msg_t ret_msg = IRM_MSG__INIT;
+ struct port_map_entry * e = NULL;
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
- cli_sockfd = accept(sockfd, 0, 0);
+ cli_sockfd = accept(instance->sockfd, 0, 0);
if (cli_sockfd < 0) {
- LOG_ERR("Cannot accept new connection");
+ LOG_ERR("Cannot accept new connection.");
continue;
}
count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE);
if (count <= 0) {
- LOG_ERR("Failed to read from socket");
+ LOG_ERR("Failed to read from socket.");
close(cli_sockfd);
continue;
}
@@ -750,11 +1038,11 @@ int main()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_AP_REG:
- ret_msg.has_fd = true;
- ret_msg.fd = ap_reg(msg->ap_name,
- msg->pid,
- msg->dif_name,
- msg->n_dif_name);
+ ret_msg.has_result = true;
+ ret_msg.result = ap_reg(msg->ap_name,
+ msg->pid,
+ msg->dif_name,
+ msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_AP_UNREG:
ret_msg.has_result = true;
@@ -764,43 +1052,57 @@ int main()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- ret_msg.has_fd = true;
- ret_msg.fd = flow_accept(msg->fd,
- msg->pid,
- ret_msg.ap_name,
- ret_msg.ae_name);
+ e = flow_accept(msg->pid,
+ &ret_msg.ap_name,
+ &ret_msg.ae_name);
+ if (e == NULL)
+ break;
+
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_1_pid;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:
ret_msg.has_result = true;
- ret_msg.result = flow_alloc_resp(msg->fd,
- msg->result);
+ ret_msg.result = flow_alloc_resp(msg->pid,
+ msg->port_id,
+ msg->response);
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
- ret_msg.has_fd = true;
- ret_msg.fd = flow_alloc(msg->dst_name,
- msg->ap_name,
- msg->ae_name,
- NULL,
- msg->oflags);
+ e = flow_alloc(msg->pid,
+ msg->dst_name,
+ msg->ap_name,
+ msg->ae_name,
+ NULL);
+ if (e == NULL)
+ break;
+
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_1_pid;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES:
- ret_msg.has_response = true;
- ret_msg.response = flow_alloc_res(msg->fd);
- break;
- case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->fd);
+ ret_msg.result = flow_alloc_res(msg->port_id);
break;
- case IRM_MSG_CODE__IRM_FLOW_CONTROL:
+ case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_cntl(msg->fd,
- msg->oflags);
+ ret_msg.result = flow_dealloc(msg->port_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
+ e = flow_req_arr(msg->pid,
+ msg->dst_name,
+ msg->ap_name,
+ msg->ae_name);
+ if (e == NULL)
+ break;
+
ret_msg.has_port_id = true;
- ret_msg.port_id = flow_req_arr(msg->dst_name,
- msg->ap_name,
- msg->ae_name);
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_pid;
break;
case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:
ret_msg.has_result = true;
@@ -812,7 +1114,7 @@ int main()
ret_msg.result = flow_dealloc_ipcp(msg->port_id);
break;
default:
- LOG_ERR("Don't know that message code");
+ LOG_ERR("Don't know that message code.");
break;
}
@@ -820,7 +1122,7 @@ int main()
buffer.size = irm_msg__get_packed_size(&ret_msg);
if (buffer.size == 0) {
- LOG_ERR("Failed to send reply message");
+ LOG_ERR("Failed to send reply message.");
close(cli_sockfd);
continue;
}
@@ -842,6 +1144,84 @@ int main()
free(buffer.data);
close(cli_sockfd);
}
+}
+
+static struct irm * irm_create()
+{
+ struct irm * i = malloc(sizeof(*i));
+ if (i == NULL)
+ return NULL;
+
+ if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1)
+ unlink("/dev/shm/" SHM_DU_MAP_FILENAME);
+
+ i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ if (i->threadpool == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ if ((i->dum = shm_du_map_create()) == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&i->ipcps);
+ INIT_LIST_HEAD(&i->reg_names);
+ INIT_LIST_HEAD(&i->port_map);
+
+ i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
+ if (i->port_ids == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ i->sockfd = server_socket_open(IRM_SOCK_PATH);
+ if (i->sockfd < 0) {
+ irm_destroy(instance);
+ return NULL;
+ }
+
+ pthread_mutex_init(&i->lock, NULL);
+
+ return i;
+}
+
+int main()
+{
+ struct sigaction sig_act;
+
+ int t = 0;
+
+ /* init sig_act */
+ memset(&sig_act, 0, sizeof sig_act);
+
+ /* install signal traps */
+ sig_act.sa_sigaction = &irmd_sig_handler;
+ sig_act.sa_flags = SA_SIGINFO;
+
+ sigaction(SIGINT, &sig_act, NULL);
+ sigaction(SIGTERM, &sig_act, NULL);
+ sigaction(SIGHUP, &sig_act, NULL);
+ sigaction(SIGPIPE, &sig_act, NULL);
+
+ instance = irm_create();
+ if (instance == NULL)
+ return 1;
+
+ /*
+ * FIXME: we need a main loop that delegates messages to subthreads in a
+ * way that avoids all possible deadlocks for local apps
+ */
+
+ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
+ pthread_create(&instance->threadpool[t], NULL, mainloop, NULL);
+
+ /* wait for (all of them) to return */
+ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
+ pthread_join(instance->threadpool[t], NULL);
+
+ irm_destroy(instance);
return 0;
}