summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/ipcp.h1
-rw-r--r--src/ipcpd/normal/fmgr.c42
-rw-r--r--src/ipcpd/normal/fmgr.h3
-rw-r--r--src/ipcpd/normal/frct.c5
-rw-r--r--src/ipcpd/normal/main.c12
-rw-r--r--src/ipcpd/normal/ribmgr.c50
-rw-r--r--src/ipcpd/normal/ribmgr.h7
-rw-r--r--src/lib/cdap.c4
-rw-r--r--src/lib/shm_ap_rbuff.c2
9 files changed, 80 insertions, 46 deletions
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index bbf1d1f7..630f7922 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -33,6 +33,7 @@
enum ipcp_state {
IPCP_INIT = 0,
+ IPCP_PENDING_ENROLL,
IPCP_ENROLLED,
IPCP_DISCONNECTED,
IPCP_SHUTDOWN
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 4b3c49e6..a539b289 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -35,6 +35,9 @@
#include "fmgr.h"
#include "ribmgr.h"
#include "frct.h"
+#include "ipcp.h"
+
+extern struct ipcp * _ipcp;
struct n_1_flow {
int fd;
@@ -77,28 +80,37 @@ static void * fmgr_listen(void * o)
int fd;
char * ae_name;
- /* FIXME: Only start to listen once we are enrolled */
+ /* FIXME: Avoid busy wait and react to pthread_cond_t */
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
+ while (_ipcp->state != IPCP_ENROLLED ||
+ _ipcp->state != IPCP_SHUTDOWN) {
+ pthread_rwlock_unlock(&_ipcp->state_lock);
+ sched_yield();
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
+ }
- while (true) {
+ while (_ipcp->state != IPCP_SHUTDOWN) {
+ pthread_rwlock_unlock(&_ipcp->state_lock);
fd = flow_accept(&ae_name);
if (fd < 0) {
LOG_ERR("Failed to accept flow.");
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
continue;
}
- LOG_DBG("New flow alloc request for AE %s", ae_name);
-
if (!(strcmp(ae_name, MGMT_AE) == 0 ||
strcmp(ae_name, DT_AE) == 0)) {
if (flow_alloc_resp(fd, -1))
LOG_ERR("Failed to reply to flow allocation.");
flow_dealloc(fd);
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
continue;
}
if (flow_alloc_resp(fd, 0)) {
LOG_ERR("Failed to reply to flow allocation.");
flow_dealloc(fd);
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
continue;
}
@@ -106,9 +118,10 @@ static void * fmgr_listen(void * o)
ae_name);
if (strcmp(ae_name, MGMT_AE) == 0) {
- if (ribmgr_mgmt_flow(fd)) {
+ if (ribmgr_add_flow(fd)) {
LOG_ERR("Failed to hand fd to RIB.");
flow_dealloc(fd);
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
continue;
}
}
@@ -117,6 +130,7 @@ static void * fmgr_listen(void * o)
if (frct_dt_flow(fd)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
continue;
}
}
@@ -124,8 +138,11 @@ static void * fmgr_listen(void * o)
if (add_n_1_fd(fd, ae_name)) {
LOG_ERR("Failed to add file descriptor to list.");
flow_dealloc(fd);
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
continue;
}
+
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
}
return (void *) 0;
@@ -134,9 +151,8 @@ static void * fmgr_listen(void * o)
int fmgr_init()
{
fmgr = malloc(sizeof(*fmgr));
- if (fmgr == NULL) {
+ if (fmgr == NULL)
return -1;
- }
INIT_LIST_HEAD(&fmgr->n_1_flows);
@@ -164,7 +180,8 @@ int fmgr_fini()
list_entry(pos, struct n_1_flow, next);
if (e->ae_name != NULL)
free(e->ae_name);
- flow_dealloc(e->fd);
+ if (ribmgr_remove_flow(e->fd))
+ LOG_ERR("Failed to remove management flow.");
}
free(fmgr);
@@ -191,7 +208,7 @@ int fmgr_mgmt_flow(char * dst_name)
return -1;
}
- if (ribmgr_mgmt_flow(fd)) {
+ if (ribmgr_add_flow(fd)) {
LOG_ERR("Failed to hand file descriptor to RIB manager");
flow_dealloc(fd);
return -1;
@@ -239,10 +256,3 @@ int fmgr_flow_dealloc(int port_id)
return -1;
}
-
-int fmgr_flow_msg()
-{
- LOG_MISSING;
-
- return -1;
-}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index 867cbff6..dc88bbdf 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -52,7 +52,4 @@ int fmgr_flow_alloc_resp(pid_t n_api,
int fmgr_flow_dealloc(int port_id);
-/* RIB Manager calls this (param will be of type fmgr_msg_t) */
-int fmgr_flow_msg();
-
#endif
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index 49006276..ba465540 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -29,12 +29,10 @@
#include "frct.h"
struct frct_i {
-
};
struct frct {
struct dt_const * dtc;
-
} * frct = NULL;
int frct_init(struct dt_const * dtc)
@@ -43,9 +41,8 @@ int frct_init(struct dt_const * dtc)
return -1;
frct = malloc(sizeof(*frct));
- if (frct == NULL) {
+ if (frct == NULL)
return -1;
- }
frct->dtc = dtc;
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 54ebd674..57fb72df 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -80,14 +80,14 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
pthread_cancel(normal_data(_ipcp)->mainloop);
+ if (fmgr_fini())
+ LOG_ERR("Failed to finalize flow manager.");
+
if (ribmgr_fini())
LOG_ERR("Failed to finalize RIB manager.");
if (frct_fini())
LOG_ERR("Failed to finalize FRCT.");
-
- if (fmgr_fini())
- LOG_ERR("Failed to finalize flow manager.");
}
default:
return;
@@ -138,15 +138,15 @@ static int normal_ipcp_enroll(char * dif_name)
return -1; /* -ENOTINIT */
}
+ pthread_rwlock_unlock(&_ipcp->state_lock);
+
if (fmgr_mgmt_flow(dif_name)) {
pthread_rwlock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to establish management flow.");
return -1;
}
- _ipcp->state = IPCP_ENROLLED;
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ /* FIXME: Wait until state changed to ENROLLED */
return 0;
}
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 4d29b098..8bb320c0 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -34,9 +34,12 @@
#include "ribmgr.h"
#include "dt_const.h"
#include "frct.h"
+#include "ipcp.h"
#define ENROLLMENT "enrollment"
+extern struct ipcp * _ipcp;
+
enum cdap_opcode {
READ = 0,
WRITE,
@@ -103,11 +106,11 @@ int cdap_request_add(struct cdap * instance,
int ribmgr_init()
{
rib = malloc(sizeof(*rib));
- if (rib == NULL) {
+ if (rib == NULL)
return -1;
- }
INIT_LIST_HEAD(&rib->flows);
+ INIT_LIST_HEAD(&rib->cdap_reqs);
if (pthread_rwlock_init(&rib->flows_lock, NULL)) {
LOG_ERR("Failed to initialize rwlock.");
@@ -141,11 +144,13 @@ int ribmgr_fini()
pthread_mutex_unlock(&rib->cdap_reqs_lock);
pthread_rwlock_wrlock(&rib->flows_lock);
- list_for_each_safe(pos, n, &rib->cdap_reqs) {
+ list_for_each_safe(pos, n, &rib->flows) {
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
if (cdap_destroy(flow->instance))
LOG_ERR("Failed to destroy CDAP instance.");
+ list_del(&flow->next);
+ free(flow);
}
pthread_rwlock_unlock(&rib->flows_lock);
@@ -232,7 +237,7 @@ static struct cdap_ops ribmgr_ops = {
.cdap_stop = ribmgr_cdap_stop
};
-int ribmgr_mgmt_flow(int fd)
+int ribmgr_add_flow(int fd)
{
struct cdap * instance = NULL;
struct mgmt_flow * flow;
@@ -253,8 +258,14 @@ int ribmgr_mgmt_flow(int fd)
flow->instance = instance;
flow->fd = fd;
+ pthread_rwlock_rdlock(&_ipcp->state_lock);
pthread_rwlock_wrlock(&rib->flows_lock);
- if (list_empty(&rib->flows)) {
+ if (list_empty(&rib->flows) &&
+ (_ipcp->state == IPCP_INIT ||
+ _ipcp->state == IPCP_DISCONNECTED)) {
+ _ipcp->state = IPCP_PENDING_ENROLL;
+ pthread_rwlock_unlock(&_ipcp->state_lock);
+
pthread_mutex_lock(&rib->cdap_reqs_lock);
iid = cdap_send_start(instance,
ENROLLMENT);
@@ -277,6 +288,7 @@ int ribmgr_mgmt_flow(int fd)
}
pthread_mutex_unlock(&rib->cdap_reqs_lock);
}
+ pthread_rwlock_unlock(&_ipcp->state_lock);
list_add(&flow->next, &rib->flows);
pthread_rwlock_unlock(&rib->flows_lock);
@@ -284,6 +296,27 @@ int ribmgr_mgmt_flow(int fd)
return 0;
}
+int ribmgr_remove_flow(int fd)
+{
+ struct list_head * pos, * n = NULL;
+
+ pthread_rwlock_wrlock(&rib->flows_lock);
+ list_for_each_safe(pos, n, &rib->flows) {
+ struct mgmt_flow * flow =
+ list_entry(pos, struct mgmt_flow, next);
+ if (flow->fd == fd) {
+ if (cdap_destroy(flow->instance))
+ LOG_ERR("Failed to destroy CDAP instance.");
+ list_del(&flow->next);
+ free(flow);
+ return 0;
+ }
+ }
+ pthread_rwlock_unlock(&rib->flows_lock);
+
+ return -1;
+}
+
int ribmgr_bootstrap(struct dif_config * conf)
{
if (conf == NULL ||
@@ -310,10 +343,3 @@ int ribmgr_bootstrap(struct dif_config * conf)
return 0;
}
-
-int ribmgr_fmgr_msg()
-{
- LOG_MISSING;
-
- return -1;
-}
diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h
index 335189f9..e85c65be 100644
--- a/src/ipcpd/normal/ribmgr.h
+++ b/src/ipcpd/normal/ribmgr.h
@@ -28,10 +28,9 @@
int ribmgr_init();
int ribmgr_fini();
-int ribmgr_mgmt_flow(int fd);
-int ribmgr_bootstrap(struct dif_config * conf);
+int ribmgr_add_flow(int fd);
+int ribmgr_remove_flow(int fd);
-/* Called by Flow Manager (param of type fmgr_msg_t) */
-int ribmgr_fmgr_msg();
+int ribmgr_bootstrap(struct dif_config * conf);
#endif
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 441f7e44..4599fd8b 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -212,6 +212,7 @@ struct cdap * cdap_create(struct cdap_ops * ops,
}
instance->ops = ops;
+ instance->fd = fd;
instance->ids = bmp_create(IDS_SIZE, 0);
if (instance->ids == NULL) {
@@ -234,6 +235,9 @@ int cdap_destroy(struct cdap * instance)
pthread_cancel(instance->reader);
+ if (flow_dealloc(instance->fd))
+ return -1;
+
pthread_mutex_lock(&instance->ids_lock);
bmp_destroy(instance->ids);
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 618c4c19..86570d98 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -296,7 +296,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
while (tail_el_ptr->port_id < 0)
*rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
- while(shm_rbuff_empty(rb))
+ while (shm_rbuff_empty(rb))
if (pthread_cond_wait(rb->work, rb->shm_mutex)
== EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");