From cf719963be2e42026012e152ae49f4c764dd9b4f Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 28 Jul 2016 17:23:42 +0200 Subject: ipcpd: normal: Allow initiating enrollment This will add more functionality for enrolling two normal IPCPs with each other. Some bugs were fixed in CDAP. Now on enrolling, an IPCP will send a START message to the other IPCP. Next step is syncing the RIBs. --- src/ipcpd/ipcp.h | 1 + src/ipcpd/normal/fmgr.c | 42 ++++++++++++++++++++++++--------------- src/ipcpd/normal/fmgr.h | 3 --- src/ipcpd/normal/frct.c | 5 +---- src/ipcpd/normal/main.c | 12 ++++++------ src/ipcpd/normal/ribmgr.c | 50 +++++++++++++++++++++++++++++++++++------------ src/ipcpd/normal/ribmgr.h | 7 +++---- src/lib/cdap.c | 4 ++++ src/lib/shm_ap_rbuff.c | 2 +- 9 files changed, 80 insertions(+), 46 deletions(-) (limited to 'src') diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index bbf1d1f7..630f7922 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -33,6 +33,7 @@ enum ipcp_state { IPCP_INIT = 0, + IPCP_PENDING_ENROLL, IPCP_ENROLLED, IPCP_DISCONNECTED, IPCP_SHUTDOWN diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 4b3c49e6..a539b289 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -35,6 +35,9 @@ #include "fmgr.h" #include "ribmgr.h" #include "frct.h" +#include "ipcp.h" + +extern struct ipcp * _ipcp; struct n_1_flow { int fd; @@ -77,28 +80,37 @@ static void * fmgr_listen(void * o) int fd; char * ae_name; - /* FIXME: Only start to listen once we are enrolled */ + /* FIXME: Avoid busy wait and react to pthread_cond_t */ + pthread_rwlock_rdlock(&_ipcp->state_lock); + while (_ipcp->state != IPCP_ENROLLED || + _ipcp->state != IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&_ipcp->state_lock); + sched_yield(); + pthread_rwlock_rdlock(&_ipcp->state_lock); + } - while (true) { + while (_ipcp->state != IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&_ipcp->state_lock); fd = flow_accept(&ae_name); if (fd < 0) { LOG_ERR("Failed to accept flow."); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } - LOG_DBG("New flow alloc request for AE %s", ae_name); - if (!(strcmp(ae_name, MGMT_AE) == 0 || strcmp(ae_name, DT_AE) == 0)) { if (flow_alloc_resp(fd, -1)) LOG_ERR("Failed to reply to flow allocation."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } if (flow_alloc_resp(fd, 0)) { LOG_ERR("Failed to reply to flow allocation."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } @@ -106,9 +118,10 @@ static void * fmgr_listen(void * o) ae_name); if (strcmp(ae_name, MGMT_AE) == 0) { - if (ribmgr_mgmt_flow(fd)) { + if (ribmgr_add_flow(fd)) { LOG_ERR("Failed to hand fd to RIB."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } } @@ -117,6 +130,7 @@ static void * fmgr_listen(void * o) if (frct_dt_flow(fd)) { LOG_ERR("Failed to hand fd to FRCT."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } } @@ -124,8 +138,11 @@ static void * fmgr_listen(void * o) if (add_n_1_fd(fd, ae_name)) { LOG_ERR("Failed to add file descriptor to list."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } + + pthread_rwlock_rdlock(&_ipcp->state_lock); } return (void *) 0; @@ -134,9 +151,8 @@ static void * fmgr_listen(void * o) int fmgr_init() { fmgr = malloc(sizeof(*fmgr)); - if (fmgr == NULL) { + if (fmgr == NULL) return -1; - } INIT_LIST_HEAD(&fmgr->n_1_flows); @@ -164,7 +180,8 @@ int fmgr_fini() list_entry(pos, struct n_1_flow, next); if (e->ae_name != NULL) free(e->ae_name); - flow_dealloc(e->fd); + if (ribmgr_remove_flow(e->fd)) + LOG_ERR("Failed to remove management flow."); } free(fmgr); @@ -191,7 +208,7 @@ int fmgr_mgmt_flow(char * dst_name) return -1; } - if (ribmgr_mgmt_flow(fd)) { + if (ribmgr_add_flow(fd)) { LOG_ERR("Failed to hand file descriptor to RIB manager"); flow_dealloc(fd); return -1; @@ -239,10 +256,3 @@ int fmgr_flow_dealloc(int port_id) return -1; } - -int fmgr_flow_msg() -{ - LOG_MISSING; - - return -1; -} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 867cbff6..dc88bbdf 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -52,7 +52,4 @@ int fmgr_flow_alloc_resp(pid_t n_api, int fmgr_flow_dealloc(int port_id); -/* RIB Manager calls this (param will be of type fmgr_msg_t) */ -int fmgr_flow_msg(); - #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 49006276..ba465540 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -29,12 +29,10 @@ #include "frct.h" struct frct_i { - }; struct frct { struct dt_const * dtc; - } * frct = NULL; int frct_init(struct dt_const * dtc) @@ -43,9 +41,8 @@ int frct_init(struct dt_const * dtc) return -1; frct = malloc(sizeof(*frct)); - if (frct == NULL) { + if (frct == NULL) return -1; - } frct->dtc = dtc; diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 54ebd674..57fb72df 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -80,14 +80,14 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) pthread_cancel(normal_data(_ipcp)->mainloop); + if (fmgr_fini()) + LOG_ERR("Failed to finalize flow manager."); + if (ribmgr_fini()) LOG_ERR("Failed to finalize RIB manager."); if (frct_fini()) LOG_ERR("Failed to finalize FRCT."); - - if (fmgr_fini()) - LOG_ERR("Failed to finalize flow manager."); } default: return; @@ -138,15 +138,15 @@ static int normal_ipcp_enroll(char * dif_name) return -1; /* -ENOTINIT */ } + pthread_rwlock_unlock(&_ipcp->state_lock); + if (fmgr_mgmt_flow(dif_name)) { pthread_rwlock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to establish management flow."); return -1; } - _ipcp->state = IPCP_ENROLLED; - - pthread_rwlock_unlock(&_ipcp->state_lock); + /* FIXME: Wait until state changed to ENROLLED */ return 0; } diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 4d29b098..8bb320c0 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -34,9 +34,12 @@ #include "ribmgr.h" #include "dt_const.h" #include "frct.h" +#include "ipcp.h" #define ENROLLMENT "enrollment" +extern struct ipcp * _ipcp; + enum cdap_opcode { READ = 0, WRITE, @@ -103,11 +106,11 @@ int cdap_request_add(struct cdap * instance, int ribmgr_init() { rib = malloc(sizeof(*rib)); - if (rib == NULL) { + if (rib == NULL) return -1; - } INIT_LIST_HEAD(&rib->flows); + INIT_LIST_HEAD(&rib->cdap_reqs); if (pthread_rwlock_init(&rib->flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); @@ -141,11 +144,13 @@ int ribmgr_fini() pthread_mutex_unlock(&rib->cdap_reqs_lock); pthread_rwlock_wrlock(&rib->flows_lock); - list_for_each_safe(pos, n, &rib->cdap_reqs) { + list_for_each_safe(pos, n, &rib->flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); + list_del(&flow->next); + free(flow); } pthread_rwlock_unlock(&rib->flows_lock); @@ -232,7 +237,7 @@ static struct cdap_ops ribmgr_ops = { .cdap_stop = ribmgr_cdap_stop }; -int ribmgr_mgmt_flow(int fd) +int ribmgr_add_flow(int fd) { struct cdap * instance = NULL; struct mgmt_flow * flow; @@ -253,8 +258,14 @@ int ribmgr_mgmt_flow(int fd) flow->instance = instance; flow->fd = fd; + pthread_rwlock_rdlock(&_ipcp->state_lock); pthread_rwlock_wrlock(&rib->flows_lock); - if (list_empty(&rib->flows)) { + if (list_empty(&rib->flows) && + (_ipcp->state == IPCP_INIT || + _ipcp->state == IPCP_DISCONNECTED)) { + _ipcp->state = IPCP_PENDING_ENROLL; + pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_lock(&rib->cdap_reqs_lock); iid = cdap_send_start(instance, ENROLLMENT); @@ -277,6 +288,7 @@ int ribmgr_mgmt_flow(int fd) } pthread_mutex_unlock(&rib->cdap_reqs_lock); } + pthread_rwlock_unlock(&_ipcp->state_lock); list_add(&flow->next, &rib->flows); pthread_rwlock_unlock(&rib->flows_lock); @@ -284,6 +296,27 @@ int ribmgr_mgmt_flow(int fd) return 0; } +int ribmgr_remove_flow(int fd) +{ + struct list_head * pos, * n = NULL; + + pthread_rwlock_wrlock(&rib->flows_lock); + list_for_each_safe(pos, n, &rib->flows) { + struct mgmt_flow * flow = + list_entry(pos, struct mgmt_flow, next); + if (flow->fd == fd) { + if (cdap_destroy(flow->instance)) + LOG_ERR("Failed to destroy CDAP instance."); + list_del(&flow->next); + free(flow); + return 0; + } + } + pthread_rwlock_unlock(&rib->flows_lock); + + return -1; +} + int ribmgr_bootstrap(struct dif_config * conf) { if (conf == NULL || @@ -310,10 +343,3 @@ int ribmgr_bootstrap(struct dif_config * conf) return 0; } - -int ribmgr_fmgr_msg() -{ - LOG_MISSING; - - return -1; -} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index 335189f9..e85c65be 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -28,10 +28,9 @@ int ribmgr_init(); int ribmgr_fini(); -int ribmgr_mgmt_flow(int fd); -int ribmgr_bootstrap(struct dif_config * conf); +int ribmgr_add_flow(int fd); +int ribmgr_remove_flow(int fd); -/* Called by Flow Manager (param of type fmgr_msg_t) */ -int ribmgr_fmgr_msg(); +int ribmgr_bootstrap(struct dif_config * conf); #endif diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 441f7e44..4599fd8b 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -212,6 +212,7 @@ struct cdap * cdap_create(struct cdap_ops * ops, } instance->ops = ops; + instance->fd = fd; instance->ids = bmp_create(IDS_SIZE, 0); if (instance->ids == NULL) { @@ -234,6 +235,9 @@ int cdap_destroy(struct cdap * instance) pthread_cancel(instance->reader); + if (flow_dealloc(instance->fd)) + return -1; + pthread_mutex_lock(&instance->ids_lock); bmp_destroy(instance->ids); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 618c4c19..86570d98 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -296,7 +296,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) while (tail_el_ptr->port_id < 0) *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); - while(shm_rbuff_empty(rb)) + while (shm_rbuff_empty(rb)) if (pthread_cond_wait(rb->work, rb->shm_mutex) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); -- cgit v1.2.3